Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hub (wallet server) now uses Elasticsearch to handle claim_search and resolve requests for added performance and reliability #3153

Merged
merged 104 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
3abdc01
index ES during sync
shyba Jan 17, 2021
488785d
add indexer task
shyba Jan 17, 2021
996686c
claim search and resolve translated to ES queries
shyba Jan 19, 2021
dc10f8c
ignore errors when deleting
shyba Jan 19, 2021
aa37faa
use porter analyzer with weights on full text search
shyba Jan 19, 2021
9924b7b
reposts and tag inheritance
shyba Jan 19, 2021
90106f5
all test_claim_commands tests green
shyba Jan 20, 2021
1870f30
add sync script
shyba Jan 20, 2021
edfd707
run ES on github actions
shyba Jan 20, 2021
6f5f8e5
add elasticsearch dep
shyba Jan 20, 2021
ab53cec
fix is_controlling sync
shyba Jan 20, 2021
1098f0d
use normalized name instead
shyba Jan 20, 2021
8b91b38
update winners in one go
shyba Jan 20, 2021
143d82d
normalized, not normalized_name
shyba Jan 20, 2021
ee7b37d
also normalize the name supplied by user
shyba Jan 20, 2021
82eec3d
use multiple clients on sync script indexing
shyba Jan 25, 2021
1010068
disable refresh interval. start with 3 shards
shyba Jan 25, 2021
0c6eaf5
fix resolve partial id
shyba Jan 27, 2021
78a9bad
no indexer_task
shyba Jan 27, 2021
dd950f5
tag can have empty space
shyba Jan 27, 2021
119e519
fix partial id
shyba Jan 27, 2021
e2441ea
use prefix from ES docs
shyba Jan 27, 2021
7295b7e
make sync parallel
shyba Jan 27, 2021
146b693
exclude title and description
shyba Jan 27, 2021
f9471f2
apply filter and block from ES script lang
shyba Jan 29, 2021
9989d8d
refresh after delete
shyba Jan 29, 2021
9a9df2f
apply filtering only to whats unfiltered
shyba Jan 29, 2021
9b56067
raise request timeout for content filtering
shyba Jan 29, 2021
5bc1a66
32 slices and add censor type to fields
shyba Jan 29, 2021
7674a0a
backport fixes from testing server
shyba Jan 30, 2021
0f2a85b
simplify sync
shyba Jan 30, 2021
8e68ba4
fix join, refresh before update
shyba Jan 31, 2021
d467dcf
increase sync queue
shyba Jan 31, 2021
84ff0b8
general timeout
shyba Jan 31, 2021
0cf9533
narrow update by query
shyba Jan 31, 2021
7b4838f
dont update more than 400 items a time
shyba Jan 31, 2021
0929088
missing refresh step
shyba Jan 31, 2021
e4d06a0
include the channel being filtered/blocked
shyba Jan 31, 2021
d4bf004
use a thread pool to sync changes
shyba Jan 31, 2021
afe7ed5
adjust size
shyba Jan 31, 2021
19f70d7
create changelog trigger
shyba Jan 31, 2021
e439a3a
advanced resolve
shyba Jan 31, 2021
ec9a3a4
do not page filtered
shyba Jan 31, 2021
1e53317
fix some of the tests
shyba Feb 1, 2021
e61874b
only repeat search if it has blocked items
shyba Feb 1, 2021
bf44bef
backport fixes from server
shyba Feb 2, 2021
dd412c0
delete sqlite fts
shyba Feb 3, 2021
87037c0
remove reader code
shyba Feb 3, 2021
5d3704c
reader mode
shyba Feb 4, 2021
038a5f9
cache encoded headers
shyba Feb 4, 2021
1ce328e
cache signature inspection
shyba Feb 6, 2021
e21f236
apply reorg deletion as well
shyba Feb 9, 2021
dfca153
claim id is also a keyword
shyba Feb 9, 2021
8d028ad
be a writer by default
shyba Feb 9, 2021
0a194b5
claim_ids query
shyba Feb 10, 2021
e12fab9
docker compose update
shyba Feb 12, 2021
9251c87
refresh after sync
shyba Feb 12, 2021
24d11de
torba-elastic-sync
jackrobison Feb 12, 2021
6781700
check ES synced without a process and wait for ES
shyba Feb 12, 2021
d9c7468
pin python3.7
shyba Feb 13, 2021
da8a8bd
filter+fts and tests for edge cases
shyba Feb 13, 2021
a9a0ac9
ignore unset flag
shyba Feb 13, 2021
a916c1f
check if db file exists before sync
shyba Feb 16, 2021
ec89bca
improve sync script for no-downtime maintenance
shyba Feb 17, 2021
920dad5
simplify sync and use asyncio Queue instead
shyba Feb 22, 2021
1949408
generate from queue
shyba Feb 22, 2021
d388527
log indexing errors
shyba Feb 22, 2021
1783ff2
dont delete claims on reorg
shyba Feb 23, 2021
bd8f371
bump referenced rows query limit up
shyba Feb 25, 2021
3254194
update dockerfile
shyba Feb 26, 2021
eb69242
round time to 10 minutes and fetch referenced by id
shyba Mar 2, 2021
5a9338a
use a dict on set_reference
shyba Mar 2, 2021
02eb789
caching for resolve
shyba Mar 2, 2021
319187d
log mempool task exceptions
shyba Mar 2, 2021
b1bb375
use right key on cache
shyba Mar 5, 2021
6b193ab
make indexing cooperative
shyba Mar 5, 2021
2641a9a
make better resolve cache
shyba Mar 5, 2021
57f1108
fix query being json serializable
shyba Mar 5, 2021
4d83d42
fix equality instead of mod
shyba Mar 5, 2021
f26394f
report deletions on docs that doesnt exist, but dont raise
shyba Mar 8, 2021
891b1e7
track results up to 200
shyba Mar 9, 2021
8f32303
apply search timeout
shyba Mar 9, 2021
c2e7b5a
restore some of the interrupt metrics
shyba Mar 9, 2021
20a5aec
fix lib exception to asyncio TimeoutError
shyba Mar 9, 2021
60a5940
cache the encoded output instead
shyba Mar 10, 2021
5dff02e
on resolve, get all claims at once
shyba Mar 11, 2021
063be00
cache inner parsing
shyba Mar 11, 2021
21e023f
fix search by channel
shyba Mar 11, 2021
c3e426c
fix search by channel for invalid channel
shyba Mar 11, 2021
6fa7da4
less slices
shyba Mar 12, 2021
6166a34
check cache item before locking
shyba Mar 12, 2021
73884b3
apply no_totals
shyba Mar 14, 2021
b81305a
index and allow has_source
shyba Mar 14, 2021
a3e146d
sort on index time
shyba Mar 14, 2021
6a35a7b
expand content filtering tests for no_totals
shyba Mar 15, 2021
cd66f7e
if not no_totals, use default page size
shyba Mar 15, 2021
d855e6c
move elasticsearch things into its own module
shyba Mar 15, 2021
ef97c9b
torba-server -> hub
shyba Mar 15, 2021
d09663c
remove flush call
shyba Mar 17, 2021
7f5d88e
remove dead/broken/unused API
shyba Mar 19, 2021
d47cf40
add reader.py for test_sqldb tests
shyba Mar 19, 2021
7df4cc4
fixes from review
shyba Mar 24, 2021
c6372ea
hub->lbry-hub
shyba Mar 24, 2021
5235a15
add prog name to sync arg parser
shyba Mar 24, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ jobs:
- blockchain
- other
steps:
- name: Configure sysctl limits
run: |
sudo swapoff -a
sudo sysctl -w vm.swappiness=1
sudo sysctl -w fs.file-max=262144
sudo sysctl -w vm.max_map_count=262144

