From d50464571ec2553a3053465e67ad427d47ed74c4 Mon Sep 17 00:00:00 2001 From: wei_lilitw Date: Thu, 18 Apr 2024 17:31:35 +0800 Subject: [PATCH 1/2] feat(hmq):add windows pipe socket.Use the open source project npipe, which nicely encapsulates the operations of windows pipe and returns the connection type net.Conn. --- broker/broker.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++-- broker/config.go | 31 +++++++++++++------------ go.mod | 1 + go.sum | 2 ++ 4 files changed, 77 insertions(+), 17 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index be2cd0c..5266705 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -5,6 +5,7 @@ import ( encJson "encoding/json" "errors" "fmt" + "github.com/natefinch/npipe" "net" "net/http" "os" @@ -165,6 +166,10 @@ func (b *Broker) Start() { if b.config.Port == "" && b.config.UnixFilePath != "" { go b.StartUnixSocketClientListening(b.config.UnixFilePath, true) } + //listen client over windows pipe + if b.config.Port == "" && b.config.UnixFilePath == "" && b.config.WindowsPipeName != "" { + go b.StartPipeSocketListening(b.config.WindowsPipeName, true) + } //listen for cluster if b.config.Cluster.Port != "" { @@ -274,11 +279,11 @@ func (b *Broker) StartClientListening(Tls bool) { } } -func (b *Broker) StartUnixSocketClientListening(socketPath string, UnixSocket bool) { +func (b *Broker) StartUnixSocketClientListening(socketPath string, unixSocket bool) { var err error var l net.Listener for { - if UnixSocket { + if unixSocket { if FileExist(socketPath) { if err != nil { log.Error("Remove Unix socketPath ", zap.Error(err)) @@ -328,6 +333,57 @@ func (b *Broker) StartUnixSocketClientListening(socketPath string, UnixSocket bo } } +// StartPipeSocketListening We use the open source npipe library to support pipe communication in windows +func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) { + var err error + var ln *npipe.PipeListener + + for { + if usePipe { + fmt.Println(pipeName) + ln, err = npipe.Listen(pipeName) + log.Info("Start Listening client on ", zap.String("pipeName", pipeName)) + } + if err == nil { + break // successfully listening + } + log.Error("Error listening on ", zap.Error(err)) + time.Sleep(1 * time.Second) + } + + tmpDelay := 10 * ACCEPT_MIN_SLEEP + + for { + conn, err := ln.Accept() + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Temporary() { + log.Error( + "Temporary Client Accept Error(%v), sleeping %dms", + zap.Error(ne), + zap.Duration("sleeping", tmpDelay/time.Millisecond), + ) + + time.Sleep(tmpDelay) + tmpDelay *= 2 + if tmpDelay > ACCEPT_MAX_SLEEP { + tmpDelay = ACCEPT_MAX_SLEEP + } + } else { + log.Error("Accept error", zap.Error(err)) + } + continue + } + + tmpDelay = ACCEPT_MIN_SLEEP + go func() { + err := b.handleConnection(CLIENT, conn) + if err != nil { + conn.Close() + } + }() + } +} + func (b *Broker) StartClusterListening() { var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port log.Info("Start Listening cluster on ", zap.String("hp", hp)) diff --git a/broker/config.go b/broker/config.go index 9bec4df..3e3782c 100644 --- a/broker/config.go +++ b/broker/config.go @@ -19,21 +19,22 @@ import ( var json = jsoniter.ConfigCompatibleWithStandardLibrary type Config struct { - Worker int `json:"workerNum"` - HTTPPort string `json:"httpPort"` - Host string `json:"host"` - Port string `json:"port"` - Cluster RouteInfo `json:"cluster"` - Router string `json:"router"` - TlsHost string `json:"tlsHost"` - TlsPort string `json:"tlsPort"` - WsPath string `json:"wsPath"` - WsPort string `json:"wsPort"` - WsTLS bool `json:"wsTLS"` - TlsInfo TLSInfo `json:"tlsInfo"` - Debug bool `json:"debug"` - Plugin Plugins `json:"plugins"` - UnixFilePath string `json:"unixFilePath"` + Worker int `json:"workerNum"` + HTTPPort string `json:"httpPort"` + Host string `json:"host"` + Port string `json:"port"` + Cluster RouteInfo `json:"cluster"` + Router string `json:"router"` + TlsHost string `json:"tlsHost"` + TlsPort string `json:"tlsPort"` + WsPath string `json:"wsPath"` + WsPort string `json:"wsPort"` + WsTLS bool `json:"wsTLS"` + TlsInfo TLSInfo `json:"tlsInfo"` + Debug bool `json:"debug"` + Plugin Plugins `json:"plugins"` + UnixFilePath string `json:"unixFilePath"` + WindowsPipeName string `json:"windowsPipeName"` } type Plugins struct { diff --git a/go.mod b/go.mod index 560501f..d101c9e 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/mattn/go-isatty v0.0.19 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 2ed89a7..7e4693b 100644 --- a/go.sum +++ b/go.sum @@ -95,6 +95,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce h1:TqjP/BTDrwN7zP9xyXVuLsMBXYMt6LLYi55PlrIcq8U= +github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:ifHPsLndGGzvgzcaXUvzmt6LxKT4pJ+uzEhtnMt+f7A= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= From 9063c6069c187b0b6e90a4b72b7bf5bcc5d174ec Mon Sep 17 00:00:00 2001 From: wei_lilitw Date: Fri, 19 Apr 2024 15:24:14 +0800 Subject: [PATCH 2/2] feat(broker):Added pipe socket conditions for different systems --- broker/broker.go | 52 ----------------------------- broker/pipe_socket_darwin.go | 11 +++++++ broker/pipe_socket_linux.go | 6 ++++ broker/pipe_socket_windows.go | 61 +++++++++++++++++++++++++++++++++++ 4 files changed, 78 insertions(+), 52 deletions(-) create mode 100644 broker/pipe_socket_darwin.go create mode 100644 broker/pipe_socket_linux.go create mode 100644 broker/pipe_socket_windows.go diff --git a/broker/broker.go b/broker/broker.go index 5266705..5d6f7c8 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -5,7 +5,6 @@ import ( encJson "encoding/json" "errors" "fmt" - "github.com/natefinch/npipe" "net" "net/http" "os" @@ -333,57 +332,6 @@ func (b *Broker) StartUnixSocketClientListening(socketPath string, unixSocket bo } } -// StartPipeSocketListening We use the open source npipe library to support pipe communication in windows -func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) { - var err error - var ln *npipe.PipeListener - - for { - if usePipe { - fmt.Println(pipeName) - ln, err = npipe.Listen(pipeName) - log.Info("Start Listening client on ", zap.String("pipeName", pipeName)) - } - if err == nil { - break // successfully listening - } - log.Error("Error listening on ", zap.Error(err)) - time.Sleep(1 * time.Second) - } - - tmpDelay := 10 * ACCEPT_MIN_SLEEP - - for { - conn, err := ln.Accept() - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Temporary() { - log.Error( - "Temporary Client Accept Error(%v), sleeping %dms", - zap.Error(ne), - zap.Duration("sleeping", tmpDelay/time.Millisecond), - ) - - time.Sleep(tmpDelay) - tmpDelay *= 2 - if tmpDelay > ACCEPT_MAX_SLEEP { - tmpDelay = ACCEPT_MAX_SLEEP - } - } else { - log.Error("Accept error", zap.Error(err)) - } - continue - } - - tmpDelay = ACCEPT_MIN_SLEEP - go func() { - err := b.handleConnection(CLIENT, conn) - if err != nil { - conn.Close() - } - }() - } -} - func (b *Broker) StartClusterListening() { var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port log.Info("Start Listening cluster on ", zap.String("hp", hp)) diff --git a/broker/pipe_socket_darwin.go b/broker/pipe_socket_darwin.go new file mode 100644 index 0000000..500a971 --- /dev/null +++ b/broker/pipe_socket_darwin.go @@ -0,0 +1,11 @@ +package broker + +import ( + "fmt" +) + +// StartPipeSocketListening We use the open source npipe library +// to jump over pipe communication in mac +func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) { + fmt.Println("macos system") +} diff --git a/broker/pipe_socket_linux.go b/broker/pipe_socket_linux.go new file mode 100644 index 0000000..36401f0 --- /dev/null +++ b/broker/pipe_socket_linux.go @@ -0,0 +1,6 @@ +package broker + +// StartPipeSocketListening We use the open source npipe library to +// jump over pipe communication in linux +func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) { +} diff --git a/broker/pipe_socket_windows.go b/broker/pipe_socket_windows.go new file mode 100644 index 0000000..b776114 --- /dev/null +++ b/broker/pipe_socket_windows.go @@ -0,0 +1,61 @@ +package broker + +import ( + "fmt" + "github.com/natefinch/npipe" + "go.uber.org/zap" + "net" + "time" +) + +// StartPipeSocketListening We use the open source npipe library to support pipe communication in windows +func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) { + var err error + var ln *npipe.PipeListener + + for { + if usePipe { + fmt.Println(pipeName) + ln, err = npipe.Listen(pipeName) + log.Info("Start Listening client on ", zap.String("pipeName", pipeName)) + } + if err == nil { + break // successfully listening + } + log.Error("Error listening on ", zap.Error(err)) + time.Sleep(1 * time.Second) + } + + tmpDelay := 10 * ACCEPT_MIN_SLEEP + + for { + conn, err := ln.Accept() + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Temporary() { + log.Error( + "Temporary Client Accept Error(%v), sleeping %dms", + zap.Error(ne), + zap.Duration("sleeping", tmpDelay/time.Millisecond), + ) + + time.Sleep(tmpDelay) + tmpDelay *= 2 + if tmpDelay > ACCEPT_MAX_SLEEP { + tmpDelay = ACCEPT_MAX_SLEEP + } + } else { + log.Error("Accept error", zap.Error(err)) + } + continue + } + + tmpDelay = ACCEPT_MIN_SLEEP + go func() { + err := b.handleConnection(CLIENT, conn) + fmt.Println("handleConnection,", err) + if err != nil { + conn.Close() + } + }() + } +}