Skip to content

Commit

Permalink
passing loop to asyncio functions is deprecated
Browse files Browse the repository at this point in the history
  • Loading branch information
eukreign committed Oct 18, 2022
1 parent 007e111 commit a1b8746
Show file tree
Hide file tree
Showing 33 changed files with 106 additions and 106 deletions.
6 changes: 3 additions & 3 deletions lbry/blob/blob_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ def __init__(
self.blob_completed_callback = blob_completed_callback
self.blob_directory = blob_directory
self.writers: typing.Dict[typing.Tuple[typing.Optional[str], typing.Optional[int]], HashBlobWriter] = {}
self.verified: asyncio.Event = asyncio.Event(loop=self.loop)
self.writing: asyncio.Event = asyncio.Event(loop=self.loop)
self.verified: asyncio.Event = asyncio.Event()
self.writing: asyncio.Event = asyncio.Event()
self.readers: typing.List[typing.BinaryIO] = []
self.added_on = added_on or time.time()
self.is_mine = is_mine
Expand Down Expand Up @@ -222,7 +222,7 @@ def get_blob_writer(self, peer_address: typing.Optional[str] = None,
peer_port: typing.Optional[int] = None) -> HashBlobWriter:
if (peer_address, peer_port) in self.writers and not self.writers[(peer_address, peer_port)].closed():
raise OSError(f"attempted to download blob twice from {peer_address}:{peer_port}")
fut = asyncio.Future(loop=self.loop)
fut = asyncio.Future()
writer = HashBlobWriter(self.blob_hash, self.get_length, fut)
self.writers[(peer_address, peer_port)] = writer

Expand Down
10 changes: 5 additions & 5 deletions lbry/blob_exchange/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_timeout: typing.Optiona
self.buf = b''

# this is here to handle the race when the downloader is closed right as response_fut gets a result
self.closed = asyncio.Event(loop=self.loop)
self.closed = asyncio.Event()

def data_received(self, data: bytes):
if self.connection_manager:
Expand Down Expand Up @@ -111,7 +111,7 @@ async def _download_blob(self) -> typing.Tuple[int, Optional['BlobExchangeClient
self.transport.write(msg)
if self.connection_manager:
self.connection_manager.sent_data(f"{self.peer_address}:{self.peer_port}", len(msg))
response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout, loop=self.loop)
response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout)
availability_response = response.get_availability_response()
price_response = response.get_price_response()
blob_response = response.get_blob_response()
Expand Down Expand Up @@ -151,7 +151,7 @@ async def _download_blob(self) -> typing.Tuple[int, Optional['BlobExchangeClient
f" timeout in {self.peer_timeout}"
log.debug(msg)
msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}"
await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop)
await asyncio.wait_for(self.writer.finished, self.peer_timeout)
# wait for the io to finish
await self.blob.verified.wait()
log.info("%s at %fMB/s", msg,
Expand Down Expand Up @@ -187,7 +187,7 @@ async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, Optiona
try:
self._blob_bytes_received = 0
self.blob, self.writer = blob, blob.get_blob_writer(self.peer_address, self.peer_port)
self._response_fut = asyncio.Future(loop=self.loop)
self._response_fut = asyncio.Future()
return await self._download_blob()
except OSError:
# i'm not sure how to fix this race condition - jack
Expand Down Expand Up @@ -244,7 +244,7 @@ async def request_blob(loop: asyncio.AbstractEventLoop, blob: Optional['Abstract
try:
if not connected_protocol:
await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port),
peer_connect_timeout, loop=loop)
peer_connect_timeout)
connected_protocol = protocol
if blob is None or blob.get_is_verified() or not blob.is_writeable():
# blob is None happens when we are just opening a connection
Expand Down
6 changes: 3 additions & 3 deletions lbry/blob_exchange/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manag
self.failures: typing.Dict['KademliaPeer', int] = {}
self.connection_failures: typing.Set['KademliaPeer'] = set()
self.connections: typing.Dict['KademliaPeer', 'BlobExchangeClientProtocol'] = {}
self.is_running = asyncio.Event(loop=self.loop)
self.is_running = asyncio.Event()

def should_race_continue(self, blob: 'AbstractBlob'):
max_probes = self.config.max_connections_per_download * (1 if self.connections else 10)
Expand Down Expand Up @@ -65,7 +65,7 @@ async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer

async def new_peer_or_finished(self):
active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)]
await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED')
await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED')