- name: Runs Elasticsearch
uses: elastic/elastic-github-actions/elasticsearch@master
with:
stack-version: 7.6.0
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
Expand Down
6 changes: 4 additions & 2 deletions docker/Dockerfile.wallet_server
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ubuntu:20.04
FROM debian:10-slim

ARG user=lbry
ARG db_dir=/database
Expand All @@ -13,7 +13,9 @@ RUN apt-get update && \
wget \
tar unzip \
build-essential \
python3 \
pkg-config \
libleveldb-dev \
python3.7 \
python3-dev \
python3-pip \
python3-wheel \
Expand Down
48 changes: 26 additions & 22 deletions docker/docker-compose-wallet-server.yml
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
version: "3"

volumes:
lbrycrd:
wallet_server:
es01:

services:
lbrycrd:
image: lbry/lbrycrd:${LBRYCRD_TAG:-latest-release}
restart: always
ports: # accessible from host
- "9246:9246" # rpc port
expose: # internal to docker network. also this doesn't do anything. its for documentation only.
- "9245" # node-to-node comms port
volumes:
- "lbrycrd:/data/.lbrycrd"
environment:
- RUN_MODE=default
# Curently not snapshot provided
#- SNAPSHOT_URL=${LBRYCRD_SNAPSHOT_URL-https://lbry.com/snapshot/blockchain}
- RPC_ALLOW_IP=0.0.0.0/0
wallet_server:
depends_on:
- es01
image: lbry/wallet-server:${WALLET_SERVER_TAG:-latest-release}
depends_on:
- lbrycrd
restart: always
network_mode: host
ports:
- "50001:50001" # rpc port
- "50005:50005" # websocket port
#- "2112:2112" # uncomment to enable prometheus
- "2112:2112" # uncomment to enable prometheus
volumes:
- "wallet_server:/database"
env_file: [/home/lbry/wallet-server-env]
environment:
# Curently not snapshot provided
# - SNAPSHOT_URL=${WALLET_SERVER_SNAPSHOT_URL-https://lbry.com/snapshot/wallet}
- DAEMON_URL=http://lbry:lbry@lbrycrd:9245
- DAEMON_URL=http://lbry:lbry@127.0.0.1:9245
- TCP_PORT=50001
- PROMETHEUS_PORT=2112
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
container_name: es01
environment:
- node.name=es01
- discovery.type=single-node
- indices.query.bool.max_clause_count=4096
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms8g -Xmx8g" # no more than 32, remember to disable swap
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- es01:/usr/share/elasticsearch/data
ports:
- 127.0.0.1:9200:9200
4 changes: 3 additions & 1 deletion docker/wallet_server_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/claims.db ]]; then
rm "$filename"
fi

