Skip to content

Commit

Permalink
add CACHE_ALL_TX_HASHES setting to optionally use more memory to sa…
Browse files Browse the repository at this point in the history
…ve i/o
  • Loading branch information
jackrobison committed Oct 16, 2021
1 parent b2de89c commit ba5312c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 15 deletions.
16 changes: 11 additions & 5 deletions lbry/wallet/server/block_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,7 @@ def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_nu

if signing_channel:
raw_channel_tx = self.db.prefix_db.tx.get(
self.db.prefix_db.tx_hash.get(
signing_channel.tx_num, deserialize_value=False
), deserialize_value=False
self.db.get_tx_hash(signing_channel.tx_num), deserialize_value=False
)
channel_pub_key_bytes = None
try:
Expand Down Expand Up @@ -1501,6 +1499,9 @@ def advance_block(self, block):
self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name)
self.pending_transactions[tx_count] = tx_hash
self.pending_transaction_num_mapping[tx_hash] = tx_count
if self.env.cache_all_tx_hashes:
self.db.total_transactions.append(tx_hash)
self.db.tx_num_mapping[tx_hash] = tx_count
tx_count += 1

# handle expired claims
Expand Down Expand Up @@ -1608,7 +1609,12 @@ async def backup_block(self):
self.db.headers.pop()
self.db.tx_counts.pop()
self.tip = self.coin.header_hash(self.db.headers[-1])
self.tx_count = self.db.tx_counts[-1]
if self.env.cache_all_tx_hashes:
while len(self.db.total_transactions) > self.db.tx_counts[-1]:
self.db.tx_num_mapping.pop(self.db.total_transactions.pop())
self.tx_count -= 1
else:
self.tx_count = self.db.tx_counts[-1]
self.height -= 1
# self.touched can include other addresses which is
# harmless, but remove None.
Expand Down Expand Up @@ -1659,7 +1665,7 @@ def get_pending_tx_num(self, tx_hash: bytes) -> int:
if tx_hash in self.pending_transaction_num_mapping:
return self.pending_transaction_num_mapping[tx_hash]
else:
return self.db.prefix_db.tx_num.get(tx_hash).tx_num
return self.db.get_tx_num(tx_hash)

def spend_utxo(self, tx_hash: bytes, nout: int):
hashX, amount = self.utxo_cache.pop((tx_hash, nout), (None, None))
Expand Down
1 change: 1 addition & 0 deletions lbry/wallet/server/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, coin=None):
self.anon_logs = self.boolean('ANON_LOGS', False)
self.log_sessions = self.integer('LOG_SESSIONS', 3600)
self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False)
self.cache_all_tx_hashes = True #self.boolean('CACHE_ALL_TX_HASHES', False)
self.country = self.default('COUNTRY', 'US')
# Peer discovery
self.peer_discovery = self.peer_discovery_enum()
Expand Down
49 changes: 39 additions & 10 deletions lbry/wallet/server/leveldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def __init__(self, env):
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)

self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server")
self.total_transactions: List[bytes] = []
self.tx_num_mapping: Dict[bytes, int] = {}

self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
Expand Down Expand Up @@ -201,7 +203,7 @@ def _prepare_resolve_result(self, tx_num: int, position: int, claim_hash: bytes,
normalized_name = name
controlling_claim = self.get_controlling_claim(normalized_name)

tx_hash = self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)
tx_hash = self.get_tx_hash(tx_num)
height = bisect_right(self.tx_counts, tx_num)
created_height = bisect_right(self.tx_counts, root_tx_num)
last_take_over_height = controlling_claim.height
Expand Down Expand Up @@ -462,7 +464,7 @@ def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]
def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]:
expired = {}
for k, v in self.prefix_db.claim_expiration.iterate(prefix=(height,)):
tx_hash = self.prefix_db.tx_hash.get(k.tx_num, deserialize_value=False)
tx_hash = self.get_tx_hash(k.tx_num)
tx = self.coin.transaction(self.prefix_db.tx.get(tx_hash, deserialize_value=False))
# treat it like a claim spend so it will delete/abandon properly
# the _spend_claim function this result is fed to expects a txi, so make a mock one
Expand Down Expand Up @@ -527,7 +529,7 @@ def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
if not reposted_claim:
return
reposted_metadata = self.get_claim_metadata(
self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False), reposted_claim.position
self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position
)
if not reposted_metadata:
return
Expand All @@ -541,7 +543,7 @@ def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
reposted_fee_currency = None
reposted_duration = None
if reposted_claim:
reposted_tx_hash = self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False)
reposted_tx_hash = self.get_tx_hash(reposted_claim.tx_num)
raw_reposted_claim_tx = self.prefix_db.tx.get(reposted_tx_hash, deserialize_value=False)
try:
reposted_claim_txo = self.coin.transaction(
Expand Down Expand Up @@ -793,6 +795,21 @@ def get_headers():
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
self.headers = headers

async def _read_tx_hashes(self):
def _read_tx_hashes():
return list(self.prefix_db.tx_hash.iterate(include_key=False, fill_cache=False, deserialize_value=False))

self.logger.info("loading tx hashes")
self.total_transactions.clear()
self.tx_num_mapping.clear()
start = time.perf_counter()
self.total_transactions.extend(await asyncio.get_event_loop().run_in_executor(None, _read_tx_hashes))
self.tx_num_mapping = {
tx_hash: tx_num for tx_num, tx_hash in enumerate(self.total_transactions)
}
ts = time.perf_counter() - start
self.logger.info("loaded %i tx hashes in %ss", len(self.total_transactions), round(ts, 4))

def estimate_timestamp(self, height: int) -> int:
if height < len(self.headers):
return struct.unpack('<I', self.headers[height][100:104])[0]
Expand Down Expand Up @@ -837,13 +854,25 @@ async def open_dbs(self):
await self._read_tx_counts()
await self._read_headers()
await self._read_claim_txos()
if self.env.cache_all_tx_hashes:
await self._read_tx_hashes()

# start search index
await self.search_index.start()

def close(self):
self.prefix_db.close()

def get_tx_hash(self, tx_num: int) -> bytes:
if self.env.cache_all_tx_hashes:
return self.total_transactions[tx_num]
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)

def get_tx_num(self, tx_hash: bytes) -> int:
if self.env.cache_all_tx_hashes:
return self.tx_num_mapping[tx_hash]
return self.prefix_db.tx_num.get(tx_hash).tx_num

# Header merkle cache

async def populate_header_merkle_cache(self):
Expand Down Expand Up @@ -900,7 +929,7 @@ def fs_tx_hash(self, tx_num):
if tx_height > self.db_height:
return None, tx_height
try:
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), tx_height
return self.get_tx_hash(tx_num), tx_height
except IndexError:
self.logger.exception(
"Failed to access a cached transaction, known bug #3142 "
Expand Down Expand Up @@ -964,13 +993,13 @@ def read_history(self, hashX: bytes, limit: int = 1000) -> List[Tuple[bytes, int
txs = []
txs_extend = txs.extend
for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False):
txs_extend([
(self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), bisect_right(self.tx_counts, tx_num))
for tx_num in hist
])
txs_extend(hist)
if len(txs) >= limit:
break
return txs
return [
(self.get_tx_hash(tx_num), bisect_right(self.tx_counts, tx_num))
for tx_num in txs
]

async def limited_history(self, hashX, *, limit=1000):
"""Return an unpruned, sorted list of (tx_hash, height) tuples of
Expand Down

0 comments on commit ba5312c

Please sign in to comment.