Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't set stream as reflected until reflector says it doesn't need any blob #3308

Merged
merged 5 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions lbry/stream/managed_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,6 @@ async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]:
protocol.transport.close()
self.uploading_to_reflector = False

if not self.fully_reflected.is_set():
self.fully_reflected.set()
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
return sent

async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
Expand Down
2 changes: 1 addition & 1 deletion lbry/stream/reflector/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def data_received(self, data):
return

async def send_request(self, request_dict: typing.Dict, timeout: int = 180):
msg = json.dumps(request_dict)
msg = json.dumps(request_dict, sort_keys=True)
try:
self.transport.write(msg.encode())
self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout))
Expand Down
31 changes: 18 additions & 13 deletions lbry/stream/reflector/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
class ReflectorServerProtocol(asyncio.Protocol):
def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000,
stop_event: asyncio.Event = None, incoming_event: asyncio.Event = None,
not_incoming_event: asyncio.Event = None):
not_incoming_event: asyncio.Event = None, partial_event: asyncio.Event = None):
self.loop = asyncio.get_event_loop()
self.blob_manager = blob_manager
self.server_task: asyncio.Task = None
Expand All @@ -34,6 +34,7 @@ def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000
self.stop_event = stop_event or asyncio.Event(loop=self.loop)
self.chunk_size = response_chunk_size
self.wait_for_stop_task: typing.Optional[asyncio.Task] = None
self.partial_event = partial_event

async def wait_for_stop(self):
await self.stop_event.wait()
Expand Down Expand Up @@ -115,10 +116,14 @@ async def handle_request(self, request: typing.Dict): # pylint: disable=too-man
if self.writer:
self.writer.close_handle()
self.writer = None
self.send_response({"send_sd_blob": False, 'needed': [
blob.blob_hash for blob in self.descriptor.blobs[:-1]
if not self.blob_manager.get_blob(blob.blob_hash).get_is_verified()
]})

needs = [blob.blob_hash
for blob in self.descriptor.blobs[:-1]
if not self.blob_manager.get_blob(blob.blob_hash).get_is_verified()]
if needs and not self.partial_event.is_set():
needs = needs[:3]
self.partial_event.set()
self.send_response({"send_sd_blob": False, 'needed_blobs': needs})
return
return
elif self.descriptor:
Expand Down Expand Up @@ -153,7 +158,7 @@ async def handle_request(self, request: typing.Dict): # pylint: disable=too-man
class ReflectorServer:
def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000,
stop_event: asyncio.Event = None, incoming_event: asyncio.Event = None,
not_incoming_event: asyncio.Event = None):
not_incoming_event: asyncio.Event = None, partial_needs=False):
self.loop = asyncio.get_event_loop()
self.blob_manager = blob_manager
self.server_task: typing.Optional[asyncio.Task] = None
Expand All @@ -163,19 +168,19 @@ def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000
self.not_incoming_event = not_incoming_event or asyncio.Event(loop=self.loop)
self.response_chunk_size = response_chunk_size
self.stop_event = stop_event
self.partial_needs = partial_needs # for testing cases where it doesn't know what it wants

def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
if self.server_task is not None:
raise Exception("already running")

async def _start_server():
server = await self.loop.create_server(
lambda: ReflectorServerProtocol(
self.blob_manager, self.response_chunk_size, self.stop_event, self.incoming_event,
self.not_incoming_event
),
interface, port
)
partial_event = asyncio.Event()
if not self.partial_needs:
partial_event.set()
server = await self.loop.create_server(lambda: ReflectorServerProtocol(
self.blob_manager, self.response_chunk_size, self.stop_event, self.incoming_event,
self.not_incoming_event, partial_event), interface, port)
self.started_listening.set()
self.stopped_listening.clear()
log.info("Reflector server listening on TCP %s:%i", interface, port)
Expand Down
8 changes: 7 additions & 1 deletion lbry/stream/stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,20 @@ def reflect_stream(self, stream: ManagedStream, server: Optional[str] = None,
server, port = random.choice(self.config.reflector_servers)
if stream.sd_hash in self.running_reflector_uploads:
return self.running_reflector_uploads[stream.sd_hash]
task = self.loop.create_task(stream.upload_to_reflector(server, port))
task = self.loop.create_task(self._retriable_reflect_stream(stream, server, port))
self.running_reflector_uploads[stream.sd_hash] = task
task.add_done_callback(
lambda _: None if stream.sd_hash not in self.running_reflector_uploads else
self.running_reflector_uploads.pop(stream.sd_hash)
)
return task

async def _retriable_reflect_stream(self, stream, host, port):
sent = await stream.upload_to_reflector(host, port)
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
stream.reflector_progress = 0
sent = await stream.upload_to_reflector(host, port)

async def create(self, file_path: str, key: Optional[bytes] = None,
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
descriptor = await StreamDescriptor.create_stream(
Expand Down
24 changes: 22 additions & 2 deletions tests/unit/stream/test_reflector.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,36 @@ async def asyncSetUp(self):
self.stream_manager.config.reflect_streams = False
self.stream = await self.stream_manager.create(file_path)

async def _test_reflect_stream(self, response_chunk_size):
reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size)
async def _test_reflect_stream(self, response_chunk_size=50, partial_needs=False):
reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size,
partial_needs=partial_needs)
reflector.start_server(5566, '127.0.0.1')
if partial_needs:
server_blob = self.server_blob_manager.get_blob(self.stream.sd_hash)
client_blob = self.blob_manager.get_blob(self.stream.sd_hash)
with client_blob.reader_context() as handle:
server_blob.set_length(client_blob.get_length())
writer = server_blob.get_blob_writer('nobody', 0)
writer.write(handle.read())
self.server_blob_manager.blob_completed(server_blob)
await reflector.started_listening.wait()
self.addCleanup(reflector.stop_server)
self.assertEqual(0, self.stream.reflector_progress)
sent = await self.stream.upload_to_reflector('127.0.0.1', 5566)
self.assertEqual(100, self.stream.reflector_progress)
if partial_needs:
self.assertFalse(self.stream.is_fully_reflected)
send_more = await self.stream.upload_to_reflector('127.0.0.1', 5566)
self.assertGreater(len(send_more), 0)
sent.extend(send_more)
sent.append(self.stream.sd_hash)
self.assertSetEqual(
set(sent),
set(map(lambda b: b.blob_hash,
self.stream.descriptor.blobs[:-1] + [self.blob_manager.get_blob(self.stream.sd_hash)]))
)
send_more = await self.stream.upload_to_reflector('127.0.0.1', 5566)
self.assertEqual(len(send_more), 0)
self.assertTrue(self.stream.is_fully_reflected)
server_sd_blob = self.server_blob_manager.get_blob(self.stream.sd_hash)
self.assertTrue(server_sd_blob.get_is_verified())
Expand All @@ -71,6 +88,9 @@ async def _test_reflect_stream(self, response_chunk_size):
async def test_reflect_stream(self):
return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=50), 3, loop=self.loop)

async def test_reflect_stream_but_reflector_changes_its_mind(self):
return await asyncio.wait_for(self._test_reflect_stream(partial_needs=True), 3, loop=self.loop)

async def test_reflect_stream_small_response_chunks(self):
return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=30), 3, loop=self.loop)

Expand Down