/home/lbry/.local/bin/torba-server "$@"
/home/lbry/.local/bin/lbry-hub-elastic-sync /database/claims.db
echo 'starting server'
/home/lbry/.local/bin/lbry-hub "$@"
81 changes: 35 additions & 46 deletions lbry/schema/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,57 +13,45 @@
BLOCKED = ErrorMessage.Code.Name(ErrorMessage.BLOCKED)


def set_reference(reference, claim_hash, rows):
if claim_hash:
for txo in rows:
if claim_hash == txo['claim_hash']:
reference.tx_hash = txo['txo_hash'][:32]
reference.nout = struct.unpack('<I', txo['txo_hash'][32:])[0]
reference.height = txo['height']
return
def set_reference(reference, txo_row):
if txo_row:
reference.tx_hash = txo_row['txo_hash'][:32]
reference.nout = struct.unpack('<I', txo_row['txo_hash'][32:])[0]
reference.height = txo_row['height']


class Censor:

shyba marked this conversation as resolved.
Show resolved Hide resolved
__slots__ = 'streams', 'channels', 'limit_claims_per_channel', 'censored', 'claims_in_channel', 'total'
NOT_CENSORED = 0
SEARCH = 1
RESOLVE = 2

def __init__(self, streams: dict = None, channels: dict = None, limit_claims_per_channel: int = None):
self.streams = streams or {}
self.channels = channels or {}
self.limit_claims_per_channel = limit_claims_per_channel # doesn't count as censored
__slots__ = 'censor_type', 'censored'

def __init__(self, censor_type):
self.censor_type = censor_type
self.censored = {}
self.claims_in_channel = {}
self.total = 0

def is_censored(self, row):
return (row.get('censor_type') or self.NOT_CENSORED) >= self.censor_type

def apply(self, rows):
return [row for row in rows if not self.censor(row)]

def censor(self, row) -> bool:
was_censored = False
for claim_hash, lookup in (
(row['claim_hash'], self.streams),
(row['claim_hash'], self.channels),
(row['channel_hash'], self.channels),
(row['reposted_claim_hash'], self.streams),
(row['reposted_claim_hash'], self.channels)):
censoring_channel_hash = lookup.get(claim_hash)
if censoring_channel_hash:
was_censored = True
self.censored.setdefault(censoring_channel_hash, 0)
self.censored[censoring_channel_hash] += 1
break
if was_censored:
self.total += 1
if not was_censored and self.limit_claims_per_channel is not None and row['channel_hash']:
self.claims_in_channel.setdefault(row['channel_hash'], 0)
self.claims_in_channel[row['channel_hash']] += 1
if self.claims_in_channel[row['channel_hash']] > self.limit_claims_per_channel:
return True
return was_censored

