Skip to content

Commit

Permalink
Remove service package
Browse files Browse the repository at this point in the history
  • Loading branch information
sergystepanov committed Aug 1, 2023
1 parent 7c0a205 commit 7fe3a89
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 89 deletions.
8 changes: 6 additions & 2 deletions cmd/coordinator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ func main() {
if log.GetLevel() < logger.InfoLevel {
log.Debug().Msgf("conf: %+v", conf)
}
c := coordinator.New(conf, log)
c, err := coordinator.New(conf, log)
if err != nil {
log.Error().Err(err).Msgf("init fail")
return
}
c.Start()
<-os.ExpectTermination()
if err := c.Stop(); err != nil {
log.Error().Err(err).Msg("service shutdown errors")
log.Error().Err(err).Msg("shutdown fail")
}
}
18 changes: 10 additions & 8 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ func run() {
}

done := os.ExpectTermination()
wrk := worker.New(conf, log, done)
wrk.Start()
w, err := worker.New(conf, log)
if err != nil {
log.Error().Err(err).Msgf("init fail")
return
}
w.Start(done)
<-done
time.Sleep(100 * time.Millisecond)
if err := wrk.Stop(); err != nil {
log.Error().Err(err).Msg("service shutdown errors")
time.Sleep(100 * time.Millisecond) // hack
if err := w.Stop(); err != nil {
log.Error().Err(err).Msg("shutdown fail")
}
}

func main() {
thread.Wrap(run)
}
func main() { thread.Wrap(run) }
50 changes: 40 additions & 10 deletions pkg/coordinator/coordinator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package coordinator