def cleanup_active(self):
if not self.active_connections and not self.connections:
Expand Down Expand Up @@ -126,7 +126,7 @@ def close(self):

async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', dht_node: 'Node',
blob_hash: str) -> 'AbstractBlob':
search_queue = asyncio.Queue(loop=loop, maxsize=config.max_connections_per_download)
search_queue = asyncio.Queue(maxsize=config.max_connections_per_download)
search_queue.put_nowait(blob_hash)
peer_queue, accumulate_task = dht_node.accumulate_peers(search_queue)
fixed_peers = None if not config.fixed_peers else await get_kademlia_peers_from_hosts(config.fixed_peers)
Expand Down
12 changes: 6 additions & 6 deletions lbry/blob_exchange/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager',
self.idle_timeout = idle_timeout
self.transfer_timeout = transfer_timeout
self.server_task: typing.Optional[asyncio.Task] = None
self.started_listening = asyncio.Event(loop=self.loop)
self.started_listening = asyncio.Event()
self.buf = b''
self.transport: typing.Optional[asyncio.Transport] = None
self.lbrycrd_address = lbrycrd_address
self.peer_address_and_port: typing.Optional[str] = None
self.started_transfer = asyncio.Event(loop=self.loop)
self.transfer_finished = asyncio.Event(loop=self.loop)
self.started_transfer = asyncio.Event()
self.transfer_finished = asyncio.Event()
self.close_on_idle_task: typing.Optional[asyncio.Task] = None

async def close_on_idle(self):
while self.transport:
try:
await asyncio.wait_for(self.started_transfer.wait(), self.idle_timeout, loop=self.loop)
await asyncio.wait_for(self.started_transfer.wait(), self.idle_timeout)
except asyncio.TimeoutError:
log.debug("closing idle connection from %s", self.peer_address_and_port)
return self.close()
Expand Down Expand Up @@ -101,7 +101,7 @@ async def handle_request(self, request: BlobRequest):
log.debug("send %s to %s:%i", blob_hash, peer_address, peer_port)
self.started_transfer.set()
try:
sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout, loop=self.loop)
sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout)
if sent and sent > 0:
self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent)
log.info("sent %s (%i bytes) to %s:%i", blob_hash, sent, peer_address, peer_port)
Expand Down Expand Up @@ -157,7 +157,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager',
self.loop = loop
self.blob_manager = blob_manager
self.server_task: typing.Optional[asyncio.Task] = None
self.started_listening = asyncio.Event(loop=self.loop)
self.started_listening = asyncio.Event()
self.lbrycrd_address = lbrycrd_address
self.idle_timeout = idle_timeout
self.transfer_timeout = transfer_timeout
Expand Down
2 changes: 1 addition & 1 deletion lbry/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def _update(self):