def to_message(self, outputs: OutputsMessage, extra_txo_rows):
outputs.blocked_total = self.total
if self.is_censored(row):
censoring_channel_hash = row['censoring_channel_hash']
self.censored.setdefault(censoring_channel_hash, set())
self.censored[censoring_channel_hash].add(row['tx_hash'])
return True
return False

def to_message(self, outputs: OutputsMessage, extra_txo_rows: dict):
for censoring_channel_hash, count in self.censored.items():
blocked = outputs.blocked.add()
blocked.count = count
set_reference(blocked.channel, censoring_channel_hash, extra_txo_rows)
blocked.count = len(count)
set_reference(blocked.channel, extra_txo_rows.get(censoring_channel_hash))
outputs.blocked_total += len(count)


class Outputs:
Expand Down Expand Up @@ -168,6 +156,7 @@ def to_base64(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked=None)

@classmethod
def to_bytes(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked: Censor = None) -> bytes:
extra_txo_rows = {row['claim_hash']: row for row in extra_txo_rows}
page = OutputsMessage()
page.offset = offset
if total is not None:
Expand All @@ -176,12 +165,12 @@ def to_bytes(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked: Censo
blocked.to_message(page, extra_txo_rows)
for row in txo_rows:
cls.row_to_message(row, page.txos.add(), extra_txo_rows)
for row in extra_txo_rows:
for row in extra_txo_rows.values():
cls.row_to_message(row, page.extra_txos.add(), extra_txo_rows)
return page.SerializeToString()

@classmethod
def row_to_message(cls, txo, txo_message, extra_txo_rows):
def row_to_message(cls, txo, txo_message, extra_row_dict: dict):
if isinstance(txo, Exception):
txo_message.error.text = txo.args[0]
if isinstance(txo, ValueError):
Expand All @@ -190,7 +179,7 @@ def row_to_message(cls, txo, txo_message, extra_txo_rows):
txo_message.error.code = ErrorMessage.NOT_FOUND
elif isinstance(txo, ResolveCensoredError):
txo_message.error.code = ErrorMessage.BLOCKED
set_reference(txo_message.error.blocked.channel, txo.censor_hash, extra_txo_rows)
set_reference(txo_message.error.blocked.channel, extra_row_dict.get(txo.censor_hash))
return
txo_message.tx_hash = txo['txo_hash'][:32]
txo_message.nout, = struct.unpack('<I', txo['txo_hash'][32:])
Expand All @@ -213,5 +202,5 @@ def row_to_message(cls, txo, txo_message, extra_txo_rows):
txo_message.claim.trending_mixed = txo['trending_mixed']
txo_message.claim.trending_local = txo['trending_local']
txo_message.claim.trending_global = txo['trending_global']
set_reference(txo_message.claim.channel, txo['channel_hash'], extra_txo_rows)
set_reference(txo_message.claim.repost, txo['reposted_claim_hash'], extra_txo_rows)
set_reference(txo_message.claim.channel, extra_row_dict.get(txo['channel_hash']))
set_reference(txo_message.claim.repost, extra_row_dict.get(txo['reposted_claim_hash']))
8 changes: 8 additions & 0 deletions lbry/schema/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ class PathSegment(NamedTuple):
def normalized(self):
return normalize_name(self.name)

@property
def is_shortid(self):
return self.claim_id is not None and len(self.claim_id) < 40
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use a constant instead of 40


@property
def is_fullid(self):
return self.claim_id is not None and len(self.claim_id) == 40

def to_dict(self):
q = {'name': self.name}
if self.claim_id is not None:
Expand Down
3 changes: 0 additions & 3 deletions lbry/wallet/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,6 @@ def unsubscribe_address(self, address):
def get_server_features(self):
return self.rpc('server.features', (), restricted=True)

def get_claims_by_ids(self, claim_ids):
return self.rpc('blockchain.claimtrie.getclaimsbyids', claim_ids)

def resolve(self, urls, session_override=None):
return self.rpc('blockchain.claimtrie.resolve', urls, False, session_override)

Expand Down
7 changes: 6 additions & 1 deletion lbry/wallet/orchstr8/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from binascii import hexlify
from typing import Type, Optional
import urllib.request
from uuid import uuid4

import lbry
from lbry.wallet.server.server import Server
Expand Down Expand Up @@ -187,7 +188,9 @@ async def start(self, blockchain_node: 'BlockchainNode', extraconf=None):
'SESSION_TIMEOUT': str(self.session_timeout),
'MAX_QUERY_WORKERS': '0',
'INDIVIDUAL_TAG_INDEXES': '',
'RPC_PORT': self.rpc_port
'RPC_PORT': self.rpc_port,
'ES_INDEX_PREFIX': uuid4().hex,
'ES_MODE': 'writer',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this currently used?

}
if extraconf:
conf.update(extraconf)
Expand All @@ -199,6 +202,8 @@ async def start(self, blockchain_node: 'BlockchainNode', extraconf=None):

