Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
Kelp GUI: add support for multitenancy on the backend (closes #682) (#…
Browse files Browse the repository at this point in the history
…688)

* 1 - update /autogenerate endpoint to accept user_data

* 2 - update /start /stop /deleteBot endpoints to include user data

* 3 - update /listBots endpoint with user data

* 4 - update /genBotName with user data

* 5 - update /getBotConfig and /getBotInfo endpoints

* 6 - update /getNewBotConfig endpoint

* 7 - make kelpErrorMap on the backend multi-tenant

* 8 - make bots.go multi-tenant (rename to userBotData.go)

* 9 - add user context for /getBotState
  • Loading branch information
nikhilsaraf authored Apr 6, 2021
1 parent 55762dc commit 6ad2cde
Show file tree
Hide file tree
Showing 33 changed files with 658 additions and 217 deletions.
131 changes: 95 additions & 36 deletions gui/backend/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,44 @@ import (
"github.com/stellar/kelp/support/kelpos"
)

// UserData is the json data passed in to represent a user
type UserData struct {
ID string `json:"id"`
}

// toUser converts to the format understood by kelpos
func (u UserData) toUser() *kelpos.User {
return kelpos.MakeUser(u.ID)
}

// String is the stringer method
func (u UserData) String() string {
return fmt.Sprintf("UserData[ID=%s]", u.ID)
}

// kelpErrorDataForUser tracks errors for a given user
type kelpErrorDataForUser struct {
errorMap map[string]KelpError
lock *sync.Mutex
}

// APIServer is an instance of the API service
type APIServer struct {
kelpBinPath *kelpos.OSPath
botConfigsPath *kelpos.OSPath
botLogsPath *kelpos.OSPath
kos *kelpos.KelpOS
horizonTestnetURI string
horizonPubnetURI string
ccxtRestUrl string
apiTestNet *horizonclient.Client
apiPubNet *horizonclient.Client
disablePubnet bool
noHeaders bool
quitFn func()
metricsTracker *plugins.MetricsTracker
kelpErrorMap map[string]KelpError
kelpErrorMapLock *sync.Mutex
kelpBinPath *kelpos.OSPath
botConfigsPath *kelpos.OSPath
botLogsPath *kelpos.OSPath
kos *kelpos.KelpOS
horizonTestnetURI string
horizonPubnetURI string
ccxtRestUrl string
apiTestNet *horizonclient.Client
apiPubNet *horizonclient.Client
disablePubnet bool
noHeaders bool
quitFn func()
metricsTracker *plugins.MetricsTracker
kelpErrorsByUser map[string]kelpErrorDataForUser
kelpErrorsByUserLock *sync.Mutex

cachedOptionsMetadata metadata
}
Expand All @@ -61,8 +82,6 @@ func MakeAPIServer(
return nil, fmt.Errorf("error while loading options metadata when making APIServer: %s", e)
}

kelpErrorMap := map[string]KelpError{}

return &APIServer{
kelpBinPath: kelpBinPath,
botConfigsPath: botConfigsPath,
Expand All @@ -78,19 +97,42 @@ func MakeAPIServer(
cachedOptionsMetadata: optionsMetadata,
quitFn: quitFn,
metricsTracker: metricsTracker,
kelpErrorMap: kelpErrorMap,
kelpErrorMapLock: &sync.Mutex{},
kelpErrorsByUser: map[string]kelpErrorDataForUser{},
kelpErrorsByUserLock: &sync.Mutex{},
}, nil
}

