-
Notifications
You must be signed in to change notification settings - Fork 712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipes #650
Merged
Pipes #650
Changes from 9 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
ac9c011
Pipe plumbing
tomwilkie b77cd3f
Add pipe controls for Docker attach & exec.
tomwilkie 3b2af2a
Terminal UI for pipes
foot fe6e897
UI Review feedback
foot 3a344f1
Backend review feedback
tomwilkie ff37488
Fix infinite loop when closing AppClient.
tomwilkie 50be8c6
Try to reconnect immediately giving a nicer ctrl-d diconnect exp.
foot 6259307
Don't use a global variable to store the pipe client.
tomwilkie 0b1a2ef
Wait for pipes to close in Close().
tomwilkie e0ff3e3
Cleanup our timeouts.
foot 921a7b7
More Terminal-UI and code post review fixes.
foot 8a63615
Add terminal setTimeouts as instance properties instead of globals
foot d93a3e2
Dont deselect node on ESC if there is a controlPipe
davkal 5050bac
Correctly wait for all background goroutines to finish before returni…
tomwilkie File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,9 @@ coverage.html | |
# Emacs backup files | ||
*~ | ||
|
||
# ctags files | ||
tags | ||
|
||
# Project specific | ||
.*.uptodate | ||
scope.tar | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
package app | ||
|
||
import ( | ||
"io" | ||
"log" | ||
"net/http" | ||
"sync" | ||
"time" | ||
|
||
"github.com/gorilla/mux" | ||
|
||
"github.com/weaveworks/scope/common/mtime" | ||
"github.com/weaveworks/scope/xfer" | ||
) | ||
|
||
const ( | ||
gcInterval = 30 * time.Second // we check all the pipes every 30s | ||
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute | ||
gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten | ||
) | ||
|
||
// PipeRouter connects incoming and outgoing pipes. | ||
type PipeRouter struct { | ||
sync.Mutex | ||
wait sync.WaitGroup | ||
quit chan struct{} | ||
pipes map[string]*pipe | ||
} | ||
|
||
// for each end of the pipe, we keep a reference count & lastUsedTIme, | ||
// such that we can timeout pipes when either end is inactive. | ||
type end struct { | ||
refCount int | ||
lastUsedTime time.Time | ||
} | ||
|
||
type pipe struct { | ||
ui, probe end | ||
tombstoneTime time.Time | ||
|
||
xfer.Pipe | ||
} | ||
|
||
// RegisterPipeRoutes registers the pipe routes | ||
func RegisterPipeRoutes(router *mux.Router) *PipeRouter { | ||
This comment was marked as abuse.
Sorry, something went wrong.
This comment was marked as abuse.
Sorry, something went wrong. |
||
pipeRouter := &PipeRouter{ | ||
quit: make(chan struct{}), | ||
pipes: map[string]*pipe{}, | ||
} | ||
pipeRouter.wait.Add(1) | ||
go pipeRouter.gcLoop() | ||
router.Methods("GET"). | ||
Path("/api/pipe/{pipeID}"). | ||
HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) { | ||
uiEnd, _ := p.Ends() | ||
return &p.ui, uiEnd | ||
})) | ||
router.Methods("GET"). | ||
Path("/api/pipe/{pipeID}/probe"). | ||
HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) { | ||
_, probeEnd := p.Ends() | ||
return &p.probe, probeEnd | ||
})) | ||
router.Methods("DELETE", "POST"). | ||
Path("/api/pipe/{pipeID}"). | ||
HandlerFunc(pipeRouter.delete) | ||
return pipeRouter | ||
} | ||
|
||
// Stop stops the pipeRouter | ||
func (pr *PipeRouter) Stop() { | ||
close(pr.quit) | ||
pr.wait.Wait() | ||
} | ||
|
||
func (pr *PipeRouter) gcLoop() { | ||
defer pr.wait.Done() | ||
ticker := time.Tick(gcInterval) | ||
for { | ||
select { | ||
case <-pr.quit: | ||
return | ||
case <-ticker: | ||
} | ||
|
||
pr.timeout() | ||
pr.garbageCollect() | ||
} | ||
} | ||
|
||
func (pr *PipeRouter) timeout() { | ||
pr.Lock() | ||
defer pr.Unlock() | ||
now := mtime.Now() | ||
for id, pipe := range pr.pipes { | ||
if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) { | ||
continue | ||
} | ||
|
||
if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) || | ||
(pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) { | ||
log.Printf("Timing out pipe %s", id) | ||
pipe.Close() | ||
pipe.tombstoneTime = now | ||
} | ||
} | ||
} | ||
|
||
func (pr *PipeRouter) garbageCollect() { | ||
pr.Lock() | ||
defer pr.Unlock() | ||
now := mtime.Now() | ||
for pipeID, pipe := range pr.pipes { | ||
if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout { | ||
delete(pr.pipes, pipeID) | ||
} | ||
} | ||
} | ||
|
||
func (pr *PipeRouter) getOrCreate(id string) (*pipe, bool) { | ||
pr.Lock() | ||
defer pr.Unlock() | ||
p, ok := pr.pipes[id] | ||
if !ok { | ||
log.Printf("Creating pipe id %s", id) | ||
p = &pipe{ | ||
ui: end{lastUsedTime: mtime.Now()}, | ||
probe: end{lastUsedTime: mtime.Now()}, | ||
Pipe: xfer.NewPipe(), | ||
} | ||
pr.pipes[id] = p | ||
} | ||
if p.Closed() { | ||
return nil, false | ||
} | ||
return p, true | ||
} | ||
|
||
func (pr *PipeRouter) retain(id string, pipe *pipe, end *end) bool { | ||
pr.Lock() | ||
defer pr.Unlock() | ||
if pipe.Closed() { | ||
return false | ||
} | ||
end.refCount++ | ||
return true | ||
} | ||
|
||
func (pr *PipeRouter) release(id string, pipe *pipe, end *end) { | ||
pr.Lock() | ||
defer pr.Unlock() | ||
|
||
end.refCount-- | ||
if end.refCount != 0 { | ||
return | ||
} | ||
|
||
if !pipe.Closed() { | ||
end.lastUsedTime = mtime.Now() | ||
} | ||
} | ||
|
||
func (pr *PipeRouter) handleWs(endSelector func(*pipe) (*end, io.ReadWriter)) func(http.ResponseWriter, *http.Request) { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
pipeID := mux.Vars(r)["pipeID"] | ||
pipe, ok := pr.getOrCreate(pipeID) | ||
if !ok { | ||
http.NotFound(w, r) | ||
return | ||
} | ||
|
||
endRef, endIO := endSelector(pipe) | ||
if !pr.retain(pipeID, pipe, endRef) { | ||
http.NotFound(w, r) | ||
return | ||
} | ||
defer pr.release(pipeID, pipe, endRef) | ||
|
||
conn, err := upgrader.Upgrade(w, r, nil) | ||
if err != nil { | ||
log.Printf("Error upgrading to websocket: %v", err) | ||
return | ||
} | ||
defer conn.Close() | ||
|
||
pipe.CopyToWebsocket(endIO, conn) | ||
} | ||
} | ||
|
||
func (pr *PipeRouter) delete(w http.ResponseWriter, r *http.Request) { | ||
pipeID := mux.Vars(r)["pipeID"] | ||
pipe, ok := pr.getOrCreate(pipeID) | ||
if ok && pr.retain(pipeID, pipe, &pipe.ui) { | ||
log.Printf("Closing pipe %s", pipeID) | ||
pipe.Close() | ||
pipe.tombstoneTime = mtime.Now() | ||
pr.release(pipeID, pipe, &pipe.ui) | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This comment was marked as abuse.
Sorry, something went wrong.
This comment was marked as abuse.
Sorry, something went wrong.