Skip to content

Commit

Permalink
Merge branch 'main' into refactor/filter-grpc-status-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
kmrmt authored Oct 8, 2024
2 parents b6422dd + 83de9c8 commit 278f407
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 357 deletions.
38 changes: 23 additions & 15 deletions pkg/gateway/lb/handler/grpc/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/vdaas/vald/internal/net/grpc/codes"
"github.com/vdaas/vald/internal/net/grpc/errdetails"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/observability/attribute"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/pkg/gateway/lb/service"
Expand All @@ -56,7 +57,7 @@ func (s *server) aggregationSearch(
f func(ctx context.Context,
fcfg *payload.Search_Config, // Forwarding Config to Agent
vc vald.Client, copts ...grpc.CallOption) (*payload.Search_Response, error),
) (res *payload.Search_Response, err error) {
) (res *payload.Search_Response, attrs []attribute.KeyValue, err error) {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "aggregationSearch"), apiName+"/aggregationSearch")
defer func() {
if span != nil {
Expand Down Expand Up @@ -235,12 +236,13 @@ func (s *server) aggregationSearch(
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search",
ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)),
})
attrs = trace.StatusCodeInternal(err.Error())
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.StatusCodeInternal(err.Error())...)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
return nil, attrs, err
}
res = aggr.Result()
if num != 0 && len(res.GetResults()) > num {
Expand All @@ -261,12 +263,13 @@ func (s *server) aggregationSearch(
ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)),
}, info.Get(),
)
attrs = trace.StatusCodeDeadlineExceeded(err.Error())
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.StatusCodeDeadlineExceeded(err.Error())...)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
return nil, attrs, err
}
if 0 < min && len(res.GetResults()) < min {
err = status.WrapWithDeadlineExceeded(
Expand All @@ -281,12 +284,13 @@ func (s *server) aggregationSearch(
ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)),
}, info.Get(),
)
attrs = trace.StatusCodeDeadlineExceeded(err.Error())
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.StatusCodeDeadlineExceeded(err.Error())...)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
return nil, attrs, err
}
}

Expand All @@ -301,14 +305,15 @@ func (s *server) aggregationSearch(
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search",
ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)),
}, info.Get())
attrs = trace.FromGRPCStatus(st.Code(), msg)
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
log.Warn(err)
if len(res.GetResults()) == 0 {
return nil, err
return nil, attrs, err
}
}
if num != 0 && len(res.GetResults()) == 0 {
Expand All @@ -324,21 +329,23 @@ func (s *server) aggregationSearch(
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search",
ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)),
}, info.Get())
attrs = trace.StatusCodeNotFound(err.Error())
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.StatusCodeNotFound(err.Error())...)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
return nil, attrs, err
}

if 0 < min && len(res.GetResults()) < min {
if err == nil {
err = errors.ErrInsuffcientSearchResult
}
attrs = trace.StatusCodeNotFound(err.Error())
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.StatusCodeNotFound(err.Error())...)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
err = status.WrapWithNotFound(
Expand All @@ -353,15 +360,16 @@ func (s *server) aggregationSearch(
ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)),
}, info.Get(),
)
attrs = trace.StatusCodeNotFound(err.Error())
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.StatusCodeNotFound(err.Error())...)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
return nil, attrs, err
}
res.RequestId = bcfg.GetRequestId()
return res, nil
return res, attrs, nil
}

// vald standard algorithm.
Expand Down
Loading

0 comments on commit 278f407

Please sign in to comment.