Skip to content

Commit

Permalink
[feature] use new block signal to get new transactions
Browse files Browse the repository at this point in the history
- remove the transaction fetching loop entirely to rely only on subscription
for live transactions and token transfer updates
- fetch token transfers history via etherscan API to lift the 100000 blocks
limit on token transfers history

- inbound token transfers are catched via a filter on ethlogs
- outbound token transfers and other transactions are catched by filtering
transaction in current block that have the wallet address as to or from field
  • Loading branch information
yenda committed May 20, 2019
1 parent 3a327c2 commit 820b76e
Show file tree
Hide file tree
Showing 12 changed files with 480 additions and 612 deletions.
4 changes: 2 additions & 2 deletions src/status_im/accounts/login/core.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@
(fx/defn initialize-wallet [cofx]
(fx/merge cofx
(models.wallet/initialize-tokens)
(transactions/initialize)
(ethereum.subscriptions/initialize)
(models.wallet/update-wallet)
(transactions/start-sync)))
(models.wallet/update-wallet)))

(fx/defn user-login [{:keys [db] :as cofx} create-database?]
(let [{:keys [address password]} (accounts.db/credentials cofx)]
Expand Down
2 changes: 0 additions & 2 deletions src/status_im/accounts/logout/core.cljs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
(ns status-im.accounts.logout.core
(:require [re-frame.core :as re-frame]
[status-im.chaos-mode.core :as chaos-mode]
[status-im.ethereum.transactions.core :as transactions]
[status-im.i18n :as i18n]
[status-im.init.core :as init]
[status-im.node.core :as node]
Expand All @@ -13,7 +12,6 @@
(fx/merge cofx
{:keychain/clear-user-password (get-in db [:account/account :address])
:dev-server/stop nil}
(transactions/stop-sync)
(transport/stop-whisper
#(re-frame/dispatch [:accounts.logout/filters-removed]))
(chaos-mode/stop-checking)))
Expand Down
223 changes: 48 additions & 175 deletions src/status_im/ethereum/subscriptions.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,14 @@
(:require [clojure.string :as string]
[re-frame.core :as re-frame]
[status-im.constants :as constants]
[status-im.ethereum.decode :as decode]
[status-im.ethereum.transactions.core :as transactions]
[status-im.native-module.core :as status]
[status-im.utils.ethereum.core :as ethereum]
[status-im.utils.ethereum.tokens :as tokens]
[status-im.utils.fx :as fx]
[status-im.utils.types :as types]
[taoensso.timbre :as log]))

;; NOTE: this is the safe block range that can be
;; queried from infura rpc gateway without getting timeouts
;; determined experimentally by @goranjovic
(def block-query-limit 100000)

(defn get-latest-block [callback]
(status/call-private-rpc
(types/json->clj {:jsonrpc "2.0"
:id 1
:method "eth_blockNumber"
:params []})
(fn [response]
(if (string/blank? response)
(log/warn :web3-response-error)
(callback (-> (.parse js/JSON response)
(js->clj :keywordize-keys true)
:result
decode/uint))))))

(defn get-block-by-hash [block-hash callback]
(status/call-private-rpc
(types/json->clj {:jsonrpc "2.0"
:id 1
:method "eth_getBlockByHash"
:params [block-hash false]})
(fn [response]
(if (string/blank? response)
(log/warn :web3-response-error)
(callback (-> (.parse js/JSON response)
(js->clj :keywordize-keys true)
:result
(update :number decode/uint)
(update :timestamp decode/uint)))))))

(defn- get-token-transfer-logs
[from-block {:keys [chain-tokens direction from to]} callback]
(status/call-private-rpc
(types/json->clj {:jsonrpc "2.0"
:id 2
:method "eth_getLogs"
:params
[{:address (keys chain-tokens)
:fromBlock from-block
:topics [constants/event-transfer-hash from to]}]})
(fn [response]
(if (string/blank? response)
(log/warn :web3-response-error)
(callback (-> (.parse js/JSON response)
(js->clj :keywordize-keys true)
:result))))))

(fx/defn handle-signal
[cofx {:keys [subscription_id data] :as event}]
(if-let [handler (get-in cofx [:db :ethereum/subscriptions subscription_id])]
Expand All @@ -75,9 +24,37 @@
[{:keys [db]} id handler]
{:db (assoc-in db [:ethereum/subscriptions id] handler)})

(defn keep-user-transactions
[wallet-address transactions]
(keep (fn [{:keys [to from] :as transaction}]
(when-let [direction (cond
(= wallet-address to) :inbound
(= wallet-address from) :outbound)]
(assoc transaction :direction direction)))
transactions))

(fx/defn new-block
[{:keys [db]} block-number]
{:db (assoc-in db [:ethereum/current-block] block-number)})
[{:keys [db] :as cofx} {:keys [number transactions] :as block}]
(when number
(let [{:keys [:account/account :wallet/all-tokens network
:ethereum/current-block]} db
chain (ethereum/network->chain-keyword (get-in account [:networks network]))
chain-tokens (into {} (map (juxt :address identity)
(tokens/tokens-for all-tokens chain)))
wallet-address (ethereum/normalized-address (:address account))
token-contracts-addresses (into #{} (keys chain-tokens))]
(fx/merge cofx
{:db (assoc-in db [:ethereum/current-block] number)
:ethereum.transactions/enrich-transactions-from-new-blocks
{:chain-tokens chain-tokens
:block block
:transactions (keep-user-transactions wallet-address
transactions)}}
(when (or (not current-block)
(not= number (inc current-block)))
;; in case we skipped some blocks or got an uncle, re-fetch history
;; from etherscan
(transactions/initialize))))))

(defn subscribe-signal
[filter params callback]
Expand All @@ -98,139 +75,33 @@
result
callback])))))))