while True:
last = time.perf_counter()
await asyncio.sleep(0.1, loop=self.loop)
await asyncio.sleep(0.1)
self._status['incoming_bps'].clear()
self._status['outgoing_bps'].clear()
now = time.perf_counter()
Expand Down
4 changes: 2 additions & 2 deletions lbry/dht/blob_announcer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def _announce(self, batch_size: typing.Optional[int] = 10):
while batch_size:
if not self.node.joined.is_set():
await self.node.joined.wait()
await asyncio.sleep(60, loop=self.loop)
await asyncio.sleep(60)
if not self.node.protocol.routing_table.get_peers():
log.warning("No peers in DHT, announce round skipped")
continue
Expand All @@ -59,7 +59,7 @@ async def _announce(self, batch_size: typing.Optional[int] = 10):
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
while len(self.announce_queue) > 0:
log.info("%i blobs to announce", len(self.announce_queue))
await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)])
announced = list(filter(None, self.announced))
if announced:
await self.storage.update_last_announced_blobs(announced)
Expand Down
14 changes: 7 additions & 7 deletions lbry/dht/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout,
split_buckets_under_index, is_bootstrap_node)
self.listening_port: asyncio.DatagramTransport = None
self.joined = asyncio.Event(loop=self.loop)
self.joined = asyncio.Event()
self._join_task: asyncio.Task = None
self._refresh_task: asyncio.Task = None
self._storage = storage
Expand Down Expand Up @@ -79,7 +79,7 @@ async def refresh_node(self, force_once=False):
else:
if force_once:
break
fut = asyncio.Future(loop=self.loop)
fut = asyncio.Future()
self.loop.call_later(constants.REFRESH_INTERVAL // 4, fut.set_result, None)
await fut
continue
Expand All @@ -93,7 +93,7 @@ async def refresh_node(self, force_once=False):
if force_once:
break

fut = asyncio.Future(loop=self.loop)
fut = asyncio.Future()
self.loop.call_later(constants.REFRESH_INTERVAL, fut.set_result, None)
await fut

Expand All @@ -108,7 +108,7 @@ async def announce_blob(self, blob_hash: str) -> typing.List[bytes]:
for peer in peers:
log.debug("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port)
stored_to_tup = await asyncio.gather(
*(self.protocol.store_to_peer(hash_value, peer) for peer in peers), loop=self.loop
*(self.protocol.store_to_peer(hash_value, peer) for peer in peers)
)
stored_to = [node_id for node_id, contacted in stored_to_tup if contacted]
if stored_to:
Expand Down Expand Up @@ -182,14 +182,14 @@ def peers_from_urls(urls: typing.Optional[typing.List[typing.Tuple[bytes, str, i
for address, udp_port in known_node_urls or []
]))
except socket.gaierror:
await asyncio.sleep(30, loop=self.loop)
await asyncio.sleep(30)
continue

self.protocol.peer_manager.reset()
self.protocol.ping_queue.enqueue_maybe_ping(*seed_peers, delay=0.0)
await self.peer_search(self.protocol.node_id, shortlist=seed_peers, count=32)

await asyncio.sleep(1, loop=self.loop)
await asyncio.sleep(1)

def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None):
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
Expand Down Expand Up @@ -271,7 +271,7 @@ async def put_into_result_queue_after_pong(_peer):
def accumulate_peers(self, search_queue: asyncio.Queue,
peer_queue: typing.Optional[asyncio.Queue] = None
) -> typing.Tuple[asyncio.Queue, asyncio.Task]:
queue = peer_queue or asyncio.Queue(loop=self.loop)
queue = peer_queue or asyncio.Queue()
return queue, self.loop.create_task(self._accumulate_peers_for_value(search_queue, queue))


Expand Down
2 changes: 1 addition & 1 deletion lbry/dht/protocol/iterative_find.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop,
self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key)

self.iteration_queue = asyncio.Queue(loop=self.loop)
self.iteration_queue = asyncio.Queue()

self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
self.iteration_count = 0
Expand Down
8 changes: 4 additions & 4 deletions lbry/dht/protocol/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ async def _process(self): # send up to 1 ping per second
del self._pending_contacts[peer]
self.maybe_ping(peer)
break
await asyncio.sleep(1, loop=self._loop)
await asyncio.sleep(1)

def start(self):
assert not self._running
Expand Down Expand Up @@ -319,10 +319,10 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self.ping_queue = PingQueue(self.loop, self)
self.node_rpc = KademliaRPC(self, self.loop, self.peer_port)
self.rpc_timeout = rpc_timeout
self._split_lock = asyncio.Lock(loop=self.loop)
self._split_lock = asyncio.Lock()
self._to_remove: typing.Set['KademliaPeer'] = set()
self._to_add: typing.Set['KademliaPeer'] = set()
self._wakeup_routing_task = asyncio.Event(loop=self.loop)
self._wakeup_routing_task = asyncio.Event()
self.maintaing_routing_task: typing.Optional[asyncio.Task] = None

@functools.lru_cache(128)
Expand Down Expand Up @@ -385,7 +385,7 @@ async def routing_table_task(self):
while self._to_add:
async with self._split_lock:
await self._add_peer(self._to_add.pop())
await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1, loop=self.loop), loop=self.loop)
await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1))
self._wakeup_routing_task.clear()

