Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2024.3-2025.1]: feature: headers in case of worker error #136

Merged
merged 8 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,12 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- asciicheck # Simple linter to check that your code does not contain non-ASCII identifiers
- bodyclose # Checks whether HTTP response body is closed successfully
- dogsled # Checks assignments with too many blank identifiers (e.g. x, _, _, _, := f())
- dupl # Tool for code clone detection
- errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases
- errorlint # find code that will cause problems with the error wrapping scheme introduced in Go 1.13
- exhaustive # check exhaustiveness of enum switch statements
- copyloopvar # checks for pointers to enclosing loop variables
- gochecknoglobals # Checks that no globals are present in Go code
- gocognit # Computes and checks the cognitive complexity of functions
- gocritic # The most opinionated Go source code linter
- gocyclo # Computes and checks the cyclomatic complexity of functions
- gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification
- goimports # Goimports does everything that gofmt does. Additionally it checks unused imports
- revive
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.23.1

require (
github.com/emicklei/proto v1.13.2
github.com/prometheus/client_golang v1.20.3
github.com/prometheus/client_golang v1.20.4
github.com/roadrunner-server/api/v4 v4.16.0
github.com/roadrunner-server/endure/v2 v2.6.1
github.com/roadrunner-server/errors v1.4.1
Expand All @@ -21,7 +21,7 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/net v0.29.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1
google.golang.org/grpc v1.66.2
google.golang.org/grpc v1.67.0
google.golang.org/protobuf v1.34.2
)

