Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix missing _add_persisted_position #8179

Merged
merged 2 commits into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8179.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add functions to `MultiWriterIdGen` used by events stream.
2 changes: 2 additions & 0 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ def _mark_id_as_finished(self, next_id: int):
curr = self._current_positions.get(self._instance_name, 0)
self._current_positions[self._instance_name] = max(curr, next_id)

self._add_persisted_position(next_id)

def get_current_token(self) -> int:
"""Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.
Expand Down
52 changes: 49 additions & 3 deletions tests/storage/test_id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,31 @@ def _create(conn):
return self.get_success(self.db_pool.runWithConnection(_create))

def _insert_rows(self, instance_name: str, number: int):
"""Insert N rows as the given instance, inserting with stream IDs pulled
from the postgres sequence.
"""

def _insert(txn):
for _ in range(number):
txn.execute(
"INSERT INTO foobar VALUES (nextval('foobar_seq'), ?)",
(instance_name,),
)

self.get_success(self.db_pool.runInteraction("test_single_instance", _insert))
self.get_success(self.db_pool.runInteraction("_insert_rows", _insert))

def _insert_row_with_id(self, instance_name: str, stream_id: int):
"""Insert one row as the given instance with given stream_id, updating
the postgres sequence position to match.
"""

def _insert(txn):
txn.execute(
"INSERT INTO foobar VALUES (?, ?)", (stream_id, instance_name,),
)
txn.execute("SELECT setval('foobar_seq', ?)", (stream_id,))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This select seems unused or is this what updates the sequence?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setval is the function that sets the value, SELECT .. is just the smallest valid sql statement that contains a function invocation. It is a bit bizarre that a SELECT mutates, but it is the standard way of calling such functions in postgres.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, weird. Thanks for the explanation!


self.get_success(self.db_pool.runInteraction("_insert_row_with_id", _insert))

def test_empty(self):
"""Test an ID generator against an empty database gives sensible
Expand Down Expand Up @@ -188,11 +205,17 @@ def test_get_persisted_upto_position(self):
positions.
"""

self._insert_rows("first", 3)
self._insert_rows("second", 5)
# The following tests are a bit cheeky in that we notify about new
# positions via `advance` without *actually* advancing the postgres
# sequence.

self._insert_row_with_id("first", 3)
self._insert_row_with_id("second", 5)

id_gen = self._create_id_generator("first")

self.assertEqual(id_gen.get_positions(), {"first": 3, "second": 5})

# Min is 3 and there is a gap between 5, so we expect it to be 3.
self.assertEqual(id_gen.get_persisted_upto_position(), 3)

Expand All @@ -218,3 +241,26 @@ def test_get_persisted_upto_position(self):
id_gen.advance("first", 11)
id_gen.advance("second", 15)
self.assertEqual(id_gen.get_persisted_upto_position(), 11)

def test_get_persisted_upto_position_get_next(self):
"""Test that `get_persisted_upto_position` correctly tracks updates to
positions when `get_next` is called.
"""

self._insert_row_with_id("first", 3)
self._insert_row_with_id("second", 5)

id_gen = self._create_id_generator("first")

self.assertEqual(id_gen.get_positions(), {"first": 3, "second": 5})

self.assertEqual(id_gen.get_persisted_upto_position(), 3)
with self.get_success(id_gen.get_next()) as stream_id:
self.assertEqual(stream_id, 6)
self.assertEqual(id_gen.get_persisted_upto_position(), 3)

self.assertEqual(id_gen.get_persisted_upto_position(), 6)

# We assume that so long as `get_next` does correctly advance the
# `persisted_upto_position` in this case, then it will be correct in the
# other cases that are tested above (since they'll hit the same code).