Skip to content

Commit

Permalink
[feature] use subscriptions for tokens
Browse files Browse the repository at this point in the history
- removes fetching of last 100000 blocks of token transfers from
the wallet pull loop
- fetches the last 100000 blocks of token transfers at startup
- replaces pulling by subscriptions to ethlogs for token transfers
  • Loading branch information
yenda committed May 16, 2019
1 parent 4f4cc91 commit f9a126e
Show file tree
Hide file tree
Showing 14 changed files with 236 additions and 254 deletions.
2 changes: 1 addition & 1 deletion STATUS_GO_SHA256
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
## DO NOT EDIT THIS FILE BY HAND. USE `scripts/update-status-go.sh <tag>` instead

0cj202bj2rwfrw327gibj8hj8i94ciyp3hkq2hck9l6711qlhpnb
0049i6znvl45hc651bqyzwgmzlv0fp40maggfjsrv13q5avd0g6d
2 changes: 1 addition & 1 deletion STATUS_GO_VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
## DO NOT EDIT THIS FILE BY HAND. USE `scripts/update-status-go.sh <tag>` instead

v0.25.0-beta.0
v0.25.0-beta.1
2 changes: 1 addition & 1 deletion src/status_im/accounts/login/core.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
[status-im.chaos-mode.core :as chaos-mode]
[status-im.data-store.core :as data-store]
[status-im.ethereum.subscriptions :as ethereum.subscriptions]
[status-im.ethereum.transactions.core :as transactions]
[status-im.fleet.core :as fleet]
[status-im.i18n :as i18n]
[status-im.models.transactions :as transactions]
[status-im.models.wallet :as models.wallet]
[status-im.native-module.core :as status]
[status-im.node.core :as node]
Expand Down
10 changes: 5 additions & 5 deletions src/status_im/accounts/logout/core.cljs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
(ns status-im.accounts.logout.core
(:require [re-frame.core :as re-frame]
[status-im.chaos-mode.core :as chaos-mode]
[status-im.ethereum.transactions.core :as transactions]
[status-im.i18n :as i18n]
[status-im.transport.core :as transport]
[status-im.utils.fx :as fx]
[status-im.models.transactions :as transactions]
[status-im.node.core :as node]
[status-im.init.core :as init]
[status-im.chaos-mode.core :as chaos-mode]))
[status-im.node.core :as node]
[status-im.transport.core :as transport]
[status-im.utils.fx :as fx]))

(fx/defn logout
[{:keys [db] :as cofx}]
Expand Down
203 changes: 188 additions & 15 deletions src/status_im/ethereum/subscriptions.cljs
Original file line number Diff line number Diff line change
@@ -1,32 +1,76 @@
(ns status-im.ethereum.subscriptions
(:require [clojure.string :as string]
[re-frame.core :as re-frame]
[status-im.constants :as constants]
[status-im.ethereum.decode :as decode]
[status-im.native-module.core :as status]
[status-im.utils.ethereum.core :as ethereum]
[status-im.utils.ethereum.tokens :as tokens]
[status-im.utils.fx :as fx]
[status-im.utils.types :as types]
[taoensso.timbre :as log]))

(defn get-block-by-hash [block-hash callback]
;; NOTE: this is the safe block range that can be
;; queried from infura rpc gateway without getting timeouts
;; determined experimentally by @goranjovic
(def block-query-limit 100000)

(defn get-latest-block [callback]
(status/call-private-rpc
(.stringify js/JSON (clj->js {:jsonrpc "2.0"
:id 1
:method "eth_getBlockByHash"
:params [block-hash false]}))
(types/json->clj {:jsonrpc "2.0"
:id 1
:method "eth_blockNumber"
:params []})
(fn [response]
(if (string/blank? response)
(log/warn :web3-response-error)
(callback (-> (.parse js/JSON response)
(js->clj :keywordize-keys true)
:result
:number
decode/uint))))))

(defn get-block-by-hash [block-hash callback]
(status/call-private-rpc
(types/json->clj {:jsonrpc "2.0"
:id 1
:method "eth_getBlockByHash"
:params [block-hash false]})
(fn [response]
(if (string/blank? response)
(log/warn :web3-response-error)
(callback (-> (.parse js/JSON response)
(js->clj :keywordize-keys true)
:result
(update :number decode/uint)
(update :timestamp decode/uint)))))))

(defn- get-token-transfer-logs
[from-block {:keys [chain-tokens direction from to]} callback]
(status/call-private-rpc
(types/json->clj {:jsonrpc "2.0"
:id 2
:method "eth_getLogs"
:params
[{:address (keys chain-tokens)
:fromBlock from-block
:topics [constants/event-transfer-hash from to]}]})
(fn [response]
(if (string/blank? response)
(log/warn :web3-response-error)
(callback (-> (.parse js/JSON response)
(js->clj :keywordize-keys true)
:result))))))

(fx/defn handle-signal
[cofx {:keys [subscription_id data] :as event}]
(if-let [handler (get-in cofx [:db :ethereum/subscriptions subscription_id])]
(handler data)
(log/warn ::unknown-subscription :event event)))

(fx/defn handle-error
[cofx {:keys [subscription_id data] :as event}]
(log/error ::error event))

(fx/defn register-subscription
[{:keys [db]} id handler]
{:db (assoc-in db [:ethereum/subscriptions id] handler)})
Expand All @@ -38,10 +82,10 @@
(defn subscribe-signal
[filter params callback]
(status/call-private-rpc
(.stringify js/JSON (clj->js {:jsonrpc "2.0"
:id 1
:method "eth_subscribeSignal"
:params [filter, params]}))
(types/clj->json {:jsonrpc "2.0"
:id 1
:method "eth_subscribeSignal"
:params [filter params]})
(fn [response]
(if (string/blank? response)
(log/error ::subscription-unknown-error :filter filter :params params)
Expand All @@ -54,22 +98,151 @@
result
callback])))))))