import (
"errors"
"fmt"
"html/template"
"net/http"
"strings"
Expand All @@ -10,27 +12,55 @@ import (
"github.com/giongto35/cloud-game/v3/pkg/logger"
"github.com/giongto35/cloud-game/v3/pkg/monitoring"
"github.com/giongto35/cloud-game/v3/pkg/network/httpx"
"github.com/giongto35/cloud-game/v3/pkg/service"
)

func New(conf config.CoordinatorConfig, log *logger.Logger) (services service.Group) {
type Coordinator struct {
hub *Hub
services [2]runnable
}

type runnable interface {
Run()
Stop() error
}

func New(conf config.CoordinatorConfig, log *logger.Logger) (*Coordinator, error) {
coordinator := &Coordinator{}
lib := games.NewLib(conf.Coordinator.Library, conf.Emulator, log)
lib.Scan()
hub := NewHub(conf, lib, log)
coordinator.hub = NewHub(conf, lib, log)
h, err := NewHTTPServer(conf, log, func(mux *httpx.Mux) *httpx.Mux {
mux.HandleFunc("/ws", hub.handleUserConnection())
mux.HandleFunc("/wso", hub.handleWorkerConnection())
mux.HandleFunc("/ws", coordinator.hub.handleUserConnection())
mux.HandleFunc("/wso", coordinator.hub.handleWorkerConnection())
return mux
})
if err != nil {
log.Error().Err(err).Msg("http server init fail")
return
return nil, fmt.Errorf("http init fail: %w", err)
}
services.Add(hub, h)
coordinator.services[0] = h
if conf.Coordinator.Monitoring.IsEnabled() {
services.Add(monitoring.New(conf.Coordinator.Monitoring, h.GetHost(), log))
coordinator.services[1] = monitoring.New(conf.Coordinator.Monitoring, h.GetHost(), log)
}
return coordinator, nil
}

func (c *Coordinator) Start() {
for _, s := range c.services {
if s != nil {
s.Run()
}
}
}

func (c *Coordinator) Stop() error {
var err error
for _, s := range c.services {
if s != nil {
err0 := s.Stop()
err = errors.Join(err, err0)
}
}
return
return err
}

func NewHTTPServer(conf config.CoordinatorConfig, log *logger.Logger, fnMux func(*httpx.Mux) *httpx.Mux) (*httpx.Server, error) {
Expand Down
46 changes: 0 additions & 46 deletions pkg/service/service.go

This file was deleted.

68 changes: 45 additions & 23 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,41 @@
package worker

import (
"errors"
"fmt"
"time"

"github.com/giongto35/cloud-game/v3/pkg/config"
"github.com/giongto35/cloud-game/v3/pkg/logger"
"github.com/giongto35/cloud-game/v3/pkg/monitoring"
"github.com/giongto35/cloud-game/v3/pkg/network/httpx"
"github.com/giongto35/cloud-game/v3/pkg/service"
"github.com/giongto35/cloud-game/v3/pkg/worker/emulator/libretro/manager/remotehttp"
)

type Worker struct {
address string
conf config.WorkerConfig
cord *coordinator
log *logger.Logger
router Router
storage CloudStorage
done chan struct{}
address string
conf config.WorkerConfig
cord *coordinator
log *logger.Logger
router Router
services [2]runnable
storage CloudStorage
}

type runnable interface {
Run()
Stop() error
}

const retry = 10 * time.Second

func New(conf config.WorkerConfig, log *logger.Logger, done chan struct{}) (services service.Group) {
func New(conf config.WorkerConfig, log *logger.Logger) (*Worker, error) {
if err := remotehttp.CheckCores(conf.Emulator, log); err != nil {
log.Error().Err(err).Msg("cores sync error")
log.Warn().Err(err).Msgf("a Libretro cores sync fail")
}

worker := &Worker{conf: conf, log: log, router: NewRouter()}

h, err := httpx.NewServer(
conf.Worker.GetAddr(),
func(s *httpx.Server) httpx.Handler {
Expand All @@ -36,48 +45,51 @@ func New(conf config.WorkerConfig, log *logger.Logger, done chan struct{}) (serv
})
},
httpx.WithServerConfig(conf.Worker.Server),
// no need just for one route
httpx.HttpsRedirect(false),
httpx.WithPortRoll(true),
httpx.WithZone(conf.Worker.Network.Zone),
httpx.WithLogger(log),
)
if err != nil {
log.Error().Err(err).Msg("http init fail")
return
return nil, fmt.Errorf("http init fail: %w", err)
}
services.Add(h)
worker.address = h.Addr
worker.services[0] = h
if conf.Worker.Monitoring.IsEnabled() {
services.Add(monitoring.New(conf.Worker.Monitoring, h.GetHost(), log))
worker.services[1] = monitoring.New(conf.Worker.Monitoring, h.GetHost(), log)
}
st, err := GetCloudStorage(conf.Storage.Provider, conf.Storage.Key)
if err != nil {
log.Error().Err(err).Msgf("cloud storage fail, using dummy cloud storage instead")
log.Warn().Err(err).Msgf("cloud storage fail, using dummy cloud storage instead")
}
services.Add(&Worker{address: h.Addr, conf: conf, done: done, log: log, storage: st, router: NewRouter()})
worker.storage = st

return
return worker, nil
}

func (w *Worker) Run() {
func (w *Worker) Start(done chan struct{}) {
for _, s := range w.services {
if s != nil {
s.Run()
}
}
go func() {
remoteAddr := w.conf.Worker.Network.CoordinatorAddress
defer func() {
if w.cord != nil {
w.cord.Disconnect()
}
w.router.Close()
w.log.Debug().Msgf("Service loop end")
}()

for {
select {
case <-w.done:
case <-done:
return
default:
cord, err := newCoordinatorConnection(remoteAddr, w.conf.Worker, w.address, w.log)
if err != nil {
w.log.Error().Err(err).Msgf("no connection: %v. Retrying in %v", remoteAddr, retry)
w.log.Warn().Err(err).Msgf("no connection: %v. Retrying in %v", remoteAddr, retry)
time.Sleep(retry)
continue
}
Expand All @@ -89,4 +101,14 @@ func (w *Worker) Run() {
}
}()
}
func (w *Worker) Stop() error { return nil }

func (w *Worker) Stop() error {
var err error
for _, s := range w.services {
if s != nil {
err0 := s.Stop()
err = errors.Join(err, err0)
}
}
return err
}

0 comments on commit 7fe3a89

Please sign in to comment.