Skip to content

Commit

Permalink
fix: Fix Nayduck test sync_chunks_from_archival (#11759)
Browse files Browse the repository at this point in the history
This fix is partial. It mostly fixes the serialization/deserialization
part.

The real issue is that the archival node fails to sync state due to cold
storage not initialized properly. To alleviate this problem, we disable
GC during the test by increasing the number of epochs to keep.

The real fix should be #11752 (or
something that forces archival nodes in the tests to be initialized with
a cold DB).
  • Loading branch information
tayfunelmas authored Jul 10, 2024
1 parent f442d48 commit 5928beb
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 61 deletions.
30 changes: 25 additions & 5 deletions pytest/lib/messages/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,24 @@ def chunk_hash(inner):
import hashlib
from messages.crypto import crypto_schema
from serializer import BinarySerializer

# We combine the hash of this inner object (of type ShardChunkHeaderInner)
# and the encoded merkle root obtained from the versioned-inner object
# inside the variants of this inner object.
encoded_merkle_root = None
if inner.enum == 'V1':
encoded_merkle_root = inner.V1.encoded_merkle_root
elif inner.enum == 'V2':
encoded_merkle_root = inner.V2.encoded_merkle_root
elif inner.enum == 'V3':
encoded_merkle_root = inner.V3.encoded_merkle_root
assert encoded_merkle_root is not None, f"Unknown ShardChunkHeaderV3 enum variant: {inner.enum}"

inner_serialized = BinarySerializer(
dict(block_schema + crypto_schema)).serialize(inner)
inner_hash = hashlib.sha256(inner_serialized).digest()

return hashlib.sha256(inner_hash + inner.encoded_merkle_root).digest()
return hashlib.sha256(inner_hash + encoded_merkle_root).digest()


class ShardChunkHeaderInner:
Expand Down Expand Up @@ -217,13 +230,20 @@ def inner_header(self):
return header.V3.inner.V3
assert False, "unknown header version"

def header_version(self):
def chunk_hash(self):
version = self.enum
if version == 'V1':
return version
return ShardChunkHeaderV1.chunk_hash(self.V1.header.inner)
elif version == 'V2':
return self.V2.header.enum
assert False, "unknown partial encoded chunk version"
header = self.V2.header
header_version = header.enum
if header_version == 'V1':
return ShardChunkHeaderV1.chunk_hash(header.V1.inner)
elif header_version == 'V2':
return ShardChunkHeaderV2.chunk_hash(header.V2.inner)
elif header_version == 'V3':
return ShardChunkHeaderV3.chunk_hash(header.V3.inner)
assert False, "unknown header version"


class PartialEncodedChunkV1:
Expand Down
97 changes: 76 additions & 21 deletions pytest/lib/messages/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class PeerChainInfoV2:
pass


class EdgeInfo:
class PartialEdgeInfo:
pass


Expand All @@ -52,7 +52,7 @@ class Edge:
pass


class SyncData:
class RoutingTableUpdate:
pass


Expand Down Expand Up @@ -132,6 +132,22 @@ class PartialSync:
pass


class SyncSnapshotHosts:
pass


class SnapshotHostInfo:
pass


class DistanceVector:
pass


class AdvertisedPeerDistance:
pass


