Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jackrobison committed Jul 13, 2020
1 parent 01f4916 commit 2c12bd9
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
49 changes: 30 additions & 19 deletions lbry/wallet/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@ async def update_history(self, address, remote_status, address_manager: AddressM
remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history))
we_need = set(remote_history) - set(local_history)
if not we_need:
remote_missing = set(local_history) - set(remote_history)
if remote_missing:
log.warning(
"%i transactions we have for %s are not in the remote address history",
len(remote_missing), address
)
return True

acquire_lock_tasks = []
Expand All @@ -511,18 +517,20 @@ async def update_history(self, address, remote_status, address_manager: AddressM
updated_cached_items = {}
already_synced = set()

already_synced_offset = 0
for i, (txid, remote_height) in enumerate(remote_history):
if not acquire_lock_tasks and i < len(local_history) and local_history[i] == (txid, remote_height):
if i == already_synced_offset and i < len(local_history) and local_history[i] == (txid, remote_height):
pending_synced_history[i] = f'{txid}:{remote_height}:'
already_synced.add((txid, remote_height))
already_synced_offset += 1
continue
cache_item = self._tx_cache.get(txid)
if cache_item is None:
cache_item = TransactionCacheItem()
self._tx_cache[txid] = cache_item
if len(acquire_lock_tasks) > 10000:
await asyncio.wait(acquire_lock_tasks)
acquire_lock_tasks.clear()

for txid, remote_height in remote_history[already_synced_offset:]:
cache_item = self._tx_cache[txid]
acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire()))

if acquire_lock_tasks:
Expand Down Expand Up @@ -576,11 +584,9 @@ async def update_history(self, address, remote_status, address_manager: AddressM
log.warning("lock was already released?")
pass

log.info("updating address count and status")
await self.db.save_transaction_io_batch(
[], address, self.address_to_hash160(address), synced_history
)
log.info("updated address count and status")

if address_manager is None:
address_manager = await self.get_address_manager_for_address(address)
Expand Down Expand Up @@ -623,7 +629,7 @@ async def update_history(self, address, remote_status, address_manager: AddressM
self._known_addresses_out_of_sync.add(address)
return False
else:
log.info("synced %s", address)
log.info("finished syncing transaction history for %s, %i known txs", address, len(local_history))
return True

async def cache_transaction(self, txid, remote_height, check_local=True):
Expand Down Expand Up @@ -682,14 +688,16 @@ async def _request_transaction_batch(self, to_request, remote_history_size, addr
batches = [[]]
remote_heights = {}
synced_txs = []
heights_in_batch = 1
heights_in_batch = 0
last_height = 0
for idx in sorted(to_request):
txid = to_request[idx][0]
height = to_request[idx][1]
remote_heights[txid] = height
if idx > 1 and height != remote_heights[batches[-1][-1]]:
if height != last_height:
heights_in_batch += 1
if len(batches[-1]) == 100 or heights_in_batch == 10:
last_height = height
if len(batches[-1]) == 100 or heights_in_batch == 20:
batches.append([])
heights_in_batch = 1
batches[-1].append(txid)
Expand Down Expand Up @@ -728,10 +736,8 @@ async def _single_batch(batch):
continue
cache_item = self._tx_cache.get(txi.txo_ref.tx_ref.id)
if cache_item is not None:
if cache_item.tx is None:
await cache_item.has_tx.wait()
assert cache_item.tx is not None
txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref
if cache_item.tx is not None:
txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref
else:
check_db_for_txos.append(txi.txo_ref.id)

Expand All @@ -740,12 +746,20 @@ async def _single_batch(batch):
txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True
)
}

for txi in tx.inputs:
if txi.txo_ref.txo is not None:
continue
referenced_txo = referenced_txos.get(txi.txo_ref.id)
if referenced_txo is not None:
txi.txo_ref = referenced_txo.ref
continue
cache_item = self._tx_cache.get(txi.txo_ref.id)
if cache_item is None:
cache_item = self._tx_cache[txi.txo_ref.id] = TransactionCacheItem()
if cache_item.tx is not None:
txi.txo_ref = cache_item.tx.ref

synced_txs.append(tx)
this_batch_synced.append(tx)
await self.db.save_transaction_io_batch(
Expand All @@ -759,11 +773,8 @@ async def _single_batch(batch):
if last_showed_synced_count + 100 < len(synced_txs):
log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address)
last_showed_synced_count = len(synced_txs)

await asyncio.wait(
[_single_batch(batch) for batch in batches]
)
log.info("finished syncing history for %s", address)
for batch in batches:
await _single_batch(batch)
return synced_txs

async def get_address_manager_for_address(self, address) -> Optional[AddressManager]:
Expand Down
13 changes: 10 additions & 3 deletions tests/unit/wallet/test_ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,17 @@ async def get_transaction(self, tx_hash, _=None):

async def get_transaction_and_merkle(self, tx_hash, known_height=None):
tx = await self.get_transaction(tx_hash)
merkle = {}
merkle = {'block_height': -1}
if known_height:
merkle = await self.get_merkle(tx_hash, known_height)
return tx, merkle

async def get_transaction_batch(self, txids):
return {
txid: await self.get_transaction_and_merkle(txid)
for txid in txids
}


class LedgerTestCase(AsyncioTestCase):

Expand Down Expand Up @@ -120,8 +126,9 @@ async def test_update_history(self):

self.ledger.network.get_history_called = []
self.ledger.network.get_transaction_called = []
for cache_item in self.ledger._tx_cache.values():
cache_item.tx.is_verified = True
self.assertFalse(self.ledger._tx_cache[txid1].tx.is_verified)
self.assertFalse(self.ledger._tx_cache[txid2].tx.is_verified)
self.assertFalse(self.ledger._tx_cache[txid3].tx.is_verified)
await self.ledger.update_history(address, '')
self.assertListEqual(self.ledger.network.get_history_called, [address])
self.assertListEqual(self.ledger.network.get_transaction_called, [])
Expand Down

0 comments on commit 2c12bd9

Please sign in to comment.