diff --git a/tests/integration/test_stream.py b/tests/integration/test_stream.py index 785ab358..9d312fed 100644 --- a/tests/integration/test_stream.py +++ b/tests/integration/test_stream.py @@ -13,10 +13,12 @@ def test_stream(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) + barrier = threading.Barrier(2) events = [] def thread_fn(): stream = scoped_client.stream(fql("Product.all().toStream()")) + barrier.wait() with stream as iter: for evt in iter: @@ -28,10 +30,7 @@ def thread_fn(): stream_thread = threading.Thread(target=thread_fn) stream_thread.start() - - # adds a delay so the thread can open the stream, - # otherwise we could miss some events - time.sleep(0.5) + barrier.wait() id = scoped_client.query(fql("Product.create({}).id")).data scoped_client.query(fql("Product.byId(${id})!.delete()", id=id)) @@ -45,10 +44,12 @@ def thread_fn(): def test_close_method(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) + barrier = threading.Barrier(2) events = [] def thread_fn(): stream = scoped_client.stream(fql("Product.all().toStream()")) + barrier.wait() with stream as iter: for evt in iter: @@ -60,10 +61,7 @@ def thread_fn(): stream_thread = threading.Thread(target=thread_fn) stream_thread.start() - - # adds a delay so the thread can open the stream, - # otherwise we could miss some events - time.sleep(0.5) + barrier.wait() scoped_client.query(fql("Product.create({})")) scoped_client.query(fql("Product.create({})")) @@ -75,8 +73,11 @@ def thread_fn(): def test_error_on_stream(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) + barrier = threading.Barrier(2) + def thread_fn(): stream = scoped_client.stream(fql("Product.all().map(.foo / 0).toStream()")) + barrier.wait() with pytest.raises(QueryRuntimeError): with stream as iter: @@ -85,10 +86,7 @@ def thread_fn(): stream_thread = threading.Thread(target=thread_fn) stream_thread.start() - - # adds a delay so the thread can open the stream, - # otherwise we could miss some events - time.sleep(0.5) + barrier.wait() scoped_client.query(fql("Product.create({foo: 10})")) scoped_client.query(fql("Product.create({foo: 10})")) @@ -132,10 +130,12 @@ def stream_func(*args, **kwargs): def test_last_ts_is_monotonic(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) + barrier = threading.Barrier(2) events = [] def thread_fn(): stream = scoped_client.stream(fql("Product.all().toStream()")) + barrier.wait() with stream as iter: last_ts = 0 @@ -153,10 +153,7 @@ def thread_fn(): stream_thread = threading.Thread(target=thread_fn) stream_thread.start() - - # adds a delay so the thread can open the stream, - # otherwise we could miss some events - time.sleep(0.5) + barrier.wait() scoped_client.query(fql("Product.create({})")) scoped_client.query(fql("Product.create({})")) @@ -176,12 +173,14 @@ def test_providing_start_ts(scoped_client): createTwo = scoped_client.query(fql("Product.create({})")) createThree = scoped_client.query(fql("Product.create({})")) + barrier = threading.Barrier(2) events = [] def thread_fn(): # replay excludes the ts that was passed in, it provides events for all ts after the one provided stream = scoped_client.stream(stream_token, StreamOptions(start_ts=createOne.txn_ts)) + barrier.wait() with stream as iter: for event in iter: events.append(event) @@ -190,10 +189,7 @@ def thread_fn(): stream_thread = threading.Thread(target=thread_fn) stream_thread.start() - - # adds a delay so the thread can open the stream, - # otherwise we could miss some events - time.sleep(0.5) + barrier.wait() createFour = scoped_client.query(fql("Product.create({})")) stream_thread.join() @@ -233,15 +229,16 @@ def test_rejects_cursor_with_fql_query(scoped_client): scoped_client.stream(fql("Collection.create({name: 'Product'})"), opts) -@pytest.mark.xfail(reason="not currently supported by core") def test_handle_status_events(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) + barrier = threading.Barrier(2) events = [] def thread_fn(): opts = StreamOptions(status_events=True) stream = scoped_client.stream(fql("Product.all().toStream()"), opts) + barrier.wait() with stream as iter: for evt in iter: @@ -253,10 +250,7 @@ def thread_fn(): stream_thread = threading.Thread(target=thread_fn) stream_thread.start() - - # adds a delay so the thread can open the stream, - # otherwise we could miss some events - time.sleep(0.5) + barrier.wait() scoped_client.query(fql("Product.create({})")) scoped_client.query(fql("Product.create({})"))