From c5c5d4bc93f4f7be8596b0e55c4cbe0d6618e841 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 5 Aug 2022 01:02:59 +0200 Subject: [PATCH 1/2] add room wait handler. --- internal/proxy/lobby.go | 46 ++++++++++++++++++++++++ internal/proxy/manager.go | 75 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 117 insertions(+), 4 deletions(-) diff --git a/internal/proxy/lobby.go b/internal/proxy/lobby.go index fc81add..f966b58 100644 --- a/internal/proxy/lobby.go +++ b/internal/proxy/lobby.go @@ -6,6 +6,40 @@ import ( "github.com/m1k1o/neko-rooms/internal/utils" ) +func roomWait(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(``)) +} + func RoomNotFound(w http.ResponseWriter, r *http.Request) { utils.Swal2Response(w, `
@@ -16,8 +50,14 @@ func RoomNotFound(w http.ResponseWriter, r *http.Request) {
The room you are trying to join does not exist.
+
You can wait on this page until it will be created.
+
+
+
`) + + roomWait(w, r) } func RoomNotRunning(w http.ResponseWriter, r *http.Request) { @@ -30,8 +70,14 @@ func RoomNotRunning(w http.ResponseWriter, r *http.Request) {
The room you are trying to join is not running.
+
You can wait on this page until it will be started.
+
+
+
`) + + roomWait(w, r) } func RoomNotReady(w http.ResponseWriter, r *http.Request) { diff --git a/internal/proxy/manager.go b/internal/proxy/manager.go index 4c73974..91bddfa 100644 --- a/internal/proxy/manager.go +++ b/internal/proxy/manager.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httputil" "net/url" + "path" "strconv" "strings" "sync" @@ -23,12 +24,20 @@ type entry struct { handler *httputil.ReverseProxy } +type wait struct { + subs int + signal chan struct{} +} + type ProxyManagerCtx struct { logger zerolog.Logger mu sync.RWMutex ctx context.Context cancel func() + waitMu sync.RWMutex + waitChans map[string]*wait + client *dockerClient.Client instanceName string handlers *prefixHandler[*entry] @@ -36,7 +45,8 @@ type ProxyManagerCtx struct { func New(client *dockerClient.Client, instanceName string) *ProxyManagerCtx { return &ProxyManagerCtx{ - logger: log.With().Str("module", "proxy").Logger(), + logger: log.With().Str("module", "proxy").Logger(), + waitChans: map[string]*wait{}, client: client, instanceName: instanceName, @@ -83,6 +93,15 @@ func (p *ProxyManagerCtx) Start() { Str("host", host). Msg("got docker event") + // terminate waiting for any events + p.waitMu.Lock() + ch, ok := p.waitChans[path] + if ok { + close(ch.signal) + delete(p.waitChans, path) + } + p.waitMu.Unlock() + p.mu.Lock() switch msg.Action { case "create": @@ -222,27 +241,75 @@ func (p *ProxyManagerCtx) parseLabels(labels map[string]string) (enabled bool, p return } +func (p *ProxyManagerCtx) waitForPath(w http.ResponseWriter, r *http.Request, path string) { + p.logger.Debug().Str("path", path).Msg("adding new wait handler") + + p.waitMu.Lock() + ch, ok := p.waitChans[path] + if !ok { + ch = &wait{ + subs: 1, + signal: make(chan struct{}), + } + p.waitChans[path] = ch + } else { + p.waitChans[path].subs += 1 + } + p.waitMu.Unlock() + + select { + case <-ch.signal: + w.Write([]byte("ready")) + case <-r.Context().Done(): + http.Error(w, r.Context().Err().Error(), http.StatusRequestTimeout) + + p.waitMu.Lock() + ch.subs -= 1 + if ch.subs <= 0 { + delete(p.waitChans, path) + } + p.waitMu.Unlock() + p.logger.Debug().Str("path", path).Msg("wait handler removed") + case <-p.ctx.Done(): + w.Write([]byte("shutdown")) + } +} + func (p *ProxyManagerCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { + cleanPath := path.Clean(r.URL.Path) + // get proxy by room name p.mu.RLock() - proxy, prefix, ok := p.handlers.Match(r.URL.Path) + proxy, prefix, ok := p.handlers.Match(cleanPath) p.mu.RUnlock() // if room not found if !ok { + // blocking until room is created + if r.URL.Query().Has("wait") { + p.waitForPath(w, r, cleanPath) + return + } + RoomNotFound(w, r) return } // redirect to room ending with / - if r.URL.Path == prefix { - r.URL.Path += "/" + if cleanPath == prefix && !strings.HasSuffix(r.URL.Path, "/") { + r.URL.Path = cleanPath + "/" http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect) return } // if room not running if !proxy.running { + // blocking until room is running + if r.URL.Query().Has("wait") { + p.waitForPath(w, r, cleanPath) + return + } + RoomNotRunning(w, r) return } From 30af83d3b12953e31e0455bc34e4f15f689a0a50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Sun, 18 Sep 2022 12:31:21 +0200 Subject: [PATCH 2/2] add wait enabled. --- internal/config/room.go | 7 +++++++ internal/proxy/lobby.go | 12 ++++++++---- internal/proxy/manager.go | 30 +++++++++++++++++------------- neko.go | 1 + 4 files changed, 33 insertions(+), 17 deletions(-) diff --git a/internal/config/room.go b/internal/config/room.go index b65dbfc..9e302db 100644 --- a/internal/config/room.go +++ b/internal/config/room.go @@ -31,6 +31,7 @@ type Room struct { NekoPrivilegedImages []string PathPrefix string Labels []string + WaitEnabled bool StorageEnabled bool StorageInternal string @@ -93,6 +94,11 @@ func (Room) Init(cmd *cobra.Command) error { return err } + cmd.PersistentFlags().Bool("wait_enabled", true, "enable active waiting for the room") + if err := viper.BindPFlag("wait_enabled", cmd.PersistentFlags().Lookup("wait_enabled")); err != nil { + return err + } + // Data cmd.PersistentFlags().Bool("storage.enabled", true, "whether storage is enabled, where peristent containers data will be stored") @@ -199,6 +205,7 @@ func (s *Room) Set() { s.NekoPrivilegedImages = viper.GetStringSlice("neko_privileged_images") s.PathPrefix = path.Join("/", path.Clean(viper.GetString("path_prefix"))) s.Labels = viper.GetStringSlice("labels") + s.WaitEnabled = viper.GetBool("wait_enabled") s.StorageEnabled = viper.GetBool("storage.enabled") s.StorageInternal = viper.GetString("storage.internal") diff --git a/internal/proxy/lobby.go b/internal/proxy/lobby.go index f966b58..23c294f 100644 --- a/internal/proxy/lobby.go +++ b/internal/proxy/lobby.go @@ -40,7 +40,7 @@ func roomWait(w http.ResponseWriter, r *http.Request) { `)) } -func RoomNotFound(w http.ResponseWriter, r *http.Request) { +func RoomNotFound(w http.ResponseWriter, r *http.Request, waitEnabled bool) { utils.Swal2Response(w, `
@@ -57,10 +57,12 @@ func RoomNotFound(w http.ResponseWriter, r *http.Request) {
`) - roomWait(w, r) + if waitEnabled { + roomWait(w, r) + } } -func RoomNotRunning(w http.ResponseWriter, r *http.Request) { +func RoomNotRunning(w http.ResponseWriter, r *http.Request, waitEnabled bool) { utils.Swal2Response(w, `
@@ -77,7 +79,9 @@ func RoomNotRunning(w http.ResponseWriter, r *http.Request) {
`) - roomWait(w, r) + if waitEnabled { + roomWait(w, r) + } } func RoomNotReady(w http.ResponseWriter, r *http.Request) { diff --git a/internal/proxy/manager.go b/internal/proxy/manager.go index 91bddfa..3890b02 100644 --- a/internal/proxy/manager.go +++ b/internal/proxy/manager.go @@ -35,21 +35,23 @@ type ProxyManagerCtx struct { ctx context.Context cancel func() - waitMu sync.RWMutex - waitChans map[string]*wait + waitMu sync.RWMutex + waitChans map[string]*wait + waitEnabled bool client *dockerClient.Client instanceName string handlers *prefixHandler[*entry] } -func New(client *dockerClient.Client, instanceName string) *ProxyManagerCtx { +func New(client *dockerClient.Client, instanceName string, waitEnabled bool) *ProxyManagerCtx { return &ProxyManagerCtx{ logger: log.With().Str("module", "proxy").Logger(), waitChans: map[string]*wait{}, client: client, instanceName: instanceName, + waitEnabled: waitEnabled, handlers: &prefixHandler[*entry]{}, } } @@ -94,13 +96,15 @@ func (p *ProxyManagerCtx) Start() { Msg("got docker event") // terminate waiting for any events - p.waitMu.Lock() - ch, ok := p.waitChans[path] - if ok { - close(ch.signal) - delete(p.waitChans, path) + if p.waitEnabled { + p.waitMu.Lock() + ch, ok := p.waitChans[path] + if ok { + close(ch.signal) + delete(p.waitChans, path) + } + p.waitMu.Unlock() } - p.waitMu.Unlock() p.mu.Lock() switch msg.Action { @@ -286,12 +290,12 @@ func (p *ProxyManagerCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { // if room not found if !ok { // blocking until room is created - if r.URL.Query().Has("wait") { + if r.URL.Query().Has("wait") && p.waitEnabled { p.waitForPath(w, r, cleanPath) return } - RoomNotFound(w, r) + RoomNotFound(w, r, p.waitEnabled) return } @@ -305,12 +309,12 @@ func (p *ProxyManagerCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { // if room not running if !proxy.running { // blocking until room is running - if r.URL.Query().Has("wait") { + if r.URL.Query().Has("wait") && p.waitEnabled { p.waitForPath(w, r, cleanPath) return } - RoomNotRunning(w, r) + RoomNotRunning(w, r, p.waitEnabled) return } diff --git a/neko.go b/neko.go index da8c7d6..9fb3822 100644 --- a/neko.go +++ b/neko.go @@ -145,6 +145,7 @@ func (main *MainCtx) Start() { main.proxyManager = proxy.New( client, main.Configs.Room.InstanceName, + main.Configs.Room.WaitEnabled, ) main.proxyManager.Start()