Skip to content

Commit

Permalink
New updates to protocol and initial tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Rigidity committed Jan 31, 2024
1 parent e26dd96 commit 2730f47
Show file tree
Hide file tree
Showing 14 changed files with 571 additions and 237 deletions.
3 changes: 3 additions & 0 deletions chia/full_node/coin_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,9 @@ async def batch_coin_states_by_puzzle_hashes(
# It can be changed later without breaking the protocol, but this is a practical limit for now.
assert len(puzzle_hashes) <= 15000

if len(puzzle_hashes) == 0:
return [], None

coin_states: List[CoinState] = []

async with self.db_wrapper.reader_no_transaction() as conn:
Expand Down
73 changes: 40 additions & 33 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1675,9 +1675,13 @@ async def request_remove_puzzle_subscriptions(
peer_id = peer.peer_node_id
subs = self.full_node.subscriptions

removed = subs.remove_puzzle_subscriptions(peer_id, request.puzzle_hashes)
if request.puzzle_hashes is None:
removed = list(subs.puzzle_subscriptions(peer_id))
subs.remove_puzzle_subscriptions(peer_id, removed)
else:
removed = list(subs.remove_puzzle_subscriptions(peer_id, request.puzzle_hashes))

response = wallet_protocol.RespondRemovePuzzleSubscriptions(list(removed))
response = wallet_protocol.RespondRemovePuzzleSubscriptions(removed)
msg = make_msg(ProtocolMessageTypes.respond_remove_puzzle_subscriptions, response)
return msg

Expand All @@ -1686,46 +1690,37 @@ async def request_remove_puzzle_subscriptions(
reply_types=[ProtocolMessageTypes.respond_remove_coin_subscriptions],
)
async def request_remove_coin_subscriptions(
self, request: wallet_protocol.RequestAddCoinSubscriptions, peer: WSChiaConnection
self, request: wallet_protocol.RequestRemoveCoinSubscriptions, peer: WSChiaConnection
) -> Message:
peer_id = peer.peer_node_id
subs = self.full_node.subscriptions

removed = subs.remove_coin_subscriptions(peer_id, request.coin_ids)
if request.coin_ids is None:
removed = list(subs.coin_subscriptions(peer_id))
subs.remove_coin_subscriptions(peer_id, removed)
else:
removed = list(subs.remove_coin_subscriptions(peer_id, request.coin_ids))

response = wallet_protocol.RespondRemoveCoinSubscriptions(list(removed))
response = wallet_protocol.RespondRemoveCoinSubscriptions(removed)
msg = make_msg(ProtocolMessageTypes.respond_remove_coin_subscriptions, response)
return msg

@api_request(
peer_required=True,
reply_types=[ProtocolMessageTypes.respond_reset_subscriptions],
)
async def request_reset_subscriptions(
self, _request: wallet_protocol.RequestResetSubscriptions, peer: WSChiaConnection
) -> Message:
peer_id = peer.peer_node_id
subs = self.full_node.subscriptions

ph_subs = subs.puzzle_subscriptions(peer_id)
coin_subs = subs.coin_subscriptions(peer_id)
subs.remove_peer(peer_id)

response = wallet_protocol.RespondResetSubscriptions(list(ph_subs), list(coin_subs))
msg = make_msg(ProtocolMessageTypes.respond_reset_subscriptions, response)
return msg

@api_request(peer_required=True, reply_types=[ProtocolMessageTypes.respond_puzzle_state])
async def request_puzzle_state(
self, request: wallet_protocol.RequestPuzzleState, peer: WSChiaConnection
) -> Message:
max_items = self.max_subscribe_response_items(peer)
max_subscriptions = self.max_subscriptions(peer)
header_hash = self.full_node.blockchain.height_to_hash(request.min_height)

if request.header_hash is not None and request.header_hash != header_hash:
rejection = wallet_protocol.RejectPuzzleState(header_hash)
msg = make_msg(ProtocolMessageTypes.reject_puzzle_state, rejection)
return msg

# This is a limit imposed by `batch_coin_states_by_puzzle_hashes`, due to the SQLite variable limit.
# It can be increased in the future, and this protocol should be written and tested in a way that
# this increase would not break the API.
count = 15000
puzzle_hashes = request.puzzle_hashes[:count]

Expand All @@ -1735,31 +1730,43 @@ async def request_puzzle_state(
include_spent=request.filters.include_spent,
include_unspent=request.filters.include_unspent,
include_hinted=request.filters.include_hinted,
max_items=max_items,
)

if next_height is None:
next_header_hash = None
peer_id = peer.peer_node_id
subs = self.full_node.subscriptions
max_subscriptions = self.max_subscriptions(peer)
subs.add_puzzle_subscriptions(peer_id, puzzle_hashes, max_subscriptions)
added = list(
self.full_node.subscriptions.add_puzzle_subscriptions(
peer.peer_node_id, puzzle_hashes, max_subscriptions
)
)
else:
next_header_hash = self.full_node.blockchain.height_to_hash(next_height)
added = []

response = wallet_protocol.RespondPuzzleState(puzzle_hashes, next_height, next_header_hash, coin_states)
response = wallet_protocol.RespondPuzzleState(added, next_height, next_header_hash, coin_states)
msg = make_msg(ProtocolMessageTypes.respond_puzzle_state, response)
return msg

@api_request(peer_required=True, reply_types=[ProtocolMessageTypes.respond_coin_state])
async def request_coin_state(self, request: wallet_protocol.RequestCoinState, peer: WSChiaConnection) -> Message:
coin_states = await self.full_node.coin_store.get_coin_states_by_ids(True, set(request.coin_ids))
max_items = self.max_subscribe_response_items(peer)
max_subscriptions = self.max_subscriptions(peer)
header_hash = self.full_node.blockchain.height_to_hash(request.min_height)

if request.header_hash is not None and request.header_hash != header_hash:
rejection = wallet_protocol.RejectCoinState(header_hash)
msg = make_msg(ProtocolMessageTypes.reject_coin_state, rejection)
return msg

coin_states = await self.full_node.coin_store.get_coin_states_by_ids(
True, set(request.coin_ids), min_height=request.min_height, max_items=max_items
)

peer_id = peer.peer_node_id
subs = self.full_node.subscriptions
max_subscriptions = self.max_subscriptions(peer)
subs.add_coin_subscriptions(peer_id, request.coin_ids, max_subscriptions)
added = self.full_node.subscriptions.add_coin_subscriptions(peer_id, request.coin_ids, max_subscriptions)

response = wallet_protocol.RespondCoinState(request.coin_ids, coin_states)
response = wallet_protocol.RespondCoinState(list(added), coin_states)
msg = make_msg(ProtocolMessageTypes.respond_coin_state, response)
return msg

Expand Down
13 changes: 6 additions & 7 deletions chia/protocols/protocol_message_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,11 @@ class ProtocolMessageTypes(Enum):
respond_remove_puzzle_subscriptions = 97
request_remove_coin_subscriptions = 98
respond_remove_coin_subscriptions = 99
request_reset_subscriptions = 100
respond_reset_subscriptions = 101
request_puzzle_state = 102
respond_puzzle_state = 103
reject_puzzle_state = 104
request_coin_state = 105
respond_coin_state = 106
request_puzzle_state = 100
respond_puzzle_state = 101
reject_puzzle_state = 102
request_coin_state = 103
respond_coin_state = 104
reject_coin_state = 105

error = 255
3 changes: 1 addition & 2 deletions chia/protocols/protocol_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@
pmt.request_add_coin_subscriptions: [pmt.respond_add_coin_subscriptions],
pmt.request_remove_puzzle_subscriptions: [pmt.respond_remove_puzzle_subscriptions],
pmt.request_remove_coin_subscriptions: [pmt.respond_remove_coin_subscriptions],
pmt.request_reset_subscriptions: [pmt.respond_reset_subscriptions],
pmt.request_puzzle_state: [pmt.respond_puzzle_state, pmt.reject_puzzle_state],
pmt.request_coin_state: [pmt.respond_coin_state],
pmt.request_coin_state: [pmt.respond_coin_state, pmt.reject_coin_state],
}


Expand Down
37 changes: 14 additions & 23 deletions chia/protocols/wallet_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,20 +278,10 @@ class RespondFeeEstimates(Streamable):
estimates: FeeEstimateGroup


@streamable
@dataclass(frozen=True)
class CoinStateFilters(Streamable):
include_spent: bool
include_unspent: bool
include_hinted: bool


@streamable
@dataclass(frozen=True)
class RequestAddPuzzleSubscriptions(Streamable):
puzzle_hashes: List[bytes32]
min_height: uint32
header_hash: Optional[bytes32]


@streamable
Expand All @@ -304,8 +294,6 @@ class RespondAddPuzzleSubscriptions(Streamable):
@dataclass(frozen=True)
class RequestAddCoinSubscriptions(Streamable):
coin_ids: List[bytes32]
min_height: uint32
header_hash: Optional[bytes32]


@streamable
Expand All @@ -317,7 +305,7 @@ class RespondAddCoinSubscriptions(Streamable):
@streamable
@dataclass(frozen=True)
class RequestRemovePuzzleSubscriptions(Streamable):
puzzle_hashes: List[bytes32]
puzzle_hashes: Optional[List[bytes32]]


@streamable
Expand All @@ -329,7 +317,7 @@ class RespondRemovePuzzleSubscriptions(Streamable):
@streamable
@dataclass(frozen=True)
class RequestRemoveCoinSubscriptions(Streamable):
coin_ids: List[bytes32]
coin_ids: Optional[List[bytes32]]


@streamable
Expand All @@ -340,15 +328,10 @@ class RespondRemoveCoinSubscriptions(Streamable):

@streamable
@dataclass(frozen=True)
class RequestResetSubscriptions(Streamable):
pass


@streamable
@dataclass(frozen=True)
class RespondResetSubscriptions(Streamable):
puzzle_hashes: List[bytes32]
coin_ids: List[bytes32]
class CoinStateFilters(Streamable):
include_spent: bool
include_unspent: bool
include_hinted: bool


@streamable
Expand Down Expand Up @@ -380,6 +363,8 @@ class RejectPuzzleState(Streamable):
@dataclass(frozen=True)
class RequestCoinState(Streamable):
coin_ids: List[bytes32]
min_height: uint32
header_hash: Optional[bytes32]
subscribe: bool


Expand All @@ -388,3 +373,9 @@ class RequestCoinState(Streamable):
class RespondCoinState(Streamable):
coin_ids: List[bytes32]
coin_states: List[CoinState]


@streamable
@dataclass(frozen=True)
class RejectCoinState(Streamable):
header_hash: Optional[bytes32]
9 changes: 9 additions & 0 deletions tests/core/full_node/stores/test_coin_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,15 @@ async def test_batch_many_coin_states(db_version: int, cut_off_middle: bool) ->
assert len(all_coin_states) == (25001 if cut_off_middle else 50000)


@pytest.mark.anyio
async def test_batch_no_puzzle_hashes(db_version: int) -> None:
async with DBConnection(db_version) as db_wrapper:
# Initialize coin and hint stores.
coin_store = await CoinStore.create(db_wrapper)
await HintStore.create(db_wrapper)
await coin_store.batch_coin_states_by_puzzle_hashes([])


@pytest.mark.anyio
async def test_unsupported_version() -> None:
with pytest.raises(RuntimeError, match="CoinStore does not support database schema v1"):
Expand Down
3 changes: 1 addition & 2 deletions tests/util/build_network_protocol_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,12 @@ def visit_wallet_protocol(visitor: Callable[[Any, str], None]) -> None:
visitor(respond_add_coin_subscriptions, "respond_add_coin_subscriptions")
visitor(request_remove_coin_subscriptions, "request_remove_coin_subscriptions")
visitor(respond_remove_coin_subscriptions, "respond_remove_coin_subscriptions")
visitor(request_reset_subscriptions, "request_reset_subscriptions")
visitor(respond_reset_subscriptions, "respond_reset_subscriptions")
visitor(request_puzzle_state, "request_puzzle_state")
visitor(reject_puzzle_state, "reject_puzzle_state")
visitor(respond_puzzle_state, "respond_puzzle_state")
visitor(request_coin_state, "request_coin_state")
visitor(respond_coin_state, "respond_coin_state")
visitor(reject_coin_state, "reject_coin_state")


def visit_harvester_protocol(visitor: Callable[[Any, str], None]) -> None:
Expand Down
24 changes: 9 additions & 15 deletions tests/util/network_protocol_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,19 +682,11 @@
bytes32(bytes.fromhex("0e03ce4c43d7d60886f27af7da0ea9749a46b977b3743f3fd2e97b169dc539c1")),
]

