Skip to content

Commit

Permalink
Refine subscribe and unsubscribe for Bybit (#2105)
Browse files Browse the repository at this point in the history
  • Loading branch information
sunlei authored Dec 13, 2024
1 parent 6112080 commit 9103127
Showing 1 changed file with 29 additions and 84 deletions.
113 changes: 29 additions & 84 deletions nautilus_trader/adapters/bybit/websocket/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,7 @@ async def _authenticate(self) -> None:
self._log.debug("Waiting for private channel authentication")
await asyncio.sleep(0.1)

################################################################################
# Public
################################################################################

async def subscribe_order_book(self, symbol: str, depth: int) -> None:
subscription = f"orderbook.{depth}.{symbol}"
async def _subscribe(self, subscription: str) -> None:
if subscription in self._subscriptions:
self._log.warning(f"Cannot subscribe '{subscription}': already subscribed")
return
Expand All @@ -197,124 +192,74 @@ async def subscribe_order_book(self, symbol: str, depth: int) -> None:
msg = {"op": "subscribe", "args": [subscription]}
await self._send(msg)

async def subscribe_trades(self, symbol: str) -> None:
subscription = f"publicTrade.{symbol}"
if subscription in self._subscriptions:
self._log.warning(f"Cannot subscribe '{subscription}': already subscribed")
async def _unsubscribe(self, subscription: str) -> None:
if subscription not in self._subscriptions:
self._log.warning(f"Cannot unsubscribe '{subscription}': not subscribed")
return

self._subscriptions.append(subscription)
msg = {"op": "subscribe", "args": [subscription]}
self._subscriptions.remove(subscription)
msg = {"op": "unsubscribe", "args": [subscription]}
await self._send(msg)

################################################################################
# Public
################################################################################

async def subscribe_order_book(self, symbol: str, depth: int) -> None:
subscription = f"orderbook.{depth}.{symbol}"
await self._subscribe(subscription)

async def subscribe_trades(self, symbol: str) -> None:
subscription = f"publicTrade.{symbol}"
await self._subscribe(subscription)

async def subscribe_tickers(self, symbol: str) -> None:
subscription = f"tickers.{symbol}"
if subscription in self._subscriptions:
self._log.warning(f"Cannot subscribe '{subscription}': already subscribed")
return

self._subscriptions.append(subscription)
msg = {"op": "subscribe", "args": [subscription]}
await self._send(msg)
await self._subscribe(subscription)

async def subscribe_klines(self, symbol: str, interval: str) -> None:
subscription = f"kline.{interval}.{symbol}"
if subscription in self._subscriptions:
self._log.warning(f"Cannot subscribe '{subscription}': already subscribed")
return

self._subscriptions.append(subscription)
msg = {"op": "subscribe", "args": [subscription]}
await self._send(msg)
await self._subscribe(subscription)

async def unsubscribe_order_book(self, symbol: str, depth: int) -> None:
subscription = f"orderbook.{depth}.{symbol}"
if subscription not in self._subscriptions:
self._log.warning(f"Cannot unsubscribe '{subscription}': not subscribed")
return

self._subscriptions.remove(subscription)
msg = {"op": "unsubscribe", "args": [subscription]}
await self._send(msg)
await self._unsubscribe(subscription)

async def unsubscribe_trades(self, symbol: str) -> None:
subscription = f"publicTrade.{symbol}"
if subscription not in self._subscriptions:
self._log.warning(f"Cannot unsubscribe '{subscription}': not subscribed")
return

self._subscriptions.remove(subscription)
msg = {"op": "unsubscribe", "args": [subscription]}
await self._send(msg)
await self._unsubscribe(subscription)

async def unsubscribe_tickers(self, symbol: str) -> None:
subscription = f"tickers.{symbol}"
if subscription not in self._subscriptions:
self._log.warning(f"Cannot unsubscribe '{subscription}': not subscribed")
return

self._subscriptions.remove(subscription)
msg = {"op": "unsubscribe", "args": [subscription]}
await self._send(msg)
await self._unsubscribe(subscription)

async def unsubscribe_klines(self, symbol: str, interval: str) -> None:
subscription = f"kline.{interval}.{symbol}"
if subscription not in self._subscriptions:
self._log.warning(f"Cannot unsubscribe '{subscription}': not subscribed")
return

self._subscriptions.remove(subscription)
msg = {"op": "unsubscribe", "args": [subscription]}
await self._send(msg)
await self._unsubscribe(subscription)

################################################################################
# Private
################################################################################

async def subscribe_account_position_update(self) -> None:
subscription = "position"
if subscription in self._subscriptions:
return

self._subscriptions.append(subscription)
msg = {"op": "subscribe", "args": [subscription]}
await self._send(msg)
await self._subscribe(subscription)

async def subscribe_orders_update(self) -> None:
subscription = "order"
if subscription in self._subscriptions:
return

self._subscriptions.append(subscription)
msg = {"op": "subscribe", "args": [subscription]}
await self._send(msg)
await self._subscribe(subscription)

async def subscribe_executions_update(self) -> None:
subscription = "execution"
if subscription in self._subscriptions:
return

self._subscriptions.append(subscription)
msg = {"op": "subscribe", "args": [subscription]}
await self._send(msg)
await self._subscribe(subscription)

async def subscribe_executions_update_fast(self) -> None:
subscription = "execution.fast"
if subscription in self._subscriptions:
return

self._subscriptions.append(subscription)
msg = {"op": "subscribe", "args": [subscription]}
await self._send(msg)
await self._subscribe(subscription)

async def subscribe_wallet_update(self) -> None:
subscription = "wallet"
if subscription in self._subscriptions:
return

self._subscriptions.append(subscription)
msg = {"op": "subscribe", "args": [subscription]}
await self._send(msg)
await self._subscribe(subscription)

def _get_signature(self):
expires = self._clock.timestamp_ms() + 5_000
Expand Down

0 comments on commit 9103127

Please sign in to comment.