From 334a0426a719be27ffe845b2eea18c1097b47fd4 Mon Sep 17 00:00:00 2001 From: Jimmy Song Date: Thu, 27 Feb 2014 22:34:33 -0600 Subject: [PATCH] Issue #135: make a threaded electrum connector --- ngcccbase/color.py | 2 +- ngcccbase/services/electrum.py | 107 ++++++++++++++++++++------------- ngcccbase/utxodb.py | 2 +- 3 files changed, 67 insertions(+), 44 deletions(-) diff --git a/ngcccbase/color.py b/ngcccbase/color.py index 74027c3..cb7ed43 100644 --- a/ngcccbase/color.py +++ b/ngcccbase/color.py @@ -42,7 +42,7 @@ def __init__(self, config): except Exception as e: # use Electrum to request transactions self.blockchain_state = EnhancedBlockchainState( - "electrum.cafebitcoin.com", 50001) + "electrum.datemas.de", 50001) self.store_conn = DataStoreConnection( params.get("colordb_path", "color.db")) diff --git a/ngcccbase/services/electrum.py b/ngcccbase/services/electrum.py index 9940dae..b719262 100644 --- a/ngcccbase/services/electrum.py +++ b/ngcccbase/services/electrum.py @@ -16,13 +16,15 @@ from bitcoin.core import b2x as to_hex from coloredcoinlib import blockchain +import bitcoin.rpc import json +import Queue import socket import sys +import threading import time import traceback import urllib2 -import bitcoin.rpc class ConnectionError(Exception): @@ -41,7 +43,11 @@ def __init__(self, host, port, debug=False): self.connection = (host, port) self.debug = debug self.is_connected = False + self.q = Queue.Queue() self.connect() + self.t = threading.Thread(target=self.grab_responses) + self.t.daemon = True + self.t.start() def connect(self): """Connects to an electrum server via TCP. @@ -63,19 +69,16 @@ def connect(self): print ("connected to %s:%s" % self.connection ) # pragma: no cover return True - def wait_for_response(self, target_id): - """Get a response message from an electrum server with - the id of + def grab_responses(self): + """Get a response message from an electrum server and push onto queue """ try: out = '' while self.is_connected or self.connect(): try: msg = self.sock.recv(1024) - if self.debug: - print (msg) # pragma: no cover except socket.timeout: # pragma: no cover - print ("socket timed out") # pragma: no cover + print ("socket timed out") # pragma: no cover self.is_connected = False # pragma: no cover continue # pragma: no cover except socket.error: # pragma: no cover @@ -92,30 +95,28 @@ def wait_for_response(self, target_id): # the last one isn't complete out = raw_messages.pop() for raw_message in raw_messages: - message = json.loads(raw_message) + try: + message = json.loads(raw_message) + except ValueError: + continue id = message.get('id') error = message.get('error') result = message.get('result') - if id == target_id: - if error: - raise Exception( # pragma: no cover - "received error %s" % message) # pragma: no cover - else: - return result + if error: + raise Exception( # pragma: no cover + "received error %s" % message) # pragma: no cover else: - # just print it for now - print (message) # pragma: no cover + if self.debug: + print "%s: %s" % (id, result) # pragma: no cover + self.q.put([id, result]) except: # pragma: no cover traceback.print_exc(file=sys.stdout) # pragma: no cover - self.is_connected = False # pragma: no cover - - def get_response(self, method, params): - """Given a message that consists of which - has , - Return the string response of the message sent to electrum""" + def add_request(self, method, params): + """Send a tx request to electrum and not wait + """ current_id = self.message_counter self.message_counter += 1 try: @@ -128,39 +129,61 @@ def get_response(self, method, params): except socket.error: # pragma: no cover traceback.print_exc(file=sys.stdout) # pragma: no cover return None # pragma: no cover - return self.wait_for_response(current_id) + return current_id + + def get_response(self, target_id, blocking=False): + id, raw = self.q.get(blocking) + while id: + if id == target_id: + return raw + try: + self.q.put(id, raw) + id, raw = self.q.get(blocking) + except: + id = None + return None + + def get_immediate_response(self, method, params): + target_id = self.add_request(method, params) + response = self.get_response(target_id, True) + self.q.task_done() + return response def get_version(self): """Get the server version of the electrum server that it's connected to. """ - return self.get_response('server.version', ["1.9", "0.6"]) - - def get_raw_transaction(self, tx_id, height): - """Get the raw transaction that has the transaction hash - of and height . - Note you may need to use another method to get the height - from the transaction id hash. - """ - return self.get_response('blockchain.transaction.get', [tx_id, height]) + return self.get_immediate_response('server.version', ["1.9", "0.6"]) def get_utxo(self, address): """Gets all the Unspent Transaction Outs from a given
""" script_pubkey = CBitcoinAddress(address).to_scriptPubKey() - txs = self.get_response('blockchain.address.get_history', [address]) + txs = self.get_immediate_response('blockchain.address.get_history', [address]) + if not txs: + return [] spent = {} utxos = [] + # start grabbing responses + ids = [] + # add the requests in parallel for tx in txs: - raw = self.get_raw_transaction(tx['tx_hash'], tx['height']) - data = CTransaction.deserialize(to_binary(raw)) - for vin in data.vin: - spent[(to_little_endian_hex(vin.prevout.hash), - vin.prevout.n)] = 1 - for outindex, vout in enumerate(data.vout): - if vout.scriptPubKey == script_pubkey: - utxos += [(tx['tx_hash'], outindex, vout.nValue, - to_hex(vout.scriptPubKey))] + id = self.add_request('blockchain.transaction.get', + (tx['tx_hash'], tx['height'])) + ids.append((id,tx)) + + for id, tx in ids: + raw = self.get_response(id, True) + if tx: + data = CTransaction.deserialize(to_binary(raw)) + for vin in data.vin: + spent[(to_little_endian_hex(vin.prevout.hash), + vin.prevout.n)] = 1 + for outindex, vout in enumerate(data.vout): + if vout.scriptPubKey == script_pubkey: + utxos += [(tx['tx_hash'], outindex, vout.nValue, + to_hex(vout.scriptPubKey))] + self.q.task_done() return [u for u in utxos if not u[0:2] in spent] diff --git a/ngcccbase/utxodb.py b/ngcccbase/utxodb.py index 28c4eba..7ef331b 100644 --- a/ngcccbase/utxodb.py +++ b/ngcccbase/utxodb.py @@ -31,7 +31,7 @@ import urllib2 import json -DEFAULT_ELECTRUM_SERVER = "btc.it-zone.org" +DEFAULT_ELECTRUM_SERVER = "electrum.datemas.de" DEFAULT_ELECTRUM_PORT = 50001