Skip to content

Commit

Permalink
Change state batching to have fixed limit of 15,000 and test more
Browse files Browse the repository at this point in the history
  • Loading branch information
Rigidity committed Jan 16, 2024
1 parent c360cbe commit 16bbe9e
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 73 deletions.
123 changes: 56 additions & 67 deletions chia/full_node/coin_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,89 +415,78 @@ async def batch_coin_states_by_puzzle_hashes(
self,
puzzle_hashes: List[bytes32],
*,
start_height: uint32,
min_height: uint32 = uint32(0),
include_spent: bool = True,
include_unspent: bool = True,
include_hinted: bool = True,
max_items: int = 50000,
) -> Tuple[List[CoinState], List[bytes32], uint32]:
) -> Tuple[List[CoinState], uint32, bool]:
"""
Returns the coin states in the current batch, as well as the remaining puzzle hashes
to be searched by in the next batch and the next block height to start from.
Returns the coin states, as well as the next block height and whether finished.
Note that the maximum number of puzzle hashes is currently set to 15000.
"""

# Calculate the total number of groups of puzzle hashes to search through.
# Each query is be for both puzzle hash and hint, in addition to some additional filters.
other_variable_count = 3

if include_hinted:
group_size = SQLITE_MAX_VARIABLE_NUMBER // 2 - other_variable_count
else:
group_size = SQLITE_MAX_VARIABLE_NUMBER - other_variable_count

synced_puzzle_hashes = 0
assert len(puzzle_hashes) <= 15000

coin_states: List[CoinState] = []

async with self.db_wrapper.reader_no_transaction() as conn:
for select_batch in to_batches(puzzle_hashes, group_size):
select_batch_db = tuple(select_batch.entries)

if include_hinted:
cursor = await conn.execute(
f"SELECT cr.confirmed_index, cr.spent_index, cr.coinbase, cr.puzzle_hash, "
f"cr.coin_parent, cr.amount, cr.timestamp FROM coin_record cr "
f"LEFT JOIN hints h ON cr.coin_name = h.coin_id "
f'WHERE (cr.puzzle_hash in ({"?," * (len(select_batch.entries) - 1)}?) '
f'OR h.hint in ({"?," * (len(select_batch.entries) - 1)}?)) '
f"AND (cr.confirmed_index>=? OR cr.spent_index>=?) "
f"{'' if include_spent else 'AND cr.spent_index=0'} "
f"{'' if include_unspent else 'AND cr.spent_index>0'} "
f"ORDER BY MAX(cr.confirmed_index, cr.spent_index) ASC "
f"LIMIT ?",
select_batch_db
+ select_batch_db
+ (start_height, start_height, max_items - len(coin_states) + 1),
)
else:
cursor = await conn.execute(
f"SELECT confirmed_index, spent_index, coinbase, puzzle_hash, "
f"coin_parent, amount, timestamp FROM coin_record INDEXED BY coin_puzzle_hash "
f'WHERE puzzle_hash in ({"?," * (len(select_batch.entries) - 1)}?) '
f"AND (confirmed_index>=? OR spent_index>=?) "
f"{'' if include_spent else 'AND spent_index=0'} "
f"{'' if include_unspent else 'AND spent_index>0'} "
f"ORDER BY MAX(confirmed_index, spent_index) ASC "
f"LIMIT ?",
select_batch_db + (start_height, start_height, max_items - len(coin_states) + 1),
)
puzzle_hashes_db = tuple(puzzle_hashes)
puzzle_hash_count = len(puzzle_hashes_db)

if include_hinted:
cursor = await conn.execute(
f"SELECT cr.confirmed_index, cr.spent_index, cr.coinbase, cr.puzzle_hash, "
f"cr.coin_parent, cr.amount, cr.timestamp FROM coin_record cr "
f"LEFT JOIN hints h ON cr.coin_name = h.coin_id "
f'WHERE (cr.puzzle_hash in ({"?," * (puzzle_hash_count - 1)}?) '
f'OR h.hint in ({"?," * (puzzle_hash_count - 1)}?)) '
f"AND (cr.confirmed_index>=? OR cr.spent_index>=?) "
f"{'' if include_spent else 'AND cr.spent_index=0'} "
f"{'' if include_unspent else 'AND cr.spent_index>0'} "
f"ORDER BY MAX(cr.confirmed_index, cr.spent_index) ASC "
f"LIMIT ?",
puzzle_hashes_db + puzzle_hashes_db + (min_height, min_height, max_items + 1),
)
else:
cursor = await conn.execute(
f"SELECT confirmed_index, spent_index, coinbase, puzzle_hash, "
f"coin_parent, amount, timestamp FROM coin_record INDEXED BY coin_puzzle_hash "
f'WHERE puzzle_hash in ({"?," * (puzzle_hash_count - 1)}?) '
f"AND (confirmed_index>=? OR spent_index>=?) "
f"{'' if include_spent else 'AND spent_index=0'} "
f"{'' if include_unspent else 'AND spent_index>0'} "
f"ORDER BY MAX(confirmed_index, spent_index) ASC "
f"LIMIT ?",
puzzle_hashes_db + (min_height, min_height, max_items + 1),
)

for row in await cursor.fetchall():
coin_states.append(self.row_to_coin_state(row))
for row in await cursor.fetchall():
coin_states.append(self.row_to_coin_state(row))