network_schema = [
[
SocketAddr, {
Expand All @@ -151,9 +167,9 @@ class PartialSync:
['Handshake', Handshake],
['HandshakeFailure', (PeerInfo, HandshakeFailureReason)],
['LastEdge', Edge],
['Sync', SyncData],
['RequestUpdateNonce', EdgeInfo],
['ResponseUpdateNonce', Edge],
['SyncRoutingTable', RoutingTableUpdate],
['RequestUpdateNonce', PartialEdgeInfo],
['_UnusedResponseUpdateNonce', None],
['PeersRequest', ()],
['PeersResponse', [PeerInfo]],
['BlockHeadersRequest', [[32]]],
Expand All @@ -162,19 +178,19 @@ class PartialSync:
['Block', Block],
['Transaction', SignedTransaction],
['Routed', RoutedMessage],
['Disconnect'],
['Disconnect', ()],
['Challenge', None], # TODO
['HandshakeV2', HandshakeV2],
['EpochSyncRequest', None], # TODO
['EpochSyncResponse', None], # TODO
['EpochSyncFinalizationRequest', None], # TODO
['EpochSyncFinalizationResponse', None], # TODO
['RoutingTableSyncV2', RoutingTableSyncV2],
['DistanceVector', None],
['StateRequestHeader', None],
['StateRequestPart', None],
['VersionedStateResponse', None],
['SyncSnapshotHosts', None],
['_UnusedHandshakeV2', None],
['_UnusedEpochSyncRequest', None],
['_UnusedEpochSyncResponse', None],
['_UnusedEpochSyncFinalizationRequest', None],
['_UnusedEpochSyncFinalizationResponse', None],
['_UnusedRoutingTableSyncV2', RoutingTableSyncV2],
['DistanceVector', DistanceVector],
['StateRequestHeader', ('u64', [32])],
['StateRequestPart', ('u64', [32], 'u64')],
['VersionedStateResponse', None], # TODO
['SyncSnapshotHosts', SyncSnapshotHosts],
]
}
],
Expand All @@ -192,7 +208,7 @@ class PartialSync:
'type': 'u16'
}],
['chain_info', PeerChainInfoV2],
['edge_info', EdgeInfo],
['edge_info', PartialEdgeInfo],
]
}
],
Expand All @@ -210,7 +226,7 @@ class PartialSync:
'type': 'u16'
}],
['chain_info', PeerChainInfo],
['edge_info', EdgeInfo],
['edge_info', PartialEdgeInfo],
]
}
],
Expand Down Expand Up @@ -284,7 +300,7 @@ class PartialSync:
}
],
[
SyncData, {
RoutingTableUpdate, {
'kind': 'struct',
'fields': [
['edges', [Edge]],
Expand All @@ -293,7 +309,7 @@ class PartialSync:
}
],
[
EdgeInfo, {
PartialEdgeInfo, {
'kind': 'struct',
'fields': [
['nonce', 'u64'],
Expand Down Expand Up @@ -561,5 +577,44 @@ class PartialSync:
['ibf', [IbfElem]],
]
}
],
[
SyncSnapshotHosts, {
'kind': 'struct',
'fields': [['hosts', [SnapshotHostInfo]],]
}
],
[
SnapshotHostInfo, {
'kind':
'struct',
'fields': [
['peer_id', PublicKey],
['sync_hash', [32]],
['epoch_height', 'u64'],
['shards', ['u64']],
['signature', Signature],
]
}
],
[
AdvertisedPeerDistance, {
'kind': 'struct',
'fields': [
['destination', PublicKey],
['distance', 'u32'],
]
}
],
[
DistanceVector, {
'kind':
'struct',
'fields': [
['root', PublicKey],
['distances', [AdvertisedPeerDistance]],
['edges', [Edge]],
]
}
]
]
6 changes: 5 additions & 1 deletion pytest/lib/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,14 @@ def deserialize_struct(self, type_):
return ret
elif structSchema['kind'] == 'enum':
value_ord = self.deserialize_num(1)
if (value_ord < 0) or (len(structSchema['values']) <= value_ord):
raise IndexError(
f"Unknown enum variant {value_ord} for {type_}, num variants: {len(structSchema['values'])}"
)

logger.debug(
f"deserialize_struct {type_} {structSchema['kind']} enum value ord {value_ord} struct schema {len(structSchema['values'])}"
)

value_schema = structSchema['values'][value_ord]
logger.debug(
f"deserialize_struct {type_} {structSchema['kind']} enum value sch {value_schema}"
Expand Down
54 changes: 20 additions & 34 deletions pytest/tests/sanity/sync_chunks_from_archival.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,11 @@ async def handle(self, msg, fr, to):
if msg_kind == 'VersionedPartialEncodedChunk':
inner_header = msg.Routed.body.VersionedPartialEncodedChunk.inner_header(
)
header_version = msg.Routed.body.VersionedPartialEncodedChunk.header_version(
)
height = inner_header.height_created
shard_id = inner_header.shard_id

if header_version == 'V1':
hash_ = ShardChunkHeaderV1.chunk_hash(inner_header)
elif header_version == 'V2':
hash_ = ShardChunkHeaderV2.chunk_hash(inner_header)
elif header_version == 'V3':
hash_ = ShardChunkHeaderV3.chunk_hash(inner_header)
hash_ = msg.Routed.body.VersionedPartialEncodedChunk.chunk_hash(
)
self.hash_to_metadata[hash_] = (height, shard_id)

if msg_kind == 'PartialEncodedChunkRequest':
Expand Down Expand Up @@ -123,6 +117,22 @@ def left_seconds(self) -> float:
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)

config = load_config()

archival_node_config_changes = {
"archive": True,
# This makes the node track all shards.
"tracked_shards": [0],
"network": {
"ttl_account_id_router": {
"secs": 1,
"nanos": 0
}
},
# TODO(#11760): This disables the GC during the test. Re-enable GC by
# removing this line after this issue is resolved (eg. #11752 is merged).
"gc_num_epochs_to_keep": 1000,
}

near_root, node_dirs = init_cluster(
2,
3,
Expand All @@ -148,32 +158,8 @@ def left_seconds(self) -> float:
],
["total_supply", "6120000000000000000000000000000000"]
],
{
4: {
"tracked_shards": [0, 1],
"archive": True
},
3: {
"archive": True,
"tracked_shards": [1],
"network": {
"ttl_account_id_router": {
"secs": 1,
"nanos": 0
}
}
},
2: {
"archive": True,
"tracked_shards": [0],
"network": {
"ttl_account_id_router": {
"secs": 1,
"nanos": 0
}
}
}
})
# Configure node2, node3, and node4 to be an archival node.
{i: archival_node_config_changes for i in [2, 3, 4]})

boot_node = spin_up_node(config, near_root, node_dirs[0], 0, proxy=proxy)
node1 = spin_up_node(config,
Expand Down

0 comments on commit 5928beb

Please sign in to comment.