Skip to content

Commit

Permalink
feat(broker):Added pipe socket conditions for different systems
Browse files Browse the repository at this point in the history
  • Loading branch information
wei_lilitw committed Apr 19, 2024
1 parent d504645 commit 9063c60
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 52 deletions.
52 changes: 0 additions & 52 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
encJson "encoding/json"
"errors"
"fmt"
"github.com/natefinch/npipe"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -333,57 +332,6 @@ func (b *Broker) StartUnixSocketClientListening(socketPath string, unixSocket bo
}
}

// StartPipeSocketListening We use the open source npipe library to support pipe communication in windows
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
var err error
var ln *npipe.PipeListener

for {
if usePipe {
fmt.Println(pipeName)
ln, err = npipe.Listen(pipeName)
log.Info("Start Listening client on ", zap.String("pipeName", pipeName))
}
if err == nil {
break // successfully listening
}
log.Error("Error listening on ", zap.Error(err))
time.Sleep(1 * time.Second)
}

tmpDelay := 10 * ACCEPT_MIN_SLEEP

for {
conn, err := ln.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
log.Error(
"Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne),
zap.Duration("sleeping", tmpDelay/time.Millisecond),
)

time.Sleep(tmpDelay)
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else {
log.Error("Accept error", zap.Error(err))
}
continue
}

tmpDelay = ACCEPT_MIN_SLEEP
go func() {
err := b.handleConnection(CLIENT, conn)
if err != nil {
conn.Close()
}
}()
}
}

func (b *Broker) StartClusterListening() {
var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port
log.Info("Start Listening cluster on ", zap.String("hp", hp))
Expand Down
11 changes: 11 additions & 0 deletions broker/pipe_socket_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package broker

import (
"fmt"
)

// StartPipeSocketListening We use the open source npipe library
// to jump over pipe communication in mac
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
fmt.Println("macos system")
}
6 changes: 6 additions & 0 deletions broker/pipe_socket_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package broker

// StartPipeSocketListening We use the open source npipe library to
// jump over pipe communication in linux
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
}
61 changes: 61 additions & 0 deletions broker/pipe_socket_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package broker

import (
"fmt"
"github.com/natefinch/npipe"
"go.uber.org/zap"
"net"
"time"
)

// StartPipeSocketListening We use the open source npipe library to support pipe communication in windows
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
var err error
var ln *npipe.PipeListener

for {
if usePipe {
fmt.Println(pipeName)
ln, err = npipe.Listen(pipeName)
log.Info("Start Listening client on ", zap.String("pipeName", pipeName))
}
if err == nil {
break // successfully listening
}
log.Error("Error listening on ", zap.Error(err))
time.Sleep(1 * time.Second)
}

tmpDelay := 10 * ACCEPT_MIN_SLEEP

for {
conn, err := ln.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
log.Error(
"Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne),
zap.Duration("sleeping", tmpDelay/time.Millisecond),
)

time.Sleep(tmpDelay)
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else {
log.Error("Accept error", zap.Error(err))
}
continue
}

tmpDelay = ACCEPT_MIN_SLEEP
go func() {
err := b.handleConnection(CLIENT, conn)
fmt.Println("handleConnection,", err)
if err != nil {
conn.Close()
}
}()
}
}

0 comments on commit 9063c60

Please sign in to comment.