if len(coin_states) == max_items:
start_height = min_height
synced_puzzle_hashes += len(select_batch.entries)
break
elif len(coin_states) > max_items:
next_coin_state = coin_states.pop()
start_height = uint32(max(next_coin_state.created_height or 0, next_coin_state.spent_height or 0))

while len(coin_states) > 0:
last_coin_state = coin_states[-1]
height = uint32(max(last_coin_state.created_height or 0, last_coin_state.spent_height or 0))
if height == start_height:
coin_states.pop()
else:
break
is_finished: bool

break
else:
start_height = min_height
synced_puzzle_hashes += len(select_batch.entries)
if len(coin_states) == max_items:
next_height = min_height
is_finished = True
elif len(coin_states) > max_items:
next_coin_state = coin_states.pop()
next_height = uint32(max(next_coin_state.created_height or 0, next_coin_state.spent_height or 0))

while len(coin_states) > 0:
last_coin_state = coin_states[-1]
height = uint32(max(last_coin_state.created_height or 0, last_coin_state.spent_height or 0))
if height == min_height:
coin_states.pop()
else:
break

is_finished = False
else:
next_height = min_height
is_finished = True

return (coin_states, puzzle_hashes[synced_puzzle_hashes:], start_height)
return (coin_states, next_height, is_finished)

async def rollback_to_block(self, block_index: int) -> List[CoinRecord]:
"""
Expand Down
81 changes: 75 additions & 6 deletions tests/core/full_node/stores/test_coin_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from chia.types.coin_record import CoinRecord
from chia.types.full_block import FullBlock
from chia.types.generator_types import BlockGenerator
from chia.util.db_wrapper import SQLITE_MAX_VARIABLE_NUMBER
from chia.util.generator_tools import tx_removals_and_additions
from chia.util.hash import std_hash
from chia.util.ints import uint32, uint64
Expand Down Expand Up @@ -505,7 +504,7 @@ async def test_coin_state_batches(db_version: int) -> None:
hints: List[Tuple[bytes32, bytes]] = []

# Make sure the count is even so we can filter by spent and unspent evenly.
count = SQLITE_MAX_VARIABLE_NUMBER * 2
count = 50000

for i in range(count):
is_spent = i % 2 == 0
Expand Down Expand Up @@ -549,18 +548,26 @@ async def test_coin_state_batches(db_version: int) -> None:
async def sync_states(*, include_spent: bool, include_unspent: bool, include_hinted: bool) -> List[CoinState]:
height = uint32(0)
all_coin_states: List[CoinState] = []
is_finished = False
remaining_phs = puzzle_hashes.copy()

while len(remaining_phs) > 0:
(coin_states, remaining_phs, height) = await coin_store.batch_coin_states_by_puzzle_hashes(
remaining_phs,
start_height=height,
while not is_finished:
(coin_states, height, is_finished) = await coin_store.batch_coin_states_by_puzzle_hashes(
remaining_phs[:15000],
min_height=height,
include_spent=include_spent,
include_unspent=include_unspent,
include_hinted=include_hinted,
)
all_coin_states += coin_states

if is_finished:
remaining_phs = remaining_phs[15000:]

if len(remaining_phs) > 0:
height = uint32(0)
is_finished = False

return all_coin_states

# Make sure all of the coin states are found when batching.
Expand Down Expand Up @@ -589,6 +596,68 @@ async def sync_states(*, include_spent: bool, include_unspent: bool, include_hin
assert len(all_coin_states) == 0


@pytest.mark.anyio
async def test_batch_many_coin_states(db_version: int) -> None:
async with DBConnection(db_version) as db_wrapper:
ph = bytes32(b"0" * 32)

# Generate coin records.
coin_records: List[CoinRecord] = []
count = 50000

for i in range(count):
created_height = uint32(i % 2 + 100)
coin = Coin(
std_hash(b"Parent Coin Id " + i.to_bytes(4, byteorder="big")),
ph,
uint64(i),
)
coin_records.append(
CoinRecord(
coin=coin,
confirmed_block_index=created_height,
spent_block_index=uint32(0),
coinbase=False,
timestamp=uint64(0),
)
)

# Initialize coin and hint stores.
coin_store = await CoinStore.create(db_wrapper)
await HintStore.create(db_wrapper)

await coin_store._add_coin_records(coin_records)

# Make sure all of the coin states are found.
(all_coin_states, next_height, is_finished) = await coin_store.batch_coin_states_by_puzzle_hashes([ph])
all_coin_states.sort(key=lambda cs: cs.coin.amount)

assert is_finished
assert next_height == 0
assert len(all_coin_states) == len(coin_records)

for i in range(min(len(coin_records), len(all_coin_states))):
assert coin_records[i].coin.name().hex() == all_coin_states[i].coin.name().hex(), i

await coin_store._add_coin_records(
[
CoinRecord(
coin=Coin(std_hash(b"extra coin"), ph, 0),
confirmed_block_index=uint32(50),
spent_block_index=uint32(0),
coinbase=False,
timestamp=uint64(0),
)
]
)

(all_coin_states, next_height, is_finished) = await coin_store.batch_coin_states_by_puzzle_hashes([ph])

assert not is_finished
assert next_height == 101
assert len(all_coin_states) == 50000


@pytest.mark.anyio
async def test_unsupported_version() -> None:
with pytest.raises(RuntimeError, match="CoinStore does not support database schema v1"):
Expand Down

0 comments on commit 16bbe9e

Please sign in to comment.