Skip to content
This repository has been archived by the owner on Jan 24, 2022. It is now read-only.

Commit

Permalink
Improve agent.
Browse files Browse the repository at this point in the history
Fix scoping bugs and generally clean up the port forwarding
implementation. Credits to @xjdrew for TCP connection handling
techniques in github.com/xjdrew/gopf (keepalives, partial connection
close).
  • Loading branch information
cmars committed Nov 22, 2017
1 parent d549388 commit 104e49c
Showing 1 changed file with 64 additions and 62 deletions.
126 changes: 64 additions & 62 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ type Agent struct {
controlPass string
conn *control.Conn
cmd *exec.Cmd
importers []importer
forwarders []*forwarder
}

type importer struct {
config.Import
RemoteAddr string
SocksAddr string
l net.Listener
type forwarder struct {
remoteAddr string
remotePort int
localAddr string
localPort int
dialer proxy.Dialer
l *net.TCPListener
}

func New(cfg *config.Config) (*Agent, error) {
Expand All @@ -77,7 +79,7 @@ Log notice stdout
}
}

var importers []importer
var forwarders []*forwarder
args := []string{
"-f", torrcPath,
"--Log", "notice stderr",
Expand All @@ -90,12 +92,18 @@ Log notice stdout
cmd.Dir = dataDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
dialer, err := proxy.SOCKS5("tcp", cfg.Node.Agent.SocksAddr, nil, proxy.Direct)
if err != nil {
return nil, errors.WithStack(err)
}
for _, remote := range cfg.Node.Remotes {
for _, import_ := range remote.Imports {
importers = append(importers, importer{
RemoteAddr: remote.Address,
Import: import_,
SocksAddr: cfg.Node.Agent.SocksAddr,
forwarders = append(forwarders, &forwarder{
dialer: dialer,
remoteAddr: remote.Address,
remotePort: import_.RemotePort,
localAddr: import_.LocalAddr,
localPort: import_.LocalPort,
})
}
}
Expand All @@ -105,7 +113,7 @@ Log notice stdout
controlAddr: cfg.Node.Agent.ControlAddr,
controlPass: controlPass,
cmd: cmd,
importers: importers,
forwarders: forwarders,
}, nil
}

Expand Down Expand Up @@ -158,7 +166,7 @@ func (a *Agent) Start() error {
return errors.Wrap(err, "control auth failed")
}
a.conn = conn
err = a.startImports()
err = a.startForwarding()
if err != nil {
return errors.Wrap(err, "local imports failed to start")
}
Expand All @@ -167,69 +175,63 @@ func (a *Agent) Start() error {
return errors.Wrap(err, "control connect failed")
}

func (a *Agent) startImports() error {
for _, i := range a.importers {
err := i.start()
func (a *Agent) startForwarding() error {
for i := range a.forwarders {
err := a.forwarders[i].start()
if err != nil {
return errors.WithStack(err)
}
}
return nil
}

func (i *importer) start() error {
var err error
i.l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", i.LocalAddr, i.LocalPort))
func (f *forwarder) start() error {
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", f.localAddr, f.localPort))
if err != nil {
return errors.WithStack(err)
}
log.Printf("started listener %s:%d", i.LocalAddr, i.LocalPort)
dialer, err := proxy.SOCKS5("tcp", i.SocksAddr, nil, proxy.Direct)
if err != nil {
return errors.WithStack(err)
}
log.Printf("socks dialer ready")
handle := func(c net.Conn) {
log.Printf("handle connection from %s", c.RemoteAddr())
defer c.Close()
remoteConn, err := dialer.Dial("tcp", fmt.Sprintf("%s:%d", i.RemoteAddr, i.RemotePort))
f.l = l.(*net.TCPListener)
go f.accept()
log.Printf("started listener %v", f.l.Addr())
return nil
}

func (f *forwarder) accept() {
for {
c, err := f.l.Accept()
if err != nil {
log.Println(err)
log.Printf("listener exiting on error: %v", err)
return
}
defer remoteConn.Close()
fwd1 := make(chan struct{})
go func() {
_, err := io.Copy(c, remoteConn)
if err != nil {
log.Println(err)
}
close(fwd1)
}()
fwd2 := make(chan struct{})
go func() {
_, err := io.Copy(remoteConn, c)
if err != nil {
log.Println(err)
}
close(fwd2)
}()
select {
case <-fwd1:
case <-fwd2:
}
go f.handleConn(c.(*net.TCPConn))
}
go func() {
for {
c, err := i.l.Accept()
if err != nil {
log.Printf("listener exiting on error: %v", err)
return
}
go handle(c)
}
}()
return nil
}

func (f *forwarder) handleConn(source *net.TCPConn) {
log.Printf("connection from %s", source.RemoteAddr())
source.SetKeepAlive(true)
source.SetKeepAlivePeriod(time.Second * 60)
log.Printf("dialing %s:%d", f.remoteAddr, f.remotePort)
dest, err := f.dialer.Dial("tcp", fmt.Sprintf("%s:%d", f.remoteAddr, f.remotePort))
if err != nil {
log.Println(err)
return
}
destTCP := dest.(*net.TCPConn)
destTCP.SetKeepAlive(true)
destTCP.SetKeepAlivePeriod(time.Second * 60)
go f.forward(source, destTCP)
f.forward(destTCP, source)
}

func (f *forwarder) forward(dest, source *net.TCPConn) {
defer dest.CloseWrite()
defer source.CloseRead()
n, err := io.Copy(dest, source)
if err != nil {
log.Println(err)
}
log.Printf("copied %d bytes %v -> %v", n, source.RemoteAddr(), dest.RemoteAddr())
}

func (a *Agent) Stop() error {
Expand Down

0 comments on commit 104e49c

Please sign in to comment.