diff --git a/README.md b/README.md index 7983e7073..54964ca61 100644 --- a/README.md +++ b/README.md @@ -173,7 +173,7 @@ The following strategies are available **out of the box** with Kelp: - mirror ([source](plugins/mirrorStrategy.go)): - - **What:** mirrors an orderbook from another exchange by placing the same orders on Stellar after including a [spread][spread]. _Note: covering your trades on the backing exchange is not currently supported out-of-the-box_. + - **What:** mirrors an orderbook from another exchange by placing the same orders on Stellar after including a [spread][spread]. - **Why:** To [hedge][hedge] your position on another exchange whenever a trade is executed to reduce inventory risk while keeping a spread - **Who:** Anyone who wants to reduce inventory risk and also has the capacity to take on a higher operational overhead in maintaining the bot system. - **Complexity:** Advanced diff --git a/api/exchange.go b/api/exchange.go index 38d94b6f5..9b5dc4c07 100644 --- a/api/exchange.go +++ b/api/exchange.go @@ -30,7 +30,9 @@ type TradesResult struct { } // TradeHistoryResult is the result of a GetTradeHistory call +// this should be the same object as TradesResult but it's a separate object for backwards compatibility type TradeHistoryResult struct { + Cursor interface{} Trades []model.Trade } @@ -39,6 +41,32 @@ type TickerAPI interface { GetTickerPrice(pairs []model.TradingPair) (map[model.TradingPair]Ticker, error) } +// FillTracker knows how to track fills against open orders +type FillTracker interface { + GetPair() (pair *model.TradingPair) + // TrackFills should be executed in a new thread + TrackFills() error + RegisterHandler(handler FillHandler) + NumHandlers() uint8 +} + +// FillHandler is invoked by the FillTracker (once registered) anytime an order is filled +type FillHandler interface { + HandleFill(trade model.Trade) error +} + +// TradeFetcher is the common method between FillTrackable and exchange +// temporarily extracted out from TradeAPI so SDEX has the flexibility to only implement this rather than exchange and FillTrackable +type TradeFetcher interface { + GetTradeHistory(maybeCursorStart interface{}, maybeCursorEnd interface{}) (*TradeHistoryResult, error) +} + +// FillTrackable enables any implementing exchange to support fill tracking +type FillTrackable interface { + TradeFetcher + GetLatestTradeCursor() (interface{}, error) +} + // TradeAPI is the interface we use as a generic API for trading on any crypto exchange type TradeAPI interface { GetAssetConverter() *model.AssetConverter @@ -47,7 +75,7 @@ type TradeAPI interface { GetTrades(pair *model.TradingPair, maybeCursor interface{}) (*TradesResult, error) - GetTradeHistory(maybeCursorStart interface{}, maybeCursorEnd interface{}) (*TradeHistoryResult, error) + TradeFetcher GetOpenOrders() (map[model.TradingPair][]model.OpenOrder, error) diff --git a/api/level.go b/api/level.go index 0172bdc31..4184b8507 100644 --- a/api/level.go +++ b/api/level.go @@ -11,4 +11,5 @@ type Level struct { // LevelProvider returns the levels for the given center price, which controls the spread and number of levels type LevelProvider interface { GetLevels(maxAssetBase float64, maxAssetQuote float64) ([]Level, error) + GetFillHandlers() ([]FillHandler, error) } diff --git a/api/strategy.go b/api/strategy.go index bf5baf20e..7e66cfb0f 100644 --- a/api/strategy.go +++ b/api/strategy.go @@ -12,6 +12,7 @@ type Strategy interface { PreUpdate(maxAssetA float64, maxAssetB float64, trustA float64, trustB float64) error UpdateWithOps(buyingAOffers []horizon.Offer, sellingAOffers []horizon.Offer) ([]build.TransactionMutator, error) PostUpdate() error + GetFillHandlers() ([]FillHandler, error) } // SideStrategy represents a strategy on a single side of the orderbook @@ -20,4 +21,5 @@ type SideStrategy interface { PreUpdate(maxAssetA float64, maxAssetB float64, trustA float64, trustB float64) error UpdateWithOps(offers []horizon.Offer) (ops []build.TransactionMutator, newTopOffer *model.Number, e error) PostUpdate() error + GetFillHandlers() ([]FillHandler, error) } diff --git a/cmd/exchanges.go b/cmd/exchanges.go index ab446b039..36be56e79 100644 --- a/cmd/exchanges.go +++ b/cmd/exchanges.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "sort" "github.com/interstellar/kelp/plugins" @@ -16,20 +15,22 @@ var exchanagesCmd = &cobra.Command{ func init() { exchanagesCmd.Run = func(ccmd *cobra.Command, args []string) { - fmt.Printf(" Exchange\tDescription\n") + fmt.Printf(" Exchange\t\tSupports Trading\tDescription\n") fmt.Printf(" --------------------------------------------------------------------------------\n") exchanges := plugins.Exchanges() for _, name := range sortedExchangeKeys(exchanges) { - fmt.Printf(" %-14s%s\n", name, exchanges[name]) + fmt.Printf(" %-14s\t%v\t\t\t%s\n", name, exchanges[name].TradeEnabled, exchanges[name].Description) } } } -func sortedExchangeKeys(m map[string]string) []string { - keys := []string{} - for name := range m { - keys = append(keys, name) +func sortedExchangeKeys(m map[string]plugins.ExchangeContainer) []string { + keys := make([]string, len(m)) + for k, v := range m { + if len(keys[v.SortOrder]) > 0 && keys[v.SortOrder] != k { + panic(fmt.Errorf("invalid sort order specified for strategies, SortOrder that was repeated: %d", v.SortOrder)) + } + keys[v.SortOrder] = k } - sort.Strings(keys) return keys } diff --git a/cmd/terminate.go b/cmd/terminate.go index f02dac45d..d379b16f5 100644 --- a/cmd/terminate.go +++ b/cmd/terminate.go @@ -4,6 +4,7 @@ import ( "log" "net/http" + "github.com/interstellar/kelp/model" "github.com/interstellar/kelp/plugins" "github.com/interstellar/kelp/support/utils" "github.com/interstellar/kelp/terminator" @@ -51,6 +52,8 @@ func init() { -1, // not needed here -1, // not needed here false, + nil, // not needed here + map[model.Asset]horizon.Asset{}, ) terminator := terminator.MakeTerminator(client, sdex, *configFile.TradingAccount, configFile.TickIntervalSeconds, configFile.AllowInactiveMinutes) // --- end initialization of objects ---- diff --git a/cmd/trade.go b/cmd/trade.go index a4637c074..103d957eb 100644 --- a/cmd/trade.go +++ b/cmd/trade.go @@ -137,6 +137,17 @@ func init() { // --- start initialization of objects ---- threadTracker := multithreading.MakeThreadTracker() + assetBase := botConfig.AssetBase() + assetQuote := botConfig.AssetQuote() + tradingPair := &model.TradingPair{ + Base: model.Asset(utils.Asset2CodeString(assetBase)), + Quote: model.Asset(utils.Asset2CodeString(assetQuote)), + } + + sdexAssetMap := map[model.Asset]horizon.Asset{ + tradingPair.Base: assetBase, + tradingPair.Quote: assetQuote, + } sdex := plugins.MakeSDEX( client, botConfig.SourceSecretSeed, @@ -148,12 +159,12 @@ func init() { *operationalBuffer, *operationalBufferNonNativePct, *simMode, + tradingPair, + sdexAssetMap, ) - assetBase := botConfig.AssetBase() - assetQuote := botConfig.AssetQuote() dataKey := model.MakeSortedBotKey(assetBase, assetQuote) - strat, e := plugins.MakeStrategy(sdex, &assetBase, &assetQuote, *strategy, *stratConfigPath) + strat, e := plugins.MakeStrategy(sdex, &assetBase, &assetQuote, *strategy, *stratConfigPath, *simMode) if e != nil { log.Println() log.Println(e) @@ -196,6 +207,34 @@ func init() { } }() } + + if botConfig.FillTrackerSleepMillis != 0 { + fillTracker := plugins.MakeFillTracker(tradingPair, threadTracker, sdex, botConfig.FillTrackerSleepMillis) + fillLogger := plugins.MakeFillLogger() + fillTracker.RegisterHandler(fillLogger) + strategyFillHandlers, e := strat.GetFillHandlers() + if e != nil { + log.Println() + log.Printf("problem encountered while instantiating the fill tracker: %s\n", e) + deleteAllOffersAndExit(botConfig, client, sdex) + } + if strategyFillHandlers != nil { + for _, h := range strategyFillHandlers { + fillTracker.RegisterHandler(h) + } + } + + log.Printf("Starting fill tracker with %d handlers\n", fillTracker.NumHandlers()) + go func() { + e := fillTracker.TrackFills() + if e != nil { + log.Println() + log.Printf("problem encountered while running the fill tracker: %s\n", e) + // we want to delete all the offers and exit here because we don't want the bot to run if fill tracking isn't working + deleteAllOffersAndExit(botConfig, client, sdex) + } + }() + } // --- end initialization of services --- log.Println("Starting the trader bot...") diff --git a/examples/configs/trader/sample_mirror.cfg b/examples/configs/trader/sample_mirror.cfg index 586344822..30d020161 100644 --- a/examples/configs/trader/sample_mirror.cfg +++ b/examples/configs/trader/sample_mirror.cfg @@ -35,3 +35,11 @@ VOLUME_DIVIDE_BY=500.0 # spread % we should maintain per level between the mirrored exchange and SDEX (0 < spread < 1.0). This moves the price away from the center price on SDEX so we can cover the position on the external exchange, i.e. if this value is > 0 then the spread you provide on SDEX will be more than the spread on the exchange you are mirroring. # in this example the spread is 0.5% PER_LEVEL_SPREAD=0.005 + +# set to true if you want the bot to offset your trades onto the backing exchange to realize the per_level_spread against each trade +# requires you to specify the EXCHANGE_API_KEYS below +#OFFSET_TRADES=true +# you can use multiple API keys to overcome rate limit concerns +#[[EXCHANGE_API_KEYS]] +#KEY="" +#SECRET="" \ No newline at end of file diff --git a/examples/configs/trader/sample_trader.cfg b/examples/configs/trader/sample_trader.cfg index 06f8362b3..024a9d2f9 100644 --- a/examples/configs/trader/sample_trader.cfg +++ b/examples/configs/trader/sample_trader.cfg @@ -26,6 +26,8 @@ TICK_INTERVAL_SECONDS=300 # example: use 0 if you want to delete all offers on any error. # example: use 2 if you want to tolerate 2 continuous update cycle with errors, i.e. three continuous update cycles with errors will delete all offers. DELETE_CYCLES_THRESHOLD=0 +# how many milliseconds to sleep before checking for fills again, a value of 0 disables fill tracking +FILL_TRACKER_SLEEP_MILLIS=0 # the url for your horizon instance. If this url contains the string "test" then the bot assumes it is using the test network. HORIZON_URL="https://horizon-testnet.stellar.org" diff --git a/glide.lock b/glide.lock index aeb380d29..69b8b37c5 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 9591697dc68984d4841e1aa71ff0aaca4b202d99f85e7e9f47c445f3a5ab4420 -updated: 2018-11-13T20:46:54.414207737-08:00 +hash: ae83d6a72fa620aca2edba83aa180580ecf8d4ffa1d6c96212e10492a857f5db +updated: 2018-11-30T16:56:13.813823058-08:00 imports: - name: cloud.google.com/go version: 793297ec250352b0ece46e103381a0fc3dab95a1 @@ -42,7 +42,7 @@ imports: - name: github.com/manucorporat/sse version: ee05b128a739a0fb76c7ebd3ae4810c1de808d6d - name: github.com/nikhilsaraf/go-tools - version: bcfd18c97e1cdae3aa432337bb17d3927c3e4fc0 + version: 19004f22be08c82a22e679726ca22853c65919ae subpackages: - multithreading - name: github.com/nullstyle/go-xdr diff --git a/glide.yaml b/glide.yaml index da63c4f8c..3e817081c 100644 --- a/glide.yaml +++ b/glide.yaml @@ -20,4 +20,4 @@ import: - package: github.com/lechengfan/googleauth version: 7595ba02fbce171759c10d69d96e4cd898d1fa93 - package: github.com/nikhilsaraf/go-tools - version: bcfd18c97e1cdae3aa432337bb17d3927c3e4fc0 + version: 19004f22be08c82a22e679726ca22853c65919ae diff --git a/model/assets.go b/model/assets.go index bd5800d13..1740f212a 100644 --- a/model/assets.go +++ b/model/assets.go @@ -67,7 +67,7 @@ func makeAssetConverter(asset2String map[Asset]string) *AssetConverter { func (c AssetConverter) ToString(a Asset) (string, error) { s, ok := c.asset2String[a] if !ok { - return "", errors.New("could not recognize Asset: " + string(a)) + return fmt.Sprintf("missing[%s]", string(a)), nil } return s, nil } diff --git a/model/dates.go b/model/dates.go index 90654929a..ba9624767 100644 --- a/model/dates.go +++ b/model/dates.go @@ -1,6 +1,9 @@ package model -import "fmt" +import ( + "fmt" + "time" +) // Timestamp is millis since epoch type Timestamp int64 @@ -11,6 +14,11 @@ func MakeTimestamp(ts int64) *Timestamp { return ×tamp } +// MakeTimestampFromTime creates a new Timestamp +func MakeTimestampFromTime(t time.Time) *Timestamp { + return MakeTimestamp(t.UnixNano() / int64(time.Millisecond)) +} + func (t *Timestamp) String() string { return fmt.Sprintf("%d", t.AsInt64()) } diff --git a/model/orderbook.go b/model/orderbook.go index 96ebb30a5..60d20d17b 100644 --- a/model/orderbook.go +++ b/model/orderbook.go @@ -25,6 +25,14 @@ func (a OrderAction) IsSell() bool { return a == OrderActionSell } +// Reverse returns the opposite action +func (a OrderAction) Reverse() OrderAction { + if a.IsSell() { + return OrderActionBuy + } + return OrderActionSell +} + // String is the stringer function func (a OrderAction) String() string { if a == OrderActionBuy { @@ -96,13 +104,18 @@ type Order struct { // String is the stringer function func (o Order) String() string { - return fmt.Sprintf("Order[pair=%s, action=%s, type=%s, price=%s, vol=%s, ts=%d]", + tsString := "" + if o.Timestamp != nil { + tsString = fmt.Sprintf("%d", o.Timestamp.AsInt64()) + } + + return fmt.Sprintf("Order[pair=%s, action=%s, type=%s, price=%s, vol=%s, ts=%s]", o.Pair, o.OrderAction, o.OrderType, o.Price.AsString(), o.Volume.AsString(), - o.Timestamp.AsInt64(), + tsString, ) } @@ -202,7 +215,7 @@ type Trade struct { } func (t Trade) String() string { - return fmt.Sprintf("Trades[txid: %s, ts: %s, pair: %s, action: %s, type: %s, price: %s, volume: %s, cost: %s, fee: %s]", + return fmt.Sprintf("Trade[txid: %s, ts: %s, pair: %s, action: %s, type: %s, counterPrice: %s, baseVolume: %s, counterCost: %s, fee: %s]", utils.CheckedString(t.TransactionID), utils.CheckedString(t.Timestamp), *t.Pair, diff --git a/plugins/balancedLevelProvider.go b/plugins/balancedLevelProvider.go index 49166783b..7673a6c8f 100644 --- a/plugins/balancedLevelProvider.go +++ b/plugins/balancedLevelProvider.go @@ -202,3 +202,8 @@ func (p *balancedLevelProvider) getLevel(maxAssetBase float64, maxAssetQuote flo } return level, nil } + +// GetFillHandlers impl +func (p *balancedLevelProvider) GetFillHandlers() ([]api.FillHandler, error) { + return nil, nil +} diff --git a/plugins/ccxtExchange.go b/plugins/ccxtExchange.go index b9797f448..0de65373a 100644 --- a/plugins/ccxtExchange.go +++ b/plugins/ccxtExchange.go @@ -18,10 +18,11 @@ type ccxtExchange struct { delimiter string api *sdk.Ccxt precision int8 + simMode bool } // makeCcxtExchange is a factory method to make an exchange using the CCXT interface -func makeCcxtExchange(ccxtBaseURL string, exchangeName string) (api.Exchange, error) { +func makeCcxtExchange(ccxtBaseURL string, exchangeName string, simMode bool) (api.Exchange, error) { c, e := sdk.MakeInitializedCcxtExchange(ccxtBaseURL, exchangeName) if e != nil { return nil, fmt.Errorf("error making a ccxt exchange: %s", e) @@ -32,6 +33,7 @@ func makeCcxtExchange(ccxtBaseURL string, exchangeName string) (api.Exchange, er delimiter: "/", api: c, precision: utils.SdexPrecision, + simMode: simMode, }, nil } diff --git a/plugins/ccxtExchange_test.go b/plugins/ccxtExchange_test.go index a85e32fba..5fedd66db 100644 --- a/plugins/ccxtExchange_test.go +++ b/plugins/ccxtExchange_test.go @@ -17,7 +17,7 @@ func TestGetTickerPrice_Ccxt(t *testing.T) { for _, exchangeName := range supportedExchanges { t.Run(exchangeName, func(t *testing.T) { - testCcxtExchange, e := makeCcxtExchange("http://localhost:3000", exchangeName) + testCcxtExchange, e := makeCcxtExchange("http://localhost:3000", exchangeName, false) if !assert.NoError(t, e) { return } @@ -44,7 +44,7 @@ func TestGetOrderBook_Ccxt(t *testing.T) { for _, exchangeName := range supportedExchanges { t.Run(exchangeName, func(t *testing.T) { - testCcxtExchange, e := makeCcxtExchange("http://localhost:3000", exchangeName) + testCcxtExchange, e := makeCcxtExchange("http://localhost:3000", exchangeName, false) if !assert.NoError(t, e) { return } @@ -77,7 +77,7 @@ func TestGetTrades_Ccxt(t *testing.T) { for _, exchangeName := range supportedExchanges { t.Run(exchangeName, func(t *testing.T) { - testCcxtExchange, e := makeCcxtExchange("http://localhost:3000", exchangeName) + testCcxtExchange, e := makeCcxtExchange("http://localhost:3000", exchangeName, false) if !assert.NoError(t, e) { return } diff --git a/plugins/composeStrategy.go b/plugins/composeStrategy.go index 0606d8fe1..0347d7702 100644 --- a/plugins/composeStrategy.go +++ b/plugins/composeStrategy.go @@ -103,3 +103,24 @@ func (s *composeStrategy) UpdateWithOps( func (s *composeStrategy) PostUpdate() error { return nil } + +// GetFillHandlers impl +func (s *composeStrategy) GetFillHandlers() ([]api.FillHandler, error) { + buyFillHandlers, e := s.buyStrat.GetFillHandlers() + if e != nil { + return nil, fmt.Errorf("error while getting fill handlers for buy side") + } + sellFillHandlers, e := s.sellStrat.GetFillHandlers() + if e != nil { + return nil, fmt.Errorf("error while getting fill handlers for sell side") + } + + handlers := []api.FillHandler{} + if buyFillHandlers != nil { + handlers = append(handlers, buyFillHandlers...) + } + if sellFillHandlers != nil { + handlers = append(handlers, sellFillHandlers...) + } + return handlers, nil +} diff --git a/plugins/deleteSideStrategy.go b/plugins/deleteSideStrategy.go index 1d433b657..311f71c87 100644 --- a/plugins/deleteSideStrategy.go +++ b/plugins/deleteSideStrategy.go @@ -57,3 +57,8 @@ func (s *deleteSideStrategy) UpdateWithOps(offers []horizon.Offer) (ops []build. func (s *deleteSideStrategy) PostUpdate() error { return nil } + +// GetFillHandlers impl +func (s *deleteSideStrategy) GetFillHandlers() ([]api.FillHandler, error) { + return nil, nil +} diff --git a/plugins/factory.go b/plugins/factory.go index f310f542e..e3e193cba 100644 --- a/plugins/factory.go +++ b/plugins/factory.go @@ -10,13 +10,22 @@ import ( "github.com/stellar/go/support/config" ) +// strategyFactoryData is a data container that has all the information needed to make a strategy +type strategyFactoryData struct { + sdex *SDEX + assetBase *horizon.Asset + assetQuote *horizon.Asset + stratConfigPath string + simMode bool +} + // StrategyContainer contains the strategy factory method along with some metadata type StrategyContainer struct { SortOrder uint8 Description string NeedsConfig bool Complexity string - makeFn func(sdex *SDEX, assetBase *horizon.Asset, assetQuote *horizon.Asset, stratConfigPath string) (api.Strategy, error) + makeFn func(strategyFactoryData strategyFactoryData) (api.Strategy, error) } // strategies is a map of all the strategies available @@ -26,12 +35,12 @@ var strategies = map[string]StrategyContainer{ Description: "Creates buy and sell offers based on a reference price with a pre-specified liquidity depth", NeedsConfig: true, Complexity: "Beginner", - makeFn: func(sdex *SDEX, assetBase *horizon.Asset, assetQuote *horizon.Asset, stratConfigPath string) (api.Strategy, error) { + makeFn: func(strategyFactoryData strategyFactoryData) (api.Strategy, error) { var cfg buySellConfig - err := config.Read(stratConfigPath, &cfg) - utils.CheckConfigError(cfg, err, stratConfigPath) + err := config.Read(strategyFactoryData.stratConfigPath, &cfg) + utils.CheckConfigError(cfg, err, strategyFactoryData.stratConfigPath) utils.LogConfig(cfg) - s, e := makeBuySellStrategy(sdex, assetBase, assetQuote, &cfg) + s, e := makeBuySellStrategy(strategyFactoryData.sdex, strategyFactoryData.assetBase, strategyFactoryData.assetQuote, &cfg) if e != nil { return nil, fmt.Errorf("makeFn failed: %s", e) } @@ -43,12 +52,12 @@ var strategies = map[string]StrategyContainer{ Description: "Mirrors an orderbook from another exchange by placing the same orders on Stellar", NeedsConfig: true, Complexity: "Advanced", - makeFn: func(sdex *SDEX, assetBase *horizon.Asset, assetQuote *horizon.Asset, stratConfigPath string) (api.Strategy, error) { + makeFn: func(strategyFactoryData strategyFactoryData) (api.Strategy, error) { var cfg mirrorConfig - err := config.Read(stratConfigPath, &cfg) - utils.CheckConfigError(cfg, err, stratConfigPath) + err := config.Read(strategyFactoryData.stratConfigPath, &cfg) + utils.CheckConfigError(cfg, err, strategyFactoryData.stratConfigPath) utils.LogConfig(cfg) - s, e := makeMirrorStrategy(sdex, assetBase, assetQuote, &cfg) + s, e := makeMirrorStrategy(strategyFactoryData.sdex, strategyFactoryData.assetBase, strategyFactoryData.assetQuote, &cfg, strategyFactoryData.simMode) if e != nil { return nil, fmt.Errorf("makeFn failed: %s", e) } @@ -60,12 +69,12 @@ var strategies = map[string]StrategyContainer{ Description: "Creates sell offers based on a reference price with a pre-specified liquidity depth", NeedsConfig: true, Complexity: "Beginner", - makeFn: func(sdex *SDEX, assetBase *horizon.Asset, assetQuote *horizon.Asset, stratConfigPath string) (api.Strategy, error) { + makeFn: func(strategyFactoryData strategyFactoryData) (api.Strategy, error) { var cfg sellConfig - err := config.Read(stratConfigPath, &cfg) - utils.CheckConfigError(cfg, err, stratConfigPath) + err := config.Read(strategyFactoryData.stratConfigPath, &cfg) + utils.CheckConfigError(cfg, err, strategyFactoryData.stratConfigPath) utils.LogConfig(cfg) - s, e := makeSellStrategy(sdex, assetBase, assetQuote, &cfg) + s, e := makeSellStrategy(strategyFactoryData.sdex, strategyFactoryData.assetBase, strategyFactoryData.assetQuote, &cfg) if e != nil { return nil, fmt.Errorf("makeFn failed: %s", e) } @@ -77,12 +86,12 @@ var strategies = map[string]StrategyContainer{ Description: "Dynamically prices two tokens based on their relative demand", NeedsConfig: true, Complexity: "Intermediate", - makeFn: func(sdex *SDEX, assetBase *horizon.Asset, assetQuote *horizon.Asset, stratConfigPath string) (api.Strategy, error) { + makeFn: func(strategyFactoryData strategyFactoryData) (api.Strategy, error) { var cfg balancedConfig - err := config.Read(stratConfigPath, &cfg) - utils.CheckConfigError(cfg, err, stratConfigPath) + err := config.Read(strategyFactoryData.stratConfigPath, &cfg) + utils.CheckConfigError(cfg, err, strategyFactoryData.stratConfigPath) utils.LogConfig(cfg) - return makeBalancedStrategy(sdex, assetBase, assetQuote, &cfg), nil + return makeBalancedStrategy(strategyFactoryData.sdex, strategyFactoryData.assetBase, strategyFactoryData.assetQuote, &cfg), nil }, }, "delete": StrategyContainer{ @@ -90,8 +99,8 @@ var strategies = map[string]StrategyContainer{ Description: "Deletes all orders for the configured orderbook", NeedsConfig: false, Complexity: "Beginner", - makeFn: func(sdex *SDEX, assetBase *horizon.Asset, assetQuote *horizon.Asset, stratConfigPath string) (api.Strategy, error) { - return makeDeleteStrategy(sdex, assetBase, assetQuote), nil + makeFn: func(strategyFactoryData strategyFactoryData) (api.Strategy, error) { + return makeDeleteStrategy(strategyFactoryData.sdex, strategyFactoryData.assetBase, strategyFactoryData.assetQuote), nil }, }, } @@ -103,13 +112,20 @@ func MakeStrategy( assetQuote *horizon.Asset, strategy string, stratConfigPath string, + simMode bool, ) (api.Strategy, error) { log.Printf("Making strategy: %s\n", strategy) if strat, ok := strategies[strategy]; ok { if strat.NeedsConfig && stratConfigPath == "" { return nil, fmt.Errorf("the '%s' strategy needs a config file", strategy) } - s, e := strat.makeFn(sdex, assetBase, assetQuote, stratConfigPath) + s, e := strat.makeFn(strategyFactoryData{ + sdex: sdex, + assetBase: assetBase, + assetQuote: assetQuote, + stratConfigPath: stratConfigPath, + simMode: simMode, + }) if e != nil { return nil, fmt.Errorf("cannot make '%s' strategy: %s", strategy, e) } @@ -124,44 +140,64 @@ func Strategies() map[string]StrategyContainer { return strategies } -type exchangeContainer struct { - description string - makeFn func() (api.Exchange, error) +// exchangeFactoryData is a data container that has all the information needed to make an exchange +type exchangeFactoryData struct { + simMode bool + apiKeys []api.ExchangeAPIKey +} + +// ExchangeContainer contains the exchange factory method along with some metadata +type ExchangeContainer struct { + SortOrder uint8 + Description string + TradeEnabled bool + makeFn func(exchangeFactoryData exchangeFactoryData) (api.Exchange, error) } // exchanges is a map of all the exchange integrations available -var exchanges = map[string]exchangeContainer{ - "kraken": exchangeContainer{ - description: "Kraken is a popular centralized cryptocurrency exchange (https://www.kraken.com/)", - makeFn: func() (api.Exchange, error) { - apiKey := api.ExchangeAPIKey{Key: "", Secret: ""} - return makeKrakenExchange([]api.ExchangeAPIKey{apiKey}) +var exchanges = map[string]ExchangeContainer{ + "kraken": ExchangeContainer{ + SortOrder: 0, + Description: "Kraken is a popular centralized cryptocurrency exchange (https://www.kraken.com/)", + TradeEnabled: true, + makeFn: func(exchangeFactoryData exchangeFactoryData) (api.Exchange, error) { + return makeKrakenExchange(exchangeFactoryData.apiKeys, exchangeFactoryData.simMode) }, }, - "ccxt-binance": exchangeContainer{ - description: "Binance is a popular centralized cryptocurrency exchange (via ccxt-rest) - partial implementation", - makeFn: func() (api.Exchange, error) { - return makeCcxtExchange("http://localhost:3000", "binance") + "ccxt-binance": ExchangeContainer{ + SortOrder: 1, + Description: "Binance is a popular centralized cryptocurrency exchange (via ccxt-rest)", + TradeEnabled: false, + makeFn: func(exchangeFactoryData exchangeFactoryData) (api.Exchange, error) { + return makeCcxtExchange("http://localhost:3000", "binance", exchangeFactoryData.simMode) }, }, - "ccxt-poloniex": exchangeContainer{ - description: "Poloniex is a popular centralized cryptocurrency exchange (via ccxt-rest) - partial implementation", - makeFn: func() (api.Exchange, error) { - return makeCcxtExchange("http://localhost:3000", "poloniex") + "ccxt-poloniex": ExchangeContainer{ + SortOrder: 2, + Description: "Poloniex is a popular centralized cryptocurrency exchange (via ccxt-rest)", + TradeEnabled: false, + makeFn: func(exchangeFactoryData exchangeFactoryData) (api.Exchange, error) { + return makeCcxtExchange("http://localhost:3000", "poloniex", exchangeFactoryData.simMode) }, }, - "ccxt-bittrex": exchangeContainer{ - description: "Bittrex is a popular centralized cryptocurrency exchange (via ccxt-rest) - partial implementation", - makeFn: func() (api.Exchange, error) { - return makeCcxtExchange("http://localhost:3000", "bittrex") + "ccxt-bittrex": ExchangeContainer{ + SortOrder: 3, + Description: "Bittrex is a popular centralized cryptocurrency exchange (via ccxt-rest)", + TradeEnabled: false, + makeFn: func(exchangeFactoryData exchangeFactoryData) (api.Exchange, error) { + return makeCcxtExchange("http://localhost:3000", "bittrex", exchangeFactoryData.simMode) }, }, } // MakeExchange is a factory method to make an exchange based on a given type -func MakeExchange(exchangeType string) (api.Exchange, error) { +func MakeExchange(exchangeType string, simMode bool) (api.Exchange, error) { if exchange, ok := exchanges[exchangeType]; ok { - x, e := exchange.makeFn() + exchangeAPIKey := api.ExchangeAPIKey{Key: "", Secret: ""} + x, e := exchange.makeFn(exchangeFactoryData{ + simMode: simMode, + apiKeys: []api.ExchangeAPIKey{exchangeAPIKey}, + }) if e != nil { return nil, fmt.Errorf("error when making the '%s' exchange: %s", exchangeType, e) } @@ -171,11 +207,31 @@ func MakeExchange(exchangeType string) (api.Exchange, error) { return nil, fmt.Errorf("invalid exchange type: %s", exchangeType) } -// Exchanges returns the list of exchanges along with the description -func Exchanges() map[string]string { - m := make(map[string]string, len(exchanges)) - for name := range exchanges { - m[name] = exchanges[name].description +// MakeTradingExchange is a factory method to make an exchange based on a given type +func MakeTradingExchange(exchangeType string, apiKeys []api.ExchangeAPIKey, simMode bool) (api.Exchange, error) { + if exchange, ok := exchanges[exchangeType]; ok { + if !exchange.TradeEnabled { + return nil, fmt.Errorf("trading is not enabled on this exchange: %s", exchangeType) + } + + if len(apiKeys) == 0 { + return nil, fmt.Errorf("cannot make trading exchange, apiKeys mising") + } + + x, e := exchange.makeFn(exchangeFactoryData{ + simMode: simMode, + apiKeys: apiKeys, + }) + if e != nil { + return nil, fmt.Errorf("error when making the '%s' exchange: %s", exchangeType, e) + } + return x, nil } - return m + + return nil, fmt.Errorf("invalid exchange type: %s", exchangeType) +} + +// Exchanges returns the list of exchanges +func Exchanges() map[string]ExchangeContainer { + return exchanges } diff --git a/plugins/fillLogger.go b/plugins/fillLogger.go new file mode 100644 index 000000000..2e5304ddf --- /dev/null +++ b/plugins/fillLogger.go @@ -0,0 +1,24 @@ +package plugins + +import ( + "log" + + "github.com/interstellar/kelp/api" + "github.com/interstellar/kelp/model" +) + +// FillLogger is a FillHandler that logs fills +type FillLogger struct{} + +var _ api.FillHandler = &FillLogger{} + +// MakeFillLogger is a factory method +func MakeFillLogger() api.FillHandler { + return &FillLogger{} +} + +// HandleFill impl. +func (f *FillLogger) HandleFill(trade model.Trade) error { + log.Printf("received fill: %s\n", trade) + return nil +} diff --git a/plugins/fillTracker.go b/plugins/fillTracker.go new file mode 100644 index 000000000..3fd8c3b0a --- /dev/null +++ b/plugins/fillTracker.go @@ -0,0 +1,113 @@ +package plugins + +import ( + "fmt" + "log" + "time" + + "github.com/interstellar/kelp/api" + "github.com/interstellar/kelp/model" + "github.com/nikhilsaraf/go-tools/multithreading" +) + +// FillTracker tracks fills +type FillTracker struct { + pair *model.TradingPair + threadTracker *multithreading.ThreadTracker + fillTrackable api.FillTrackable + fillTrackerSleepMillis uint32 + + // uninitialized + handlers []api.FillHandler +} + +// enforce FillTracker implementing api.FillTracker +var _ api.FillTracker = &FillTracker{} + +// MakeFillTracker impl. +func MakeFillTracker( + pair *model.TradingPair, + threadTracker *multithreading.ThreadTracker, + fillTrackable api.FillTrackable, + fillTrackerSleepMillis uint32, +) api.FillTracker { + return &FillTracker{ + pair: pair, + threadTracker: threadTracker, + fillTrackable: fillTrackable, + fillTrackerSleepMillis: fillTrackerSleepMillis, + } +} + +// GetPair impl +func (f *FillTracker) GetPair() (pair *model.TradingPair) { + return f.pair +} + +// TrackFills impl +func (f *FillTracker) TrackFills() error { + // get the last cursor so we only start querying from the current position + lastCursor, e := f.fillTrackable.GetLatestTradeCursor() + if e != nil { + return fmt.Errorf("error while getting last trade: %s", e) + } + log.Printf("got latest trade cursor from where to start tracking fills: %v\n", lastCursor) + + ech := make(chan error, len(f.handlers)) + for { + select { + case e := <-ech: + return fmt.Errorf("caught an error when tracking fills: %s", e) + default: + // do nothing + } + + tradeHistoryResult, e := f.fillTrackable.GetTradeHistory(lastCursor, nil) + if e != nil { + return fmt.Errorf("error when fetching trades: %s", e) + } + + if len(tradeHistoryResult.Trades) > 0 { + // use a single goroutine so we handle trades sequentially and also respect the handler sequence + f.threadTracker.TriggerGoroutine(func(inputs []interface{}) { + ech := inputs[0].(chan error) + defer handlePanic(ech) + + handlers := inputs[1].([]api.FillHandler) + trades := inputs[2].([]model.Trade) + for _, t := range trades { + for _, h := range handlers { + e := h.HandleFill(t) + if e != nil { + ech <- fmt.Errorf("error in a fill handler: %s", e) + // we do NOT want to exit from the goroutine immediately after encountering an error + // because we want to give all handlers a chance to get called for each trade + } + } + } + }, []interface{}{ech, f.handlers, tradeHistoryResult.Trades}) + } + + lastCursor = tradeHistoryResult.Cursor + time.Sleep(time.Duration(f.fillTrackerSleepMillis) * time.Millisecond) + } +} + +func handlePanic(ech chan error) { + if r := recover(); r != nil { + e := r.(error) + + log.Printf("handling panic by passing onto error channel: %s\n", e) + ech <- e + } +} + +// RegisterHandler impl +func (f *FillTracker) RegisterHandler(handler api.FillHandler) { + f.handlers = append(f.handlers, handler) +} + +// NumHandlers impl +func (f *FillTracker) NumHandlers() uint8 { + return uint8(len(f.handlers)) +} diff --git a/plugins/krakenExchange.go b/plugins/krakenExchange.go index bb8148f8c..8c55069b7 100644 --- a/plugins/krakenExchange.go +++ b/plugins/krakenExchange.go @@ -48,7 +48,7 @@ func (m asset2Address2Key) getKey(asset model.Asset, address string) (string, er // makeKrakenExchange is a factory method to make the kraken exchange // TODO 2, should take in config file for withdrawalKeys mapping -func makeKrakenExchange(apiKeys []api.ExchangeAPIKey) (api.Exchange, error) { +func makeKrakenExchange(apiKeys []api.ExchangeAPIKey, isSimulated bool) (api.Exchange, error) { if len(apiKeys) == 0 || len(apiKeys) > math.MaxUint8 { return nil, fmt.Errorf("invalid number of apiKeys: %d", len(apiKeys)) } @@ -65,8 +65,9 @@ func makeKrakenExchange(apiKeys []api.ExchangeAPIKey) (api.Exchange, error) { apis: krakenAPIs, apiNextIndex: 0, delimiter: "", - withdrawKeys: asset2Address2Key{}, precision: 8, + withdrawKeys: asset2Address2Key{}, + isSimulated: isSimulated, }, nil } @@ -86,13 +87,14 @@ func (k *krakenExchange) AddOrder(order *model.Order) (*model.TransactionID, err return nil, e } + if k.isSimulated { + log.Printf("not adding order to Kraken in simulation mode, order=%s\n", *order) + return nil, nil + } + args := map[string]string{ "price": order.Price.AsString(), } - // validate should not be present if it's false, otherwise Kraken treats it as true - if k.isSimulated { - args["validate"] = "true" - } resp, e := k.nextAPI().AddOrder( pairStr, order.OrderAction.String(), @@ -113,9 +115,6 @@ func (k *krakenExchange) AddOrder(order *model.Order) (*model.TransactionID, err return nil, fmt.Errorf("there was more than 1 transctionId: %s", resp.TransactionIds) } - if k.isSimulated { - return nil, nil - } return nil, fmt.Errorf("no transactionIds returned from order creation") } @@ -347,6 +346,7 @@ func (k *krakenExchange) getTradeHistory(maybeCursorStart *int64, maybeCursorEnd Cost: model.MustNumberFromString(_cost, k.precision), Fee: model.MustNumberFromString(_fee, k.precision), }) + res.Cursor = _time } return &res, nil } diff --git a/plugins/krakenExchange_test.go b/plugins/krakenExchange_test.go index ea94c9946..7b7d95f12 100644 --- a/plugins/krakenExchange_test.go +++ b/plugins/krakenExchange_test.go @@ -148,6 +148,10 @@ func TestGetTradeHistory(t *testing.T) { return } + if !assert.NotNil(t, tradeHistoryResult.Cursor) { + return + } + assert.Fail(t, "force fail") } diff --git a/plugins/mirrorStrategy.go b/plugins/mirrorStrategy.go index 00713fa27..8b573ce0f 100644 --- a/plugins/mirrorStrategy.go +++ b/plugins/mirrorStrategy.go @@ -1,6 +1,7 @@ package plugins import ( + "fmt" "log" "github.com/interstellar/kelp/api" @@ -10,14 +11,32 @@ import ( "github.com/stellar/go/clients/horizon" ) +type exchangeAPIKeysToml []struct { + Key string `valid:"-" toml:"KEY"` + Secret string `valid:"-" toml:"SECRET"` +} + +func (t *exchangeAPIKeysToml) toExchangeAPIKeys() []api.ExchangeAPIKey { + apiKeys := []api.ExchangeAPIKey{} + for _, apiKey := range *t { + apiKeys = append(apiKeys, api.ExchangeAPIKey{ + Key: apiKey.Key, + Secret: apiKey.Secret, + }) + } + return apiKeys +} + // mirrorConfig contains the configuration params for this strategy type mirrorConfig struct { - Exchange string `valid:"-" toml:"EXCHANGE"` - ExchangeBase string `valid:"-" toml:"EXCHANGE_BASE"` - ExchangeQuote string `valid:"-" toml:"EXCHANGE_QUOTE"` - OrderbookDepth int32 `valid:"-" toml:"ORDERBOOK_DEPTH"` - VolumeDivideBy float64 `valid:"-" toml:"VOLUME_DIVIDE_BY"` - PerLevelSpread float64 `valid:"-" toml:"PER_LEVEL_SPREAD"` + Exchange string `valid:"-" toml:"EXCHANGE"` + ExchangeBase string `valid:"-" toml:"EXCHANGE_BASE"` + ExchangeQuote string `valid:"-" toml:"EXCHANGE_QUOTE"` + OrderbookDepth int32 `valid:"-" toml:"ORDERBOOK_DEPTH"` + VolumeDivideBy float64 `valid:"-" toml:"VOLUME_DIVIDE_BY"` + PerLevelSpread float64 `valid:"-" toml:"PER_LEVEL_SPREAD"` + OffsetTrades bool `valid:"-" toml:"OFFSET_TRADES"` + ExchangeAPIKeys exchangeAPIKeysToml `valid:"-" toml:"EXCHANGE_API_KEYS"` } // String impl. @@ -33,16 +52,30 @@ type mirrorStrategy struct { quoteAsset *horizon.Asset config *mirrorConfig tradeAPI api.TradeAPI + offsetTrades bool } -// ensure this implements Strategy +// ensure this implements api.Strategy var _ api.Strategy = &mirrorStrategy{} +// ensure this implements api.FillHandler +var _ api.FillHandler = &mirrorStrategy{} + // makeMirrorStrategy is a factory method -func makeMirrorStrategy(sdex *SDEX, baseAsset *horizon.Asset, quoteAsset *horizon.Asset, config *mirrorConfig) (api.Strategy, error) { - exchange, e := MakeExchange(config.Exchange) - if e != nil { - return nil, e +func makeMirrorStrategy(sdex *SDEX, baseAsset *horizon.Asset, quoteAsset *horizon.Asset, config *mirrorConfig, simMode bool) (api.Strategy, error) { + var exchange api.Exchange + var e error + if config.OffsetTrades { + exchangeAPIKeys := config.ExchangeAPIKeys.toExchangeAPIKeys() + exchange, e = MakeTradingExchange(config.Exchange, exchangeAPIKeys, simMode) + if e != nil { + return nil, e + } + } else { + exchange, e = MakeExchange(config.Exchange, simMode) + if e != nil { + return nil, e + } } orderbookPair := &model.TradingPair{ @@ -56,6 +89,7 @@ func makeMirrorStrategy(sdex *SDEX, baseAsset *horizon.Asset, quoteAsset *horizo quoteAsset: quoteAsset, config: config, tradeAPI: api.TradeAPI(exchange), + offsetTrades: config.OffsetTrades, }, nil } @@ -261,3 +295,32 @@ func (s *mirrorStrategy) doModifyOffer( func (s *mirrorStrategy) PostUpdate() error { return nil } + +// GetFillHandlers impl +func (s *mirrorStrategy) GetFillHandlers() ([]api.FillHandler, error) { + if s.offsetTrades { + return []api.FillHandler{s}, nil + } + return nil, nil +} + +// HandleFill impl +func (s *mirrorStrategy) HandleFill(trade model.Trade) error { + newOrder := model.Order{ + Pair: s.orderbookPair, // we want to offset trades on the backing exchange so use the backing exchange's trading pair + OrderAction: trade.OrderAction.Reverse(), + OrderType: model.OrderTypeLimit, + Price: trade.Price, + Volume: trade.Volume, + Timestamp: nil, + } + + log.Printf("mirror strategy is going to offset the trade from the primary exchange (transactionID=%s) onto the backing exchange with the order: %s\n", trade.TransactionID, newOrder) + transactionID, e := s.tradeAPI.AddOrder(&newOrder) + if e != nil { + return fmt.Errorf("error when offsetting trade (%s): %s", newOrder, e) + } + + log.Printf("...mirror strategy successfully offset the trade from the primary exchange (transactionID=%s) onto the backing exchange (transactionID=%s) with the order %s\n", trade.TransactionID, transactionID, newOrder) + return nil +} diff --git a/plugins/priceFeed.go b/plugins/priceFeed.go index 3f00c72d0..64c5abfe9 100644 --- a/plugins/priceFeed.go +++ b/plugins/priceFeed.go @@ -20,7 +20,7 @@ func MakePriceFeed(feedType string, url string) (api.PriceFeed, error) { case "exchange": // [0] = exchangeType, [1] = base, [2] = quote urlParts := strings.Split(url, "/") - exchange, e := MakeExchange(urlParts[0]) + exchange, e := MakeExchange(urlParts[0], true) if e != nil { return nil, fmt.Errorf("cannot make priceFeed because of an error when making the '%s' exchange: %s", urlParts[0], e) } diff --git a/plugins/sdex.go b/plugins/sdex.go index be09c47e6..7abcec3e0 100644 --- a/plugins/sdex.go +++ b/plugins/sdex.go @@ -4,8 +4,12 @@ import ( "fmt" "log" "math" + "reflect" "strconv" + "strings" + "github.com/interstellar/kelp/api" + "github.com/interstellar/kelp/model" "github.com/interstellar/kelp/support/utils" "github.com/nikhilsaraf/go-tools/multithreading" "github.com/pkg/errors" @@ -16,6 +20,10 @@ import ( const baseReserve = 0.5 const baseFee = 0.0000100 const maxLumenTrust = math.MaxFloat64 +const maxPageLimit = 200 + +// TODO we need a reasonable value for the resolution here (currently arbitrary 300000 from a test in horizon) +const fetchTradesResolution = 300000 // SDEX helps with building and submitting transactions to the Stellar network type SDEX struct { @@ -29,6 +37,8 @@ type SDEX struct { operationalBuffer float64 operationalBufferNonNativePct float64 simMode bool + pair *model.TradingPair + assetMap map[model.Asset]horizon.Asset // this is needed until we fully address putting SDEX behind the Exchange interface // uninitialized seqNum uint64 @@ -66,6 +76,8 @@ func MakeSDEX( operationalBuffer float64, operationalBufferNonNativePct float64, simMode bool, + pair *model.TradingPair, + assetMap map[model.Asset]horizon.Asset, ) *SDEX { sdex := &SDEX{ API: api, @@ -77,7 +89,9 @@ func MakeSDEX( threadTracker: threadTracker, operationalBuffer: operationalBuffer, operationalBufferNonNativePct: operationalBufferNonNativePct, - simMode: simMode, + simMode: simMode, + pair: pair, + assetMap: assetMap, } log.Printf("Using network passphrase: %s\n", sdex.Network.Passphrase) @@ -384,9 +398,9 @@ func (sdex *SDEX) SubmitOps(ops []build.TransactionMutator, asyncCallback func(h // submit if !sdex.simMode { log.Println("submitting tx XDR to network (async)") - sdex.threadTracker.TriggerGoroutine(func() { + sdex.threadTracker.TriggerGoroutine(func(inputs []interface{}) { sdex.submit(txeB64, asyncCallback) - }) + }, nil) } else { log.Println("not submitting tx XDR to network in simulation mode, calling asyncCallback with empty hash value") sdex.invokeAsyncCallback(asyncCallback, "", nil) @@ -447,9 +461,9 @@ func (sdex *SDEX) invokeAsyncCallback(asyncCallback func(hash string, e error), return } - sdex.threadTracker.TriggerGoroutine(func() { + sdex.threadTracker.TriggerGoroutine(func(inputs []interface{}) { asyncCallback(hash, e) - }) + }, nil) } func (sdex *SDEX) logLiabilities(asset horizon.Asset, assetStr string) { @@ -602,3 +616,185 @@ func (sdex *SDEX) _liabilities(asset horizon.Asset, otherAsset horizon.Asset) (* sdex.cachedLiabilities[asset] = liabilities return &liabilities, &pairLiabilities, nil } + +func (sdex *SDEX) pair2Assets() (baseAsset horizon.Asset, quoteAsset horizon.Asset, e error) { + var ok bool + baseAsset, ok = sdex.assetMap[sdex.pair.Base] + if !ok { + return horizon.Asset{}, horizon.Asset{}, fmt.Errorf("unexpected error, base asset was not found in sdex.assetMap") + } + + quoteAsset, ok = sdex.assetMap[sdex.pair.Quote] + if !ok { + return horizon.Asset{}, horizon.Asset{}, fmt.Errorf("unexpected error, quote asset was not found in sdex.assetMap") + } + + return baseAsset, quoteAsset, nil +} + +// enforce SDEX implementing api.FillTrackable +var _ api.FillTrackable = &SDEX{} + +// GetTradeHistory fetches trades for the trading account bound to this instance of SDEX +func (sdex *SDEX) GetTradeHistory(maybeCursorStart interface{}, maybeCursorEnd interface{}) (*api.TradeHistoryResult, error) { + baseAsset, quoteAsset, e := sdex.pair2Assets() + if e != nil { + return nil, fmt.Errorf("error while convertig pair to base and quote asset: %s", e) + } + + var cursorStart string + if maybeCursorStart != nil { + var ok bool + cursorStart, ok = maybeCursorStart.(string) + if !ok { + return nil, fmt.Errorf("could not convert maybeCursorStart to string, type=%s, maybeCursorStart=%v", reflect.TypeOf(maybeCursorStart), maybeCursorStart) + } + } + var cursorEnd string + if maybeCursorEnd != nil { + var ok bool + cursorEnd, ok = maybeCursorEnd.(string) + if !ok { + return nil, fmt.Errorf("could not convert maybeCursorEnd to string, type=%s, maybeCursorEnd=%v", reflect.TypeOf(maybeCursorEnd), maybeCursorEnd) + } + } + + trades := []model.Trade{} + for { + tradesPage, e := sdex.API.LoadTrades(baseAsset, quoteAsset, 0, fetchTradesResolution, horizon.Cursor(cursorStart), horizon.Order(horizon.OrderAsc), horizon.Limit(maxPageLimit)) + if e != nil { + if strings.Contains(e.Error(), "Rate limit exceeded") { + // return normally, we will continue loading trades in the next call from where we left off + return &api.TradeHistoryResult{ + Cursor: cursorStart, + Trades: trades, + }, nil + } + return nil, fmt.Errorf("error while fetching trades in SDEX (cursor=%s): %s", cursorStart, e) + } + + if len(tradesPage.Embedded.Records) == 0 { + return &api.TradeHistoryResult{ + Cursor: cursorStart, + Trades: trades, + }, nil + } + + updatedResult, hitCursorEnd, e := sdex.tradesPage2TradeHistoryResult(baseAsset, quoteAsset, tradesPage, cursorEnd) + if e != nil { + return nil, fmt.Errorf("error converting tradesPage2TradesResult: %s", e) + } + cursorStart = updatedResult.Cursor.(string) + trades = append(trades, updatedResult.Trades...) + + if hitCursorEnd { + return &api.TradeHistoryResult{ + Cursor: cursorStart, + Trades: trades, + }, nil + } + } +} + +func (sdex *SDEX) getOrderAction(baseAsset horizon.Asset, quoteAsset horizon.Asset, trade horizon.Trade) *model.OrderAction { + if trade.BaseAccount != sdex.TradingAccount && trade.CounterAccount != sdex.TradingAccount { + return nil + } + + tradeBaseAsset := utils.Native + if trade.BaseAssetType != utils.Native { + tradeBaseAsset = trade.BaseAssetCode + ":" + trade.BaseAssetIssuer + } + tradeQuoteAsset := utils.Native + if trade.CounterAssetType != utils.Native { + tradeQuoteAsset = trade.CounterAssetCode + ":" + trade.CounterAssetIssuer + } + sdexBaseAsset := utils.Asset2String(baseAsset) + sdexQuoteAsset := utils.Asset2String(quoteAsset) + + // compare the base and quote asset on the trade to what we are using as our base and quote + // then compare whether it was the base or the quote that was the seller + actionSell := model.OrderActionSell + actionBuy := model.OrderActionBuy + if sdexBaseAsset == tradeBaseAsset && sdexQuoteAsset == tradeQuoteAsset { + if trade.BaseIsSeller { + return &actionSell + } + return &actionBuy + } else if sdexBaseAsset == tradeQuoteAsset && sdexQuoteAsset == tradeBaseAsset { + if trade.BaseIsSeller { + return &actionBuy + } + return &actionSell + } else { + return nil + } +} + +// returns tradeHistoryResult, hitCursorEnd, and any error +func (sdex *SDEX) tradesPage2TradeHistoryResult(baseAsset horizon.Asset, quoteAsset horizon.Asset, tradesPage horizon.TradesPage, cursorEnd string) (*api.TradeHistoryResult, bool, error) { + var cursor string + trades := []model.Trade{} + + for _, t := range tradesPage.Embedded.Records { + orderAction := sdex.getOrderAction(baseAsset, quoteAsset, t) + if orderAction == nil { + // we have encountered a trade that is different from the base and quote asset for our trading account + continue + } + + vol, e := model.NumberFromString(t.BaseAmount, utils.SdexPrecision) + if e != nil { + return nil, false, fmt.Errorf("could not convert baseAmount to model.Number: %s", e) + } + floatPrice := float64(t.Price.N) / float64(t.Price.D) + + trades = append(trades, model.Trade{ + Order: model.Order{ + Pair: sdex.pair, + OrderAction: *orderAction, + OrderType: model.OrderTypeLimit, + Price: model.NumberFromFloat(floatPrice, utils.SdexPrecision), + Volume: vol, + Timestamp: model.MakeTimestampFromTime(t.LedgerCloseTime), + }, + TransactionID: model.MakeTransactionID(t.ID), + Cost: model.NumberFromFloat(floatPrice*vol.AsFloat(), utils.SdexPrecision), + Fee: model.NumberFromFloat(baseFee, utils.SdexPrecision), + }) + + cursor = t.PT + if cursor == cursorEnd { + return &api.TradeHistoryResult{ + Cursor: cursor, + Trades: trades, + }, true, nil + } + } + + return &api.TradeHistoryResult{ + Cursor: cursor, + Trades: trades, + }, false, nil +} + +// GetLatestTradeCursor impl. +func (sdex *SDEX) GetLatestTradeCursor() (interface{}, error) { + baseAsset, quoteAsset, e := sdex.pair2Assets() + if e != nil { + return nil, fmt.Errorf("error while convertig pair to base and quote asset: %s", e) + } + + tradesPage, e := sdex.API.LoadTrades(baseAsset, quoteAsset, 0, fetchTradesResolution, horizon.Order(horizon.OrderDesc), horizon.Limit(1)) + if e != nil { + return nil, fmt.Errorf("error while fetching latest trade cursor in SDEX: %s", e) + } + + records := tradesPage.Embedded.Records + if len(records) == 0 { + // we want to use nil as the latest trade cursor if there are no trades + return nil, nil + } + + return records[0].PT, nil +} diff --git a/plugins/sellSideStrategy.go b/plugins/sellSideStrategy.go index 0e08385f7..b766b2239 100644 --- a/plugins/sellSideStrategy.go +++ b/plugins/sellSideStrategy.go @@ -485,3 +485,8 @@ func (s *sellSideStrategy) placeOrderWithRetry( return true, nil, fmt.Errorf("error: (programmer?) unable to place offer with the new (reduced) selling and buying amounts, oldSellingAmount=%.7f, newSellingAmount=%.7f, oldBuyingAmount=%.7f, newBuyingAmount=%.7f", incrementalSellAmount, newSellingAmount, incrementalBuyAmount, newBuyingAmount) } + +// GetFillHandlers impl +func (s *sellSideStrategy) GetFillHandlers() ([]api.FillHandler, error) { + return s.levelsProvider.GetFillHandlers() +} diff --git a/plugins/staticSpreadLevelProvider.go b/plugins/staticSpreadLevelProvider.go index 38c292b3c..c7f16045e 100644 --- a/plugins/staticSpreadLevelProvider.go +++ b/plugins/staticSpreadLevelProvider.go @@ -89,3 +89,8 @@ func (p *staticSpreadLevelProvider) GetLevels(maxAssetBase float64, maxAssetQuot } return levels, nil } + +// GetFillHandlers impl +func (p *staticSpreadLevelProvider) GetFillHandlers() ([]api.FillHandler, error) { + return nil, nil +} diff --git a/support/utils/functions.go b/support/utils/functions.go index 62a358549..0156f4fb7 100644 --- a/support/utils/functions.go +++ b/support/utils/functions.go @@ -116,6 +116,14 @@ func Asset2String(asset horizon.Asset) string { return fmt.Sprintf("%s:%s", asset.Code, asset.Issuer) } +// Asset2CodeString extracts the code out of a horizon.Asset +func Asset2CodeString(asset horizon.Asset) string { + if asset.Type == Native { + return "XLM" + } + return asset.Code +} + // String2Asset converts a code:issuer to a horizon.Asset func String2Asset(code string, issuer string) horizon.Asset { if code == "XLM" { diff --git a/trader/config.go b/trader/config.go index ff9a3edad..b5c3e1277 100644 --- a/trader/config.go +++ b/trader/config.go @@ -13,23 +13,24 @@ const XLM = "XLM" // BotConfig represents the configuration params for the bot type BotConfig struct { - SourceSecretSeed string `valid:"-" toml:"SOURCE_SECRET_SEED"` - TradingSecretSeed string `valid:"-" toml:"TRADING_SECRET_SEED"` - AssetCodeA string `valid:"-" toml:"ASSET_CODE_A"` - IssuerA string `valid:"-" toml:"ISSUER_A"` - AssetCodeB string `valid:"-" toml:"ASSET_CODE_B"` - IssuerB string `valid:"-" toml:"ISSUER_B"` - TickIntervalSeconds int32 `valid:"-" toml:"TICK_INTERVAL_SECONDS"` - DeleteCyclesThreshold int64 `valid:"-" toml:"DELETE_CYCLES_THRESHOLD"` - HorizonURL string `valid:"-" toml:"HORIZON_URL"` - AlertType string `valid:"-" toml:"ALERT_TYPE"` - AlertAPIKey string `valid:"-" toml:"ALERT_API_KEY"` - MonitoringPort uint16 `valid:"-" toml:"MONITORING_PORT"` - MonitoringTLSCert string `valid:"-" toml:"MONITORING_TLS_CERT"` - MonitoringTLSKey string `valid:"-" toml:"MONITORING_TLS_KEY"` - GoogleClientID string `valid:"-" toml:"GOOGLE_CLIENT_ID"` - GoogleClientSecret string `valid:"-" toml:"GOOGLE_CLIENT_SECRET"` - AcceptableEmails string `valid:"-" toml:"ACCEPTABLE_GOOGLE_EMAILS"` + SourceSecretSeed string `valid:"-" toml:"SOURCE_SECRET_SEED"` + TradingSecretSeed string `valid:"-" toml:"TRADING_SECRET_SEED"` + AssetCodeA string `valid:"-" toml:"ASSET_CODE_A"` + IssuerA string `valid:"-" toml:"ISSUER_A"` + AssetCodeB string `valid:"-" toml:"ASSET_CODE_B"` + IssuerB string `valid:"-" toml:"ISSUER_B"` + TickIntervalSeconds int32 `valid:"-" toml:"TICK_INTERVAL_SECONDS"` + DeleteCyclesThreshold int64 `valid:"-" toml:"DELETE_CYCLES_THRESHOLD"` + FillTrackerSleepMillis uint32 `valid:"-" toml:"FILL_TRACKER_SLEEP_MILLIS"` + HorizonURL string `valid:"-" toml:"HORIZON_URL"` + AlertType string `valid:"-" toml:"ALERT_TYPE"` + AlertAPIKey string `valid:"-" toml:"ALERT_API_KEY"` + MonitoringPort uint16 `valid:"-" toml:"MONITORING_PORT"` + MonitoringTLSCert string `valid:"-" toml:"MONITORING_TLS_CERT"` + MonitoringTLSKey string `valid:"-" toml:"MONITORING_TLS_KEY"` + GoogleClientID string `valid:"-" toml:"GOOGLE_CLIENT_ID"` + GoogleClientSecret string `valid:"-" toml:"GOOGLE_CLIENT_SECRET"` + AcceptableEmails string `valid:"-" toml:"ACCEPTABLE_GOOGLE_EMAILS"` tradingAccount *string sourceAccount *string // can be nil