Skip to content

Commit

Permalink
✨ GetCandles
Browse files Browse the repository at this point in the history
  • Loading branch information
MateoGreil committed May 7, 2024
1 parent 79092d3 commit 4e08aa7
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 25 deletions.
6 changes: 6 additions & 0 deletions internal/protocols/stream/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ type Request struct {
Command string `json:"command"`
StreamSessionId string `json:"streamSessionId"`
}

type GetCandlesRequest struct {
Command string `json:"command"`
StreamSessionId string `json:"streamSessionId"`
Symbol string `json:"symbol"`
}
27 changes: 27 additions & 0 deletions internal/protocols/stream/response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,32 @@
package stream

type Response struct {
Command string `json:"command"`
}

type KeepAliveResponse struct {
Command string `json:"command"`
Data KeepAliveData `json:"data"`
}

type KeepAliveData struct {
Timestamp int `json:"timestamp"`
}

type ResponseCandle struct {
Command string `json:"response"`
Data Candle `json:"data"`
}

// TODO: Move it to a common package (stream and socket use it)
type Candle struct {
Close float64 `json:"close"`
Ctm int64 `json:"ctm"`
CtmString string `json:"ctmString"`
High float64 `json:"high"`
Low float64 `json:"low"`
Open float64 `json:"open"`
QuoteId int `json:"quoteId"`
Symbol string `json:"symbol"`
Vol float64 `json:"vol"`
}
13 changes: 13 additions & 0 deletions internal/protocols/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package xapi

type Candle struct {
Close float64 `json:"close"`
Ctm int64 `json:"ctm"`
CtmString string `json:"ctmString"`
High float64 `json:"high"`
Low float64 `json:"low"`
Open float64 `json:"open"`
QuoteId int `json:"quoteId"`
Symbol string `json:"symbol"`
Vol float64 `json:"vol"`
}
97 changes: 78 additions & 19 deletions xapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ import (
"fmt"
"time"

socket "github.com/MateoGreil/xapi-go/internal/protocols/socket"
stream "github.com/MateoGreil/xapi-go/internal/protocols/stream"
"github.com/MateoGreil/xapi-go/internal/protocols/socket"
"github.com/MateoGreil/xapi-go/internal/protocols/stream"
"github.com/gorilla/websocket"
)

type client struct {
conn *websocket.Conn
streamConn *websocket.Conn
streamSessionId string
conn *websocket.Conn
streamConn *websocket.Conn
streamSessionId string
socketMessageChannel chan interface{}
streamMessageChannel chan interface{}
CandlesChannel chan stream.Candle
}

const (
Expand Down Expand Up @@ -48,33 +51,58 @@ func NewClient(userId string, password string, connectionType string) (*client,
if err != nil {
return nil, err
}
getKeepAlive(conn, streamSessionId)
getKeepAlive(streamConn, streamSessionId)

c := &client{
conn: conn,
streamConn: streamConn,
streamSessionId: streamSessionId,
conn: conn,
streamConn: streamConn,
streamSessionId: streamSessionId,
socketMessageChannel: make(chan interface{}),
streamMessageChannel: make(chan interface{}),
CandlesChannel: make(chan stream.Candle),
}
go c.pingSocket()
go c.pingStream()
go c.listenStream()
go c.socketWriteJSON()
go c.streamWriteJSON()

return c, nil
}

func (c *client) SubscribeCandles(symbol string) {
request := stream.GetCandlesRequest{
Command: "getCandles",
StreamSessionId: c.streamSessionId,
Symbol: symbol,
}
c.streamMessageChannel <- request
}

func (c *client) listenStream() {
for {
_, msg, err := c.streamConn.ReadMessage()
_, message, err := c.streamConn.ReadMessage()
if err != nil {
fmt.Println(err.Error())
}
fmt.Println("Stream message:", msg)
response := stream.Response{}
err = json.Unmarshal(msg, &response)
err = json.Unmarshal(message, &response)
if err != nil {
fmt.Printf("message: %s\n", message)
fmt.Println(err.Error())
}
switch response.Command {
case "candle":
responseCandle := stream.ResponseCandle{}
err = json.Unmarshal(message, &responseCandle)
if err != nil {
fmt.Println(err.Error())
}
c.CandlesChannel <- responseCandle.Data
case "keepAlive":
fmt.Printf("keepAlive received\n")
default:
fmt.Printf("Unknown stream message: %s\n", message)
}
}
}
Expand All @@ -85,12 +113,12 @@ func (c *client) pingSocket() {
Command: "ping",
Arguments: nil,
}
c.conn.WriteJSON(request)
response := socket.Response{}
err := c.conn.ReadJSON(&response)
if err != nil {
fmt.Println(err.Error())
}
c.socketMessageChannel <- request
// response := socket.Response{}
// err := c.conn.ReadJSON(&response)
// if err != nil {
// fmt.Println(err.Error())
// }
time.Sleep(pingInterval)
}
}
Expand All @@ -101,11 +129,27 @@ func (c *client) pingStream() {
Command: "ping",
StreamSessionId: c.streamSessionId,
}
c.streamConn.WriteJSON(request)
c.streamMessageChannel <- request
time.Sleep(pingInterval)
}
}

