Skip to content

Commit

Permalink
update checks for wrapped errors (#3117)
Browse files Browse the repository at this point in the history
* errors.Is()
* extract function isBrokenConnection()
  • Loading branch information
mmetc authored Nov 4, 2024
1 parent 1616991 commit 5752111
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 21 deletions.
4 changes: 0 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,6 @@ issues:
- errorlint
text: "type switch on error will fail on wrapped errors. Use errors.As to check for specific errors"

- linters:
- errorlint
text: "comparing with .* will fail on wrapped errors. Use errors.Is to check for a specific error"

- linters:
- nosprintfhostport
text: "host:port in url should be constructed with net.JoinHostPort and not directly with fmt.Sprintf"
Expand Down
15 changes: 14 additions & 1 deletion pkg/acquisition/modules/appsec/appsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (ac *AuthCache) Get(apiKey string) (time.Time, bool) {
ac.mu.RLock()
expiration, exists := ac.APIKeys[apiKey]
ac.mu.RUnlock()

return expiration, exists
}

Expand Down Expand Up @@ -128,6 +129,7 @@ func (w *AppsecSource) UnmarshalConfig(yamlConfig []byte) error {
if w.config.ListenSocket != "" && w.config.ListenAddr == "" {
w.config.Name = w.config.ListenSocket
}

if w.config.ListenSocket == "" {
w.config.Name = fmt.Sprintf("%s%s", w.config.ListenAddr, w.config.Path)
}
Expand All @@ -153,6 +155,7 @@ func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLe
if err != nil {
return fmt.Errorf("unable to parse appsec configuration: %w", err)
}

w.logger = logger
w.metricsLevel = MetricsLevel
w.logger.Tracef("Appsec configuration: %+v", w.config)
Expand Down Expand Up @@ -211,17 +214,20 @@ func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLe
AppsecRuntime: &wrt,
Labels: w.config.Labels,
}

err := runner.Init(appsecCfg.GetDataDir())
if err != nil {
return fmt.Errorf("unable to initialize runner: %w", err)
}

w.AppsecRunners[nbRoutine] = runner
}

w.logger.Infof("Created %d appsec runners", len(w.AppsecRunners))

// We don´t use the wrapper provided by coraza because we want to fully control what happens when a rule match to send the information in crowdsec
w.mux.HandleFunc(w.config.Path, w.appsecHandler)

return nil
}

Expand All @@ -243,17 +249,20 @@ func (w *AppsecSource) OneShotAcquisition(_ context.Context, _ chan types.Event,

func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
w.outChan = out

t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/appsec/live")

w.logger.Infof("%d appsec runner to start", len(w.AppsecRunners))

for _, runner := range w.AppsecRunners {
runner.outChan = out
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/appsec/live/runner")
return runner.Run(t)
})
}

t.Go(func() error {
if w.config.ListenSocket != "" {
w.logger.Infof("creating unix socket %s", w.config.ListenSocket)
Expand All @@ -268,10 +277,11 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.
} else {
err = w.server.Serve(listener)
}
if err != nil && err != http.ErrServerClosed {
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("appsec server failed: %w", err)
}
}

return nil
})
t.Go(func() error {
Expand All @@ -288,6 +298,7 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.
return fmt.Errorf("appsec server failed: %w", err)
}
}

return nil
})
<-t.Dying()
Expand All @@ -297,6 +308,7 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.
w.server.Shutdown(ctx)
return nil
})

return nil
}

Expand Down Expand Up @@ -391,6 +403,7 @@ func (w *AppsecSource) appsecHandler(rw http.ResponseWriter, r *http.Request) {
logger.Debugf("Response: %+v", appsecResponse)

rw.WriteHeader(statusCode)

body, err := json.Marshal(appsecResponse)
if err != nil {
logger.Errorf("unable to serialize response: %s", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/acquisition/modules/wineventlog/wineventlog_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (w *WinEventLogSource) getXMLEvents(config *winlog.SubscribeConfig, publish
2000, // Timeout in milliseconds to wait.
0, // Reserved. Must be zero.
&returned) // The number of handles in the array that are set by the API.
if err == windows.ERROR_NO_MORE_ITEMS {
if errors.Is(err, windows.ERROR_NO_MORE_ITEMS) {
return nil, err
} else if err != nil {
return nil, fmt.Errorf("wevtapi.EvtNext failed: %v", err)
Expand Down Expand Up @@ -188,7 +188,7 @@ func (w *WinEventLogSource) getEvents(out chan types.Event, t *tomb.Tomb) error
}
if status == syscall.WAIT_OBJECT_0 {
renderedEvents, err := w.getXMLEvents(w.evtConfig, publisherCache, subscription, 500)
if err == windows.ERROR_NO_MORE_ITEMS {
if errors.Is(err, windows.ERROR_NO_MORE_ITEMS) {
windows.ResetEvent(w.evtConfig.SignalEvent)
} else if err != nil {
w.logger.Errorf("getXMLEvents failed: %v", err)
Expand Down Expand Up @@ -411,7 +411,7 @@ OUTER_LOOP:
return nil
default:
evts, err := w.getXMLEvents(w.evtConfig, publisherCache, handle, 500)
if err == windows.ERROR_NO_MORE_ITEMS {
if errors.Is(err, windows.ERROR_NO_MORE_ITEMS) {
log.Info("No more items")
break OUTER_LOOP
} else if err != nil {
Expand Down
28 changes: 15 additions & 13 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,11 @@ type APIServer struct {
consoleConfig *csconfig.ConsoleConfig
}

func recoverFromPanic(c *gin.Context) {
err := recover()
if err == nil {
return
}

// Check for a broken connection, as it is not really a
// condition that warrants a panic stack trace.
brokenPipe := false

func isBrokenConnection(err any) bool {
if ne, ok := err.(*net.OpError); ok {
if se, ok := ne.Err.(*os.SyscallError); ok {
if strings.Contains(strings.ToLower(se.Error()), "broken pipe") || strings.Contains(strings.ToLower(se.Error()), "connection reset by peer") {
brokenPipe = true
return true
}
}
}
Expand All @@ -79,11 +70,22 @@ func recoverFromPanic(c *gin.Context) {
errors.Is(strErr, errClosedBody) ||
errors.Is(strErr, errHandlerComplete) ||
errors.Is(strErr, errStreamClosed) {
brokenPipe = true
return true
}
}

if brokenPipe {
return false
}

func recoverFromPanic(c *gin.Context) {
err := recover()
if err == nil {
return
}

// Check for a broken connection, as it is not really a
// condition that warrants a panic stack trace.
if isBrokenConnection(err) {
log.Warningf("client %s disconnected: %s", c.ClientIP(), err)
c.Abort()
} else {
Expand Down

0 comments on commit 5752111

Please sign in to comment.