Expand All @@ -34,6 +34,7 @@ require (
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand Down
11 changes: 5 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -30,8 +29,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.20.3 h1:oPksm4K8B+Vt35tUhw6GbSNSgVlVSBH0qELP/7u83l4=
github.com/prometheus/client_golang v1.20.3/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_golang v1.20.4 h1:Tgh3Yr67PaOv/uTqloMsCEdeuFTatm5zIq5+qNN23vI=
github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.59.1 h1:LXb1quJHWm1P6wq/U824uxYi4Sg0oGvNeUm1z5dJoX0=
Expand Down Expand Up @@ -94,8 +93,8 @@ golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo=
google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw=
google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1489,6 +1489,7 @@ golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8=
golang.org/x/oauth2 v0.10.0 h1:zHCpF2Khkwy4mMB4bv0U37YtJdTGW8jI0glAApi0Kh8=
golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI=
Expand Down Expand Up @@ -1596,6 +1597,7 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920183334-c177e329c48b/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240823204242-4ba0660f739c/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.56.1/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQfGwJD30Nv2jfUgzb5UcE=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
Expand Down
94 changes: 76 additions & 18 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/roadrunner-server/pool/pool/static_pool"
"github.com/roadrunner-server/pool/worker"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"
"golang.org/x/net/context"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
Expand All @@ -33,6 +34,8 @@ const (
peerAuthType string = ":peer.auth-type"
delimiter string = "|:|"
apiErr string = "error"
headers string = "headers"
trailers string = "trailers"
)

type Pool interface {
Expand Down Expand Up @@ -65,6 +68,7 @@ type rpcContext struct {
// Proxy manages GRPC/RoadRunner bridge.
type Proxy struct {
mu *sync.RWMutex
log *zap.Logger
prop propagation.TextMapPropagator
grpcPool Pool
name string
Expand All @@ -75,8 +79,9 @@ type Proxy struct {
}

// NewProxy creates a new service proxy object.
func NewProxy(name string, metadata string, grpcPool Pool, mu *sync.RWMutex, prop propagation.TextMapPropagator) *Proxy {
func NewProxy(name string, metadata string, log *zap.Logger, grpcPool Pool, mu *sync.RWMutex, prop propagation.TextMapPropagator) *Proxy {
return &Proxy{
log: log,
mu: mu,
prop: prop,
grpcPool: grpcPool,
Expand Down Expand Up @@ -147,7 +152,12 @@ func (p *Proxy) methodHandler(method string) func(srv any, ctx context.Context,
}

handler := func(ctx context.Context, req any) (any, error) {
return p.invoke(ctx, method, req.(*codec.RawMessage))
switch r := req.(type) {
case *codec.RawMessage:
return p.invoke(ctx, method, r)
default:
return nil, errors.Errorf("unexpected request type %T", r)
}
}

return interceptor(ctx, in, info, handler)
Expand All @@ -158,6 +168,9 @@ func (p *Proxy) invoke(ctx context.Context, method string, in *codec.RawMessage)
pld := p.getPld()
defer p.putPld(pld)

// experimental grpc API
st := grpc.ServerTransportStreamFromContext(ctx)

err := p.makePayload(ctx, method, in, pld)
if err != nil {
return nil, err
Expand Down Expand Up @@ -188,12 +201,7 @@ func (p *Proxy) invoke(ctx context.Context, method string, in *codec.RawMessage)
return nil, errors.Str("worker empty response")
}

md, err := p.responseMetadata(r)
if err != nil {
return nil, err
}
ctx = metadata.NewIncomingContext(ctx, md)
err = grpc.SetHeader(ctx, md)
err = p.responseMetadata(st, r)
if err != nil {
return nil, err
}
Expand All @@ -202,46 +210,96 @@ func (p *Proxy) invoke(ctx context.Context, method string, in *codec.RawMessage)
}

// responseMetadata extracts metadata from roadrunner response Payload.Context and converts it to metadata.MD
func (p *Proxy) responseMetadata(resp *payload.Payload) (metadata.MD, error) {
var md metadata.MD
func (p *Proxy) responseMetadata(st grpc.ServerTransportStream, resp *payload.Payload) error {
if resp == nil || len(resp.Context) == 0 {
return md, nil
return nil
}

var rpcMetadata map[string]string
err := json.Unmarshal(resp.Context, &rpcMetadata)
if err != nil {
return md, err
return err
}

if len(rpcMetadata) > 0 {
md = metadata.New(rpcMetadata)
// old meta should not be used in response in new API
md := metadata.New(rpcMetadata)

// we assume that if there are no new headers and trailers, an old PHP-GRPC client is used
if len(md.Get(headers)) == 0 && len(md.Get(trailers)) == 0 {
// backward compatibility
_ = st.SetHeader(md)
goto parseErr
}

// New API headers
if len(md.Get(headers)) > 0 {
mdh := make(map[string]any)
err = json.Unmarshal([]byte(md.Get(headers)[0]), &mdh)
if err != nil {
// we don't need to return this error, log it
p.log.Error("error unmarshalling headers", zap.Error(err))
}

for k, v := range mdh {
switch tt := v.(type) {
case string:
_ = st.SetHeader(metadata.Pairs(k, tt))
case int:
_ = st.SetHeader(metadata.Pairs(k, strconv.Itoa(tt)))
default:
p.log.Warn("skipping header with unsupported type", zap.String("key", k), zap.Any("value", v))
}
}
}

// New API trailers
if len(md.Get(trailers)) > 0 {
mdh := make(map[string]any)
err = json.Unmarshal([]byte(md.Get(trailers)[0]), &mdh)
if err != nil {
// we don't need to return this error, log it
p.log.Error("error unmarshalling trailers", zap.Error(err))
}

for k, v := range mdh {
switch tt := v.(type) {
case string:
_ = st.SetTrailer(metadata.Pairs(k, tt))
case int:
_ = st.SetTrailer(metadata.Pairs(k, strconv.Itoa(tt)))
default:
p.log.Warn("skipping header with unsupported type", zap.String("key", k), zap.Any("value", v))
}
}
}

/*
we have an error
actually, if code is OK, status.ErrorProto will be nil
actually if the code is OK, status.ErrorProto will be nil
but, we use this only in case of PHP exception happened

*/
parseErr:
if len(md.Get(apiErr)) > 0 {
st := &spb.Status{}

// get an error
data, err := base64.StdEncoding.DecodeString(md.Get(apiErr)[0])
if err != nil {
return nil, err
return err
}

err = proto.Unmarshal(data, st)
if err != nil {
return nil, err
return err
}

return md, status.ErrorProto(st)
return status.ErrorProto(st)
}
}

return md, nil
return nil
}

// makePayload generates RoadRunner compatible payload based on GRPC message.
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (p *Plugin) createGRPCserver(interceptors map[string]common.Interceptor) (*
}

for _, service := range services {
px := proxy.NewProxy(fmt.Sprintf("%s.%s", service.Package, service.Name), p.config.Proto[i], p.gPool, p.mu, p.prop)
px := proxy.NewProxy(fmt.Sprintf("%s.%s", service.Package, service.Name), p.config.Proto[i], p.log.Named(service.Name), p.gPool, p.mu, p.prop)
for _, m := range service.Methods {
px.RegisterMethod(m.Name)
}
Expand Down
22 changes: 11 additions & 11 deletions tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ go 1.23
toolchain go1.23.1

require (
github.com/roadrunner-server/config/v5 v5.0.3
github.com/roadrunner-server/config/v5 v5.0.4
github.com/roadrunner-server/endure/v2 v2.6.1
github.com/roadrunner-server/goridge/v3 v3.8.3
github.com/roadrunner-server/grpc/v5 v5.0.0
github.com/roadrunner-server/logger/v5 v5.0.3
github.com/roadrunner-server/metrics/v5 v5.0.3
github.com/roadrunner-server/otel/v5 v5.0.2
github.com/roadrunner-server/resetter/v5 v5.0.4
github.com/roadrunner-server/rpc/v5 v5.0.3
github.com/roadrunner-server/server/v5 v5.1.1
github.com/roadrunner-server/status/v5 v5.0.3
github.com/roadrunner-server/logger/v5 v5.0.4
github.com/roadrunner-server/metrics/v5 v5.0.4
github.com/roadrunner-server/otel/v5 v5.0.3
github.com/roadrunner-server/resetter/v5 v5.0.5
github.com/roadrunner-server/rpc/v5 v5.0.4
github.com/roadrunner-server/server/v5 v5.1.2
github.com/roadrunner-server/status/v5 v5.0.4
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.66.2
google.golang.org/grpc v1.67.0
google.golang.org/protobuf v1.34.2
)

Expand Down Expand Up @@ -48,7 +48,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand All @@ -59,7 +59,7 @@ require (
github.com/pborman/uuid v1.2.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.20.3 // indirect
github.com/prometheus/client_golang v1.20.4 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.59.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand Down
Loading
Loading