Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
fix #113 , 使用smux时会出现无法加载的情况
Browse files Browse the repository at this point in the history
这是由两个问题造成的

问题1:

同时并发请求多个请求时,会出现同时建立两个mux的情况,导致先建立的mux被覆盖;

问题2:

一旦某个stream的连接失败后,代码 会关闭整个session。这是由于 iics无法分辨simplesocks和普通协议造成的。

加一个 isInner 标签即可分辨。
  • Loading branch information
e1732a364fed committed May 22, 2022
1 parent 7e69f17 commit 8fcf747
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 81 deletions.
5 changes: 4 additions & 1 deletion examples/trojan.client.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ insecure = true
utls = true
#lazy = true


# 备注: trojan 也是一样可以应用 ws/grpc/quic 的,具体你只要参考对应示例文件即可,然后把 vlesss 改成 trojans 即可.

# 备注:verysimple 的trojan实现 完全兼容 trojan-go, 包括mux, 因为一样用的是 smux+ simplesocks
# use_mux = true

# verysimple 的trojan实现 完全兼容 trojan-go, 包括mux, 因为一样用的是 smux+ simplesocks
2 changes: 2 additions & 0 deletions iics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type incomingInserverConnState struct {
inServer proxy.Server //可为 nil
defaultClient proxy.Client

isInner bool

inTag string //在inServer为nil时,可用此项确定 inTag
useSniffing bool //在inServer为nil时,可用此项确定 是否使用sniffing

Expand Down
176 changes: 98 additions & 78 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func ListenSer(inServer proxy.Server, defaultOutClient proxy.Client, env *proxy.
// handleNewIncomeConnection 会处理 网络层至高级层的数据,
// 然后将代理层的处理发往 handshakeInserver_and_passToOutClient 函数。
//
// 在 listenSer 中被调用。
// 在 ListenSer 中被调用。
func handleNewIncomeConnection(inServer proxy.Server, defaultClientForThis proxy.Client, thisLocalConnectionInstance net.Conn, env *proxy.RoutingEnv) {

iics := incomingInserverConnState{
Expand Down Expand Up @@ -380,111 +380,117 @@ func handshakeInserver(iics *incomingInserverConnState) (wlc net.Conn, udp_wlc n

wlc, udp_wlc, targetAddr, err = inServer.Handshake(iics.wrappedConn)

if err == nil {
if udp_wlc != nil && inServer.Name() == "socks5" {
// socks5的 udp associate返回的是 clientFutureAddr, 而不是实际客户的第一个请求.
//所以我们要读一次才能进行下一步。
if err != nil {
return
}
if udp_wlc != nil && inServer.Name() == "socks5" {
// socks5的 udp associate返回的是 clientFutureAddr, 而不是实际客户的第一个请求.
//所以我们要读一次才能进行下一步。

firstSocks5RequestData, firstSocks5RequestAddr, err2 := udp_wlc.ReadMsgFrom()
if err2 != nil {
if ce := iics.CanLogWarn("failed in socks5 read"); ce != nil {
ce.Write(
zap.String("handler", inServer.AddrStr()),
zap.Error(err2),
)
}
err = err2
return
firstSocks5RequestData, firstSocks5RequestAddr, err2 := udp_wlc.ReadMsgFrom()
if err2 != nil {
if ce := iics.CanLogWarn("failed in socks5 read"); ce != nil {
ce.Write(
zap.String("handler", inServer.AddrStr()),
zap.Error(err2),
)
}
err = err2
return
}

iics.fallbackFirstBuffer = bytes.NewBuffer(firstSocks5RequestData)

targetAddr = firstSocks5RequestAddr
}

////////////////////////////// 内层mux阶段 /////////////////////////////////////

if muxInt, innerProxyName := inServer.HasInnerMux(); muxInt > 0 {
mh, ok := wlc.(proxy.MuxMarker)
if !ok {
return
}

innerSerConf := proxy.ListenConf{
CommonConf: proxy.CommonConf{
Protocol: innerProxyName,
},
}

innerSer, err2 := proxy.NewServer(&innerSerConf)
if err2 != nil {
if ce := iics.CanLogDebug("mux inner proxy server creation failed"); ce != nil {
ce.Write(zap.Error(err))
}
err = err2
return
}

iics.fallbackFirstBuffer = bytes.NewBuffer(firstSocks5RequestData)
session := inServer.GetServerInnerMuxSession(mh)

targetAddr = firstSocks5RequestAddr
if session == nil {
err = utils.ErrFailed
return
}

////////////////////////////// 内层mux阶段 /////////////////////////////////////
//内层mux要对每一个子连接单独进行 子代理协议握手 以及 outClient的拨号。

if muxInt, innerProxyName := inServer.HasInnerMux(); muxInt > 0 {
if mh, ok := wlc.(proxy.MuxMarker); ok {
go func() {

innerSerConf := proxy.ListenConf{
CommonConf: proxy.CommonConf{
Protocol: innerProxyName,
},
for {
if ce := iics.CanLogDebug("inServer try accept smux stream "); ce != nil {
ce.Write()
}

innerSer, err2 := proxy.NewServer(&innerSerConf)
if err2 != nil {
if ce := iics.CanLogDebug("mux inner proxy server creation failed"); ce != nil {
stream, err := session.AcceptStream()
if err != nil {
if ce := iics.CanLogDebug("mux inServer try accept stream failed"); ce != nil {
ce.Write(zap.Error(err))
}
err = err2
return
}

session := inServer.GetServerInnerMuxSession(mh)

if session == nil {
err = utils.ErrFailed
session.Close()
return
}

//内层mux要对每一个子连接单独进行 子代理协议握手 以及 outClient的拨号。
if ce := iics.CanLogDebug("inServer got inner mux stream"); ce != nil {
ce.Write(zap.String("innerProxyName", innerProxyName))
}

go func() {

for {
if ce := iics.CanLogDebug("inServer try accept smux stream "); ce != nil {
ce.Write()
}
wlc1, udp_wlc1, targetAddr1, err1 := innerSer.Handshake(stream)

stream, err := session.AcceptStream()
if err != nil {
if ce := iics.CanLogDebug("mux inServer try accept stream failed"); ce != nil {
ce.Write(zap.Error(err))
}
if err1 != nil {
if ce := iics.CanLogDebug("inServer mux inner proxy handshake failed"); ce != nil {
ce.Write(zap.Error(err1))
}
newiics := *iics

session.Close()
if !newiics.extractFirstBufFromErr(err1) {
return
}
if ce := iics.CanLogDebug("inServer got inner mux stream"); ce != nil {
ce.Write(zap.String("innerProxyName", innerProxyName))
}

go func() {

wlc1, udp_wlc1, targetAddr1, err1 := innerSer.Handshake(stream)
passToOutClient(newiics, true, wlc1, udp_wlc1, targetAddr1)

if err1 != nil {
if ce := iics.CanLogDebug("inServer mux inner proxy handshake failed"); ce != nil {
ce.Write(zap.Error(err1))
}
newiics := *iics

if !newiics.extractFirstBufFromErr(err1) {
return
}
passToOutClient(newiics, true, wlc1, udp_wlc1, targetAddr1)

} else {

if ce := iics.CanLogDebug("inServer mux stream handshake ok"); ce != nil {
ce.Write(zap.String("targetAddr1", targetAddr1.String()))
}
} else {

passToOutClient(*iics, false, wlc1, udp_wlc1, targetAddr1)
if ce := iics.CanLogDebug("inServer mux stream handshake ok"); ce != nil {
ce.Write(zap.String("targetAddr", targetAddr1.String()))
}

}
newiics := *iics
newiics.isInner = true

}()
passToOutClient(newiics, false, wlc1, udp_wlc1, targetAddr1)

}

}()

err = utils.ErrHandled
return
}
}
}()

err = utils.ErrHandled
return

}

Expand All @@ -494,7 +500,7 @@ func handshakeInserver(iics *incomingInserverConnState) (wlc net.Conn, udp_wlc n
// 本函数 处理inServer的代理层数据,并在试图处理 分流和回落后,将流量导向目标,并开始Copy。
// iics 不使用指针, 因为iics不能公用,因为 在多路复用时 iics.wrappedConn 是会变化的。
//
//被 handleNewIncomeConnection 调用。
//被 handleNewIncomeConnection 和 ListenSer 调用。
func handshakeInserver_and_passToOutClient(iics incomingInserverConnState) {

wlc, udp_wlc, targetAddr, err := handshakeInserver(&iics)
Expand Down Expand Up @@ -953,7 +959,12 @@ func dialClient(iics incomingInserverConnState, targetAddr netLayer.Addr,
hasInnerMux = true

//先过滤掉 innermux 通道已经建立的情况, 此时我们不必再次外部拨号,而是直接进行内层拨号.

client.Lock()

if client.InnerMuxEstablished() {
client.Unlock()

wrc1, realudp_wrc, result1 := dialInnerProxy(client, wlc, nil, iics, innerProxyName, targetAddr, isudp)

if result1 == 0 {
Expand All @@ -966,11 +977,20 @@ func dialClient(iics incomingInserverConnState, targetAddr netLayer.Addr,
result = result1
return
} else {
if ce := iics.CanLogDebug("mux failed, will redial"); ce != nil {
if ce := iics.CanLogDebug("client inner mux dial innerProxy failed, will redial"); ce != nil {
ce.Write()
}
}

} else {
//在实测时 发现,可能出现并发问题,比如在加载图多的网页时,很容易碰到
//此时如果是两个连接同时 发出,而且 尚未 建立 innerMux,
// 则如果不加锁的话 ,两个连接 会同时 获取到 client.InnerMuxEstablished() 为 false
// 这会导致同时试图拨号 innerMux,而这是错误的
//我们只允许有一个 innerMux 连接存在,如果有多个的话,那么最新拨号的innerMux 会覆盖以前的拨号,
// 导致 以前的 innerMux 成为了 悬垂连接,而且会导致 相关联的请求卡住。

defer client.Unlock()
}
}
}
Expand Down Expand Up @@ -1372,7 +1392,7 @@ func dialClient_andRelay(iics incomingInserverConnState, targetAddr netLayer.Add

//在内层mux时, 不能因为单个传输完毕就关闭整个连接
if innerMuxResult, _ := client.HasInnerMux(); innerMuxResult == 0 {
if iics.shouldCloseInSerBaseConnWhenFinish {
if iics.shouldCloseInSerBaseConnWhenFinish && !iics.isInner {
if iics.baseLocalConn != nil {
defer iics.baseLocalConn.Close()
}
Expand Down
7 changes: 5 additions & 2 deletions proxy/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"io"
"strings"
"sync"

"github.com/e1732a364fed/v2ray_simple/advLayer"
"github.com/e1732a364fed/v2ray_simple/httpLayer"
Expand Down Expand Up @@ -110,6 +111,7 @@ type Base struct {

Innermux *smux.Session //用于存储 client的已拨号的mux连接

sync.Mutex
}

func (b *Base) GetBase() *Base {
Expand Down Expand Up @@ -171,6 +173,7 @@ func (b *Base) Sniffing() bool {
}

func (b *Base) InnerMuxEstablished() bool {

return b.Innermux != nil && !b.Innermux.IsClosed()
}

Expand All @@ -194,14 +197,14 @@ func (*Base) GetServerInnerMuxSession(wlc io.ReadWriteCloser) *smux.Session {
}

func (b *Base) CloseInnerMuxSession() {
if b.Innermux != nil && !b.Innermux.IsClosed() {
if b.InnerMuxEstablished() {
b.Innermux.Close()
b.Innermux = nil
}
}

func (b *Base) GetClientInnerMuxSession(wrc io.ReadWriteCloser) *smux.Session {
if b.Innermux != nil && !b.Innermux.IsClosed() {
if b.InnerMuxEstablished() {
return b.Innermux
} else {
smuxConfig := smux.DefaultConfig()
Expand Down
3 changes: 3 additions & 0 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"net"
"strings"
"sync"
"time"

"github.com/e1732a364fed/v2ray_simple/netLayer"
Expand Down Expand Up @@ -58,6 +59,8 @@ type Client interface {
GetClientInnerMuxSession(wrc io.ReadWriteCloser) *smux.Session
InnerMuxEstablished() bool
CloseInnerMuxSession()

sync.Locker //用于锁定 innerMux
}

type UserClient interface {
Expand Down

0 comments on commit 8fcf747

Please sign in to comment.