Skip to content

Commit

Permalink
added qubic pool connection usage; added flag to disable pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
0xluk committed Dec 20, 2024
1 parent 3c0cc99 commit d908ddf
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 38 deletions.
25 changes: 25 additions & 0 deletions .github/workflows/push-docker-dev.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Deploy dev images to GHCR

on:
push:
branches:
- 'dev'

jobs:
push-store-image:
runs-on: ubuntu-latest
steps:
- name: 'Checkout GitHub Action'
uses: actions/checkout@main

- name: 'Login to GitHub Container Registry'
uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{github.actor}}
password: ${{secrets.GITHUB_TOKEN}}

- name: 'Build Inventory Image'
run: |
docker build . --tag ghcr.io/qubic/go-events:dev
docker push ghcr.io/qubic/go-events:dev
41 changes: 29 additions & 12 deletions app/events-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func run() error {
HttpHost string `conf:"default:0.0.0.0:8000"`
GrpcHost string `conf:"default:0.0.0.0:8001"`
NodeSyncThreshold int `conf:"default:3"`
ChainTickFetchUrl string `conf:"default:http://127.0.0.1:8080/max-tick"`
}
Pool struct {
SingleNodeIP string `conf:"default:127.0.0.1"`
Expand All @@ -48,11 +47,14 @@ func run() error {
IdleTimeout time.Duration `conf:"default:15s"`
}
Qubic struct {
NodePort string `conf:"default:21841"`
StorageFolder string `conf:"default:store"`
ProcessTickTimeout time.Duration `conf:"default:120s"`
NodePort string `conf:"default:21841"`
StorageFolder string `conf:"default:store"`
ConnectionTimeout time.Duration `conf:"default:5s"`
HandlerRequestTimeout time.Duration `conf:"default:5s"`
ProcessTickTimeout time.Duration `conf:"default:120s"`
}
PubSub struct {
Enabled bool `conf:"default:false"`
Addr string `conf:"default:localhost:6379"`
Password string `conf:"default:password"`
}
Expand Down Expand Up @@ -84,13 +86,23 @@ func run() error {
}
log.Printf("main: Config :\n%v\n", out)

connectorConfig := connector.Config{
pfConfig := connector.PoolFetcherConfig{
URL: cfg.Pool.NodeFetcherUrl,
RequestTimeout: cfg.Pool.NodeFetcherTimeout,
}
cConfig := connector.Config{
ConnectionPort: cfg.Qubic.NodePort,
ConnectionTimeout: 5 * time.Second,
HandlerRequestTimeout: 5 * time.Second,
ConnectionTimeout: cfg.Qubic.ConnectionTimeout,
HandlerRequestTimeout: cfg.Qubic.HandlerRequestTimeout,
}
pConfig := connector.PoolConfig{
InitialCap: cfg.Pool.InitialCap,
MaxCap: cfg.Pool.MaxCap,
MaxIdle: cfg.Pool.MaxIdle,
IdleTimeout: cfg.Pool.IdleTimeout,
}

conn, err := connector.NewConnector(cfg.Pool.SingleNodeIP, connectorConfig)
pConn, err := connector.NewPoolConnector(pfConfig, cConfig, pConfig)
if err != nil {
return errors.Wrap(err, "creating connector")
}
Expand All @@ -116,16 +128,21 @@ func run() error {
}
defer db.Close()

redisPubSubClient, err := pubsub.NewRedisPubSub(cfg.PubSub.Addr, cfg.PubSub.Password)
if err != nil {
return errors.Wrap(err, "creating redis pubsub client")
var pubSubClient *pubsub.RedisPubSub
if cfg.PubSub.Enabled {
redisPubSubClient, err := pubsub.NewRedisPubSub(cfg.PubSub.Addr, cfg.PubSub.Password)
if err != nil {
return errors.Wrap(err, "creating redis pubsub client")
}

pubSubClient = redisPubSubClient
}

eventsStore := store.NewStore(db)

var passcode [4]uint64
copy(passcode[:], cfg.Pool.NodePasscode)
proc := processor.NewProcessor(conn, redisPubSubClient, eventsStore, cfg.Qubic.ProcessTickTimeout, passcode)
proc := processor.NewProcessor(pConn, pubSubClient, cfg.PubSub.Enabled, eventsStore, cfg.Qubic.ProcessTickTimeout, passcode)

srv := server.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, eventsStore)
err = srv.Start()
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/cockroachdb/pebble v1.1.2
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0
github.com/pkg/errors v0.9.1
github.com/qubic/go-qubic v0.1.4-0.20241030082827-d70a073f351d
github.com/qubic/go-qubic v0.2.2
github.com/redis/go-redis/v9 v9.7.0
google.golang.org/genproto/googleapis/api v0.0.0-20240730163845-b1a4ccb954bf
google.golang.org/grpc v1.65.0
Expand All @@ -18,7 +18,7 @@ require (
github.com/DataDog/zstd v1.4.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudflare/circl v1.3.8 // indirect
github.com/cloudflare/circl v1.5.0 // indirect
github.com/cockroachdb/errors v1.11.3 // indirect
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
Expand All @@ -41,8 +41,8 @@ require (
github.com/silenceper/pool v1.0.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f // indirect
)
30 changes: 14 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/circl v1.3.8 h1:j+V8jJt09PoeMFIu2uh5JUyEaIHTXVOHslFoLNAKqwI=
github.com/cloudflare/circl v1.3.8/go.mod h1:PDRU+oXvdD7KCtgKxW95M5Z8BpSCJXQORiZFnBQS5QU=
github.com/cloudflare/fourq v0.0.0-20170427000316-8ada258cf9c8 h1:748sGeXXbplK0UVPDLbhh53hejCnvv/u6jn2RPBfyI8=
github.com/cloudflare/fourq v0.0.0-20170427000316-8ada258cf9c8/go.mod h1:13nQglQo5cpucnNY80duyW/6HK+WQ9+dHZ70UzAy6Jw=
github.com/cloudflare/circl v1.5.0 h1:hxIWksrX6XN5a1L2TI/h53AGPhNHoUBo+TD1ms9+pys=
github.com/cloudflare/circl v1.5.0/go.mod h1:uddAzsPgqdMAYatqJ0lsjX1oECcQLIlRpzZh3pJrofs=
github.com/cloudflare/fourq v0.0.0-20240920015215-a8ef7b780d07 h1:A0Btoh92RfhQC5qh15d/gSsPSXoWxax82iwMjc8k+ko=
github.com/cloudflare/fourq v0.0.0-20240920015215-a8ef7b780d07/go.mod h1:13nQglQo5cpucnNY80duyW/6HK+WQ9+dHZ70UzAy6Jw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4=
github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU=
Expand Down Expand Up @@ -232,10 +232,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/qubic/go-qubic v0.1.4-0.20241029121630-0bd3f87f4c75 h1:nM0TTY9XEoPb2d9xUY7t91EEgCTCTlhpup3csQKqdg0=
github.com/qubic/go-qubic v0.1.4-0.20241029121630-0bd3f87f4c75/go.mod h1:6+8pmfKCl5HnZEXM1F8J19nP7gEc5b7bFbhuraN8WnE=
github.com/qubic/go-qubic v0.1.4-0.20241030082827-d70a073f351d h1:5JTqoiC+IenkF8jiqWgv48Zx8KzQAYNq2yAX9+D9h88=
github.com/qubic/go-qubic v0.1.4-0.20241030082827-d70a073f351d/go.mod h1:6+8pmfKCl5HnZEXM1F8J19nP7gEc5b7bFbhuraN8WnE=
github.com/qubic/go-qubic v0.2.2 h1:qPYJWUxINnpgvn+P5JVcFjNCRX43gqLEKv65mjS1b/g=
github.com/qubic/go-qubic v0.2.2/go.mod h1:OqqByAtABECupBpf9pmtG6N+uskGVJwZjhDgyPpHyRc=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down Expand Up @@ -334,8 +332,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -352,8 +350,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -392,17 +390,17 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
14 changes: 9 additions & 5 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ func (e *TickInTheFutureError) Error() string {

type Processor struct {
qubicConnector *connector.Connector
isPubSubEnabled bool
redisPubSubClient *pubsub.RedisPubSub
eventsStore *store.Store
passcode [4]uint64
processTickTimeout time.Duration
}

func NewProcessor(qubicConnector *connector.Connector, redisPubSubClient *pubsub.RedisPubSub, eventsStore *store.Store, processTickTimeout time.Duration, passcode [4]uint64) *Processor {
func NewProcessor(qubicConnector *connector.Connector, redisPubSubClient *pubsub.RedisPubSub, isPubSubEnabled bool, eventsStore *store.Store, processTickTimeout time.Duration, passcode [4]uint64) *Processor {
return &Processor{
qubicConnector: qubicConnector,
isPubSubEnabled: isPubSubEnabled,
redisPubSubClient: redisPubSubClient,
eventsStore: eventsStore,
processTickTimeout: processTickTimeout,
Expand Down Expand Up @@ -83,7 +85,7 @@ func (p *Processor) processOneByOne() error {

eventsClient := events.NewClient(p.qubicConnector)
start := time.Now()
tickEvents, err := eventsClient.GetTickEventsOneByOne(context.Background(), p.passcode, nextTick.TickNumber)
tickEvents, err := eventsClient.GetTickEvents(context.Background(), p.passcode, nextTick.TickNumber)
if err != nil {
return errors.Wrap(err, "getting tick events")
}
Expand All @@ -104,9 +106,11 @@ func (p *Processor) processOneByOne() error {
return errors.Wrapf(err, "processing status for lastTick %+v and nextTick %+v", lastTick, nextTick)
}

err = p.redisPubSubClient.PublishTickEvents(ctx, tickEvents)
if err != nil {
return errors.Wrap(err, "publishing tick events")
if p.isPubSubEnabled {
err = p.redisPubSubClient.PublishTickEvents(ctx, tickEvents)
if err != nil {
return errors.Wrap(err, "publishing tick events")
}
}

return nil
Expand Down

0 comments on commit d908ddf

Please sign in to comment.