(defn- add-padding [address]
{:pre [(string? address)]}
(str "0x000000000000000000000000" (subs address 2)))

(defn- remove-padding [topic]
{:pre [(string? topic)]}
(str "0x" (subs topic 26)))

(defn- parse-transaction-entries [timestamp chain-tokens direction transfers]
{:pre [(integer? timestamp)
(map? chain-tokens)
(every? (fn [[k v]] (and (string? k) (map? v))) chain-tokens)
(keyword? direction)
(every? map? transfers)]}
(into {}
(keep identity
(for [transfer transfers]
(when-let [token (->> transfer :address (get chain-tokens))]
(when-not (:nft? token)
[(:transactionHash transfer)
{:block (str (-> transfer :blockNumber ethereum/hex->bignumber))
:hash (:transactionHash transfer)
:symbol (:symbol token)
:from (some-> transfer :topics second remove-padding)
:to (some-> transfer :topics last remove-padding)
:value (-> transfer :data ethereum/hex->bignumber)
:type direction
:gas-price nil
:nonce nil
:data nil
:gas-limit nil
:timestamp (str (* timestamp 1000))
:gas-used nil
;; NOTE(goranjovic) - metadata on the type of token: contains name, symbol, decimas, address.
:token token
;; NOTE(goranjovic) - if an event has been emitted, we can say there was no error
:error? false
;; NOTE(goranjovic) - just a flag we need when we merge this entry with the existing entry in
;; the app, e.g. transaction info with gas details, or a previous transfer entry with old
;; confirmations count.
:transfer true}]))))))

(letfn [(combine-entries [transaction token-transfer]
(merge transaction (select-keys token-transfer [:symbol :from :to :value :type :token :transfer])))
(tx-and-transfer? [tx1 tx2]
(and (not (:transfer tx1)) (:transfer tx2)))
(both-transfer?
[tx1 tx2]
(and (:transfer tx1) (:transfer tx2)))]
(defn- dedupe-transactions [tx1 tx2]
(cond (tx-and-transfer? tx1 tx2) (combine-entries tx1 tx2)
(tx-and-transfer? tx2 tx1) (combine-entries tx2 tx1)
:else tx2)))