request_add_puzzle_subscriptions = wallet_protocol.RequestAddPuzzleSubscriptions(
hashes,
uint32(2585637244),
bytes32(bytes.fromhex("9620d602399252a8401a44669a9d7a6fc328358868a427e827d721c233e2b411")),
)
request_add_puzzle_subscriptions = wallet_protocol.RequestAddPuzzleSubscriptions(hashes)

respond_add_puzzle_subscriptions = wallet_protocol.RespondAddPuzzleSubscriptions(hashes)

request_add_coin_subscriptions = wallet_protocol.RequestAddCoinSubscriptions(
hashes,
uint32(2585637244),
bytes32(bytes.fromhex("9620d602399252a8401a44669a9d7a6fc328358868a427e827d721c233e2b411")),
)
request_add_coin_subscriptions = wallet_protocol.RequestAddCoinSubscriptions(hashes)

respond_add_coin_subscriptions = wallet_protocol.RespondAddCoinSubscriptions(hashes)

Expand All @@ -707,10 +699,6 @@

respond_remove_coin_subscriptions = wallet_protocol.RespondRemoveCoinSubscriptions(hashes)

request_reset_subscriptions = wallet_protocol.RequestResetSubscriptions()

respond_reset_subscriptions = wallet_protocol.RespondResetSubscriptions(hashes, hashes)

request_puzzle_state = wallet_protocol.RequestPuzzleState(
hashes,
uint32(0),
Expand All @@ -725,10 +713,16 @@
bytes32(bytes.fromhex("9620d602399252a8401a44669a9d7a6fc328358868a427e827d721c233e2b411"))
)

request_coin_state = wallet_protocol.RequestCoinState(hashes, False)
request_coin_state = wallet_protocol.RequestCoinState(
hashes, uint32(0), bytes32(bytes.fromhex("9620d602399252a8401a44669a9d7a6fc328358868a427e827d721c233e2b411")), False
)

respond_coin_state = wallet_protocol.RespondCoinState(hashes, [coin_state])

reject_coin_state = wallet_protocol.RejectCoinState(
bytes32(bytes.fromhex("9620d602399252a8401a44669a9d7a6fc328358868a427e827d721c233e2b411"))
)


### HARVESTER PROTOCOL
pool_difficulty = harvester_protocol.PoolDifficulty(
Expand Down
Binary file modified tests/util/protocol_messages_bytes-v1.0
Binary file not shown.
Loading

0 comments on commit 2730f47

Please sign in to comment.