Skip to content

Commit

Permalink
Merge pull request #202 from xinkonglili/weilili
Browse files Browse the repository at this point in the history
fix(broker):Added windows pipe socket communication, compatible with other systems #199
  • Loading branch information
chowyu12 authored Apr 19, 2024
2 parents 2ceb61a + 9063c60 commit c460807
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 17 deletions.
8 changes: 6 additions & 2 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (b *Broker) Start() {
if b.config.Port == "" && b.config.UnixFilePath != "" {
go b.StartUnixSocketClientListening(b.config.UnixFilePath, true)
}
//listen client over windows pipe
if b.config.Port == "" && b.config.UnixFilePath == "" && b.config.WindowsPipeName != "" {
go b.StartPipeSocketListening(b.config.WindowsPipeName, true)
}

//listen for cluster
if b.config.Cluster.Port != "" {
Expand Down Expand Up @@ -274,11 +278,11 @@ func (b *Broker) StartClientListening(Tls bool) {
}
}

func (b *Broker) StartUnixSocketClientListening(socketPath string, UnixSocket bool) {
func (b *Broker) StartUnixSocketClientListening(socketPath string, unixSocket bool) {
var err error
var l net.Listener
for {
if UnixSocket {
if unixSocket {
if FileExist(socketPath) {
if err != nil {
log.Error("Remove Unix socketPath ", zap.Error(err))
Expand Down
31 changes: 16 additions & 15 deletions broker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary

type Config struct {
Worker int `json:"workerNum"`
HTTPPort string `json:"httpPort"`
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
WsPort string `json:"wsPort"`
WsTLS bool `json:"wsTLS"`
TlsInfo TLSInfo `json:"tlsInfo"`
Debug bool `json:"debug"`
Plugin Plugins `json:"plugins"`
UnixFilePath string `json:"unixFilePath"`
Worker int `json:"workerNum"`
HTTPPort string `json:"httpPort"`
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
WsPort string `json:"wsPort"`
WsTLS bool `json:"wsTLS"`
TlsInfo TLSInfo `json:"tlsInfo"`
Debug bool `json:"debug"`
Plugin Plugins `json:"plugins"`
UnixFilePath string `json:"unixFilePath"`
WindowsPipeName string `json:"windowsPipeName"`
}

type Plugins struct {
Expand Down
11 changes: 11 additions & 0 deletions broker/pipe_socket_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package broker

import (
"fmt"
)

// StartPipeSocketListening We use the open source npipe library
// to jump over pipe communication in mac
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
fmt.Println("macos system")
}
6 changes: 6 additions & 0 deletions broker/pipe_socket_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package broker

// StartPipeSocketListening We use the open source npipe library to
// jump over pipe communication in linux
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
}
61 changes: 61 additions & 0 deletions broker/pipe_socket_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package broker

import (
"fmt"
"github.com/natefinch/npipe"
"go.uber.org/zap"
"net"
"time"
)

// StartPipeSocketListening We use the open source npipe library to support pipe communication in windows
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
var err error
var ln *npipe.PipeListener

for {
if usePipe {
fmt.Println(pipeName)
ln, err = npipe.Listen(pipeName)
log.Info("Start Listening client on ", zap.String("pipeName", pipeName))
}
if err == nil {
break // successfully listening
}
log.Error("Error listening on ", zap.Error(err))
time.Sleep(1 * time.Second)
}

tmpDelay := 10 * ACCEPT_MIN_SLEEP

for {
conn, err := ln.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
log.Error(
"Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne),
zap.Duration("sleeping", tmpDelay/time.Millisecond),
)

time.Sleep(tmpDelay)
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else {
log.Error("Accept error", zap.Error(err))
}
continue
}

tmpDelay = ACCEPT_MIN_SLEEP
go func() {
err := b.handleConnection(CLIENT, conn)
fmt.Println("handleConnection,", err)
if err != nil {
conn.Close()
}
}()
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce h1:TqjP/BTDrwN7zP9xyXVuLsMBXYMt6LLYi55PlrIcq8U=
github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:ifHPsLndGGzvgzcaXUvzmt6LxKT4pJ+uzEhtnMt+f7A=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
Expand Down

0 comments on commit c460807

Please sign in to comment.