(fx/defn new-transactions
[{:keys [db]} transactions]
{:db (update-in db
[:wallet :transactions]
#(merge-with dedupe-transactions % transactions))})

(defn transactions-handler
[{:keys [chain-tokens from to direction]}]
(fn [transfers]
(let [transfers-by-block (group-by :blockHash transfers)]
(doseq [[block-hash block-transfers] transfers-by-block]
(get-block-by-hash
block-hash
(fn [{:keys [timestamp]}]
(let [transactions (parse-transaction-entries timestamp
chain-tokens
direction
block-transfers)]
(when (not-empty transactions)
(re-frame/dispatch [:ethereum.signal/new-transactions
transactions])))))))))

;; Here we are querying event logs for Transfer events.
;;
;; The parameters are as follows:
;; - address - token smart contract address
;; - fromBlock - we need to specify it, since default is latest
;; - topics[0] - hash code of the Transfer event signature
;; - topics[1] - address of token sender with leading zeroes padding up to 32 bytes
;; - topics[2] - address of token sender with leading zeroes padding up to 32 bytes
(defn new-token-transaction-filter
[{:keys [chain-tokens from to] :as args}]
(subscribe-signal
"eth_newFilter"
[{:fromBlock "latest"
:toBlock "latest"
:address (keys chain-tokens)
:topics [constants/event-transfer-hash from to]}]
(transactions-handler args)))
(transactions/inbound-token-transfer-handler chain-tokens)))

(re-frame/reg-fx
:ethereum.subscriptions/token-transactions
(fn [{:keys [address] :as args}]
;; start inbound token transaction subscriptions
;; outbound token transactions are already caught in new blocks filter
(new-token-transaction-filter (merge args
{:direction :inbound
:to address}))))

(defn new-block-filter
[]
(subscribe-signal
"eth_newBlockFilter" []
(fn [[block-hash]]
(get-block-by-hash
(transactions/get-block-by-hash
block-hash
(fn [block]
(when-let [block-number (:number block)]
(re-frame/dispatch [:ethereum.signal/new-block
block-number])))))))

(defn get-from-block
[current-block-number]
(-> current-block-number
(- block-query-limit)
(max 0)
ethereum/int->hex))

(re-frame/reg-fx
:ethereum.subscriptions/token-transactions
(fn [{:keys [address] :as args}]
(let [inbound-args (merge args
{:direction :inbound
:to address})
outbound-args (merge args
{:direction :outbound
:from address})]
;; fetch 2 weeks of history until transactions are persisted
(get-latest-block
(fn [current-block-number]
(let [from-block (get-from-block current-block-number)]
(get-token-transfer-logs from-block inbound-args
(transactions-handler inbound-args))
(get-token-transfer-logs from-block outbound-args
(transactions-handler outbound-args)))))
;; start inbound and outbound token transaction subscriptions
(new-token-transaction-filter inbound-args)
(new-token-transaction-filter outbound-args))))
(re-frame/dispatch [:ethereum.signal/new-block block]))))))

(re-frame/reg-fx
:ethereum.subscriptions/new-block
Expand All @@ -242,7 +113,9 @@
chain (ethereum/network->chain-keyword (get-in account [:networks network]))
chain-tokens (into {} (map (juxt :address identity)
(tokens/tokens-for all-tokens chain)))
padded-address (add-padding (ethereum/normalized-address (:address account)))]
normalized-address (ethereum/normalized-address (:address account))
padded-address (transactions/add-padding normalized-address)]
{:ethereum.subscriptions/new-block nil
:ethereum.subscriptions/token-transactions {:chain-tokens chain-tokens
:address padded-address}}))
:ethereum.subscriptions/token-transactions
{:chain-tokens chain-tokens
:address padded-address}}))
Loading

0 comments on commit 820b76e

Please sign in to comment.