(defn- add-padding [address]
{:pre [(string? address)]}
(str "0x000000000000000000000000" (subs address 2)))

(defn- remove-padding [topic]
{:pre [(string? topic)]}
(str "0x" (subs topic 26)))

(defn- parse-transaction-entries [timestamp chain-tokens direction transfers]
{:pre [(integer? timestamp)
(map? chain-tokens)
(every? (fn [[k v]] (and (string? k) (map? v))) chain-tokens)
(keyword? direction)
(every? map? transfers)]}
(into {}
(keep identity
(for [transfer transfers]
(when-let [token (->> transfer :address (get chain-tokens))]
(when-not (:nft? token)
[(:transactionHash transfer)
{:block (str (-> transfer :blockNumber ethereum/hex->bignumber))
:hash (:transactionHash transfer)
:symbol (:symbol token)
:from (some-> transfer :topics second remove-padding)
:to (some-> transfer :topics last remove-padding)
:value (-> transfer :data ethereum/hex->bignumber)
:type direction
:gas-price nil
:nonce nil
:data nil
:gas-limit nil
:timestamp (str (* timestamp 1000))
:gas-used nil
;; NOTE(goranjovic) - metadata on the type of token: contains name, symbol, decimas, address.
:token token
;; NOTE(goranjovic) - if an event has been emitted, we can say there was no error
:error? false
;; NOTE(goranjovic) - just a flag we need when we merge this entry with the existing entry in
;; the app, e.g. transaction info with gas details, or a previous transfer entry with old
;; confirmations count.
:transfer true}]))))))

(letfn [(combine-entries [transaction token-transfer]
(merge transaction (select-keys token-transfer [:symbol :from :to :value :type :token :transfer])))
(tx-and-transfer? [tx1 tx2]
(and (not (:transfer tx1)) (:transfer tx2)))
(both-transfer?
[tx1 tx2]
(and (:transfer tx1) (:transfer tx2)))]
(defn- dedupe-transactions [tx1 tx2]
(cond (tx-and-transfer? tx1 tx2) (combine-entries tx1 tx2)
(tx-and-transfer? tx2 tx1) (combine-entries tx2 tx1)
:else tx2)))

(fx/defn new-transactions
[{:keys [db]} transactions]
{:db (update-in db
[:wallet :transactions]
#(merge-with dedupe-transactions % transactions))})

(defn transactions-handler
[{:keys [chain-tokens from to direction]}]
(fn [transfers]
(let [transfers-by-block (group-by :blockHash transfers)]
(doseq [[block-hash block-transfers] transfers-by-block]
(get-block-by-hash
block-hash
(fn [{:keys [timestamp]}]
(let [transactions (parse-transaction-entries timestamp
chain-tokens
direction
block-transfers)]
(when (not-empty transactions)
(re-frame/dispatch [:ethereum.signal/new-transactions
transactions])))))))))

;; Here we are querying event logs for Transfer events.
;;
;; The parameters are as follows:
;; - address - token smart contract address
;; - fromBlock - we need to specify it, since default is latest
;; - topics[0] - hash code of the Transfer event signature
;; - topics[1] - address of token sender with leading zeroes padding up to 32 bytes
;; - topics[2] - address of token sender with leading zeroes padding up to 32 bytes
(defn new-token-transaction-filter
[{:keys [chain-tokens from to] :as args}]
(subscribe-signal
"eth_newFilter"
[{:fromBlock "latest"
:toBlock "latest"
:address (keys chain-tokens)
:topics [constants/event-transfer-hash from to]}]
(transactions-handler args)))

(defn new-block-filter
[]
(subscribe-signal
"eth_newBlockFilter" []
(fn [[block-hash]]
(get-block-by-hash
block-hash
(fn [block-number]
(when block-number
(fn [block]
(when-let [block-number (:number block)]
(re-frame/dispatch [:ethereum.signal/new-block
block-number])))))))

(defn get-from-block
[current-block-number]
(-> current-block-number
(- block-query-limit)
(max 0)
ethereum/int->hex))

(re-frame/reg-fx
:ethereum.subscriptions/token-transactions
(fn [{:keys [address] :as args}]
(let [inbound-args (merge args
{:direction :inbound
:to address})
outbound-args (merge args
{:direction :outbound
:from address})]
;; fetch 2 weeks of history until transactions are persisted
(get-latest-block
(fn [current-block-number]
(let [from-block (get-from-block current-block-number)]
(get-token-transfer-logs from-block inbound-args
(transactions-handler inbound-args))
(get-token-transfer-logs from-block outbound-args
(transactions-handler outbound-args)))))
;; start inbound and outbound token transaction subscriptions
(new-token-transaction-filter inbound-args)
(new-token-transaction-filter outbound-args))))

(re-frame/reg-fx
:ethereum.subscriptions/new-block-filter
:ethereum.subscriptions/new-block
new-block-filter)

(fx/defn initialize
[cofx]
{:ethereum.subscriptions/new-block-filter nil})
[{:keys [db] :as cofx}]
(let [{:keys [:account/account :wallet/all-tokens network]} db
chain (ethereum/network->chain-keyword (get-in account [:networks network]))
chain-tokens (into {} (map (juxt :address identity)
(tokens/tokens-for all-tokens chain)))
padded-address (add-padding (ethereum/normalized-address (:address account)))]
{:ethereum.subscriptions/new-block nil
:ethereum.subscriptions/token-transactions {:chain-tokens chain-tokens
:address padded-address}}))
Loading

0 comments on commit f9a126e

Please sign in to comment.