Skip to content

Commit

Permalink
Run some range reads with limit in serializability checker KIKIMR-19054
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury committed Aug 16, 2023
1 parent 333783d commit 58f8ec0
Showing 1 changed file with 87 additions and 2 deletions.
89 changes: 87 additions & 2 deletions ydb/tests/tools/ydb_serializable/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,33 @@ def generate_query_range_reads(table, minvar='$minKey', maxvar='$maxKey'):
})


def generate_query_range_read_with_limit(table, minvar='$minKey', maxvar='$maxKey', limitvar='$limit'):
declares = [
f'''\
DECLARE {minvar} AS Uint64;
DECLARE {maxvar} AS Uint64;
DECLARE {limitvar} AS Uint64;
''',
]
statements = [
f'''\
SELECT key, value
FROM `{table}`
WHERE key >= {minvar} AND key <= {maxvar}
ORDER BY key
LIMIT {limitvar};
''',
]
return Query(
declares,
statements,
{
minvar: ydb.PrimitiveType.Uint64,
maxvar: ydb.PrimitiveType.Uint64,
limitvar: ydb.PrimitiveType.Uint64,
})


def generate_random_name(cnt=20):
return ''.join(
random.choice('abcdefghijklmnopqrstuvwxyz')
Expand Down Expand Up @@ -774,6 +801,61 @@ async def perform(session):

history.add(History.Commit('read_range', node.value, values)).apply_to(checker)

async def async_perform_range_reads_with_limit(self, history, table, options, checker, deadline):
range_query = generate_query_range_read_with_limit(table)

while time.time() < deadline and not self.is_stopping():
min_key = random.randint(0, options.keys)
max_key = random.randint(min_key, options.keys)
limit = random.randint(1, options.keys)
read_keys = list(range(min_key, max_key + 1))

node = history.add(History.Begin('read_range_limit', None, read_keys=read_keys)).apply_to(checker)

async def perform(session):
async with session.transaction(ydb.SerializableReadWrite()) as tx:
simple_tx = bool(random.randint(0, 1))
with history.log_op(node.value, f'range_read_limit_{limit}+commit' if simple_tx else f'range_read_limit_{limit}') as log:
rss = await tx.execute(
range_query,
parameters={
'$minKey': min_key,
'$maxKey': max_key,
'$limit': limit,
},
commit_tx=simple_tx,
)
if options.oplog_results:
log.result({row.key: row.value for row in rss[0].rows})
if not simple_tx:
with history.log_op(node.value, 'commit'):
await tx.commit()
return rss

try:
rss = await self.async_retry_operation(perform, deadline)
except ydb.Aborted:
history.add(History.Abort('read_range_limit', node.value)).apply_to(checker)
except ydb.Undetermined:
pass # transaction outcome unknown
else:
values = {}
num_observed = 0
last_observed = None
for row in rss[0].rows:
num_observed += 1
last_observed = row.key
values[row.key] = row.value

if num_observed > limit:
raise SerializabilityError(f'Tx {node.value} got {num_observed} rows with limit {limit}')

if num_observed == limit:
# Iteration stops when reaching limit so it's ok not to see beyond the last row
node.expected_read_keys = tuple(range(min_key, last_observed + 1))

history.add(History.Commit('read_range_limit', node.value, values)).apply_to(checker)

async def async_perform_read_tables(self, history, table, options, checker, deadline):
while time.time() < deadline and not self.is_stopping():
if options.read_table_ranges:
Expand Down Expand Up @@ -835,8 +917,11 @@ async def async_perform_test(self, history, table, options, checker):
futures.append(self.async_perform_point_reads_writes(history, table, options, checker, deadline=deadline, keysets=readwrite_keysets))
futures.append(self.async_perform_verifying_reads(history, table, options, checker, deadline=deadline, keysets=readwrite_keysets))

for _ in range(options.rangereaders):
futures.append(self.async_perform_range_reads(history, table, options, checker, deadline=deadline))
for i in range(options.rangereaders):
if i % 2 == 0:
futures.append(self.async_perform_range_reads(history, table, options, checker, deadline=deadline))
else:
futures.append(self.async_perform_range_reads_with_limit(history, table, options, checker, deadline=deadline))

for _ in range(options.readtablers):
futures.append(self.async_perform_read_tables(history, table, options, checker, deadline=deadline))
Expand Down

0 comments on commit 58f8ec0

Please sign in to comment.