func (c *client) streamWriteJSON() {
for {
message := <-c.streamMessageChannel
c.streamConn.WriteJSON(message)
fmt.Printf("messageStream: %+v\n", message)
}
}

func (c *client) socketWriteJSON() {
for {
message := <-c.socketMessageChannel
c.conn.WriteJSON(message)
fmt.Printf("messageSocket: %+v\n", message)
}
}

func login(conn *websocket.Conn, userId string, password string) (string, error) {
request := socket.Request{
Command: "login",
Expand Down Expand Up @@ -133,4 +177,19 @@ func getKeepAlive(conn *websocket.Conn, streamSessionId string) {
StreamSessionId: streamSessionId,
}
conn.WriteJSON(keepAliveReq)
_, message, err := conn.ReadMessage()
if err != nil {
// TODO: Handle errors
fmt.Println(err.Error())
}
response := stream.KeepAliveResponse{}
err = json.Unmarshal(message, &response)
if err != nil {
// TODO: Handle errors
fmt.Println(err.Error())
}
if response.Command != "keepAlive" {
// TODO: Handle errors
fmt.Println(err.Error())
}
}
28 changes: 22 additions & 6 deletions xapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@ import (
"fmt"
"os"
"testing"
"time"
)

func TestNewClient(t *testing.T) {
_, err := NewClient(os.Getenv("XAPI_USER_ID"), os.Getenv("XAPI_PASSWORD"), "demo")
if err != nil {
t.Error(err)
}

_, err = NewClient(os.Getenv("XAPI_USER_ID"), "wrong-password", "demo")
_, err := NewClient(os.Getenv("XAPI_USER_ID"), "wrong-password", "demo")
if err.Error() != "userPasswordCheck: Invalid login or password" {
t.Error(err)
}
Expand All @@ -22,4 +18,24 @@ func TestNewClient(t *testing.T) {
fmt.Println(err)
t.Error(err)
}

_, err = NewClient(os.Getenv("XAPI_USER_ID"), os.Getenv("XAPI_PASSWORD"), "demo")
if err != nil {
t.Error(err)
}
}

func TestSuscribeCandles(t *testing.T) {
xapiClient, err := NewClient(os.Getenv("XAPI_USER_ID"), os.Getenv("XAPI_PASSWORD"), "demo")
if err != nil {
t.Error(err)
}

xapiClient.SubscribeCandles("EURUSD")
select {
case candle := <-xapiClient.CandlesChannel:
fmt.Printf("%+v\n", candle)
case <-time.After(2 * time.Minute):
t.Error("Did not receive candles")
}
}

0 comments on commit 4e08aa7

Please sign in to comment.