Skip to content

Commit

Permalink
Issue chromaway#135: make a threaded electrum connector
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmysong committed Feb 28, 2014
1 parent cac5cf9 commit 334a042
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 44 deletions.
2 changes: 1 addition & 1 deletion ngcccbase/color.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, config):
except Exception as e:
# use Electrum to request transactions
self.blockchain_state = EnhancedBlockchainState(
"electrum.cafebitcoin.com", 50001)
"electrum.datemas.de", 50001)

self.store_conn = DataStoreConnection(
params.get("colordb_path", "color.db"))
Expand Down
107 changes: 65 additions & 42 deletions ngcccbase/services/electrum.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
from bitcoin.core import b2x as to_hex
from coloredcoinlib import blockchain

import bitcoin.rpc
import json
import Queue
import socket
import sys
import threading
import time
import traceback
import urllib2
import bitcoin.rpc


class ConnectionError(Exception):
Expand All @@ -41,7 +43,11 @@ def __init__(self, host, port, debug=False):
self.connection = (host, port)
self.debug = debug
self.is_connected = False
self.q = Queue.Queue()
self.connect()
self.t = threading.Thread(target=self.grab_responses)
self.t.daemon = True
self.t.start()

def connect(self):
"""Connects to an electrum server via TCP.
Expand All @@ -63,19 +69,16 @@ def connect(self):
print ("connected to %s:%s" % self.connection ) # pragma: no cover
return True

def wait_for_response(self, target_id):
"""Get a response message from an electrum server with
the id of <target_id>
def grab_responses(self):
"""Get a response message from an electrum server and push onto queue
"""
try:
out = ''
while self.is_connected or self.connect():
try:
msg = self.sock.recv(1024)
if self.debug:
print (msg) # pragma: no cover
except socket.timeout: # pragma: no cover
print ("socket timed out") # pragma: no cover
print ("socket timed out") # pragma: no cover
self.is_connected = False # pragma: no cover
continue # pragma: no cover
except socket.error: # pragma: no cover
Expand All @@ -92,30 +95,28 @@ def wait_for_response(self, target_id):
# the last one isn't complete
out = raw_messages.pop()
for raw_message in raw_messages:
message = json.loads(raw_message)
try:
message = json.loads(raw_message)
except ValueError:
continue

id = message.get('id')
error = message.get('error')
result = message.get('result')

if id == target_id:
if error:
raise Exception( # pragma: no cover
"received error %s" % message) # pragma: no cover
else:
return result
if error:
raise Exception( # pragma: no cover
"received error %s" % message) # pragma: no cover
else:
# just print it for now
print (message) # pragma: no cover
if self.debug:
print "%s: %s" % (id, result) # pragma: no cover
self.q.put([id, result])
except: # pragma: no cover
traceback.print_exc(file=sys.stdout) # pragma: no cover

self.is_connected = False # pragma: no cover

def get_response(self, method, params):
"""Given a message that consists of <method> which
has <params>,
Return the string response of the message sent to electrum"""
def add_request(self, method, params):
"""Send a tx request to electrum and not wait
"""
current_id = self.message_counter
self.message_counter += 1
try:
Expand All @@ -128,39 +129,61 @@ def get_response(self, method, params):
except socket.error: # pragma: no cover
traceback.print_exc(file=sys.stdout) # pragma: no cover
return None # pragma: no cover
return self.wait_for_response(current_id)
return current_id

def get_response(self, target_id, blocking=False):
id, raw = self.q.get(blocking)
while id:
if id == target_id:
return raw
try:
self.q.put(id, raw)
id, raw = self.q.get(blocking)
except:
id = None
return None

def get_immediate_response(self, method, params):
target_id = self.add_request(method, params)
response = self.get_response(target_id, True)
self.q.task_done()
return response

def get_version(self):
"""Get the server version of the electrum server
that it's connected to.
"""
return self.get_response('server.version', ["1.9", "0.6"])

def get_raw_transaction(self, tx_id, height):
"""Get the raw transaction that has the transaction hash
of <tx_id> and height <height>.
Note you may need to use another method to get the height
from the transaction id hash.
"""
return self.get_response('blockchain.transaction.get', [tx_id, height])
return self.get_immediate_response('server.version', ["1.9", "0.6"])

def get_utxo(self, address):
"""Gets all the Unspent Transaction Outs from a given <address>
"""
script_pubkey = CBitcoinAddress(address).to_scriptPubKey()
txs = self.get_response('blockchain.address.get_history', [address])
txs = self.get_immediate_response('blockchain.address.get_history', [address])
if not txs:
return []
spent = {}
utxos = []
# start grabbing responses
ids = []
# add the requests in parallel
for tx in txs:
raw = self.get_raw_transaction(tx['tx_hash'], tx['height'])
data = CTransaction.deserialize(to_binary(raw))
for vin in data.vin:
spent[(to_little_endian_hex(vin.prevout.hash),
vin.prevout.n)] = 1
for outindex, vout in enumerate(data.vout):
if vout.scriptPubKey == script_pubkey:
utxos += [(tx['tx_hash'], outindex, vout.nValue,
to_hex(vout.scriptPubKey))]
id = self.add_request('blockchain.transaction.get',
(tx['tx_hash'], tx['height']))
ids.append((id,tx))

for id, tx in ids:
raw = self.get_response(id, True)
if tx:
data = CTransaction.deserialize(to_binary(raw))
for vin in data.vin:
spent[(to_little_endian_hex(vin.prevout.hash),
vin.prevout.n)] = 1
for outindex, vout in enumerate(data.vout):
if vout.scriptPubKey == script_pubkey:
utxos += [(tx['tx_hash'], outindex, vout.nValue,
to_hex(vout.scriptPubKey))]
self.q.task_done()
return [u for u in utxos if not u[0:2] in spent]


Expand Down
2 changes: 1 addition & 1 deletion ngcccbase/utxodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import urllib2
import json

DEFAULT_ELECTRUM_SERVER = "btc.it-zone.org"
DEFAULT_ELECTRUM_SERVER = "electrum.datemas.de"
DEFAULT_ELECTRUM_PORT = 50001


Expand Down

0 comments on commit 334a042

Please sign in to comment.