Warning
Only intended to serve as a simple POC
Example pipeline to ingest RIS Live BGP messages into Clickhouse using Vector
- Clickhouse
- Python3
- Vector
Setup tables and materialized view:
clickhouse-client --queries-file ./sql/clickhouse.sql
Ingest and process messages:
python ris_live.py | vector --config-toml ./vector/vector.toml
WITH
flatten(announcements.prefixes) AS prefixes,
has(prefixes, '1.0.0.0/24') AS found
SELECT
host,
timestamp,
originating_asn,
path,
announcements.prefixes,
announcements.next_hop
FROM ris_live
WHERE (found = 1) AND (timestamp > (now() - toIntervalHour(1)))
ORDER BY
timestamp DESC,
host ASC
LIMIT 20;
WITH
flatten(announcements.prefixes) AS prefixes,
arrayExists(prefix -> isIPAddressInRange('1.1.1.1', prefix), prefixes) AS found
SELECT count() AS c
FROM ris_live
WHERE found = 1;
SELECT
originating_asn,
count() AS c
FROM ris_live
WHERE (originating_asn != 0) AND (host = 'rrc12.ripe.net') AND (timestamp > (now() - toIntervalMinute(10)))
GROUP BY originating_asn
ORDER BY c DESC
LIMIT 20;
SELECT
arrayJoin(arrayJoin(announcements.prefixes)) AS prefix,
count() AS c
FROM ris_live
WHERE timestamp > (now() - toIntervalMinute(10)) AND (host = 'rrc01.ripe.net')
GROUP BY prefix
ORDER BY c DESC
LIMIT 20;