From be8ecfa7076ee998b7b5b2f5304629d6ac3b0d39 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 20 May 2021 18:03:23 -0300 Subject: [PATCH 1/5] sort keys so helper scripts can send blobs using send_request --- lbry/stream/reflector/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/stream/reflector/client.py b/lbry/stream/reflector/client.py index 459e87b756..a70b5b526a 100644 --- a/lbry/stream/reflector/client.py +++ b/lbry/stream/reflector/client.py @@ -59,7 +59,7 @@ def data_received(self, data): return async def send_request(self, request_dict: typing.Dict, timeout: int = 180): - msg = json.dumps(request_dict) + msg = json.dumps(request_dict, sort_keys=True) try: self.transport.write(msg.encode()) self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout)) From 9bdf3d23e1290da1f50c5969b008d00b7f57ff48 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 20 May 2021 18:03:53 -0300 Subject: [PATCH 2/5] test bug 3296, failing --- lbry/stream/reflector/server.py | 30 +++++++++++++++++------------ tests/unit/stream/test_reflector.py | 16 +++++++++++++-- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/lbry/stream/reflector/server.py b/lbry/stream/reflector/server.py index cd1dbd849e..2f95cfb75b 100644 --- a/lbry/stream/reflector/server.py +++ b/lbry/stream/reflector/server.py @@ -17,7 +17,7 @@ class ReflectorServerProtocol(asyncio.Protocol): def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000, stop_event: asyncio.Event = None, incoming_event: asyncio.Event = None, - not_incoming_event: asyncio.Event = None): + not_incoming_event: asyncio.Event = None, partial_needs=False): self.loop = asyncio.get_event_loop() self.blob_manager = blob_manager self.server_task: asyncio.Task = None @@ -34,6 +34,7 @@ def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000 self.stop_event = stop_event or asyncio.Event(loop=self.loop) self.chunk_size = response_chunk_size self.wait_for_stop_task: typing.Optional[asyncio.Task] = None + self.partial_needs = partial_needs async def wait_for_stop(self): await self.stop_event.wait() @@ -115,10 +116,16 @@ async def handle_request(self, request: typing.Dict): # pylint: disable=too-man if self.writer: self.writer.close_handle() self.writer = None - self.send_response({"send_sd_blob": False, 'needed': [ - blob.blob_hash for blob in self.descriptor.blobs[:-1] - if not self.blob_manager.get_blob(blob.blob_hash).get_is_verified() - ]}) + + needs = [blob.blob_hash + for blob in self.descriptor.blobs[:-1] + if not self.blob_manager.get_blob(blob.blob_hash).get_is_verified()] + print(self.partial_needs, needs) + if needs and self.partial_needs: + needs = needs[:3] + self.partial_needs = False + print(self.partial_needs, needs) + self.send_response({"send_sd_blob": False, 'needed_blobs': needs}) return return elif self.descriptor: @@ -153,7 +160,7 @@ async def handle_request(self, request: typing.Dict): # pylint: disable=too-man class ReflectorServer: def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000, stop_event: asyncio.Event = None, incoming_event: asyncio.Event = None, - not_incoming_event: asyncio.Event = None): + not_incoming_event: asyncio.Event = None, partial_needs=False): self.loop = asyncio.get_event_loop() self.blob_manager = blob_manager self.server_task: typing.Optional[asyncio.Task] = None @@ -163,19 +170,18 @@ def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000 self.not_incoming_event = not_incoming_event or asyncio.Event(loop=self.loop) self.response_chunk_size = response_chunk_size self.stop_event = stop_event + self.partial_needs = partial_needs # for testing cases where it doesn't know what it wants def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'): if self.server_task is not None: raise Exception("already running") async def _start_server(): - server = await self.loop.create_server( - lambda: ReflectorServerProtocol( - self.blob_manager, self.response_chunk_size, self.stop_event, self.incoming_event, - self.not_incoming_event - ), - interface, port + proto = ReflectorServerProtocol( + self.blob_manager, self.response_chunk_size, self.stop_event, self.incoming_event, + self.not_incoming_event, self.partial_needs ) + server = await self.loop.create_server(lambda: proto, interface, port) self.started_listening.set() self.stopped_listening.clear() log.info("Reflector server listening on TCP %s:%i", interface, port) diff --git a/tests/unit/stream/test_reflector.py b/tests/unit/stream/test_reflector.py index 4a928e2d26..0ca653523d 100644 --- a/tests/unit/stream/test_reflector.py +++ b/tests/unit/stream/test_reflector.py @@ -43,9 +43,18 @@ async def asyncSetUp(self): self.stream_manager.config.reflect_streams = False self.stream = await self.stream_manager.create(file_path) - async def _test_reflect_stream(self, response_chunk_size): - reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size) + async def _test_reflect_stream(self, response_chunk_size=50, partial_needs=False): + reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size, + partial_needs=partial_needs) reflector.start_server(5566, '127.0.0.1') + if partial_needs: + server_blob = self.server_blob_manager.get_blob(self.stream.sd_hash) + client_blob = self.blob_manager.get_blob(self.stream.sd_hash) + with client_blob.reader_context() as handle: + server_blob.set_length(client_blob.get_length()) + writer = server_blob.get_blob_writer('nobody', 0) + writer.write(handle.read()) + self.server_blob_manager.blob_completed(server_blob) await reflector.started_listening.wait() self.addCleanup(reflector.stop_server) self.assertEqual(0, self.stream.reflector_progress) @@ -71,6 +80,9 @@ async def _test_reflect_stream(self, response_chunk_size): async def test_reflect_stream(self): return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=50), 3, loop=self.loop) + async def test_reflect_stream_but_reflector_changes_its_mind(self): + return await asyncio.wait_for(self._test_reflect_stream(partial_needs=True), 3, loop=self.loop) + async def test_reflect_stream_small_response_chunks(self): return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=30), 3, loop=self.loop) From 352bf694098345c366bf9628a2ef6ad41b13db56 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 20 May 2021 18:49:20 -0300 Subject: [PATCH 3/5] improve test --- lbry/stream/reflector/server.py | 19 +++++++++---------- tests/unit/stream/test_reflector.py | 5 +++++ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/lbry/stream/reflector/server.py b/lbry/stream/reflector/server.py index 2f95cfb75b..aa41f7bc76 100644 --- a/lbry/stream/reflector/server.py +++ b/lbry/stream/reflector/server.py @@ -17,7 +17,7 @@ class ReflectorServerProtocol(asyncio.Protocol): def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000, stop_event: asyncio.Event = None, incoming_event: asyncio.Event = None, - not_incoming_event: asyncio.Event = None, partial_needs=False): + not_incoming_event: asyncio.Event = None, partial_event: asyncio.Event = None): self.loop = asyncio.get_event_loop() self.blob_manager = blob_manager self.server_task: asyncio.Task = None @@ -34,7 +34,7 @@ def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000 self.stop_event = stop_event or asyncio.Event(loop=self.loop) self.chunk_size = response_chunk_size self.wait_for_stop_task: typing.Optional[asyncio.Task] = None - self.partial_needs = partial_needs + self.partial_event = partial_event async def wait_for_stop(self): await self.stop_event.wait() @@ -120,11 +120,9 @@ async def handle_request(self, request: typing.Dict): # pylint: disable=too-man needs = [blob.blob_hash for blob in self.descriptor.blobs[:-1] if not self.blob_manager.get_blob(blob.blob_hash).get_is_verified()] - print(self.partial_needs, needs) - if needs and self.partial_needs: + if needs and not self.partial_event.is_set(): needs = needs[:3] - self.partial_needs = False - print(self.partial_needs, needs) + self.partial_event.set() self.send_response({"send_sd_blob": False, 'needed_blobs': needs}) return return @@ -177,11 +175,12 @@ def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'): raise Exception("already running") async def _start_server(): - proto = ReflectorServerProtocol( + partial_event = asyncio.Event() + if not self.partial_needs: + partial_event.set() + server = await self.loop.create_server(lambda: ReflectorServerProtocol( self.blob_manager, self.response_chunk_size, self.stop_event, self.incoming_event, - self.not_incoming_event, self.partial_needs - ) - server = await self.loop.create_server(lambda: proto, interface, port) + self.not_incoming_event, partial_event), interface, port) self.started_listening.set() self.stopped_listening.clear() log.info("Reflector server listening on TCP %s:%i", interface, port) diff --git a/tests/unit/stream/test_reflector.py b/tests/unit/stream/test_reflector.py index 0ca653523d..89354b3cc4 100644 --- a/tests/unit/stream/test_reflector.py +++ b/tests/unit/stream/test_reflector.py @@ -60,6 +60,11 @@ async def _test_reflect_stream(self, response_chunk_size=50, partial_needs=False self.assertEqual(0, self.stream.reflector_progress) sent = await self.stream.upload_to_reflector('127.0.0.1', 5566) self.assertEqual(100, self.stream.reflector_progress) + if partial_needs: + self.assertFalse(self.stream.is_fully_reflected) + send_more = await self.stream.upload_to_reflector('127.0.0.1', 5566) + self.assertGreater(0, len(send_more)) + sent.extend(send_more) self.assertSetEqual( set(sent), set(map(lambda b: b.blob_hash, From 1437871d884b88d6b8634b997b1421cf42ea14d2 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 20 May 2021 18:53:35 -0300 Subject: [PATCH 4/5] fix reflector client: only set completed when server says so --- lbry/stream/managed_stream.py | 3 --- tests/unit/stream/test_reflector.py | 5 ++++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index 3e615dfc66..888dca2e26 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -372,9 +372,6 @@ async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]: protocol.transport.close() self.uploading_to_reflector = False - if not self.fully_reflected.is_set(): - self.fully_reflected.set() - await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") return sent async def update_content_claim(self, claim_info: Optional[typing.Dict] = None): diff --git a/tests/unit/stream/test_reflector.py b/tests/unit/stream/test_reflector.py index 89354b3cc4..618f7486fe 100644 --- a/tests/unit/stream/test_reflector.py +++ b/tests/unit/stream/test_reflector.py @@ -63,13 +63,16 @@ async def _test_reflect_stream(self, response_chunk_size=50, partial_needs=False if partial_needs: self.assertFalse(self.stream.is_fully_reflected) send_more = await self.stream.upload_to_reflector('127.0.0.1', 5566) - self.assertGreater(0, len(send_more)) + self.assertGreater(len(send_more), 0) sent.extend(send_more) + sent.append(self.stream.sd_hash) self.assertSetEqual( set(sent), set(map(lambda b: b.blob_hash, self.stream.descriptor.blobs[:-1] + [self.blob_manager.get_blob(self.stream.sd_hash)])) ) + send_more = await self.stream.upload_to_reflector('127.0.0.1', 5566) + self.assertEqual(len(send_more), 0) self.assertTrue(self.stream.is_fully_reflected) server_sd_blob = self.server_blob_manager.get_blob(self.stream.sd_hash) self.assertTrue(server_sd_blob.get_is_verified()) From 142d182bc11734fd566d12c805416a9205d5343e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 20 May 2021 20:11:18 -0300 Subject: [PATCH 5/5] if progress was made, retry without a delay --- lbry/stream/stream_manager.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index cc46849763..a9ce211e6c 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -215,7 +215,7 @@ def reflect_stream(self, stream: ManagedStream, server: Optional[str] = None, server, port = random.choice(self.config.reflector_servers) if stream.sd_hash in self.running_reflector_uploads: return self.running_reflector_uploads[stream.sd_hash] - task = self.loop.create_task(stream.upload_to_reflector(server, port)) + task = self.loop.create_task(self._retriable_reflect_stream(stream, server, port)) self.running_reflector_uploads[stream.sd_hash] = task task.add_done_callback( lambda _: None if stream.sd_hash not in self.running_reflector_uploads else @@ -223,6 +223,12 @@ def reflect_stream(self, stream: ManagedStream, server: Optional[str] = None, ) return task + async def _retriable_reflect_stream(self, stream, host, port): + sent = await stream.upload_to_reflector(host, port) + while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0: + stream.reflector_progress = 0 + sent = await stream.upload_to_reflector(host, port) + async def create(self, file_path: str, key: Optional[bytes] = None, iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: descriptor = await StreamDescriptor.create_stream(