from hashlib import sha256 from datetime import datetime from math import log10 total_nonce_count, mining_tries = 0, 0 type_of_mining = 'Fine' if type_of_mining == 'Rough': leading_zero_count = 15 diff = '0' * leading_zero_count approx_nonce_count = 2**leading_zero_count elif type_of_mining == 'Fine': # So we don't have to generate a float and then convert to an integer difficulty = 2 * 10 ** 72 + 65 * 10 ** 70 approx_nonce_count = int(10**(256 * log10(2.0)) / difficulty) DUMMY_ARRAY = ['Dummy', 'array'] # merkle_tree builds a Merkle tree and returns (and keeps only) the head. # input -- leaf_data -- The data (lowest level) for the tree as an array of strings # return -- The tree head as a single hash def merkle_tree(leaf_data): next_level = [] data = [do_hash(s) for s in leaf_data] while True: while True: s = data.pop(0) if not data: break s += data.pop(0) if not data: break next_level.append(do_hash(s)) next_level.append(do_hash(s)) if len(next_level) == 1: break data = next_level[:] next_level = [] return next_level[0] # do_hash moves all the hash calls to a single point # input -- s -- The string to hash # return -- hash of s def do_hash(s): return sha256(s.encode('utf-8')).hexdigest() # course_grained_mining carries out repeated hash calculations with successive nonces until the difficulty is satisfied # input -- s -- the string that the nonces get concatenated with # input -- diff -- a string of zeroes that the successful hash will start with # input -- leading_zero_count -- the length of "diff" # input -- nonce -- the starting value for the nonce # input -- verbose_level -- a number controlling the number of print statements which get executed # return -- the successful nonce def course_grained_mining(s, diff, leading_zero_count, nonce = 1, verbose_level = 1): global mining_tries, total_nonce_count mining_tries += 1 if verbose_level > 0: print('Mining') while True: hsh = do_hash(s + str(nonce)) b = bin(int('1' + str(hsh), 16))[3:] if b[:leading_zero_count] == diff: if verbose_level > 1: print('''Hash with correct nonce ({0:d}) is: {1:s}'''.format(nonce, b)) elif verbose_level > 0: print('Nonce = {0:d}'.format(nonce)) break nonce += 1 total_nonce_count += nonce return nonce # fine_grained_mining carries out repeated hash calculations with successive nonces until the difficulty is satisfied # input -- s -- the string that the nonces get concatenated with # input -- difficulty -- an integer that is the upper limit on the acceptable hash # input -- nonce -- the starting value for the nonce # input -- verbose_level -- a number controlling the number of print statements which get executed # return -- the successful nonce def fine_grained_mining(s, difficulty, nonce = 1, verbose_level = 1): global mining_tries, total_nonce_count mining_tries += 1 if verbose_level > 0: print('Mining') while True: hsh = sha256((s + str(nonce)).encode('utf-8')).hexdigest() i = int(str(hsh), 16) if i < difficulty: if verbose_level > 1: print('''Hash with correct nonce ({0:d}) is: {1:d}'''.format(nonce, i)) elif verbose_level > 0: print('Nonce = {0:d}'.format(nonce)) break nonce += 1 total_nonce_count += nonce return nonce class Block: # input -- previous_block -- a pointer back to the preceding block # input -- is_first -- true for first block, false otherwise # input -- data_array -- the data in the block as a string array def __init__(self, previous_block, is_first, data_array): self.previous_block = previous_block self.is_first = is_first self.data_array = data_array self.time_stamp = datetime.today().strftime('D%Y-%m-%dT%H:%M:%S.%f') self.merkle = merkle_tree(data_array) if is_first: if type_of_mining == 'Rough': self.nonce = course_grained_mining(self.merkle + self.time_stamp, diff, leading_zero_count, verbose_level = 1) elif type_of_mining == 'Fine': self.nonce = fine_grained_mining(self.merkle + self.time_stamp, difficulty, verbose_level = 1) else: self.nonce = 42 self.header = self.merkle + self.time_stamp + str(self.nonce) if is_first: self.cumulative_hash_header = do_hash(self.header) else: self.cumulative_hash_header = '' self.next_block = None # reset_data_array is used to set the @cumulative_hash_header when a new block is added # input -- data_array -- a changed data array is input and an updated @header is calculated from the updated Merkle tree def reset_data_array(self, data_array): self.data_array = data_array self.merkle = merkle_tree(data_array) if type_of_mining == 'Rough': self.nonce = course_grained_mining(self.merkle + self.time_stamp, diff, leading_zero_count, verbose_level = 1) elif type_of_mining == 'Fine': self.nonce = fine_grained_mining(self.merkle + self.time_stamp, difficulty, verbose_level = 1) # print('Resetting nonce') self.header = self.merkle + self.time_stamp + str(self.nonce) self.cumulative_hash_header = do_hash(self.previous_block.cumulative_hash_header + self.header) class Chain: def __init__(self, data_array): self.tail = self.chain = Block(None, True, data_array) self.head = Block(self.chain, False, DUMMY_ARRAY) self.chain.next_block = self.head self.master_hash = self.chain.cumulative_hash_header # add_block updates the chain with a new block of data # input -- in_data_array -- the data for the new block def add_block(self, in_data_array): self.head.reset_data_array(in_data_array) self.chain = self.head self.head = Block(self.chain, False, DUMMY_ARRAY) self.chain.next_block = self.head self.master_hash = self.chain.cumulative_hash_header # tail_to_head recomputes the master hash for comparison with cumulative hash header # return -- a recalculated master hash def tail_to_head(self): present_block, next_block, block_array = self.tail, self.tail.next_block, [] while present_block != self.head: block_array.append(present_block) present_block = next_block next_block = present_block.next_block a_header = '' for b in block_array: a_header = do_hash(a_header + merkle_tree(b.data_array) + b.time_stamp + str(b.nonce)) return a_header # fine_grained_corruption_search walks the block-chain from tail to head looking for 1. any incompatibilites # between the data array and the stored merkle tree head 2. the merkle tree head, the time stamp, and the nonce, # compared to the stored header, and 3. the cumulative hashed header and the stored cumulative hashed header. # return -- either an all good or an error message def fine_grained_corruption_search(self): present_block, next_block, result = self.tail, self.tail.next_block, 'No incompatibilities found' hashed_header = '' while present_block != self.head: merkle = merkle_tree(present_block.data_array) if merkle != present_block.merkle: result = 'Merkle -- data incompatility' break header = merkle + present_block.time_stamp + str(present_block.nonce) if header != present_block.header: result = 'Header incompability' break hashed_header = do_hash(hashed_header + header) if hashed_header != present_block.cumulative_hash_header: result = 'Cumulative hashed header incompability in block beginning with "' + present_block.data_array[0] + '"' break present_block = next_block next_block = present_block.next_block return result # publish_master_hash does just that # return -- the header hash that insures an uncorrupted chain def publish_master_hash(self): return self.master_hash # integrity_check compares the master_hash with a newly minted one # return -- true if chain is uncorrupted, false if chain is corrupted def integrity_check(self): if self.tail_to_head() == self.master_hash: result = 'Chain is intact' else: result = 'Chain is corrupted' return result # to_s puts a nicely formatted chain into a string def to_s(self): present_block, next_block = self.tail, self.tail.next_block while present_block != self.head: print('{0:s} -- {1:s} -- {2:d} \n'.format(' '.join(present_block.data_array), present_block.time_stamp, present_block.nonce)) present_block = next_block next_block = present_block.next_block if __name__ == "__main__": if type_of_mining == 'Rough': print('The challenge is to find a (binary) hash with {0:d} leading zeroes.'.format(leading_zero_count)) elif type_of_mining == 'Fine': print('The challenge is to find a (decimal) hash < {0:d}.'.format(difficulty)) data = 'Nobody ever went broke underestimating the taste of the American public' a_chain = Chain(data.split()) data = 'To apologize is to lay the foundation for a future offense' a_chain.add_block(data.split()) data = 'Patience: A minor form of dispair disguised as a virtue' a_chain.add_block(data.split()) data = 'War is God\'s way of teaching Americans geography' a_chain.add_block(data.split()) data = 'Happiness: An agreeable sensation arising from contemplating the misery of another' a_chain.add_block(data.split()) data = 'Academy: A modern school where football is taught' a_chain.add_block(data.split()) data = 'Egotist: A person of low taste, more interested in himself than in me' a_chain.add_block(data.split()) print('\nThe master hash is: {0:s}'.format(a_chain.publish_master_hash())) print(a_chain.integrity_check()) print('The uncorrupted chain is, in order of insertion:\n') print(a_chain.to_s()) print('\nFine grained error search:') print(a_chain.fine_grained_corruption_search()) data = 'Fidelity: A virtue peculiar to those who are about to be betrayed' b = a_chain.head.previous_block.previous_block b.data_array = data.split() b.merkle = merkle_tree(b.data_array) if type_of_mining == 'Rough': b.nonce = course_grained_mining(b.merkle + b.time_stamp, diff, leading_zero_count, verbose_level = 1) elif type_of_mining == 'Fine': b.nonce = fine_grained_mining(b.merkle + b.time_stamp, difficulty, verbose_level = 1) b.header = b.merkle + b.time_stamp + str(b.nonce) b.cumulative_hash_header = do_hash(b.previous_block.cumulative_hash_header + b.header) print(a_chain.integrity_check()) print('The corrupted chain is, in order of insertion:\n') print(a_chain.to_s()) print('\nFine grained error search:') print(a_chain.fine_grained_corruption_search()) print('\nAverage nonce count = {0:d} (Predicted = {1:d}).'.format(total_nonce_count // mining_tries, approx_nonce_count))