Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pytest: add end-to-end recompress-storage sanity test #6601

Merged
merged 5 commits into from
Apr 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,6 @@ pytest sanity/rosetta.py --features nightly_protocol,nightly_protocol_features

# Make sure Docker image can be build and run
pytest --skip-build --timeout=1h sanity/docker.py

pytest sanity/recompress_storage.py
pytest sanity/recompress_storage.py --features nightly_protocol,nightly_protocol_features
Empty file.
108 changes: 108 additions & 0 deletions pytest/tests/sanity/recompress_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env python3
"""Tests whether node can continue working after its storage is recompressed.

The test starts a cluster of four nodes, submits a few transactions onto the
network. The same transactions as the ones used in rpc_tx_status.py test.

Once that’s done, each node is stopped and its storage processed via the
‘recompress-storage’ command. ‘view-state apply-range’ is then executed to
verify that the database has not been corrupted.

Finally, all the nodes are restarted and again a few transactions are sent to
verify that everything is working in order.

The above steps are done once for RPC nodes and again for archival nodes.
"""

import os
import pathlib
import subprocess
import sys
import threading
import time
import typing
import unittest

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from configured_logger import logger
import cluster
import key
import transaction

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'tests'))

import sanity.rpc_tx_status


class RecompressStorageTestCase(unittest.TestCase):

def __init__(self, *args, **kw) -> None:
super().__init__(*args, **kw)
self.nodes = ()

def tearDown(self) -> None:
for node in self.nodes:
node.cleanup()
self.nodes = ()

def do_test_recompress_storage(self, *, archive: bool) -> None:
self.nodes = sanity.rpc_tx_status.start_cluster(archive=archive)

# Give the network some time to generate a few blocks. The same goes
# for other sleeps in this method.
time.sleep(5)

# Execute a few transactions
sanity.rpc_tx_status.test_tx_status(self.nodes)
time.sleep(5)

# Recompress storage on each node
for idx, node in enumerate(self.nodes):
logger.info(f'Stopping node{idx}')
node.kill(gentle=True)

node_dir = pathlib.Path(node.node_dir)
self._call(node, 'recompress-storage',
'--output-dir=' + str(node_dir / 'data-new'))
(node_dir / 'data').rename(node_dir / 'data-old')
(node_dir / 'data-new').rename(node_dir / 'data')

cmd = self._call(node, 'view-state', 'apply-range',
'--start-index=0', '--verbose-output')

# Restart all nodes with the new database
for idx, node in enumerate(self.nodes):
logger.info(f'Starting node{idx}')
node.start()

# Execute a few more transactions
time.sleep(5)
sanity.rpc_tx_status.test_tx_status(self.nodes, nonce_offset=3)

def _call(self, node: cluster.LocalNode,
*args: typing.Union[str, pathlib.Path]) -> None:
"""Calls node’s neard with given arguments."""
node_dir = pathlib.Path(node.node_dir)
cmd = [
pathlib.Path(node.near_root) / node.binary_name,
f'--home={node_dir}',
] + list(args)
logger.info('Running ' + ' '.join(str(arg) for arg in cmd))
with open(node_dir / 'stdout', 'ab') as stdout, \
open(node_dir / 'stderr', 'ab') as stderr:
subprocess.check_call(cmd,
stdin=subprocess.DEVNULL,
stdout=stdout,
stderr=stderr,
env=dict(os.environ, RUST_LOG='debug'))

def test_recompress_storage_rpc(self) -> None:
self.do_test_recompress_storage(archive=False)

def test_recompress_storage_archive(self) -> None:
self.do_test_recompress_storage(archive=True)


if __name__ == '__main__':
unittest.main()
37 changes: 26 additions & 11 deletions pytest/tests/sanity/rpc_tx_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pathlib

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))
from cluster import start_cluster, Key
import cluster
from utils import load_test_contract
import transaction

Expand Down Expand Up @@ -36,27 +36,42 @@ def submit_tx_and_check(nodes, node_index, tx):
f'receipt id from receipts {receipt_id_from_receipts}')


def test_tx_status():
nodes = start_cluster(
4, 0, 1, None,
[["epoch_length", 1000], ["block_producer_kickout_threshold", 80],
["transaction_validity_period", 10000]], {})

def test_tx_status(nodes, *, nonce_offset: int = 0):
signer_key = nodes[0].signer_key
encoded_block_hash = nodes[0].get_latest_block().hash_bytes
payment_tx = transaction.sign_payment_tx(signer_key, 'test1', 100, 1,
payment_tx = transaction.sign_payment_tx(signer_key, 'test1', 100,
nonce_offset + 1,
encoded_block_hash)
submit_tx_and_check(nodes, 0, payment_tx)

deploy_contract_tx = transaction.sign_deploy_contract_tx(
signer_key, load_test_contract(), 2, encoded_block_hash)
signer_key, load_test_contract(), nonce_offset + 2, encoded_block_hash)
submit_tx_and_check(nodes, 0, deploy_contract_tx)

function_call_tx = transaction.sign_function_call_tx(
signer_key, signer_key.account_id, 'write_key_value',
struct.pack('<QQ', 42, 24), 300000000000000, 0, 3, encoded_block_hash)
struct.pack('<QQ', 42, 24), 300000000000000, 0, nonce_offset + 3,
encoded_block_hash)
submit_tx_and_check(nodes, 0, deploy_contract_tx)


def start_cluster(*, archive: bool = False):
num_nodes = 4
genesis_changes = [["epoch_length", 1000],
["block_producer_kickout_threshold", 80],
["transaction_validity_period", 10000]]
config_changes = dict.fromkeys(range(num_nodes), {'archive': archive})
return cluster.start_cluster(num_nodes=num_nodes,
num_observers=0,
num_shards=1,
config=None,
genesis_config_changes=genesis_changes,
client_config_changes=config_changes)


def main():
test_tx_status(start_cluster())


if __name__ == '__main__':
test_tx_status()
main()