Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add live proxy feature #512

Merged
merged 6 commits into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions air_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,9 @@ clean_on_exit = true
[screen]
clear_on_rebuild = true
keep_scroll = true

# Enable live-reloading on the browser.
[proxy]
enabled = true
proxy_port = 8090
app_port = 8080
12 changes: 9 additions & 3 deletions runner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Config struct {
Log cfgLog `toml:"log"`
Misc cfgMisc `toml:"misc"`
Screen cfgScreen `toml:"screen"`
Proxy cfgProxy `toml:"proxy"`
}

type cfgBuild struct {
Expand Down Expand Up @@ -96,6 +97,12 @@ type cfgScreen struct {
KeepScroll bool `toml:"keep_scroll"`
}

type cfgProxy struct {
Enabled bool `toml:"enabled"`
ProxyPort int `toml:"proxy_port"`
AppPort int `toml:"app_port"`
}

type sliceTransformer struct{}

func (t sliceTransformer) Transformer(typ reflect.Type) func(dst, src reflect.Value) error {
Expand Down Expand Up @@ -350,10 +357,9 @@ func (c *Config) killDelay() time.Duration {
// interpret as milliseconds if less than the value of 1 millisecond
if c.Build.KillDelay < time.Millisecond {
return c.Build.KillDelay * time.Millisecond
} else {
// normalize kill delay to milliseconds
return time.Duration(c.Build.KillDelay.Milliseconds()) * time.Millisecond
}
// normalize kill delay to milliseconds
return time.Duration(c.Build.KillDelay.Milliseconds()) * time.Millisecond
}

func (c *Config) binPath() string {
Expand Down
15 changes: 15 additions & 0 deletions runner/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// Engine ...
type Engine struct {
config *Config
proxy *Proxy
logger *logger
watcher filenotify.FileWatcher
debugMode bool
Expand Down Expand Up @@ -48,6 +49,7 @@
}
e := Engine{
config: cfg,
proxy: NewProxy(&cfg.Proxy),
logger: logger,
watcher: watcher,
debugMode: debugMode,
Expand Down Expand Up @@ -310,6 +312,11 @@

// Endless loop and never return
func (e *Engine) start() {
if e.config.Proxy.Enabled {
go e.proxy.Run()
e.mainLog("Proxy server listening on http://localhost%s", e.proxy.server.Addr)

Check warning on line 317 in runner/engine.go

View check run for this annotation

Codecov / codecov/patch

runner/engine.go#L316-L317

Added lines #L316 - L317 were not covered by tests
}

e.running = true
firstRunCh := make(chan bool, 1)
firstRunCh <- true
Expand Down Expand Up @@ -535,6 +542,9 @@
cmd, stdout, stderr, _ := e.startCmd(command)
processExit := make(chan struct{})
e.mainDebug("running process pid %v", cmd.Process.Pid)
if e.config.Proxy.Enabled {
e.proxy.Reload()

Check warning on line 546 in runner/engine.go

View check run for this annotation

Codecov / codecov/patch

runner/engine.go#L546

Added line #L546 was not covered by tests
}

wg.Add(1)
atomic.AddUint64(&e.round, 1)
Expand Down Expand Up @@ -579,6 +589,11 @@
e.mainLog("cleaning...")
defer e.mainLog("see you again~")

if e.config.Proxy.Enabled {
e.mainDebug("powering down the proxy...")
e.proxy.Stop()

Check warning on line 594 in runner/engine.go

View check run for this annotation

Codecov / codecov/patch

runner/engine.go#L593-L594

Added lines #L593 - L594 were not covered by tests
}

e.withLock(func() {
close(e.binStopCh)
e.binStopCh = make(chan bool)
Expand Down
3 changes: 3 additions & 0 deletions runner/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,9 @@ func Test(t *testing.T) {
t.Log("testing")
}
`)
if err != nil {
t.Fatal(err)
}
// run sed
// check the file is exist
if _, err := os.Stat(dftTOML); err != nil {
Expand Down
169 changes: 169 additions & 0 deletions runner/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package runner

import (
"bytes"
"errors"
"fmt"
"io"
"log"
"net/http"
"strconv"
"strings"
"syscall"
"time"
)

type Reloader interface {
AddSubscriber() *Subscriber
RemoveSubscriber(id int)
Reload()
Stop()
}

type Proxy struct {
server *http.Server
client *http.Client
config *cfgProxy
stream Reloader
}

func NewProxy(cfg *cfgProxy) *Proxy {
p := &Proxy{
config: cfg,
server: &http.Server{
Addr: fmt.Sprintf(":%d", cfg.ProxyPort),
},
client: &http.Client{},
stream: NewProxyStream(),
}
return p
}

func (p *Proxy) Run() {
http.HandleFunc("/", p.proxyHandler)
http.HandleFunc("/internal/reload", p.reloadHandler)
if err := p.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("failed to start proxy server: %v", err)

Check warning on line 46 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L42-L46

Added lines #L42 - L46 were not covered by tests
}
}

func (p *Proxy) Stop() {
p.server.Close()
p.stream.Stop()

Check warning on line 52 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L50-L52

Added lines #L50 - L52 were not covered by tests
}

func (p *Proxy) Reload() {
p.stream.Reload()

Check warning on line 56 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L55-L56

Added lines #L55 - L56 were not covered by tests
}

func (p *Proxy) injectLiveReload(respBody io.ReadCloser) string {
buf := new(bytes.Buffer)
if _, err := buf.ReadFrom(respBody); err != nil {
log.Fatalf("failed to convert request body to bytes buffer, err: %+v\n", err)

Check warning on line 62 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L59-L62

Added lines #L59 - L62 were not covered by tests
}
original := buf.String()

Check warning on line 64 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L64

Added line #L64 was not covered by tests

// the script will be injected before the end of the body tag. In case the tag is missing, the injection will be skipped without an error to ensure that a page with partial reloads only has at most one injected script.
body := strings.LastIndex(original, "</body>")
if body == -1 {
return original

Check warning on line 69 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L67-L69

Added lines #L67 - L69 were not covered by tests
}

script := fmt.Sprintf(
`<script>new EventSource("http://localhost:%d/internal/reload").onmessage = () => { location.reload() }</script>`,
p.config.ProxyPort,
)
return original[:body] + script + original[body:]

Check warning on line 76 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L72-L76

Added lines #L72 - L76 were not covered by tests
}

func (p *Proxy) proxyHandler(w http.ResponseWriter, r *http.Request) {
appURL := r.URL
appURL.Scheme = "http"
appURL.Host = fmt.Sprintf("localhost:%d", p.config.AppPort)

if err := r.ParseForm(); err != nil {
log.Fatalf("failed to read form data from request, err: %+v\n", err)

Check warning on line 85 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L85

Added line #L85 was not covered by tests
}
var body io.Reader
if len(r.Form) > 0 {
body = strings.NewReader(r.Form.Encode())
} else {
body = r.Body
}
req, err := http.NewRequest(r.Method, appURL.String(), body)
if err != nil {
log.Fatalf("proxy could not create request, err: %+v\n", err)

Check warning on line 95 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L95

Added line #L95 was not covered by tests
}

// Copy the headers from the original request
for name, values := range r.Header {
for _, value := range values {
req.Header.Add(name, value)
}
}
req.Header.Set("X-Forwarded-For", r.RemoteAddr)

// retry on connection refused error since after a file change air will restart the server and it may take a few milliseconds for the server to be up-and-running.
var resp *http.Response
for i := 0; i < 10; i++ {
resp, err = p.client.Do(req)
if err == nil {
break
}
if !errors.Is(err, syscall.ECONNREFUSED) {
log.Fatalf("proxy failed to call %s, err: %+v\n", appURL.String(), err)

Check warning on line 114 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L113-L114

Added lines #L113 - L114 were not covered by tests
}
time.Sleep(100 * time.Millisecond)

Check warning on line 116 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L116

Added line #L116 was not covered by tests
}
defer resp.Body.Close()

// Copy the headers from the proxy response except Content-Length
for k, vv := range resp.Header {
for _, v := range vv {
if k == "Content-Length" {
continue
}
w.Header().Add(k, v)
}
}
w.WriteHeader(resp.StatusCode)

if strings.Contains(resp.Header.Get("Content-Type"), "text/html") {
newPage := p.injectLiveReload(resp.Body)
w.Header().Set("Content-Length", strconv.Itoa((len([]byte(newPage)))))
if _, err := io.WriteString(w, newPage); err != nil {
log.Fatalf("proxy failed injected live reloading script, err: %+v\n", err)

Check warning on line 135 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L132-L135

Added lines #L132 - L135 were not covered by tests
}
} else {
w.Header().Set("Content-Length", resp.Header.Get("Content-Length"))
if _, err := io.Copy(w, resp.Body); err != nil {
log.Fatalf("proxy failed to forward the response body, err: %+v\n", err)

Check warning on line 140 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L140

Added line #L140 was not covered by tests
}
}
}

func (p *Proxy) reloadHandler(w http.ResponseWriter, r *http.Request) {
flusher, err := w.(http.Flusher)
if !err {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return

Check warning on line 149 in runner/proxy.go

View check run for this annotation

Codecov / codecov/patch

runner/proxy.go#L148-L149

Added lines #L148 - L149 were not covered by tests
}

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

sub := p.stream.AddSubscriber()
go func() {
<-r.Context().Done()
p.stream.RemoveSubscriber(sub.id)
}()

w.WriteHeader(http.StatusOK)
flusher.Flush()

for range sub.reloadCh {
fmt.Fprintf(w, "data: reload\n\n")
flusher.Flush()
}
}
50 changes: 50 additions & 0 deletions runner/proxy_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package runner

import (
"sync"
)

type ProxyStream struct {
sync.Mutex
subscribers map[int]*Subscriber
count int
}

type Subscriber struct {
id int
reloadCh chan struct{}
}

func NewProxyStream() *ProxyStream {
return &ProxyStream{subscribers: make(map[int]*Subscriber)}
}

func (stream *ProxyStream) Stop() {
for id := range stream.subscribers {
stream.RemoveSubscriber(id)
}
stream.count = 0
}

func (stream *ProxyStream) AddSubscriber() *Subscriber {
stream.Lock()
defer stream.Unlock()
stream.count++

sub := &Subscriber{id: stream.count, reloadCh: make(chan struct{})}
stream.subscribers[stream.count] = sub
return sub
}

func (stream *ProxyStream) RemoveSubscriber(id int) {
stream.Lock()
defer stream.Unlock()
close(stream.subscribers[id].reloadCh)
delete(stream.subscribers, id)
}

func (stream *ProxyStream) Reload() {
for _, sub := range stream.subscribers {
sub.reloadCh <- struct{}{}
}
}
66 changes: 66 additions & 0 deletions runner/proxy_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package runner

import (
"sync"
"testing"
)

func find(s map[int]*Subscriber, id int) bool {
for _, sub := range s {
if sub.id == id {
return true
}
}
return false
}

func TestProxyStream(t *testing.T) {
stream := NewProxyStream()

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
_ = stream.AddSubscriber()
}(i)
}
wg.Wait()

if got, exp := len(stream.subscribers), 10; got != exp {
t.Errorf("expected %d but got %d", exp, got)
}

go func() {
stream.Reload()
}()

reloadCount := 0
for _, sub := range stream.subscribers {
wg.Add(1)
go func(sub *Subscriber) {
defer wg.Done()
<-sub.reloadCh
reloadCount++
}(sub)
}
wg.Wait()

if got, exp := reloadCount, 10; got != exp {
t.Errorf("expected %d but got %d", exp, got)
}

stream.RemoveSubscriber(2)
stream.AddSubscriber()
if got, exp := find(stream.subscribers, 2), false; got != exp {
t.Errorf("expected subscriber found to be %t but got %t", exp, got)
}
if got, exp := find(stream.subscribers, 11), true; got != exp {
t.Errorf("expected subscriber found to be %t but got %t", exp, got)
}

stream.Stop()
if got, exp := len(stream.subscribers), 0; got != exp {
t.Errorf("expected %d but got %d", exp, got)
}
}
Loading
Loading