Skip to content

Commit

Permalink
[yugabyte#7857] Add analyzer for ysql/append jepsen test
Browse files Browse the repository at this point in the history
Summary:
This diffs adds transaction dump analyzer for ysql/append jepsen test.

Also added tablet id to apply intents, read and remove transaction events.
Moved shared code from BankAccountAnalyzer to AnalyzerBase.

Test Plan: Jenkins

Reviewers: rsami

Reviewed By: rsami

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D12246
  • Loading branch information
spolitov committed Jul 13, 2021
1 parent 5a23f48 commit a90ab43
Show file tree
Hide file tree
Showing 18 changed files with 283 additions and 113 deletions.
63 changes: 63 additions & 0 deletions python/yb/txndump/append_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python3

import sys

from time import monotonic
from typing import NamedTuple, List
from uuid import UUID
from yb.txndump.model import DocHybridTime, HybridTime, SubDocKey, Tombstone
from yb.txndump.parser import AnalyzerBase, DumpProcessor, TransactionBase, Update

kValueColumns = [2, 3]


class AppendTransaction(TransactionBase):
def __init__(self, txn_id: UUID):
super().__init__(txn_id)

def __repr__(self) -> str:
return "{ " + self.fields_to_string() + " }"


class AppendAnalyzer(AnalyzerBase[str, str]):
def __init__(self):
super().__init__()
self.log = []

def create_transaction(self, txn_id: UUID):
return AppendTransaction(txn_id)

def extract_key(self, tablet: str, key: SubDocKey):
if key.sub_keys[0] not in kValueColumns:
return None
return "{}_{}_{}".format(tablet, key.sub_keys[0], key.hash_components[0])

def extract_value(self, value):
return '' if value == Tombstone.kTombstone else value

def check_transaction(self, transaction):
return

def initial_value(self, key: str):
return ''

def analyze_update(self, key: str, update: Update[str], old_value: str) -> str:
new_value = update.value
hybrid_time = update.doc_ht.hybrid_time
if not new_value.startswith(old_value):
self.error(hybrid_time, update.txn_id,
"Bad update for {}: {} => {}".format(key, old_value, new_value))
return new_value


def main():
analyzer = AppendAnalyzer()
processor = DumpProcessor(analyzer)
processor.process(sys.argv[1])
processing_time = monotonic() - processor.start_time
analyzer.analyze()
print("Processing time: {}".format(processing_time))


if __name__ == '__main__':
main()
103 changes: 11 additions & 92 deletions python/yb/txndump/bank_accounts_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing import NamedTuple, List
from uuid import UUID
from yb.txndump.model import DocHybridTime, HybridTime, SubDocKey
from yb.txndump.parser import AnalyzerBase, DumpProcessor, TransactionBase
from yb.txndump.parser import AnalyzerBase, DumpProcessor, TransactionBase, Update

kValueColumn = 1

Expand All @@ -40,59 +40,18 @@ def __repr__(self) -> str:
return result + " }"


class Update(NamedTuple):
doc_ht: DocHybridTime
txn_id: UUID
value: int
log_ht: HybridTime


class Read(NamedTuple):
read_time: HybridTime
value: int
write_time: DocHybridTime
txn_id: UUID
same_transaction: bool


class KeyData(NamedTuple):
updates: List[Update] = []
reads: List[Read] = []


class BankAccountsAnalyzer(AnalyzerBase):
class BankAccountsAnalyzer(AnalyzerBase[int, int]):
def __init__(self):
super().__init__()
self.rows = {}
self.log = []

def apply_row(self, txn_id: UUID, key: SubDocKey, value: int, log_ht: HybridTime):
def extract_key(self, tablet: str, key: SubDocKey):
if key.sub_keys[0] == kValueColumn:
row = key.hash_components[0]
self.get_row(row).updates.append(Update(key.doc_ht, txn_id, value, log_ht))

def read_value(
self, txn_id: UUID, key, value, read_time: HybridTime, write_time: DocHybridTime,
same_transaction: bool):
if key.sub_keys[0] == kValueColumn and (write_time.hybrid_time <= read_time):
self.get_row(key.hash_components[0]).reads.append(Read(
read_time, value, write_time, txn_id, same_transaction))

def get_row(self, key) -> KeyData:
if key not in self.rows:
self.rows[key] = KeyData()
return self.rows[key]

def get_transaction(self, txn_id: UUID):
if txn_id not in self.txns:
self.txns[txn_id] = BankAccountTransaction(txn_id)
return self.txns[txn_id]

def check_same_updates(self, key: int, update: Update, same_updates: int):
if same_updates < 3:
err_fmt = "Wrong number of same updates for key {}, update {}: {}"
self.error(
update.doc_ht.hybrid_time, update.txn_id, err_fmt.format(key, update, same_updates))
return key.hash_components[0]
return None

def create_transaction(self, txn_id: UUID):
return BankAccountTransaction(txn_id)

def analyze(self):
self.check_status_logs()
Expand All @@ -107,33 +66,10 @@ def analyze(self):
for line in sorted(self.log):
print(line)

def analyze_key(self, key):
updates = sorted(self.rows[key].updates,
key=lambda upd: (upd.doc_ht, upd.txn_id))
reads = sorted(self.rows[key].reads,
key=lambda read: read.read_time)
read_idx = 0
old_balance = 100 if key == 0 else 0
prev_update = None
same_updates = 3
for update in updates:
if prev_update is not None and prev_update == update:
same_updates += 1
continue
else:
self.check_same_updates(key, prev_update, same_updates)
same_updates = 1

new_balance: int = self.analyze_update(key, update, old_balance)
def initial_value(self, key: int):
return 100 if key == 0 else 0

read_idx = self.analyze_read(
key, reads, read_idx, update.doc_ht.hybrid_time, old_balance)

old_balance = new_balance
prev_update = update
self.check_same_updates(key, prev_update, same_updates)

def analyze_update(self, key: int, update: Update, old_balance: int) -> int:
def analyze_update(self, key: int, update: Update[int], old_balance: int) -> int:
new_balance = update.value
hybrid_time = update.doc_ht.hybrid_time
txn = self.txns[update.txn_id]
Expand Down Expand Up @@ -168,23 +104,6 @@ def analyze_update(self, key: int, update: Update, old_balance: int) -> int:
self.log.append((hybrid_time, 'w', txn))
return new_balance

def analyze_read(
self, key: int, reads: List[Read], read_idx: int, hybrid_time: HybridTime,
old_balance: int) -> int:
while read_idx < len(reads) and hybrid_time > reads[read_idx].read_time:
read_txn = reads[read_idx].txn_id
if read_txn in self.txns:
read_balance = reads[read_idx].value
if old_balance != read_balance:
self.error(
reads[read_idx].read_time,
read_txn,
"Bad read key: {}, actual: {}, read: {}".format(
key, old_balance, reads[read_idx]))
self.log.append((reads[read_idx].read_time, 'r', read_txn, key, read_balance))
read_idx += 1
return read_idx

def check_transaction(self, txn: BankAccountTransaction):
cnt_keys = (1 if txn.key1 is not None else 0) + (1 if txn.key2 is not None else 0)
if cnt_keys == 0:
Expand Down
3 changes: 3 additions & 0 deletions python/yb/txndump/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,6 @@ def read_varbytes(self) -> bytes:

def read(self, size: int = -1) -> bytes:
return self._input.read(size)

def read_string(self) -> str:
return self._input.read(-1).decode('utf-8')
10 changes: 10 additions & 0 deletions python/yb/txndump/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ class ValueType(Enum):
kInt64 = 73
kSystemColumnId = 74
kColumnId = 75
kString = 83
kTombstone = 88

@staticmethod
def read(inp: BinaryIO) -> 'ValueType':
Expand Down Expand Up @@ -211,6 +213,10 @@ def decode(key: bytes, has_hybrid_time: bool):
return SubDocKey(hash_components, range_components, sub_keys, doc_ht)


class Tombstone(Enum):
kTombstone = 0


def decode_value(value: bytes):
inp = BinaryIO(BytesIO(value))
value_type = ValueType.read(inp)
Expand All @@ -223,6 +229,10 @@ def decode_value(value: bytes):
return inp.read_be_int32()
if value_type == ValueType.kInt64:
return inp.read_be_int64()
if value_type == ValueType.kString:
return inp.read_string()
if value_type == ValueType.kTombstone:
return Tombstone.kTombstone
raise Exception('Not supported value type: {}'.format(value_type))


Expand Down
Loading

0 comments on commit a90ab43

Please sign in to comment.