def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):
Expand Down
2 changes: 1 addition & 1 deletion lbry/extras/daemon/componentmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, conf: Config, analytics_manager=None, skip_components=None,
self.analytics_manager = analytics_manager
self.component_classes = {}
self.components = set()
self.started = asyncio.Event(loop=self.loop)
self.started = asyncio.Event()
self.peer_manager = peer_manager or PeerManager(asyncio.get_event_loop_policy().get_event_loop())

for component_name, component_class in self.default_component_classes.items():
Expand Down
4 changes: 2 additions & 2 deletions lbry/extras/daemon/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ async def _repeatedly_maintain_redirects(self, now=True):
while True:
if now:
await self._maintain_redirects()
await asyncio.sleep(360, loop=self.component_manager.loop)
await asyncio.sleep(360)

async def _maintain_redirects(self):
# setup the gateway if necessary
Expand Down Expand Up @@ -673,7 +673,7 @@ async def stop(self):
log.info("Removing upnp redirects: %s", self.upnp_redirects)
await asyncio.wait([
self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
], loop=self.component_manager.loop)
])
if self._maintain_redirects_task and not self._maintain_redirects_task.done():
self._maintain_redirects_task.cancel()

Expand Down
3 changes: 1 addition & 2 deletions lbry/file/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
stream.set_claim(claim_info, claim)
if save_file:
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download),
loop=self.loop)
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download))
return stream
except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash)
Expand Down
8 changes: 4 additions & 4 deletions lbry/file/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: '
self.analytics_manager = analytics_manager
self.downloader = None

self.saving = asyncio.Event(loop=self.loop)
self.finished_writing = asyncio.Event(loop=self.loop)
self.started_writing = asyncio.Event(loop=self.loop)
self.finished_write_attempt = asyncio.Event(loop=self.loop)
self.saving = asyncio.Event()
self.finished_writing = asyncio.Event()
self.started_writing = asyncio.Event()
self.finished_write_attempt = asyncio.Event()

# @classmethod
# async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', file_path: str,
Expand Down
2 changes: 1 addition & 1 deletion lbry/file/source_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: '
self.storage = storage
self.analytics_manager = analytics_manager
self._sources: typing.Dict[str, ManagedDownloadSource] = {}
self.started = asyncio.Event(loop=self.loop)
self.started = asyncio.Event()

def add(self, source: ManagedDownloadSource):
self._sources[source.identifier] = source
Expand Down
8 changes: 4 additions & 4 deletions lbry/stream/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manag
self.config = config
self.blob_manager = blob_manager
self.sd_hash = sd_hash
self.search_queue = asyncio.Queue(loop=loop) # blob hashes to feed into the iterative finder
self.peer_queue = asyncio.Queue(loop=loop) # new peers to try
self.search_queue = asyncio.Queue() # blob hashes to feed into the iterative finder
self.peer_queue = asyncio.Queue() # new peers to try
self.blob_downloader = BlobDownloader(self.loop, self.config, self.blob_manager, self.peer_queue)
self.descriptor: typing.Optional[StreamDescriptor] = descriptor
self.node: typing.Optional['Node'] = None
Expand Down Expand Up @@ -72,7 +72,7 @@ async def load_descriptor(self, connection_id: int = 0):
now = self.loop.time()
sd_blob = await asyncio.wait_for(
self.blob_downloader.download_blob(self.sd_hash, connection_id),
self.config.blob_download_timeout, loop=self.loop
self.config.blob_download_timeout
)
log.info("downloaded sd blob %s", self.sd_hash)
self.time_to_descriptor = self.loop.time() - now
Expand Down Expand Up @@ -111,7 +111,7 @@ async def download_stream_blob(self, blob_info: 'BlobInfo', connection_id: int =
raise ValueError(f"blob {blob_info.blob_hash} is not part of stream with sd hash {self.sd_hash}")
blob = await asyncio.wait_for(
self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length, connection_id),
self.config.blob_download_timeout * 10, loop=self.loop
self.config.blob_download_timeout * 10
)
return blob

Expand Down
Loading

0 comments on commit a1b8746

Please sign in to comment.