# The code which follows generates a Bitcoin-like block-chain of any length. Each block can contain any amount of data # in the form of an array of separate strings. On generation, the block's header will consist of a time stamp, a nonce, which # for now is just a place holder (the number 42), and the head of the Merkle tree of the block's data. # Following the chain code is a test case with seven blocks and an integrity check. # Then one string in one of the blocks is modified, the Merkle tree is # updated for compatibility with the modified data, and the header is modified to be consistent with the modified Merkle # tree. The block's hashed header is even modified to be compatible with the corrupted block. Then the integrity check is # repeated. This is a fairly sophisticated attack on the block chain, but, as you can see from the test results, readily detectable. # A more serious attack would successively modify the headers of blocks up to the lead block (one short of the dummy header # block), but this can't be carried out, because the hashed header of that last data block will be published separately # from the block, and that is why the block chain cannot be modified (anywhere) without detection. #!/usr/local/bin/python3 from hashlib import sha256 from datetime import datetime DUMMY_ARRAY = ['Dummy', 'array'] # merkle_tree builds a Merkle tree and returns (and keeps only) the head. # input -- leaf_data -- The data (lowest level) for the tree as an array of strings # return -- The tree head as a single hash def merkle_tree(leaf_data): next_level = [] data = [do_hash(s) for s in leaf_data] while True: while True: s = data.pop(0) if not data: break s += data.pop(0) if not data: break next_level.append(do_hash(s)) next_level.append(do_hash(s)) if len(next_level) == 1: break data = next_level[:] next_level = [] return next_level[0] # do_hash moves all the hash calls to a single point # input -- s -- The string to hash # return -- hash of s def do_hash(s): return sha256(s.encode('utf-8')).hexdigest() class Block: # input -- previous_block -- a pointer back to the preceding block # input -- is_first -- true for first block, false otherwise # input -- data_array -- the data in the block as a string array def __init__(self, previous_block, is_first, data_array): self.previous_block = previous_block self.is_first = is_first self.data_array = data_array self.nonce = '42' # For now self.time_stamp = datetime.today().strftime('D%Y-%m-%dT%H:%M:%S.%f') self.merkle = merkle_tree(data_array) self.header = self.merkle + self.time_stamp + self.nonce if is_first: self.cumulative_hash_header = do_hash(self.header) else: self.cumulative_hash_header = '' self.next_block = None # reset_data_array is used to set the @cumulative_hash_header when a new block is added # input -- data_array -- a changed data array is input and an updated @header is calculated from the updated Merkle tree def reset_data_array(self, data_array): self.data_array = data_array self.merkle = merkle_tree(data_array) self.header = self.merkle + self.time_stamp + self.nonce self.cumulative_hash_header = do_hash(self.previous_block.cumulative_hash_header + self.header) class Chain: def __init__(self, data_array): self.tail = self.chain = Block(None, True, data_array) self.head = Block(self.chain, False, DUMMY_ARRAY) self.chain.next_block = self.head self.master_hash = self.chain.cumulative_hash_header # add_block updates the chain with a new block of data # input -- in_data_array -- the data for the new block def add_block(self, in_data_array): self.head.reset_data_array(in_data_array) self.chain = self.head self.head = Block(self.chain, False, DUMMY_ARRAY) self.chain.next_block = self.head self.master_hash = self.chain.cumulative_hash_header # tail_to_head recomputes the master hash for comparison with cumulative hash header # return -- a recalculated master hash def tail_to_head(self): present_block, next_block, block_array = self.tail, self.tail.next_block, [] while present_block != self.head: block_array.append(present_block) present_block = next_block next_block = present_block.next_block a_header = '' for b in block_array: a_header = do_hash(a_header + merkle_tree(b.data_array) + b.time_stamp + b.nonce) return a_header # fine_grained_corruption_search walks the block-chain from tail to head looking for 1. any incompatibilites # between the data array and the stored merkle tree head 2. the merkle tree head, the time stamp, and the nonce, # compared to the stored header, and 3. the cumulative hashed header and the stored cumulative hashed header. # return -- either an all good or an error message def fine_grained_corruption_search(self): present_block, next_block, result = self.tail, self.tail.next_block, 'No incompatibilities found' hashed_header = '' while present_block != self.head: merkle = merkle_tree(present_block.data_array) if merkle != present_block.merkle: result = 'Merkle -- data incompatility' break header = merkle + present_block.time_stamp + present_block.nonce if header != present_block.header: result = 'Header incompability' break hashed_header = do_hash(hashed_header + header) if hashed_header != present_block.cumulative_hash_header: result = 'Cumulative hashed header incompability in block beginning with "' + present_block.data_array[0] + '"' break present_block = next_block next_block = present_block.next_block return result # publish_master_hash does just that # return -- the header hash that insures an uncorrupted chain def publish_master_hash(self): return self.master_hash # integrity_check compares the master_hash with a newly minted one # return -- true if chain is uncorrupted, false if chain is corrupted def integrity_check(self): if self.tail_to_head() == self.master_hash: result = 'Chain is intact' else: result = 'Chain is corrupted' return result # to_s puts a nicely formatted chain into a string def to_s(self): present_block, next_block = self.tail, self.tail.next_block while present_block != self.head: print('{0:s} -- {1:s} -- {2:s} \n'.format(' '.join(present_block.data_array), present_block.time_stamp, present_block.nonce)) present_block = next_block next_block = present_block.next_block if __name__ == "__main__": data = 'Nobody ever went broke underestimating the taste of the American public' a_chain = Chain(data.split()) data = 'To apologize is to lay the foundation for a future offense' a_chain.add_block(data.split()) data = 'Patience: A minor form of dispair disguised as a virtue' a_chain.add_block(data.split()) data = 'War is God\'s way of teaching Americans geography' a_chain.add_block(data.split()) data = 'Happiness: An agreeable sensation arising from contemplating the misery of another' a_chain.add_block(data.split()) data = 'Academy: A modern school where football is taught' a_chain.add_block(data.split()) data = 'Egotist: A person of low taste, more interested in himself than in me' a_chain.add_block(data.split()) print('\nThe master hash is: {0:s}'.format(a_chain.publish_master_hash())) print() print(a_chain.integrity_check()) print() print('The uncorrupted chain is, in order of insertion:\n') print(a_chain.to_s()) print('\nFine grained error search:') print(a_chain.fine_grained_corruption_search()) data = 'Fidelity: A virtue peculiar to those who are about to be betrayed' b = a_chain.head.previous_block.previous_block b.data_array = data.split() b.merkle = merkle_tree(data.split()) b.header = b.merkle + b.time_stamp + b.nonce b.cumulative_hash_header = do_hash(b.previous_block.cumulative_hash_header + b.header) print() print(a_chain.integrity_check()) print('The corrupted chain is, in order of insertion:\n') print(a_chain.to_s()) print('\nFine grained error search:') print(a_chain.fine_grained_corruption_search()) # Test output: # The master hash is: 0b98be28b2f3bd882b112a352d2a666b78545ca4a69ccf3cb223cc96d72eea8b # Chain is intact # The uncorrupted chain is, in order of insertion: # Nobody ever went broke underestimating the taste of the American public -- D2017-03-09T17:14:10.735397 -- 42 # To apologize is to lay the foundation for a future offense -- D2017-03-09T17:14:10.735583 -- 42 # Patience: A minor form of dispair disguised as a virtue -- D2017-03-09T17:14:10.735732 -- 42 # War is God's way of teaching Americans geography -- D2017-03-09T17:14:10.735869 -- 42 # Happiness: An agreeable sensation arising from contemplating the misery of another -- D2017-03-09T17:14:10.735980 -- 42 # Academy: A modern school where football is taught -- D2017-03-09T17:14:10.736124 -- 42 # Egotist: A person of low taste, more interested in himself than in me -- D2017-03-09T17:14:10.736235 -- 42 # None # Fine grained error search: # No incompatibilities found # Chain is corrupted # The corrupted chain is, in order of insertion: # Nobody ever went broke underestimating the taste of the American public -- D2017-03-09T17:14:10.735397 -- 42 # To apologize is to lay the foundation for a future offense -- D2017-03-09T17:14:10.735583 -- 42 # Patience: A minor form of dispair disguised as a virtue -- D2017-03-09T17:14:10.735732 -- 42 # War is God's way of teaching Americans geography -- D2017-03-09T17:14:10.735869 -- 42 # Happiness: An agreeable sensation arising from contemplating the misery of another -- D2017-03-09T17:14:10.735980 -- 42 # Fidelity: A virtue peculiar to those who are about to be betrayed -- D2017-03-09T17:14:10.736124 -- 42 # Egotist: A person of low taste, more interested in himself than in me -- D2017-03-09T17:14:10.736235 -- 42 # None # Fine grained error search: # Cumulative hashed header incompability in block beginning with "Egotist:"