Skip to content

Commit

Permalink
remove problematic "oneshot" methods (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
Graeme22 authored Jun 22, 2023
1 parent ce14929 commit 88f126b
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 64 deletions.
14 changes: 12 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ The streamer is a websocket connection to the Tastytrade API that allows you to
subs_list = ['SPY', 'SPX']
# this function fetches quotes once, then closes the subscription
quotes = await streamer.oneshot(EventType.QUOTE, subs_list)
await streamer.subscribe(EventType.QUOTE, subs_list)
quotes = []
async for quote in streamer.listen():
quotes.append(quote)
if len(quotes) >= len(subs_list):
break
print(quotes)
>>> [Quote(eventSymbol='SPY', eventTime=0, sequence=0, timeNanoPart=0, bidTime=0, bidExchangeCode='Q', bidPrice=411.58, bidSize=400.0, askTime=0, askExchangeCode='Q', askPrice=411.6, askSize=1313.0), Quote(eventSymbol='SPX', eventTime=0, sequence=0, timeNanoPart=0, bidTime=0, bidExchangeCode='\x00', bidPrice=4122.49, bidSize='NaN', askTime=0, askExchangeCode='\x00', askPrice=4123.65, askSize='NaN')]
Expand Down Expand Up @@ -115,7 +120,12 @@ Options chain/streaming greeks
chain = get_option_chain(session, 'SPLG')
subs_list = [chain[date(2023, 6, 16)][0].streamer_symbol]
greeks = await streamer.oneshot(EventType.GREEKS, subs_list)
await streamer.subscribe(EventType.GREEKS, subs_list)
greeks = []
async for greek in streamer.listen():
greeks.append(greek)
if len(greeks) >= len(subs_list):
break
print(greeks)
>>> [Greeks(eventSymbol='.SPLG230616C23', eventTime=0, eventFlags=0, index=7235129486797176832, time=1684559855338, sequence=0, price=26.3380972233688, volatility=0.396983376650804, delta=0.999999999996191, gamma=4.81989763184255e-12, theta=-2.5212017514875e-12, rho=0.01834504287973133, vega=3.7003015672215e-12)]
Expand Down
2 changes: 1 addition & 1 deletion tastytrade/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

API_URL = 'https://api.tastyworks.com'
CERT_URL = 'https://api.cert.tastyworks.com'
VERSION = '5.5'
VERSION = '5.6'


logger = logging.getLogger(__name__)
Expand Down
68 changes: 7 additions & 61 deletions tastytrade/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,12 @@ class DataStreamer:
streamer = await DataStreamer.create(session)
subs = ['SPY', 'GLD'] # list of quotes to fetch
quote = await streamer.oneshot(EventType.QUOTE, subs)
await streamer.subscribe(EventType.QUOTE, subs)
quotes = []
async for quote in streamer.listen():
quotes.append(quote)
if len(quotes) >= len(subs):
break
"""
def __init__(self, session: Session):
Expand Down Expand Up @@ -286,17 +291,6 @@ async def create(cls, session: Session) -> 'DataStreamer':
while not self.client_id:
await asyncio.sleep(0.1)

# see Github issue #45:
# once the handshake completes, although setup is completed locally, remotely there
# is still some kind of setup process that hasn't happened that takes about 8-9
# seconds, and afterwards you're good to go. Unfortunately, there's no way to know
# when that process concludes remotely, as there's no kind of confirmation message
# sent. This is a hacky solution to ensure streamer setup completes.
await self.oneshot(EventType.QUOTE, ['SPY'])
# clear queue if there's any lingering data
while not self._queue.empty():
self._queue.get_nowait()

return self

async def _next_id(self):
Expand Down Expand Up @@ -410,8 +404,7 @@ async def _heartbeat(self) -> None:

async def subscribe(self, event_type: EventType, symbols: list[str], reset: bool = False) -> None:
"""
Subscribes to quotes for given list of symbols. Used for recurring data feeds;
if you just want to get a one-time quote, use :meth:`oneshot`.
Subscribes to quotes for given list of symbols. Used for recurring data feeds.
:param event_type: type of subscription to add
:param symbols: list of symbols to subscribe for
Expand Down Expand Up @@ -451,31 +444,6 @@ async def unsubscribe(self, event_type: EventType, symbols: list[str]) -> None:
logger.debug('sending unsubscription: %s', message)
await self._websocket.send(json.dumps([message]))

async def oneshot(self, event_type: EventType, symbols: list[str]) -> list[Event]:
"""
Using the given information, subscribes to the list of symbols passed, streams
the requested information once, then unsubscribes. If you want to maintain the
subscription open, add a subscription with :meth:`subscribe` and listen with
:meth:`listen`.
If you use this alongside :meth:`subscribe` and :meth:`listen`, you will get
some unexpected behavior. Most apps should use either this or :meth:`listen`
but not both.
:param event_type: the type of subscription to stream, either greeks or quotes
:param symbols: list of symbols to subscribe to
:return: list of :class:`~tastytrade.dxfeed.event.Event`s pulled.
"""
await self.subscribe(event_type, symbols)
data = []
async for item in self.listen():
data.append(item)
if len(data) >= len(symbols):
break
await self.unsubscribe(event_type, symbols)
return data

async def subscribe_candle(self, ticker: str, start_time: datetime, interval: str) -> None:
"""
Subscribes to candle-style 'OHLC' data for the given symbol.
Expand Down Expand Up @@ -520,28 +488,6 @@ async def unsubscribe_candle(self, ticker: str, interval: str) -> None:
logger.debug('sending unsubscription: %s', message)
await self._websocket.send(json.dumps([message]))

async def oneshot_candle(self, ticker: str, start_time: datetime, interval: str) -> list[Candle]:
"""
Subscribes to candle-style 'OHLC' data for the given symbol, waits for
the complete range to be received, then unsubscribes.
:param ticker: symbol to get date for
:param start_time: starting time for the data range
:param interval: the width of each candle in time, e.g. '5m', '1h', '3d', '1w', '1mo'
"""
await self.subscribe_candle(ticker, start_time, interval)
candles = []
async for candle in self.listen_candle():
candles.append(candle)
# until we hit the start date, keep going
# use timestamp to support timezone in start_time
if candle.time <= start_time.timestamp() * 1000:
break
await self.unsubscribe_candle(ticker, interval)

candles.reverse()
return candles

def _map_message(self, message) -> list[Event]:
"""
Takes the raw JSON data and returns a list of parsed :class:`~tastytrade.dxfeed.event.Event` objects.
Expand Down

0 comments on commit 88f126b

Please sign in to comment.