// InitBackend initializes anything required to get the backend ready to serve
func (s *APIServer) InitBackend() error {
// initial load of bots into memory
_, e := s.doListBots()
if e != nil {
return fmt.Errorf("error listing/loading bots: %s", e)
func (s *APIServer) botConfigsPathForUser(userID string) *kelpos.OSPath {
return s.botConfigsPath.Join(userID)
}

func (s *APIServer) botLogsPathForUser(userID string) *kelpos.OSPath {
return s.botLogsPath.Join(userID)
}

func (s *APIServer) kelpErrorsForUser(userID string) kelpErrorDataForUser {
s.kelpErrorsByUserLock.Lock()
defer s.kelpErrorsByUserLock.Unlock()

var kefu kelpErrorDataForUser
if v, ok := s.kelpErrorsByUser[userID]; ok {
kefu = v
} else {
// create new value and insert in map
kefu = kelpErrorDataForUser{
errorMap: map[string]KelpError{},
lock: &sync.Mutex{},
}
s.kelpErrorsByUser[userID] = kefu
}

return kefu
}

// InitBackend initializes anything required to get the backend ready to serve
func (s *APIServer) InitBackend() error {
// do not do an initial load of bots into memory for now since it's based on the user context which we don't have right now
// and we don't want to do it for all users right now
return nil
}

Expand Down Expand Up @@ -177,20 +219,37 @@ func (s *APIServer) writeErrorJson(w http.ResponseWriter, message string) {
w.Write(marshalledJson)
}

func (s *APIServer) addKelpErrorToMap(ke KelpError) {
func (s *APIServer) addKelpErrorToMap(userData UserData, ke KelpError) {
key := ke.UUID

kefu := s.kelpErrorsForUser(userData.ID)
// need to use a lock because we could encounter a "concurrent map writes" error against the map which is being updated by multiple threads
s.kelpErrorMapLock.Lock()
defer s.kelpErrorMapLock.Unlock()
kefu.lock.Lock()
defer kefu.lock.Unlock()

kefu.errorMap[key] = ke
}

s.kelpErrorMap[key] = ke
// removeKelpErrorUserDataIfEmpty removes user error data if the underlying map is empty
func (s *APIServer) removeKelpErrorUserDataIfEmpty(userData UserData) {
// issue with this is that someone can hold a reference to this object when it is empty
// and then we remove from parent map and the other thread will add a value, which would result
// in the object having an entry in the map but being orphaned.
//
// We can get creative with timeouts too but that is all an overoptimizationn
//
// We could resolve this by always holding both the higher level lock and the per-user lock to modify
// values inside a user's error map, but that will slow things down
//
// for now we do not remove Kelp error user data even if empty.

// do nothing
}

func (s *APIServer) writeKelpError(w http.ResponseWriter, kerw KelpErrorResponseWrapper) {
func (s *APIServer) writeKelpError(userData UserData, w http.ResponseWriter, kerw KelpErrorResponseWrapper) {
w.WriteHeader(http.StatusInternalServerError)
log.Printf("writing error: %s\n", kerw.String())
s.addKelpErrorToMap(kerw.KelpError)
s.addKelpErrorToMap(userData, kerw.KelpError)

marshalledJSON, e := json.MarshalIndent(kerw, "", " ")
if e != nil {
Expand Down Expand Up @@ -239,15 +298,15 @@ func (s *APIServer) runKelpCommandBackground(namespace string, cmd string) (*kel
return s.kos.Background(namespace, cmdString)
}

func (s *APIServer) setupOpsDirectory() error {
e := s.kos.Mkdir(s.botConfigsPath)
func (s *APIServer) setupOpsDirectory(userID string) error {
e := s.kos.Mkdir(s.botConfigsPathForUser(userID))
if e != nil {
return fmt.Errorf("error setting up configs directory (%s): %s\n", s.botConfigsPath, e)
return fmt.Errorf("error setting up configs directory (%s): %s", s.botConfigsPathForUser(userID).Native(), e)
}

e = s.kos.Mkdir(s.botLogsPath)
e = s.kos.Mkdir(s.botLogsPathForUser(userID))
if e != nil {
return fmt.Errorf("error setting up logs directory (%s): %s\n", s.botLogsPath, e)
return fmt.Errorf("error setting up logs directory (%s): %s", s.botLogsPathForUser(userID).Native(), e)
}

return nil
Expand Down
42 changes: 33 additions & 9 deletions gui/backend/autogenerate_bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backend
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
Expand All @@ -28,7 +29,30 @@ var centralizedVolumePrecisionOverride = int8(1)
var centralizedMinBaseVolumeOverride = float64(30.0)
var centralizedMinQuoteVolumeOverride = float64(10.0)

type autogenerateBotRequest struct {
UserData UserData `json:"user_data"`
}

func (s *APIServer) autogenerateBot(w http.ResponseWriter, r *http.Request) {
bodyBytes, e := ioutil.ReadAll(r.Body)
if e != nil {
s.writeErrorJson(w, fmt.Sprintf("error reading request input: %s", e))
return
}
log.Printf("autogenerateBot requestJson: %s\n", string(bodyBytes))

var req autogenerateBotRequest
e = json.Unmarshal(bodyBytes, &req)
if e != nil {
s.writeErrorJson(w, fmt.Sprintf("error unmarshaling json: %s; bodyString = %s", e, string(bodyBytes)))
return
}
userID := req.UserData.ID
if strings.TrimSpace(userID) == "" {
s.writeErrorJson(w, fmt.Sprintf("cannot have empty userID"))
return
}

kp, e := keypair.Random()
if e != nil {
s.writeError(w, fmt.Sprintf("error generating keypair: %s\n", e))
Expand All @@ -37,14 +61,14 @@ func (s *APIServer) autogenerateBot(w http.ResponseWriter, r *http.Request) {

// make and register bot, which places it in the initial bot state
bot := model2.MakeAutogeneratedBot()
e = s.kos.RegisterBot(bot)
e = s.kos.BotDataForUser(req.UserData.toUser()).RegisterBot(bot)
if e != nil {
// the bot is not registered at this stage so we don't throw a KelpError here
s.writeError(w, fmt.Sprintf("error registering bot: %s\n", e))
return
}

e = s.setupOpsDirectory()
e = s.setupOpsDirectory(userID)
if e != nil {
// the bot is not registered at this stage so we don't throw a KelpError here
s.writeError(w, fmt.Sprintf("error setting up ops directory: %s\n", e))
Expand All @@ -53,7 +77,7 @@ func (s *APIServer) autogenerateBot(w http.ResponseWriter, r *http.Request) {

filenamePair := bot.Filenames()
sampleTrader := s.makeSampleTrader(kp.Seed())
traderFilePath := s.botConfigsPath.Join(filenamePair.Trader)
traderFilePath := s.botConfigsPathForUser(userID).Join(filenamePair.Trader)
log.Printf("writing autogenerated bot config to file: %s\n", traderFilePath.AsString())
e = toml.WriteFile(traderFilePath.Native(), sampleTrader)
if e != nil {
Expand All @@ -63,11 +87,11 @@ func (s *APIServer) autogenerateBot(w http.ResponseWriter, r *http.Request) {
}

sampleBuysell := makeSampleBuysell()
strategyFilePath := s.botConfigsPath.Join(filenamePair.Strategy)
strategyFilePath := s.botConfigsPathForUser(userID).Join(filenamePair.Strategy)
log.Printf("writing autogenerated strategy config to file: %s\n", strategyFilePath.AsString())
e = toml.WriteFile(strategyFilePath.Native(), sampleBuysell)
if e != nil {
s.writeKelpError(w, makeKelpErrorResponseWrapper(
s.writeKelpError(req.UserData, w, makeKelpErrorResponseWrapper(
errorTypeBot,
bot.Name,
time.Now().UTC(),
Expand All @@ -81,7 +105,7 @@ func (s *APIServer) autogenerateBot(w http.ResponseWriter, r *http.Request) {
go func() {
e := s.setupTestnetAccount(kp.Address(), kp.Seed(), bot.Name)
if e != nil {
s.writeKelpError(w, makeKelpErrorResponseWrapper(
s.writeKelpError(req.UserData, w, makeKelpErrorResponseWrapper(
errorTypeBot,
bot.Name,
time.Now().UTC(),
Expand All @@ -91,9 +115,9 @@ func (s *APIServer) autogenerateBot(w http.ResponseWriter, r *http.Request) {
return
}

e = s.kos.AdvanceBotState(bot.Name, kelpos.InitState())
e = s.kos.BotDataForUser(req.UserData.toUser()).AdvanceBotState(bot.Name, kelpos.InitState())
if e != nil {
s.writeKelpError(w, makeKelpErrorResponseWrapper(
s.writeKelpError(req.UserData, w, makeKelpErrorResponseWrapper(
errorTypeBot,
bot.Name,
time.Now().UTC(),
Expand All @@ -106,7 +130,7 @@ func (s *APIServer) autogenerateBot(w http.ResponseWriter, r *http.Request) {

botJSON, e := json.Marshal(*bot)
if e != nil {
s.writeKelpError(w, makeKelpErrorResponseWrapper(
s.writeKelpError(req.UserData, w, makeKelpErrorResponseWrapper(
errorTypeBot,
bot.Name,
time.Now().UTC(),
Expand Down
41 changes: 30 additions & 11 deletions gui/backend/delete_bot.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,45 @@
package backend

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"time"

"github.com/stellar/kelp/gui/model2"
"github.com/stellar/kelp/support/kelpos"
)

type deleteBotRequest struct {
UserData UserData `json:"user_data"`
BotName string `json:"bot_name"`
}

func (s *APIServer) deleteBot(w http.ResponseWriter, r *http.Request) {
botName, e := s.parseBotName(r)
bodyBytes, e := ioutil.ReadAll(r.Body)
if e != nil {
s.writeError(w, fmt.Sprintf("error in deleteBot: %s\n", e))
s.writeErrorJson(w, fmt.Sprintf("error when reading request input: %s\n", e))
return
}
var req deleteBotRequest
e = json.Unmarshal(bodyBytes, &req)
if e != nil {
s.writeErrorJson(w, fmt.Sprintf("error unmarshaling json: %s; bodyString = %s", e, string(bodyBytes)))
return
}
if strings.TrimSpace(req.UserData.ID) == "" {
s.writeErrorJson(w, fmt.Sprintf("cannot have empty userID"))
return
}
botName := req.BotName

// only stop bot if current state is running
botState, e := s.doGetBotState(botName)
botState, e := s.doGetBotState(req.UserData, botName)
if e != nil {
s.writeKelpError(w, makeKelpErrorResponseWrapper(
s.writeKelpError(req.UserData, w, makeKelpErrorResponseWrapper(
errorTypeBot,
botName,
time.Now().UTC(),
Expand All @@ -31,9 +50,9 @@ func (s *APIServer) deleteBot(w http.ResponseWriter, r *http.Request) {
}
log.Printf("current botState: %s\n", botState)
if botState == kelpos.BotStateRunning {
e = s.doStopBot(botName)
e = s.doStopBot(req.UserData, botName)
if e != nil {
s.writeKelpError(w, makeKelpErrorResponseWrapper(
s.writeKelpError(req.UserData, w, makeKelpErrorResponseWrapper(
errorTypeBot,
botName,
time.Now().UTC(),
Expand All @@ -45,9 +64,9 @@ func (s *APIServer) deleteBot(w http.ResponseWriter, r *http.Request) {
}

for {
botState, e := s.doGetBotState(botName)
botState, e := s.doGetBotState(req.UserData, botName)
if e != nil {
s.writeKelpError(w, makeKelpErrorResponseWrapper(
s.writeKelpError(req.UserData, w, makeKelpErrorResponseWrapper(
errorTypeBot,
botName,
time.Now().UTC(),
Expand All @@ -66,14 +85,14 @@ func (s *APIServer) deleteBot(w http.ResponseWriter, r *http.Request) {
}

// unregister bot
s.kos.SafeUnregisterBot(botName)
s.kos.BotDataForUser(req.UserData.toUser()).SafeUnregisterBot(botName)

// delete configs
botPrefix := model2.GetPrefix(botName)
botConfigPath := s.botConfigsPath.Join(botPrefix)
botConfigPath := s.botConfigsPathForUser(req.UserData.ID).Join(botPrefix)
_, e = s.kos.Blocking("rm", fmt.Sprintf("rm %s*", botConfigPath.Unix()))
if e != nil {
s.writeKelpError(w, makeKelpErrorResponseWrapper(
s.writeKelpError(req.UserData, w, makeKelpErrorResponseWrapper(
errorTypeBot,
botName,
time.Now().UTC(),
Expand Down
Loading

0 comments on commit 6ad2cde

Please sign in to comment.