Skip to content

Commit

Permalink
IWF-415: Improve activity logging
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Dec 13, 2024
1 parent 1bed2dc commit 6f433dc
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 32 deletions.
43 changes: 15 additions & 28 deletions service/api/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package api

import (
"encoding/json"
"github.com/indeedeng/iwf/config"
"net/http"

Expand Down Expand Up @@ -48,7 +47,7 @@ func (h *handler) apiV1WorkflowStart(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

resp, errResp := h.svc.ApiV1WorkflowStartPost(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -65,7 +64,7 @@ func (h *handler) apiV1WorkflowWaitForStateCompletion(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

resp, errResp := h.svc.ApiV1WorkflowWaitForStateCompletion(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -82,7 +81,7 @@ func (h *handler) apiV1WorkflowSignal(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

errResp := h.svc.ApiV1WorkflowSignalPost(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -99,7 +98,7 @@ func (h *handler) apiV1WorkflowStop(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

errResp := h.svc.ApiV1WorkflowStopPost(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -116,7 +115,7 @@ func (h *handler) apiV1WorkflowInternalDump(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

resp, errResp := h.svc.ApiV1WorkflowDumpPost(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -133,7 +132,7 @@ func (h *handler) apiV1WorkflowConfigUpdate(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

errResp := h.svc.ApiV1WorkflowConfigUpdate(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -150,7 +149,7 @@ func (h *handler) apiV1WorkflowTriggerContinueAsNew(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

errResp := h.svc.ApiV1WorkflowTriggerContinueAsNew(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -167,7 +166,7 @@ func (h *handler) apiV1WorkflowSearch(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

resp, errResp := h.svc.ApiV1WorkflowSearchPost(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -184,7 +183,7 @@ func (h *handler) apiV1WorkflowRpc(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

resp, errResp := h.svc.ApiV1WorkflowRpcPost(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -210,7 +209,7 @@ func (h *handler) apiV1WorkflowGetDataObjects(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

resp, errResp := h.svc.ApiV1WorkflowGetQueryAttributesPost(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -227,7 +226,7 @@ func (h *handler) apiV1WorkflowSetDataObjects(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

errResp := h.svc.ApiV1WorkflowSetQueryAttributesPost(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -244,7 +243,7 @@ func (h *handler) apiV1WorkflowGetSearchAttributes(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

resp, errResp := h.svc.ApiV1WorkflowGetSearchAttributesPost(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -261,7 +260,7 @@ func (h *handler) apiV1WorkflowSetSearchAttributes(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

errResp := h.svc.ApiV1WorkflowSetSearchAttributesPost(c.Request.Context(), req)
if errResp != nil {
Expand All @@ -286,7 +285,7 @@ func (h *handler) doApiV1WorkflowGetPost(c *gin.Context, waitIfStillRunning bool
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

var resp *iwfidl.WorkflowGetResponse
var errResp *errors.ErrorAndStatus
Expand All @@ -310,7 +309,7 @@ func (h *handler) apiV1WorkflowReset(c *gin.Context) {
invalidRequestSchema(c)
return
}
h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req)))
h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req)))

resp, errResp := h.svc.ApiV1WorkflowResetPost(c.Request.Context(), req)
if errResp != nil {
Expand Down Expand Up @@ -345,15 +344,3 @@ func invalidRequestSchema(c *gin.Context) {
func (h *handler) processError(c *gin.Context, resp *errors.ErrorAndStatus) {
c.JSON(resp.StatusCode, resp.Error)
}

func (h *handler) toJsonLogging(req any) string {
str, err := json.Marshal(req)
if len(str) > 1000 {
str = str[0:1000]
}
if err != nil {
h.logger.Error("error when serializing request", tag.Error(err), tag.DefaultValue(req))
return ""
}
return string(str)
}
14 changes: 14 additions & 0 deletions service/common/log/truncate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package log

import "encoding/json"

func ToJsonAndTruncateForLogging(req any) string {
str, err := json.Marshal(req)
if len(str) > 1000 {
str = str[0:1000]
}
if err != nil {
return "Error when serializing request: " + err.Error()
}
return string(str)
}
9 changes: 5 additions & 4 deletions service/interpreter/activityImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/compatibility"
"github.com/indeedeng/iwf/service/common/event"
"github.com/indeedeng/iwf/service/common/log"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/indeedeng/iwf/service/common/rpc"
"github.com/indeedeng/iwf/service/common/urlautofix"
Expand All @@ -31,7 +32,7 @@ func StateApiWaitUntil(
stateApiWaitUntilStartTime := time.Now().UnixMilli()
provider := getActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("StateStartActivity", "input", input)
logger.Info("StateStartActivity", "input", log.ToJsonAndTruncateForLogging(input))
iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl)

svcCfg := env.GetSharedConfig()
Expand Down Expand Up @@ -116,7 +117,7 @@ func StateApiExecute(
stateApiExecuteStartTime := time.Now().UnixMilli()
provider := getActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("StateDecideActivity", "input", input)
logger.Info("StateDecideActivity", "input", log.ToJsonAndTruncateForLogging(input))

iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl)
svcCfg := env.GetSharedConfig()
Expand Down Expand Up @@ -330,7 +331,7 @@ func DumpWorkflowInternal(
) (*iwfidl.WorkflowDumpResponse, error) {
provider := getActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("DumpWorkflowInternal", "input", req)
logger.Info("DumpWorkflowInternal", "input", log.ToJsonAndTruncateForLogging(req))

svcCfg := env.GetSharedConfig()
apiAddress := svcCfg.GetApiServiceAddressWithDefault()
Expand Down Expand Up @@ -359,7 +360,7 @@ func InvokeWorkerRpc(
) (*InvokeRpcActivityOutput, error) {
provider := getActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("invoke worker RPC by activity", "input", req)
logger.Info("invoke worker RPC by activity", "input", log.ToJsonAndTruncateForLogging(req))

apiMaxSeconds := env.GetSharedConfig().Api.MaxWaitSeconds

Expand Down

0 comments on commit 6f433dc

Please sign in to comment.