Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wait handler for rooms #68

Merged
merged 3 commits into from
Sep 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/config/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Room struct {
NekoPrivilegedImages []string
PathPrefix string
Labels []string
WaitEnabled bool

StorageEnabled bool
StorageInternal string
Expand Down Expand Up @@ -93,6 +94,11 @@ func (Room) Init(cmd *cobra.Command) error {
return err
}

cmd.PersistentFlags().Bool("wait_enabled", true, "enable active waiting for the room")
if err := viper.BindPFlag("wait_enabled", cmd.PersistentFlags().Lookup("wait_enabled")); err != nil {
return err
}

// Data

cmd.PersistentFlags().Bool("storage.enabled", true, "whether storage is enabled, where peristent containers data will be stored")
Expand Down Expand Up @@ -199,6 +205,7 @@ func (s *Room) Set() {
s.NekoPrivilegedImages = viper.GetStringSlice("neko_privileged_images")
s.PathPrefix = path.Join("/", path.Clean(viper.GetString("path_prefix")))
s.Labels = viper.GetStringSlice("labels")
s.WaitEnabled = viper.GetBool("wait_enabled")

s.StorageEnabled = viper.GetBool("storage.enabled")
s.StorageInternal = viper.GetString("storage.internal")
Expand Down
54 changes: 52 additions & 2 deletions internal/proxy/lobby.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,41 @@ import (
"github.com/m1k1o/neko-rooms/internal/utils"
)

func RoomNotFound(w http.ResponseWriter, r *http.Request) {
func roomWait(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`<script>
(async function() {
document.querySelector(".swal2-loader").style.display = 'block'
document.querySelector(".swal2-loader").style.visibility = 'hidden'

let lastAttempt = (new Date()).getTime()
while(true) {
try {
lastAttempt = (new Date()).getTime()
document.querySelector(".swal2-loader").style.visibility = 'visible'

await fetch("?wait")
location.href = location.href
} catch {
let now = (new Date()).getTime()
let diff = now - lastAttempt

// if the gap between last attempt and now
// is gt 20s, do reconnect immediatly
if ((now - lastAttempt) > 20*1000) {
continue
}

// wait for 10 sec
await new Promise(res => setTimeout(res, 2500))
document.querySelector(".swal2-loader").style.visibility = 'hidden'
await new Promise(res => setTimeout(res, 7500))
}
}
}())
</script>`))
}

func RoomNotFound(w http.ResponseWriter, r *http.Request, waitEnabled bool) {
utils.Swal2Response(w, `
<div class="swal2-header">
<div class="swal2-icon swal2-error">
Expand All @@ -16,11 +50,19 @@ func RoomNotFound(w http.ResponseWriter, r *http.Request) {
</div>
<div class="swal2-content">
<div>The room you are trying to join does not exist.</div>
<div>You can wait on this page until it will be created.</div>
</div>
<div class="swal2-actions">
<div class="swal2-loader" style="display:none;"></div>
</div>
`)

if waitEnabled {
roomWait(w, r)
}
}

func RoomNotRunning(w http.ResponseWriter, r *http.Request) {
func RoomNotRunning(w http.ResponseWriter, r *http.Request, waitEnabled bool) {
utils.Swal2Response(w, `
<div class="swal2-header">
<div class="swal2-icon swal2-warning">
Expand All @@ -30,8 +72,16 @@ func RoomNotRunning(w http.ResponseWriter, r *http.Request) {
</div>
<div class="swal2-content">
<div>The room you are trying to join is not running.</div>
<div>You can wait on this page until it will be started.</div>
</div>
<div class="swal2-actions">
<div class="swal2-loader" style="display:none;"></div>
</div>
`)

if waitEnabled {
roomWait(w, r)
}
}

func RoomNotReady(w http.ResponseWriter, r *http.Request) {
Expand Down
85 changes: 78 additions & 7 deletions internal/proxy/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"path"
"strconv"
"strings"
"sync"
Expand All @@ -23,23 +24,34 @@ type entry struct {
handler *httputil.ReverseProxy
}

type wait struct {
subs int
signal chan struct{}
}

type ProxyManagerCtx struct {
logger zerolog.Logger
mu sync.RWMutex
ctx context.Context
cancel func()

waitMu sync.RWMutex
waitChans map[string]*wait
waitEnabled bool

client *dockerClient.Client
instanceName string
handlers *prefixHandler[*entry]
}

func New(client *dockerClient.Client, instanceName string) *ProxyManagerCtx {
func New(client *dockerClient.Client, instanceName string, waitEnabled bool) *ProxyManagerCtx {
return &ProxyManagerCtx{
logger: log.With().Str("module", "proxy").Logger(),
logger: log.With().Str("module", "proxy").Logger(),
waitChans: map[string]*wait{},

client: client,
instanceName: instanceName,
waitEnabled: waitEnabled,
handlers: &prefixHandler[*entry]{},
}
}
Expand Down Expand Up @@ -83,6 +95,17 @@ func (p *ProxyManagerCtx) Start() {
Str("host", host).
Msg("got docker event")

// terminate waiting for any events
if p.waitEnabled {
p.waitMu.Lock()
ch, ok := p.waitChans[path]
if ok {
close(ch.signal)
delete(p.waitChans, path)
}
p.waitMu.Unlock()
}

p.mu.Lock()
switch msg.Action {
case "create":
Expand Down Expand Up @@ -222,28 +245,76 @@ func (p *ProxyManagerCtx) parseLabels(labels map[string]string) (enabled bool, p
return
}

func (p *ProxyManagerCtx) waitForPath(w http.ResponseWriter, r *http.Request, path string) {
p.logger.Debug().Str("path", path).Msg("adding new wait handler")

p.waitMu.Lock()
ch, ok := p.waitChans[path]
if !ok {
ch = &wait{
subs: 1,
signal: make(chan struct{}),
}
p.waitChans[path] = ch
} else {
p.waitChans[path].subs += 1
}
p.waitMu.Unlock()

select {
case <-ch.signal:
w.Write([]byte("ready"))
case <-r.Context().Done():
http.Error(w, r.Context().Err().Error(), http.StatusRequestTimeout)

p.waitMu.Lock()
ch.subs -= 1
if ch.subs <= 0 {
delete(p.waitChans, path)
}
p.waitMu.Unlock()
p.logger.Debug().Str("path", path).Msg("wait handler removed")
case <-p.ctx.Done():
w.Write([]byte("shutdown"))
}
}

func (p *ProxyManagerCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cleanPath := path.Clean(r.URL.Path)

// get proxy by room name
p.mu.RLock()
proxy, prefix, ok := p.handlers.Match(r.URL.Path)
proxy, prefix, ok := p.handlers.Match(cleanPath)
p.mu.RUnlock()

// if room not found
if !ok {
RoomNotFound(w, r)
// blocking until room is created
if r.URL.Query().Has("wait") && p.waitEnabled {
p.waitForPath(w, r, cleanPath)
return
}

RoomNotFound(w, r, p.waitEnabled)
return
}

// redirect to room ending with /
if r.URL.Path == prefix {
r.URL.Path += "/"
if cleanPath == prefix && !strings.HasSuffix(r.URL.Path, "/") {
r.URL.Path = cleanPath + "/"
http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect)
return
}

// if room not running
if !proxy.running {
RoomNotRunning(w, r)
// blocking until room is running
if r.URL.Query().Has("wait") && p.waitEnabled {
p.waitForPath(w, r, cleanPath)
return
}

RoomNotRunning(w, r, p.waitEnabled)
return
}

Expand Down
1 change: 1 addition & 0 deletions neko.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (main *MainCtx) Start() {
main.proxyManager = proxy.New(
client,
main.Configs.Room.InstanceName,
main.Configs.Room.WaitEnabled,
)
main.proxyManager.Start()

Expand Down