forked from 0xBrian/0xBTCpay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.rb
110 lines (93 loc) · 2.57 KB
/
stream.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
require "faye/websocket"
require "json"
require "redis"
require "./config"
require "./log"
ADDR_0xBITCOIN = "0xb6ed7644c69416d67b522e20bc294a9a9b405b31"
TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
NEW_TRANSFERS = "new_transfers" # redis queue
class Stream
include Log
def initialize(url:)
@name = "ST" # for logging
@url = url
@redis = Redis.new
end
def start
@ws = Faye::WebSocket::Client.new(@url)
@subs = Hash.new
@waiting_subs = Hash.new
@ws.on :open do
log "connected."
do_pings
subscribe
end
@ws.on :message do |event|
process event.data
end
@ws.on :close do |event|
log "connection closed. #{event.code} #{event.reason}"
reconnect
end
end
def subscribe
log "subscribing."
@ws.send rpc(
"eth_subscribe",
["logs", {address: ADDR_0xBITCOIN, topics: [TRANSFER_TOPIC]}],
id: create_subscription(:_0xbitcoin_transfers)
)
end
def reconnect
EM.add_timer(1) { start }
end
def create_subscription(name)
new_id.tap { |id| @waiting_subs[id] = name }
end
def do_pings
EM.cancel_timer @timer if @timer
@timer = EM.add_periodic_timer(10) { @ws.send(rpc("eth_blockNumber", [])) }
end
def process(response_json)
r = JSON.parse(response_json, symbolize_names: true)
return on_confirm_sub(r) if waiting?(r[:id])
if r[:method] == "eth_subscription"
sub_id, result = parse(r)
return unless sub_id == :_0xbitcoin_transfers
return unless result[:topics].first == TRANSFER_TOPIC # paranoia
return unless result[:address].casecmp?(ADDR_0xBITCOIN) # paranoia
log "[#{sub_id.inspect}] " + result.inspect
@redis.publish NEW_TRANSFERS, result.to_json
end
end
def on_confirm_sub(r)
if parity_sub_id = r[:result]
sub_id = @waiting_subs[r[:id]]
@subs[parity_sub_id] = sub_id
@waiting_subs.delete r[:id]
log "subscribed #{sub_id.inspect} (#{parity_sub_id})"
log "all subscriptions confirmed" if @waiting_subs.empty?
else
raise "subscription failure: " + r.inspect
end
end
def waiting?(id)
@waiting_subs.include?(id)
end
def parse(r)
[@subs[r[:params][:subscription]], result = r[:params][:result]]
end
def new_id
@id_ = (@id_ || 0) + 1
end
def rpc(method, params, id: new_id)
{method: method, params: params, id: id, jsonrpc: 2.0}.to_json
end
end
begin
config = Config::load_file("config.yml")[:stream]
s = Stream.new(url: config[:provider])
EM.run { s.start }
rescue Interrupt
puts "quit."
end