async def stop(self, cleanup=True):
try:
await self.server.db.search_index.delete_index()
await self.server.db.search_index.stop()
await self.server.stop()
finally:
cleanup and self.cleanup()
Expand Down
3 changes: 3 additions & 0 deletions lbry/wallet/rpc/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@

# other_params: None means cannot be called with keyword arguments only
# any means any name is good
from functools import lru_cache
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should lbry.utils.LRUCache be used here instead? functools.lru_cache has a RLock


SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args '
'required_names other_names')


@lru_cache(256)
def signature_info(func):
params = inspect.signature(func).parameters
min_args = max_args = 0
Expand Down
39 changes: 17 additions & 22 deletions lbry/wallet/server/block_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import Optional
from prometheus_client import Gauge, Histogram
import lbry
from lbry.schema.claim import Claim
from lbry.wallet.server.db.writer import SQLDB
from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
Expand Down Expand Up @@ -215,6 +214,8 @@ async def check_and_advance_blocks(self, raw_blocks):
if hprevs == chain:
start = time.perf_counter()
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
if self.sql:
await self.db.search_index.claim_consumer(self.sql.claim_producer())
for cache in self.search_cache.values():
cache.clear()
self.history_cache.clear()
Expand All @@ -228,6 +229,9 @@ async def check_and_advance_blocks(self, raw_blocks):
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
if self._caught_up_event.is_set():
if self.sql:
await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels,
self.sql.filtered_streams, self.sql.filtered_channels)
await self.notifications.on_block(self.touched, self.height)
self.touched = set()
elif hprevs[0] != chain[0]:
Expand Down Expand Up @@ -282,7 +286,6 @@ def flush_backup():
await self.run_in_thread_with_lock(flush_backup)
last -= len(raw_blocks)

await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
await self.prefetcher.reset_height(self.height)
self.reorg_count_metric.inc()
except:
Expand Down Expand Up @@ -651,7 +654,11 @@ async def _process_prefetched_blocks(self):
self.reorg_count = 0
else:
blocks = self.prefetcher.get_prefetched_blocks()
await self.check_and_advance_blocks(blocks)
try:
await self.check_and_advance_blocks(blocks)
except Exception:
self.logger.exception("error while processing txs")
raise

async def _first_caught_up(self):
self.logger.info(f'caught up to height {self.height}')
Expand Down Expand Up @@ -782,15 +789,17 @@ def __init__(self, *args, **kwargs):
self.timer = Timer('BlockProcessor')

def advance_blocks(self, blocks):
self.sql.begin()
if self.sql:
self.sql.begin()
try:
self.timer.run(super().advance_blocks, blocks)
except:
self.logger.exception(f'Error while advancing transaction in new block.')
raise
finally:
self.sql.commit()
if self.db.first_sync and self.height == self.daemon.cached_height():
if self.sql:
self.sql.commit()
if self.sql and self.db.first_sync and self.height == self.daemon.cached_height():
self.timer.run(self.sql.execute, self.sql.SEARCH_INDEXES, timer_name='executing SEARCH_INDEXES')
if self.env.individual_tag_indexes:
self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES')
Expand All @@ -799,22 +808,8 @@ def advance_blocks(self, blocks):
def advance_txs(self, height, txs, header, block_hash):
timer = self.timer.sub_timers['advance_blocks']
undo = timer.run(super().advance_txs, height, txs, header, block_hash, timer_name='super().advance_txs')
timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True)
if self.sql:
timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True)
if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10):
self.timer.show(height=height)
return undo

def _checksig(self, value, address):
try:
claim_dict = Claim.from_bytes(value)
cert_id = claim_dict.signing_channel_hash
if not self.should_validate_signatures:
return cert_id
if cert_id:
cert_claim = self.db.get_claim_info(cert_id)
if cert_claim:
certificate = Claim.from_bytes(cert_claim.value)
claim_dict.validate_signature(address, certificate)
return cert_id
except Exception:
pass
Loading