From 91e8710dc55b6981bba72d693eebe97a9ad6a7b8 Mon Sep 17 00:00:00 2001 From: CSWYF3634076 <3634076@qq.com> Date: Sun, 24 Jul 2022 17:11:00 +0800 Subject: [PATCH 01/10] timeout http & grpc & dubbo v1.0 --- pkg/common/constant/filter.go | 4 ++++ pkg/common/grpc/manager.go | 22 +++++++++++++++++++++- pkg/common/http/manager.go | 17 ++++++++++++++++- pkg/common/http/manager_test.go | 1 + pkg/filter/http/dubboproxy/dubbo.go | 16 +++++++++++++++- pkg/filter/http/grpcproxy/grpc.go | 9 +++++++-- pkg/filter/http/httpproxy/routerfilter.go | 7 ++++++- pkg/model/http.go | 3 +++ 8 files changed, 73 insertions(+), 6 deletions(-) diff --git a/pkg/common/constant/filter.go b/pkg/common/constant/filter.go index d818644db..14eda1c10 100644 --- a/pkg/common/constant/filter.go +++ b/pkg/common/constant/filter.go @@ -17,6 +17,8 @@ package constant +import "time" + var ( Default403Body = []byte("403 for bidden") Default404Body = []byte("404 page not found") @@ -40,4 +42,6 @@ const ( LogDataBuffer = 5000 // console Console = "console" + + DefaultReqTimeout = 10 * time.Second ) diff --git a/pkg/common/grpc/manager.go b/pkg/common/grpc/manager.go index bcd3a2ed2..eb0f27ab2 100644 --- a/pkg/common/grpc/manager.go +++ b/pkg/common/grpc/manager.go @@ -25,6 +25,7 @@ import ( "io/ioutil" "net" stdHttp "net/http" + "time" ) import ( @@ -37,6 +38,7 @@ import ( ) import ( + "github.com/apache/dubbo-go-pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter" router2 "github.com/apache/dubbo-go-pixiu/pkg/common/router" "github.com/apache/dubbo-go-pixiu/pkg/logger" @@ -49,12 +51,14 @@ type GrpcConnectionManager struct { filter.EmptyNetworkFilter config *model.GRPCConnectionManagerConfig routerCoordinator *router2.RouterCoordinator + timeout time.Duration } // CreateGrpcConnectionManager create grpc connection manager func CreateGrpcConnectionManager(hcmc *model.GRPCConnectionManagerConfig) *GrpcConnectionManager { hcm := &GrpcConnectionManager{config: hcmc} hcm.routerCoordinator = router2.CreateRouterCoordinator(&hcmc.RouteConfig) + hcm.timeout = resolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) return hcm } @@ -79,7 +83,11 @@ func (gcm *GrpcConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp return } - newReq := r.Clone(context.Background()) + ctx := context.Background() + // timeout + ctx, cancel := context.WithTimeout(ctx, gcm.timeout) + defer cancel() + newReq := r.Clone(ctx) newReq.URL.Scheme = "http" newReq.URL.Host = endpoint.Address.GetAddress() @@ -153,3 +161,15 @@ func copyHeader(dst, src stdHttp.Header) { } } } + +func resolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration { + if currentV == "" { + return defaultV + } else { + if duration, err := time.ParseDuration(currentV); err != nil { + return defaultV + } else { + return duration + } + } +} diff --git a/pkg/common/http/manager.go b/pkg/common/http/manager.go index dc09679a3..cb50e93ae 100644 --- a/pkg/common/http/manager.go +++ b/pkg/common/http/manager.go @@ -23,6 +23,7 @@ import ( "io/ioutil" stdHttp "net/http" "sync" + "time" ) import ( @@ -47,6 +48,7 @@ type HttpConnectionManager struct { routerCoordinator *router2.RouterCoordinator filterManager *filter.FilterManager pool sync.Pool + timeout time.Duration } // CreateHttpConnectionManager create http connection manager @@ -58,6 +60,7 @@ func CreateHttpConnectionManager(hcmc *model.HttpConnectionManagerConfig) *HttpC hcm.routerCoordinator = router2.CreateRouterCoordinator(&hcmc.RouteConfig) hcm.filterManager = filter.NewFilterManager(hcmc.HTTPFilters) hcm.filterManager.Load() + hcm.timeout = resolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) return hcm } @@ -84,7 +87,7 @@ func (hcm *HttpConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp hc.Writer = w hc.Request = r hc.Reset() - + hc.Timeout = hcm.timeout err := hcm.Handle(hc) if err != nil { logger.Errorf("ServeHTTP %v", err) @@ -165,3 +168,15 @@ func (hcm *HttpConnectionManager) findRoute(hc *pch.HttpContext) error { hc.RouteEntry(ra) return nil } + +func resolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration { + if currentV == "" { + return defaultV + } else { + if duration, err := time.ParseDuration(currentV); err != nil { + return defaultV + } else { + return duration + } + } +} diff --git a/pkg/common/http/manager_test.go b/pkg/common/http/manager_test.go index 684dd2dd2..3aa36635b 100644 --- a/pkg/common/http/manager_test.go +++ b/pkg/common/http/manager_test.go @@ -124,6 +124,7 @@ func TestCreateHttpConnectionManager(t *testing.T) { ServerName: "test_http_dubbo", GenerateRequestID: false, IdleTimeoutStr: "100", + TimeoutStr: "10s", } hcm := CreateHttpConnectionManager(&hcmc) diff --git a/pkg/filter/http/dubboproxy/dubbo.go b/pkg/filter/http/dubboproxy/dubbo.go index e358d24c9..e21ead417 100644 --- a/pkg/filter/http/dubboproxy/dubbo.go +++ b/pkg/filter/http/dubboproxy/dubbo.go @@ -25,11 +25,13 @@ import ( "net/http" "reflect" "strings" + "time" ) import ( "dubbo.apache.org/dubbo-go/v3/common" dubboConstant "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/dubbo" "dubbo.apache.org/dubbo-go/v3/protocol/invocation" hessian "github.com/apache/dubbo-go-hessian2" @@ -225,7 +227,19 @@ func (f *Filter) Decode(hc *pixiuHttp.HttpContext) filter.FilterStatus { invoc.SetReply(&resp) invCtx := context.Background() - result := invoker.Invoke(invCtx, invoc) + + resultCh := make(chan protocol.Result) + go func() { + resultCh <- invoker.Invoke(invCtx, invoc) + }() + var result protocol.Result + timer := time.NewTimer(hc.Timeout) + select { + case result = <-resultCh: + case <-timer.C: + timer.Stop() + return filter.Stop + } result.SetAttachments(invoc.Attachments()) if result.Error() != nil { diff --git a/pkg/filter/http/grpcproxy/grpc.go b/pkg/filter/http/grpcproxy/grpc.go index 398da1666..1992af9be 100644 --- a/pkg/filter/http/grpcproxy/grpc.go +++ b/pkg/filter/http/grpcproxy/grpc.go @@ -26,6 +26,7 @@ import ( stdHttp "net/http" "strings" "sync" + "time" ) import ( @@ -111,7 +112,8 @@ type ( Config struct { DescriptorSourceStrategy DescriptorSourceStrategy `yaml:"descriptor_source_strategy" json:"descriptor_source_strategy" default:"auto"` Path string `yaml:"path" json:"path"` - Rules []*Rule `yaml:"rules" json:"rules"` //nolint + Rules []*Rule `yaml:"rules" json:"rules"` //nolint + Timeout time.Duration `yaml:"timeout" json:"timeout"` //nolint } Rule struct { @@ -191,7 +193,10 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte("cluster not exists")) return filter.Stop } - + // timeout for Dial and Invoke + var cancel context.CancelFunc + c.Ctx, cancel = context.WithTimeout(c.Ctx, c.Timeout) + defer cancel() ep := e.Address.GetAddress() p, ok := f.pools[strings.Join([]string{re.Cluster, ep}, ".")] diff --git a/pkg/filter/http/httpproxy/routerfilter.go b/pkg/filter/http/httpproxy/routerfilter.go index 3767440d3..c2ff81c32 100644 --- a/pkg/filter/http/httpproxy/routerfilter.go +++ b/pkg/filter/http/httpproxy/routerfilter.go @@ -119,7 +119,12 @@ func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus { } req.Header = r.Header - resp, err := (&http3.Client{Transport: f.transport}).Do(req) + cli := &http3.Client{ + Transport: f.transport, + Timeout: hc.Timeout, //从HttpContext中获取超时时间 + } + + resp, err := cli.Do(req) if err != nil { panic(err) } diff --git a/pkg/model/http.go b/pkg/model/http.go index 1b016066a..e9fd97ac8 100644 --- a/pkg/model/http.go +++ b/pkg/model/http.go @@ -32,17 +32,20 @@ type HttpConnectionManagerConfig struct { ServerName string `yaml:"server_name" json:"server_name" mapstructure:"server_name"` IdleTimeoutStr string `yaml:"idle_timeout" json:"idle_timeout" mapstructure:"idle_timeout"` GenerateRequestID bool `yaml:"generate_request_id" json:"generate_request_id" mapstructure:"generate_request_id"` + TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` } // GRPCConnectionManagerConfig type GRPCConnectionManagerConfig struct { RouteConfig RouteConfiguration `yaml:"route_config" json:"route_config" mapstructure:"route_config"` + TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` } // DubboProxyConnectionManagerConfig type DubboProxyConnectionManagerConfig struct { RouteConfig RouteConfiguration `yaml:"route_config" json:"route_config" mapstructure:"route_config"` DubboFilters []*DubboFilter `yaml:"dubbo_filters" json:"dubbo_filters" mapstructure:"dubbo_filters"` + TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` } // HTTPFilter http filter From 3de97cfb063e6499f3771a6f8c76eb91043f5a8b Mon Sep 17 00:00:00 2001 From: CSWYF3634076 <3634076@qq.com> Date: Sun, 31 Jul 2022 12:17:44 +0800 Subject: [PATCH 02/10] =?UTF-8?q?timeout=20triple=20=E5=92=8C=20=E5=85=B6?= =?UTF-8?q?=E4=BB=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/filter/event/event.go | 18 +++++++++++++++++- pkg/filter/http/grpcproxy/grpc.go | 4 +--- pkg/filter/http/remote/call.go | 27 +++++++++++++++++++++------ pkg/filter/seata/filter.go | 16 +++++++++++++++- 4 files changed, 54 insertions(+), 11 deletions(-) diff --git a/pkg/filter/event/event.go b/pkg/filter/event/event.go index 15355f5f2..f41022844 100644 --- a/pkg/filter/event/event.go +++ b/pkg/filter/event/event.go @@ -20,6 +20,7 @@ package event import ( "fmt" sdkhttp "net/http" + "time" ) import ( @@ -72,7 +73,22 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { mqClient := mq.NewSingletonMQClient(*f.cfg) req := client.NewReq(ctx.Request.Context(), ctx.Request, *ctx.GetAPI()) - resp, err := mqClient.Call(req) + + var resp interface{} + var err error + respCh := make(chan struct{}) + go func() { + resp, err = mqClient.Call(req) + close(respCh) + }() + select { + case <-respCh: + case <-time.After(ctx.Timeout): + logger.Errorf("[dubbo-go-pixiu] event client call timeout err!") + return filter.Stop + } + + //resp, err := mqClient.Call(req) if err != nil { logger.Errorf("[dubbo-go-pixiu] event client call err:%v!", err) ctx.SendLocalReply(sdkhttp.StatusInternalServerError, []byte(fmt.Sprintf("event client call err:%v", err))) diff --git a/pkg/filter/http/grpcproxy/grpc.go b/pkg/filter/http/grpcproxy/grpc.go index 1992af9be..d3d86336d 100644 --- a/pkg/filter/http/grpcproxy/grpc.go +++ b/pkg/filter/http/grpcproxy/grpc.go @@ -194,9 +194,7 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { return filter.Stop } // timeout for Dial and Invoke - var cancel context.CancelFunc - c.Ctx, cancel = context.WithTimeout(c.Ctx, c.Timeout) - defer cancel() + c.Ctx, _ = context.WithTimeout(c.Ctx, c.Timeout) ep := e.Address.GetAddress() p, ok := f.pools[strings.Join([]string{re.Cluster, ep}, ".")] diff --git a/pkg/filter/http/remote/call.go b/pkg/filter/http/remote/call.go index d40fcc021..55e5aae53 100644 --- a/pkg/filter/http/remote/call.go +++ b/pkg/filter/http/remote/call.go @@ -24,6 +24,7 @@ import ( "os" "strconv" "strings" + "time" ) import ( @@ -43,9 +44,9 @@ import ( ) const ( - open = iota - close - all + OPEN = iota + CLOSE + ALL ) const ( @@ -99,7 +100,7 @@ func (factory *FilterFactory) Apply() error { } level := mockLevel(mock) if level < 0 || level > 2 { - level = close + level = CLOSE } factory.conf.Level = level // must init it at apply function @@ -125,7 +126,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus { api := c.GetAPI() - if (f.conf.Level == open && api.Mock) || (f.conf.Level == all) { + if (f.conf.Level == OPEN && api.Mock) || (f.conf.Level == ALL) { c.SourceResp = &contexthttp.ErrResponse{ Message: "mock success", } @@ -140,7 +141,21 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus { } req := client.NewReq(c.Request.Context(), c.Request, *api) - resp, err := cli.Call(req) + + var resp interface{} + respCh := make(chan struct{}) + go func() { + resp, err = cli.Call(req) + close(respCh) + }() + + select { + case <-respCh: + case <-time.After(c.Timeout): + logger.Errorf("[dubbo-go-pixiu] client call timeout err!") + return filter.Stop + } + //resp, err := cli.Call(req) if err != nil { logger.Errorf("[dubbo-go-pixiu] client call err:%v!", err) c.SendLocalReply(http.StatusInternalServerError, []byte(fmt.Sprintf("client call err: %s", err))) diff --git a/pkg/filter/seata/filter.go b/pkg/filter/seata/filter.go index 2103e99ab..f4ba98a04 100644 --- a/pkg/filter/seata/filter.go +++ b/pkg/filter/seata/filter.go @@ -20,6 +20,7 @@ package seata import ( netHttp "net/http" "strings" + "time" ) import ( @@ -33,6 +34,7 @@ import ( import ( "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter" "github.com/apache/dubbo-go-pixiu/pkg/context/http" + "github.com/apache/dubbo-go-pixiu/pkg/logger" ) const ( @@ -144,7 +146,19 @@ func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { tccResource, exists := f.tccResources[strings.ToLower(path)] if exists { - f.branchRegisterResult = f.handleHttp1BranchRegister(ctx, tccResource) + respCh := make(chan struct{}) + go func() { + f.branchRegisterResult = f.handleHttp1BranchRegister(ctx, tccResource) + close(respCh) + }() + + select { + case <-respCh: + case <-time.After(ctx.Timeout): + logger.Errorf("[dubbo-go-pixiu] seata handle http1 timeout err!") + return filter.Stop + } + //f.branchRegisterResult = f.handleHttp1BranchRegister(ctx, tccResource) } return filter.Continue } From 7a0f3f45821e373dc190620d492be2826a600518 Mon Sep 17 00:00:00 2001 From: CSWYF3634076 <3634076@qq.com> Date: Sun, 14 Aug 2022 19:13:53 +0800 Subject: [PATCH 03/10] feat:timeout optimization --- pkg/client/client.go | 2 +- pkg/client/dubbo/dubbo.go | 2 ++ pkg/client/http/http.go | 3 +-- pkg/client/mq/config.go | 9 ++++---- pkg/client/mq/kafka_facade.go | 1 + pkg/client/mq/mq.go | 5 ++++- pkg/client/request.go | 2 ++ pkg/client/triple/triple.go | 8 ++++++- pkg/common/constant/filter.go | 20 ++++++++++++++++-- pkg/common/extension/filter/filter_chain.go | 4 ++++ pkg/common/grpc/manager.go | 19 +---------------- pkg/common/http/manager.go | 17 +-------------- pkg/context/http/context.go | 4 +++- pkg/filter/auth/jwt/jwt.go | 2 +- pkg/filter/authority/authority.go | 1 + pkg/filter/cors/cors.go | 2 ++ pkg/filter/csrf/csrf.go | 1 + pkg/filter/event/event.go | 21 ++++--------------- pkg/filter/header/header.go | 1 + pkg/filter/http/apiconfig/api_config.go | 1 + pkg/filter/http/dubboproxy/dubbo.go | 19 ++++------------- pkg/filter/http/grpcproxy/grpc.go | 12 ++++++----- pkg/filter/http/httpproxy/routerfilter.go | 3 ++- pkg/filter/http/loadbalancer/loadbalancer.go | 1 + pkg/filter/http/proxyrewrite/rewrite.go | 1 + pkg/filter/http/remote/call.go | 20 +++--------------- pkg/filter/network/dubboproxy/plugin.go | 1 + .../network/grpcconnectionmanager/plugin.go | 1 + .../network/httpconnectionmanager/plugin.go | 1 + pkg/filter/seata/filter.go | 16 +------------- pkg/filter/seata/transaction.go | 5 +++-- pkg/model/http.go | 4 ++++ samples/dubbogo/simple/body/pixiu/conf.yaml | 1 + .../dubbogo/simple/body/server/app/user.go | 2 ++ samples/dubbogo/simple/triple/pixiu/conf.yaml | 1 + samples/http/simple/pixiu/conf.yaml | 1 + 36 files changed, 95 insertions(+), 119 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index f07b5a49f..a290179fc 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -22,7 +22,7 @@ type Client interface { // Apply to init client Apply() error - // Close close the clinet + // Close close the client Close() error // Call invoke the downstream service. diff --git a/pkg/client/dubbo/dubbo.go b/pkg/client/dubbo/dubbo.go index e2a9774de..f5581f3ec 100644 --- a/pkg/client/dubbo/dubbo.go +++ b/pkg/client/dubbo/dubbo.go @@ -209,6 +209,8 @@ func (dc *Client) Call(req *client.Request) (res interface{}, err error) { span.SetAttributes(attribute.Key(spanTagValues).String(string(finalValues))) defer span.End() ctx := context.WithValue(req.Context, constant.TracingRemoteSpanCtx, trace.SpanFromContext(req.Context).SpanContext()) + ctx, cancel := context.WithTimeout(ctx, req.Timeout) + defer cancel() rst, err := gs.Invoke(ctx, method, types, vals) if err != nil { return nil, err diff --git a/pkg/client/http/http.go b/pkg/client/http/http.go index b16989f40..32eb54262 100644 --- a/pkg/client/http/http.go +++ b/pkg/client/http/http.go @@ -22,7 +22,6 @@ import ( "net/url" "strings" "sync" - "time" ) import ( @@ -107,7 +106,7 @@ func (dc *Client) Call(req *client.Request) (resp interface{}, err error) { newReq, _ := http.NewRequest(req.IngressRequest.Method, targetURL, params.Body) newReq.Header = params.Header - httpClient := &http.Client{Timeout: 5 * time.Second} + httpClient := &http.Client{Timeout: req.Timeout} tr := otel.Tracer(traceNameHTTPClient) _, span := tr.Start(req.Context, "HTTP "+newReq.Method, trace.WithSpanKind(trace.SpanKindClient)) diff --git a/pkg/client/mq/config.go b/pkg/client/mq/config.go index c9e7d3d68..5cf0a32b9 100644 --- a/pkg/client/mq/config.go +++ b/pkg/client/mq/config.go @@ -40,10 +40,11 @@ type ( } KafkaProducerConfig struct { - Brokers []string `yaml:"brokers" json:"brokers"` - ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"` - Metadata Metadata `yaml:"metadata" json:"metadata"` - Producer Producer `yaml:"producer" json:"producer"` + Brokers []string `yaml:"brokers" json:"brokers"` + ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"` + Metadata Metadata `yaml:"metadata" json:"metadata"` + Producer Producer `yaml:"producer" json:"producer"` + Timeout time.Duration `yaml:"timeout" json:"timeout"` } Metadata struct { diff --git a/pkg/client/mq/kafka_facade.go b/pkg/client/mq/kafka_facade.go index 591d82ea7..b4375756e 100644 --- a/pkg/client/mq/kafka_facade.go +++ b/pkg/client/mq/kafka_facade.go @@ -212,6 +212,7 @@ func NewKafkaProviderFacade(config KafkaProducerConfig) (*KafkaProducerFacade, e c.Metadata.Retry.Max = config.Metadata.Retry.Max c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff c.Producer.MaxMessageBytes = config.Producer.MaxMessageBytes + c.Producer.Timeout = config.Timeout if config.ProtocolVersion != "" { version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) if err != nil { diff --git a/pkg/client/mq/mq.go b/pkg/client/mq/mq.go index f08e6e8de..3355640d6 100644 --- a/pkg/client/mq/mq.go +++ b/pkg/client/mq/mq.go @@ -59,6 +59,7 @@ func NewMQClient(config Config) (*Client, error) { ctx := context.Background() switch config.MqType { case constant.MQTypeKafka: + config.KafkaProducerConfig.Timeout = config.Timeout pf, err := NewKafkaProviderFacade(config.KafkaProducerConfig) if err != nil { return nil, err @@ -125,7 +126,9 @@ func (c Client) Call(req *client.Request) (res interface{}, err error) { consumerFacadeMap.Store(cReq.ConsumerGroup, facade) if f, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); ok { cf := f.(ConsumerFacade) - err = cf.Subscribe(c.ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup)) + ctx, cancel := context.WithTimeout(c.ctx, req.Timeout) + defer cancel() + err = cf.Subscribe(ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup)) if err != nil { facade.Stop() consumerFacadeMap.Delete(cReq.ConsumerGroup) diff --git a/pkg/client/request.go b/pkg/client/request.go index 39f01429b..b4b198216 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -20,6 +20,7 @@ package client import ( "context" "net/http" + "time" ) import ( @@ -32,6 +33,7 @@ type Request struct { Context context.Context IngressRequest *http.Request API router.API + Timeout time.Duration } // NewReq create a request diff --git a/pkg/client/triple/triple.go b/pkg/client/triple/triple.go index d540fed5b..2f9cce961 100644 --- a/pkg/client/triple/triple.go +++ b/pkg/client/triple/triple.go @@ -23,6 +23,7 @@ import ( "net/url" "strings" "sync" + "time" ) import ( @@ -34,6 +35,7 @@ import ( import ( "github.com/apache/dubbo-go-pixiu/pkg/client" + "github.com/apache/dubbo-go-pixiu/pkg/logger" ) // InitDefaultTripleClient init default dubbo client @@ -89,7 +91,11 @@ func (dc *Client) Call(req *client.Request) (res interface{}, err error) { } meta := make(map[string][]string) reqData, _ := ioutil.ReadAll(req.IngressRequest.Body) - call, err := p.Call(context.Background(), req.API.Method.IntegrationRequest.Interface, req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta)) + logger.Debugf("[dubbo-go-pixiu] client timeout val %v", req.Timeout) + ctx, cancel := context.WithTimeout(context.Background(), req.Timeout) + defer cancel() + time.Sleep(100 * time.Nanosecond) + call, err := p.Call(ctx, req.API.Method.IntegrationRequest.Interface, req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta)) if err != nil { return "", errors.Errorf("call triple server error = %s", err) } diff --git a/pkg/common/constant/filter.go b/pkg/common/constant/filter.go index 14eda1c10..612d18930 100644 --- a/pkg/common/constant/filter.go +++ b/pkg/common/constant/filter.go @@ -17,7 +17,10 @@ package constant -import "time" +import ( + "fmt" + "time" +) var ( Default403Body = []byte("403 for bidden") @@ -43,5 +46,18 @@ const ( // console Console = "console" - DefaultReqTimeout = 10 * time.Second + DefaultReqTimeout = 1 * time.Nanosecond ) + +func ResolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration { + fmt.Printf("timeout parse %s : %d", currentV, defaultV) + if currentV == "" { + return defaultV + } else { + if duration, err := time.ParseDuration(currentV); err != nil { + return defaultV + } else { + return duration + } + } +} diff --git a/pkg/common/extension/filter/filter_chain.go b/pkg/common/extension/filter/filter_chain.go index 682d32bcc..dce533418 100644 --- a/pkg/common/extension/filter/filter_chain.go +++ b/pkg/common/extension/filter/filter_chain.go @@ -59,7 +59,11 @@ func (c *defaultFilterChain) AppendEncodeFilters(f ...HttpEncodeFilter) { } func (c *defaultFilterChain) OnDecode(ctx *http.HttpContext) { + for ; c.decodeFiltersIndex < len(c.decodeFilters); c.decodeFiltersIndex++ { + + //logger.Debugf("[dubbo-go-pixiu] client Before Api timout :%v", ctx.Timeout) + //logger.Debugf("[dubbo-go-pixiu] client filter :%v", c.decodeFilters[c.decodeFiltersIndex]) filterStatus := c.decodeFilters[c.decodeFiltersIndex].Decode(ctx) switch filterStatus { diff --git a/pkg/common/grpc/manager.go b/pkg/common/grpc/manager.go index eb0f27ab2..d5cc01db7 100644 --- a/pkg/common/grpc/manager.go +++ b/pkg/common/grpc/manager.go @@ -25,7 +25,6 @@ import ( "io/ioutil" "net" stdHttp "net/http" - "time" ) import ( @@ -38,7 +37,6 @@ import ( ) import ( - "github.com/apache/dubbo-go-pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter" router2 "github.com/apache/dubbo-go-pixiu/pkg/common/router" "github.com/apache/dubbo-go-pixiu/pkg/logger" @@ -51,14 +49,12 @@ type GrpcConnectionManager struct { filter.EmptyNetworkFilter config *model.GRPCConnectionManagerConfig routerCoordinator *router2.RouterCoordinator - timeout time.Duration } // CreateGrpcConnectionManager create grpc connection manager func CreateGrpcConnectionManager(hcmc *model.GRPCConnectionManagerConfig) *GrpcConnectionManager { hcm := &GrpcConnectionManager{config: hcmc} hcm.routerCoordinator = router2.CreateRouterCoordinator(&hcmc.RouteConfig) - hcm.timeout = resolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) return hcm } @@ -82,10 +78,9 @@ func (gcm *GrpcConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp gcm.writeStatus(w, status.New(codes.Unknown, "can't find endpoint in cluster")) return } - ctx := context.Background() // timeout - ctx, cancel := context.WithTimeout(ctx, gcm.timeout) + ctx, cancel := context.WithTimeout(ctx, gcm.config.Timeout) defer cancel() newReq := r.Clone(ctx) newReq.URL.Scheme = "http" @@ -161,15 +156,3 @@ func copyHeader(dst, src stdHttp.Header) { } } } - -func resolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration { - if currentV == "" { - return defaultV - } else { - if duration, err := time.ParseDuration(currentV); err != nil { - return defaultV - } else { - return duration - } - } -} diff --git a/pkg/common/http/manager.go b/pkg/common/http/manager.go index cb50e93ae..a0dd568dd 100644 --- a/pkg/common/http/manager.go +++ b/pkg/common/http/manager.go @@ -23,7 +23,6 @@ import ( "io/ioutil" stdHttp "net/http" "sync" - "time" ) import ( @@ -48,7 +47,6 @@ type HttpConnectionManager struct { routerCoordinator *router2.RouterCoordinator filterManager *filter.FilterManager pool sync.Pool - timeout time.Duration } // CreateHttpConnectionManager create http connection manager @@ -60,7 +58,6 @@ func CreateHttpConnectionManager(hcmc *model.HttpConnectionManagerConfig) *HttpC hcm.routerCoordinator = router2.CreateRouterCoordinator(&hcmc.RouteConfig) hcm.filterManager = filter.NewFilterManager(hcmc.HTTPFilters) hcm.filterManager.Load() - hcm.timeout = resolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) return hcm } @@ -87,7 +84,7 @@ func (hcm *HttpConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp hc.Writer = w hc.Request = r hc.Reset() - hc.Timeout = hcm.timeout + hc.Timeout = hcm.config.Timeout err := hcm.Handle(hc) if err != nil { logger.Errorf("ServeHTTP %v", err) @@ -168,15 +165,3 @@ func (hcm *HttpConnectionManager) findRoute(hc *pch.HttpContext) error { hc.RouteEntry(ra) return nil } - -func resolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration { - if currentV == "" { - return defaultV - } else { - if duration, err := time.ParseDuration(currentV); err != nil { - return defaultV - } else { - return duration - } - } -} diff --git a/pkg/context/http/context.go b/pkg/context/http/context.go index 6e7d7198c..ac71a8fb2 100644 --- a/pkg/context/http/context.go +++ b/pkg/context/http/context.go @@ -205,7 +205,9 @@ func (hc *HttpContext) LocalReply() bool { // API sets the API to http context func (hc *HttpContext) API(api router.API) { - hc.Timeout = api.Timeout + if hc.Timeout > api.Timeout { + hc.Timeout = api.Timeout + } hc.Api = &api } diff --git a/pkg/filter/auth/jwt/jwt.go b/pkg/filter/auth/jwt/jwt.go index e7e74b72e..71323ccd0 100644 --- a/pkg/filter/auth/jwt/jwt.go +++ b/pkg/filter/auth/jwt/jwt.go @@ -87,7 +87,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { - + //logger.Debugf("[dubbo-go-pixiu] client Before Api timout jwt :%v", ctx.Timeout) path := ctx.Request.RequestURI router := false diff --git a/pkg/filter/authority/authority.go b/pkg/filter/authority/authority.go index dbb977271..205b73ded 100644 --- a/pkg/filter/authority/authority.go +++ b/pkg/filter/authority/authority.go @@ -71,6 +71,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { + //logger.Debugf("[dubbo-go-pixiu] client Before Api timout authority :%v", c.Timeout) for _, r := range f.cfg.Rules { item := c.GetClientIP() if r.Limit == App { diff --git a/pkg/filter/cors/cors.go b/pkg/filter/cors/cors.go index 1c20a919e..211ec654d 100644 --- a/pkg/filter/cors/cors.go +++ b/pkg/filter/cors/cors.go @@ -18,6 +18,7 @@ package cors import ( + "github.com/apache/dubbo-go-pixiu/pkg/logger" stdHttp "net/http" ) @@ -79,6 +80,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { + logger.Debugf("[dubbo-go-pixiu] client Before Api timout cors :%v", ctx.Timeout) f.handleCors(ctx) return filter.Continue } diff --git a/pkg/filter/csrf/csrf.go b/pkg/filter/csrf/csrf.go index c4b615b55..f3838236d 100644 --- a/pkg/filter/csrf/csrf.go +++ b/pkg/filter/csrf/csrf.go @@ -81,6 +81,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { + //logger.Debugf("[dubbo-go-pixiu] client Before Api timout call :%v", ctx.Timeout) ctx.Request.Header.Set(csrfSecret, f.cfg.Secret) if inMethod(f.cfg.IgnoreMethods, ctx.Request.Method) { diff --git a/pkg/filter/event/event.go b/pkg/filter/event/event.go index f41022844..242bbc414 100644 --- a/pkg/filter/event/event.go +++ b/pkg/filter/event/event.go @@ -20,7 +20,6 @@ package event import ( "fmt" sdkhttp "net/http" - "time" ) import ( @@ -71,24 +70,12 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { + //logger.Debugf("[dubbo-go-pixiu] client Before Api timout event :%v", ctx.Timeout) + f.cfg.Timeout = ctx.Timeout mqClient := mq.NewSingletonMQClient(*f.cfg) req := client.NewReq(ctx.Request.Context(), ctx.Request, *ctx.GetAPI()) - - var resp interface{} - var err error - respCh := make(chan struct{}) - go func() { - resp, err = mqClient.Call(req) - close(respCh) - }() - select { - case <-respCh: - case <-time.After(ctx.Timeout): - logger.Errorf("[dubbo-go-pixiu] event client call timeout err!") - return filter.Stop - } - - //resp, err := mqClient.Call(req) + req.Timeout = ctx.Timeout + resp, err := mqClient.Call(req) if err != nil { logger.Errorf("[dubbo-go-pixiu] event client call err:%v!", err) ctx.SendLocalReply(sdkhttp.StatusInternalServerError, []byte(fmt.Sprintf("event client call err:%v", err))) diff --git a/pkg/filter/header/header.go b/pkg/filter/header/header.go index ffbe38ed0..0ca7c6304 100644 --- a/pkg/filter/header/header.go +++ b/pkg/filter/header/header.go @@ -74,6 +74,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus { + //logger.Debugf("[dubbo-go-pixiu] client Before Api timout call :%v", hc.Timeout) api := hc.GetAPI() headers := api.Headers if len(headers) == 0 { diff --git a/pkg/filter/http/apiconfig/api_config.go b/pkg/filter/http/apiconfig/api_config.go index 52b3b8a6a..7ec41b387 100644 --- a/pkg/filter/http/apiconfig/api_config.go +++ b/pkg/filter/http/apiconfig/api_config.go @@ -113,6 +113,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c } func (f *Filter) Decode(ctx *contexthttp.HttpContext) filter.FilterStatus { + logger.Debugf("[dubbo-go-pixiu] client Before Api timout apiconfig :%v", ctx.Timeout) req := ctx.Request v, err := f.apiService.MatchAPI(req.URL.Path, fc.HTTPVerb(req.Method)) if err != nil { diff --git a/pkg/filter/http/dubboproxy/dubbo.go b/pkg/filter/http/dubboproxy/dubbo.go index e21ead417..e5521adbe 100644 --- a/pkg/filter/http/dubboproxy/dubbo.go +++ b/pkg/filter/http/dubboproxy/dubbo.go @@ -25,13 +25,11 @@ import ( "net/http" "reflect" "strings" - "time" ) import ( "dubbo.apache.org/dubbo-go/v3/common" dubboConstant "dubbo.apache.org/dubbo-go/v3/common/constant" - "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/dubbo" "dubbo.apache.org/dubbo-go/v3/protocol/invocation" hessian "github.com/apache/dubbo-go-hessian2" @@ -100,6 +98,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *pixiuHttp.HttpContext, cha // Decode handle http request to dubbo direct generic call and return http response func (f *Filter) Decode(hc *pixiuHttp.HttpContext) filter.FilterStatus { + logger.Debugf("[dubbo-go-pixiu] client Before dubboproxy timout dubbo :%v", hc.Timeout) rEntry := hc.GetRouteEntry() if rEntry == nil { logger.Info("[dubbo-go-pixiu] http not match route") @@ -227,19 +226,9 @@ func (f *Filter) Decode(hc *pixiuHttp.HttpContext) filter.FilterStatus { invoc.SetReply(&resp) invCtx := context.Background() - - resultCh := make(chan protocol.Result) - go func() { - resultCh <- invoker.Invoke(invCtx, invoc) - }() - var result protocol.Result - timer := time.NewTimer(hc.Timeout) - select { - case result = <-resultCh: - case <-timer.C: - timer.Stop() - return filter.Stop - } + invCtx, cancel := context.WithTimeout(invCtx, hc.Timeout) + defer cancel() + result := invoker.Invoke(invCtx, invoc) result.SetAttachments(invoc.Attachments()) if result.Error() != nil { diff --git a/pkg/filter/http/grpcproxy/grpc.go b/pkg/filter/http/grpcproxy/grpc.go index d3d86336d..28b111cc0 100644 --- a/pkg/filter/http/grpcproxy/grpc.go +++ b/pkg/filter/http/grpcproxy/grpc.go @@ -179,6 +179,7 @@ func getServiceAndMethod(path string) (string, string) { // Decode use the default http to grpc transcoding strategy https://cloud.google.com/endpoints/docs/grpc/transcoding func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { + logger.Debugf("[dubbo-go-pixiu] client Before grpc timout grpc :%v", c.Timeout) svc, mth := getServiceAndMethod(c.GetUrl()) var clientConn *grpc.ClientConn @@ -194,7 +195,8 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { return filter.Stop } // timeout for Dial and Invoke - c.Ctx, _ = context.WithTimeout(c.Ctx, c.Timeout) + ctx, cancel := context.WithTimeout(c.Ctx, c.Timeout) + defer cancel() ep := e.Address.GetAddress() p, ok := f.pools[strings.Join([]string{re.Cluster, ep}, ".")] @@ -205,7 +207,7 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { clientConn, ok = p.Get().(*grpc.ClientConn) if !ok || clientConn == nil { // TODO(Kenway): Support Credential and TLS - clientConn, err = grpc.DialContext(c.Ctx, ep, grpc.WithTransportCredentials(insecure.NewCredentials())) + clientConn, err = grpc.DialContext(ctx, ep, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil || clientConn == nil { logger.Errorf("%s err {failed to connect to grpc service provider}", loggerHeader) c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte((fmt.Sprintf("%s", err)))) @@ -214,14 +216,14 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { } // get DescriptorSource, contain file and reflection - source, err := f.descriptor.getDescriptorSource(context.WithValue(c.Ctx, ct.ContextKey(GrpcClientConnKey), clientConn), f.cfg) + source, err := f.descriptor.getDescriptorSource(context.WithValue(ctx, ct.ContextKey(GrpcClientConnKey), clientConn), f.cfg) if err != nil { logger.Errorf("%s err %s : %s ", loggerHeader, "get desc source fail", err) c.SendLocalReply(stdHttp.StatusInternalServerError, []byte("service not config proto file or the server not support reflection API")) return filter.Stop } //put DescriptorSource concurrent, del if no need - c.Ctx = context.WithValue(c.Ctx, ct.ContextKey(DescriptorSourceKey), source) + ctx = context.WithValue(ctx, ct.ContextKey(DescriptorSourceKey), source) dscp, err := source.FindSymbol(svc) if err != nil { @@ -260,7 +262,7 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { // metadata in grpc has the same feature in http md := mapHeaderToMetadata(c.AllHeaders()) - ctx := metadata.NewOutgoingContext(c.Ctx, md) + ctx = metadata.NewOutgoingContext(ctx, md) md = metadata.MD{} t := metadata.MD{} diff --git a/pkg/filter/http/httpproxy/routerfilter.go b/pkg/filter/http/httpproxy/routerfilter.go index c2ff81c32..6fb904e5d 100644 --- a/pkg/filter/http/httpproxy/routerfilter.go +++ b/pkg/filter/http/httpproxy/routerfilter.go @@ -80,6 +80,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus { + logger.Debugf("[dubbo-go-pixiu] client Before Api timout routerfilter :%v", hc.Timeout) rEntry := hc.GetRouteEntry() if rEntry == nil { panic("no route entry") @@ -121,7 +122,7 @@ func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus { cli := &http3.Client{ Transport: f.transport, - Timeout: hc.Timeout, //从HttpContext中获取超时时间 + Timeout: hc.Timeout, } resp, err := cli.Do(req) diff --git a/pkg/filter/http/loadbalancer/loadbalancer.go b/pkg/filter/http/loadbalancer/loadbalancer.go index a93b5f04f..e476fce5e 100644 --- a/pkg/filter/http/loadbalancer/loadbalancer.go +++ b/pkg/filter/http/loadbalancer/loadbalancer.go @@ -75,6 +75,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c } func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus { + //logger.Debugf("[dubbo-go-pixiu] client Before Api timout loadbalancer :%v", c.Timeout) allInstances := strings.Split(c.GetAPI().IntegrationRequest.HTTPBackendConfig.URL, ",") idx := rand.Int31n(int32(len(allInstances))) c.Api.IntegrationRequest.HTTPBackendConfig.URL = allInstances[idx] diff --git a/pkg/filter/http/proxyrewrite/rewrite.go b/pkg/filter/http/proxyrewrite/rewrite.go index f8ab76204..836724d3a 100644 --- a/pkg/filter/http/proxyrewrite/rewrite.go +++ b/pkg/filter/http/proxyrewrite/rewrite.go @@ -96,6 +96,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c } func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus { + //logger.Debugf("[dubbo-go-pixiu] client Before Api timout weite :%v", c.Timeout) url := c.GetUrl() newUrl := f.uriRegex.ReplaceAllString(url, f.replace) diff --git a/pkg/filter/http/remote/call.go b/pkg/filter/http/remote/call.go index 55e5aae53..1a42e0b95 100644 --- a/pkg/filter/http/remote/call.go +++ b/pkg/filter/http/remote/call.go @@ -24,7 +24,6 @@ import ( "os" "strconv" "strings" - "time" ) import ( @@ -116,7 +115,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c } func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus { - + logger.Debugf("[dubbo-go-pixiu] client Before Api timout call :%v", c.Timeout) if f.conf.Dpc.AutoResolve { if err := f.resolve(c); err != nil { c.SendLocalReply(http.StatusInternalServerError, []byte(fmt.Sprintf("auto resolve err: %s", err))) @@ -141,21 +140,8 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus { } req := client.NewReq(c.Request.Context(), c.Request, *api) - - var resp interface{} - respCh := make(chan struct{}) - go func() { - resp, err = cli.Call(req) - close(respCh) - }() - - select { - case <-respCh: - case <-time.After(c.Timeout): - logger.Errorf("[dubbo-go-pixiu] client call timeout err!") - return filter.Stop - } - //resp, err := cli.Call(req) + req.Timeout = c.Timeout + resp, err := cli.Call(req) if err != nil { logger.Errorf("[dubbo-go-pixiu] client call err:%v!", err) c.SendLocalReply(http.StatusInternalServerError, []byte(fmt.Sprintf("client call err: %s", err))) diff --git a/pkg/filter/network/dubboproxy/plugin.go b/pkg/filter/network/dubboproxy/plugin.go index cb033b2ba..4a69c0b4a 100644 --- a/pkg/filter/network/dubboproxy/plugin.go +++ b/pkg/filter/network/dubboproxy/plugin.go @@ -44,6 +44,7 @@ func (p *Plugin) Kind() string { // CreateFilter create dubbo networkfilter func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) { hcmc, ok := config.(*model.DubboProxyConnectionManagerConfig) + hcmc.Timeout = constant.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) if !ok { panic("CreateFilter occur some exception for the type is not suitable one.") } diff --git a/pkg/filter/network/grpcconnectionmanager/plugin.go b/pkg/filter/network/grpcconnectionmanager/plugin.go index c6002cf88..0976678b0 100644 --- a/pkg/filter/network/grpcconnectionmanager/plugin.go +++ b/pkg/filter/network/grpcconnectionmanager/plugin.go @@ -44,6 +44,7 @@ func (p *Plugin) Kind() string { // CreateFilter create grpc network filter func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) { hcmc := config.(*model.GRPCConnectionManagerConfig) + hcmc.Timeout = constant.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) return grpc.CreateGrpcConnectionManager(hcmc), nil } diff --git a/pkg/filter/network/httpconnectionmanager/plugin.go b/pkg/filter/network/httpconnectionmanager/plugin.go index ce90f374c..e319feb0a 100644 --- a/pkg/filter/network/httpconnectionmanager/plugin.go +++ b/pkg/filter/network/httpconnectionmanager/plugin.go @@ -44,6 +44,7 @@ func (p *Plugin) Kind() string { // CreateFilter create http network filter func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) { hcmc := config.(*model.HttpConnectionManagerConfig) + hcmc.Timeout = constant.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) return http.CreateHttpConnectionManager(hcmc), nil } diff --git a/pkg/filter/seata/filter.go b/pkg/filter/seata/filter.go index f4ba98a04..2103e99ab 100644 --- a/pkg/filter/seata/filter.go +++ b/pkg/filter/seata/filter.go @@ -20,7 +20,6 @@ package seata import ( netHttp "net/http" "strings" - "time" ) import ( @@ -34,7 +33,6 @@ import ( import ( "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter" "github.com/apache/dubbo-go-pixiu/pkg/context/http" - "github.com/apache/dubbo-go-pixiu/pkg/logger" ) const ( @@ -146,19 +144,7 @@ func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { tccResource, exists := f.tccResources[strings.ToLower(path)] if exists { - respCh := make(chan struct{}) - go func() { - f.branchRegisterResult = f.handleHttp1BranchRegister(ctx, tccResource) - close(respCh) - }() - - select { - case <-respCh: - case <-time.After(ctx.Timeout): - logger.Errorf("[dubbo-go-pixiu] seata handle http1 timeout err!") - return filter.Stop - } - //f.branchRegisterResult = f.handleHttp1BranchRegister(ctx, tccResource) + f.branchRegisterResult = f.handleHttp1BranchRegister(ctx, tccResource) } return filter.Continue } diff --git a/pkg/filter/seata/transaction.go b/pkg/filter/seata/transaction.go index 60397db5b..90e01f3ec 100644 --- a/pkg/filter/seata/transaction.go +++ b/pkg/filter/seata/transaction.go @@ -130,8 +130,9 @@ func (f *Filter) handleHttp1BranchRegister(ctx *http.HttpContext, tccResource *T ctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("encode request context failed, %v", err))) return false } - - branchID, err := f.branchRegister(ctx.Ctx, xid, tccResource.PrepareRequestPath, apis.TCC, data, "") + Ctx, cancel := context.WithTimeout(ctx.Ctx, ctx.Timeout) + defer cancel() + branchID, err := f.branchRegister(Ctx, xid, tccResource.PrepareRequestPath, apis.TCC, data, "") if err != nil { logger.Errorf("branch transaction register failed, xid: %s, err: %v", xid, err) ctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("branch transaction register failed, %v", err))) diff --git a/pkg/model/http.go b/pkg/model/http.go index e9fd97ac8..6f0cb8a45 100644 --- a/pkg/model/http.go +++ b/pkg/model/http.go @@ -19,6 +19,7 @@ package model import ( "github.com/mitchellh/mapstructure" + "time" ) import ( @@ -33,12 +34,14 @@ type HttpConnectionManagerConfig struct { IdleTimeoutStr string `yaml:"idle_timeout" json:"idle_timeout" mapstructure:"idle_timeout"` GenerateRequestID bool `yaml:"generate_request_id" json:"generate_request_id" mapstructure:"generate_request_id"` TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` + Timeout time.Duration `yaml:"timeoutDuration" json:"timeoutDuration" mapstructure:"timeoutDuration"` } // GRPCConnectionManagerConfig type GRPCConnectionManagerConfig struct { RouteConfig RouteConfiguration `yaml:"route_config" json:"route_config" mapstructure:"route_config"` TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` + Timeout time.Duration `yaml:"timeoutDuration" json:"timeoutDuration" mapstructure:"timeoutDuration"` } // DubboProxyConnectionManagerConfig @@ -46,6 +49,7 @@ type DubboProxyConnectionManagerConfig struct { RouteConfig RouteConfiguration `yaml:"route_config" json:"route_config" mapstructure:"route_config"` DubboFilters []*DubboFilter `yaml:"dubbo_filters" json:"dubbo_filters" mapstructure:"dubbo_filters"` TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` + Timeout time.Duration `yaml:"timeoutDuration" json:"timeoutDuration" mapstructure:"timeoutDuration"` } // HTTPFilter http filter diff --git a/samples/dubbogo/simple/body/pixiu/conf.yaml b/samples/dubbogo/simple/body/pixiu/conf.yaml index 1d582030c..3be28fa23 100644 --- a/samples/dubbogo/simple/body/pixiu/conf.yaml +++ b/samples/dubbogo/simple/body/pixiu/conf.yaml @@ -24,6 +24,7 @@ static_resources: filters: - name: dgp.filter.httpconnectionmanager config: + timeout : "2ns" route_config: routes: - match: diff --git a/samples/dubbogo/simple/body/server/app/user.go b/samples/dubbogo/simple/body/server/app/user.go index 6df7c480f..8e0dadb96 100644 --- a/samples/dubbogo/simple/body/server/app/user.go +++ b/samples/dubbogo/simple/body/server/app/user.go @@ -235,6 +235,7 @@ func (u *UserProvider) GetUserByNameAndAge(ctx context.Context, name string, age // UpdateUser update by user struct, my be another struct, PX config POST or PUT. func (u *UserProvider) UpdateUser(ctx context.Context, user *User) (bool, error) { fmt.Printf("Req UpdateUser data: %#v \n", user) + time.Sleep(1 * time.Millisecond) r, ok := cache.GetByName(user.Name) if ok { if user.ID != "" { @@ -250,6 +251,7 @@ func (u *UserProvider) UpdateUser(ctx context.Context, user *User) (bool, error) // UpdateUserByName update by user struct, my be another struct, PX config POST or PUT. func (u *UserProvider) UpdateUserByName(ctx context.Context, name string, user *User) (bool, error) { + time.Sleep(1 * time.Millisecond) fmt.Printf("Req UpdateUserByName data: %#v \n", user) r, ok := cache.GetByName(name) if ok { diff --git a/samples/dubbogo/simple/triple/pixiu/conf.yaml b/samples/dubbogo/simple/triple/pixiu/conf.yaml index ea45ccb15..0f6f0c805 100644 --- a/samples/dubbogo/simple/triple/pixiu/conf.yaml +++ b/samples/dubbogo/simple/triple/pixiu/conf.yaml @@ -29,6 +29,7 @@ static_resources: filters: - name: dgp.filter.httpconnectionmanager config: + timeout : "2ns" route_config: routes: - match: diff --git a/samples/http/simple/pixiu/conf.yaml b/samples/http/simple/pixiu/conf.yaml index 6b3898111..8e8e179a1 100644 --- a/samples/http/simple/pixiu/conf.yaml +++ b/samples/http/simple/pixiu/conf.yaml @@ -29,6 +29,7 @@ static_resources: filters: - name: dgp.filter.httpconnectionmanager config: + timeout: "2ns" route_config: routes: - match: From 7c10cdf6b4c5c592b0fe9f65493398fe4d05269f Mon Sep 17 00:00:00 2001 From: CSWYF3634076 <3634076@qq.com> Date: Sat, 27 Aug 2022 22:17:26 +0800 Subject: [PATCH 04/10] feat:timeout dubbo --- pkg/client/dubbo/dubbo.go | 10 +++++++--- pkg/client/triple/triple.go | 2 -- pkg/common/constant/filter.go | 2 +- pkg/common/extension/filter/filter_chain.go | 4 ---- pkg/common/http/manager_test.go | 1 - pkg/filter/auth/jwt/jwt.go | 2 +- pkg/filter/authority/authority.go | 1 - pkg/filter/cors/cors.go | 2 -- pkg/filter/csrf/csrf.go | 1 - pkg/filter/event/event.go | 1 - pkg/filter/header/header.go | 1 - pkg/filter/http/apiconfig/api_config.go | 1 - pkg/filter/http/dubboproxy/dubbo.go | 3 --- pkg/filter/http/grpcproxy/grpc.go | 1 - pkg/filter/http/httpproxy/routerfilter.go | 1 - pkg/filter/http/loadbalancer/loadbalancer.go | 1 - pkg/filter/http/proxyrewrite/rewrite.go | 1 - pkg/filter/http/remote/call.go | 1 - samples/dubbogo/simple/body/pixiu/conf.yaml | 1 - samples/dubbogo/simple/body/server/app/user.go | 2 -- samples/http/simple/pixiu/conf.yaml | 1 - 21 files changed, 9 insertions(+), 31 deletions(-) diff --git a/pkg/client/dubbo/dubbo.go b/pkg/client/dubbo/dubbo.go index f5581f3ec..3de1667ab 100644 --- a/pkg/client/dubbo/dubbo.go +++ b/pkg/client/dubbo/dubbo.go @@ -52,6 +52,7 @@ import ( import ( "github.com/apache/dubbo-go-pixiu/pkg/client" + cst "github.com/apache/dubbo-go-pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pkg/config" "github.com/apache/dubbo-go-pixiu/pkg/logger" ) @@ -209,8 +210,6 @@ func (dc *Client) Call(req *client.Request) (res interface{}, err error) { span.SetAttributes(attribute.Key(spanTagValues).String(string(finalValues))) defer span.End() ctx := context.WithValue(req.Context, constant.TracingRemoteSpanCtx, trace.SpanFromContext(req.Context).SpanContext()) - ctx, cancel := context.WithTimeout(ctx, req.Timeout) - defer cancel() rst, err := gs.Invoke(ctx, method, types, vals) if err != nil { return nil, err @@ -299,7 +298,6 @@ func (dc *Client) create(key string, irequest fc.IntegrationRequest) *generic.Ge useNacosRegister = true } } - refConf := dg.ReferenceConfig{ InterfaceName: irequest.Interface, Cluster: constant.ClusterKeyFailover, @@ -316,6 +314,12 @@ func (dc *Client) create(key string, irequest fc.IntegrationRequest) *generic.Ge refConf.Retries = irequest.DubboBackendConfig.Retries } + if dc.dubboProxyConfig.Timeout != nil { + refConf.RequestTimeout = dc.dubboProxyConfig.Timeout.RequestTimeoutStr + } else { + refConf.RequestTimeout = cst.DefaultReqTimeout.String() + } + logger.Debugf("[dubbo-go-pixiu] client dubbo timeout val %v", refConf.RequestTimeout) dc.lock.Lock() defer dc.lock.Unlock() diff --git a/pkg/client/triple/triple.go b/pkg/client/triple/triple.go index 2f9cce961..e25de88e7 100644 --- a/pkg/client/triple/triple.go +++ b/pkg/client/triple/triple.go @@ -35,7 +35,6 @@ import ( import ( "github.com/apache/dubbo-go-pixiu/pkg/client" - "github.com/apache/dubbo-go-pixiu/pkg/logger" ) // InitDefaultTripleClient init default dubbo client @@ -91,7 +90,6 @@ func (dc *Client) Call(req *client.Request) (res interface{}, err error) { } meta := make(map[string][]string) reqData, _ := ioutil.ReadAll(req.IngressRequest.Body) - logger.Debugf("[dubbo-go-pixiu] client timeout val %v", req.Timeout) ctx, cancel := context.WithTimeout(context.Background(), req.Timeout) defer cancel() time.Sleep(100 * time.Nanosecond) diff --git a/pkg/common/constant/filter.go b/pkg/common/constant/filter.go index 612d18930..3de0419f9 100644 --- a/pkg/common/constant/filter.go +++ b/pkg/common/constant/filter.go @@ -46,7 +46,7 @@ const ( // console Console = "console" - DefaultReqTimeout = 1 * time.Nanosecond + DefaultReqTimeout = 10 * time.Second ) func ResolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration { diff --git a/pkg/common/extension/filter/filter_chain.go b/pkg/common/extension/filter/filter_chain.go index dce533418..682d32bcc 100644 --- a/pkg/common/extension/filter/filter_chain.go +++ b/pkg/common/extension/filter/filter_chain.go @@ -59,11 +59,7 @@ func (c *defaultFilterChain) AppendEncodeFilters(f ...HttpEncodeFilter) { } func (c *defaultFilterChain) OnDecode(ctx *http.HttpContext) { - for ; c.decodeFiltersIndex < len(c.decodeFilters); c.decodeFiltersIndex++ { - - //logger.Debugf("[dubbo-go-pixiu] client Before Api timout :%v", ctx.Timeout) - //logger.Debugf("[dubbo-go-pixiu] client filter :%v", c.decodeFilters[c.decodeFiltersIndex]) filterStatus := c.decodeFilters[c.decodeFiltersIndex].Decode(ctx) switch filterStatus { diff --git a/pkg/common/http/manager_test.go b/pkg/common/http/manager_test.go index 3aa36635b..684dd2dd2 100644 --- a/pkg/common/http/manager_test.go +++ b/pkg/common/http/manager_test.go @@ -124,7 +124,6 @@ func TestCreateHttpConnectionManager(t *testing.T) { ServerName: "test_http_dubbo", GenerateRequestID: false, IdleTimeoutStr: "100", - TimeoutStr: "10s", } hcm := CreateHttpConnectionManager(&hcmc) diff --git a/pkg/filter/auth/jwt/jwt.go b/pkg/filter/auth/jwt/jwt.go index 71323ccd0..e7e74b72e 100644 --- a/pkg/filter/auth/jwt/jwt.go +++ b/pkg/filter/auth/jwt/jwt.go @@ -87,7 +87,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { - //logger.Debugf("[dubbo-go-pixiu] client Before Api timout jwt :%v", ctx.Timeout) + path := ctx.Request.RequestURI router := false diff --git a/pkg/filter/authority/authority.go b/pkg/filter/authority/authority.go index 205b73ded..dbb977271 100644 --- a/pkg/filter/authority/authority.go +++ b/pkg/filter/authority/authority.go @@ -71,7 +71,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { - //logger.Debugf("[dubbo-go-pixiu] client Before Api timout authority :%v", c.Timeout) for _, r := range f.cfg.Rules { item := c.GetClientIP() if r.Limit == App { diff --git a/pkg/filter/cors/cors.go b/pkg/filter/cors/cors.go index 211ec654d..1c20a919e 100644 --- a/pkg/filter/cors/cors.go +++ b/pkg/filter/cors/cors.go @@ -18,7 +18,6 @@ package cors import ( - "github.com/apache/dubbo-go-pixiu/pkg/logger" stdHttp "net/http" ) @@ -80,7 +79,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { - logger.Debugf("[dubbo-go-pixiu] client Before Api timout cors :%v", ctx.Timeout) f.handleCors(ctx) return filter.Continue } diff --git a/pkg/filter/csrf/csrf.go b/pkg/filter/csrf/csrf.go index f3838236d..c4b615b55 100644 --- a/pkg/filter/csrf/csrf.go +++ b/pkg/filter/csrf/csrf.go @@ -81,7 +81,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { - //logger.Debugf("[dubbo-go-pixiu] client Before Api timout call :%v", ctx.Timeout) ctx.Request.Header.Set(csrfSecret, f.cfg.Secret) if inMethod(f.cfg.IgnoreMethods, ctx.Request.Method) { diff --git a/pkg/filter/event/event.go b/pkg/filter/event/event.go index 242bbc414..c6516e77d 100644 --- a/pkg/filter/event/event.go +++ b/pkg/filter/event/event.go @@ -70,7 +70,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { - //logger.Debugf("[dubbo-go-pixiu] client Before Api timout event :%v", ctx.Timeout) f.cfg.Timeout = ctx.Timeout mqClient := mq.NewSingletonMQClient(*f.cfg) req := client.NewReq(ctx.Request.Context(), ctx.Request, *ctx.GetAPI()) diff --git a/pkg/filter/header/header.go b/pkg/filter/header/header.go index 0ca7c6304..ffbe38ed0 100644 --- a/pkg/filter/header/header.go +++ b/pkg/filter/header/header.go @@ -74,7 +74,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus { - //logger.Debugf("[dubbo-go-pixiu] client Before Api timout call :%v", hc.Timeout) api := hc.GetAPI() headers := api.Headers if len(headers) == 0 { diff --git a/pkg/filter/http/apiconfig/api_config.go b/pkg/filter/http/apiconfig/api_config.go index 7ec41b387..52b3b8a6a 100644 --- a/pkg/filter/http/apiconfig/api_config.go +++ b/pkg/filter/http/apiconfig/api_config.go @@ -113,7 +113,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c } func (f *Filter) Decode(ctx *contexthttp.HttpContext) filter.FilterStatus { - logger.Debugf("[dubbo-go-pixiu] client Before Api timout apiconfig :%v", ctx.Timeout) req := ctx.Request v, err := f.apiService.MatchAPI(req.URL.Path, fc.HTTPVerb(req.Method)) if err != nil { diff --git a/pkg/filter/http/dubboproxy/dubbo.go b/pkg/filter/http/dubboproxy/dubbo.go index e5521adbe..e358d24c9 100644 --- a/pkg/filter/http/dubboproxy/dubbo.go +++ b/pkg/filter/http/dubboproxy/dubbo.go @@ -98,7 +98,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *pixiuHttp.HttpContext, cha // Decode handle http request to dubbo direct generic call and return http response func (f *Filter) Decode(hc *pixiuHttp.HttpContext) filter.FilterStatus { - logger.Debugf("[dubbo-go-pixiu] client Before dubboproxy timout dubbo :%v", hc.Timeout) rEntry := hc.GetRouteEntry() if rEntry == nil { logger.Info("[dubbo-go-pixiu] http not match route") @@ -226,8 +225,6 @@ func (f *Filter) Decode(hc *pixiuHttp.HttpContext) filter.FilterStatus { invoc.SetReply(&resp) invCtx := context.Background() - invCtx, cancel := context.WithTimeout(invCtx, hc.Timeout) - defer cancel() result := invoker.Invoke(invCtx, invoc) result.SetAttachments(invoc.Attachments()) diff --git a/pkg/filter/http/grpcproxy/grpc.go b/pkg/filter/http/grpcproxy/grpc.go index 28b111cc0..bbb373c57 100644 --- a/pkg/filter/http/grpcproxy/grpc.go +++ b/pkg/filter/http/grpcproxy/grpc.go @@ -179,7 +179,6 @@ func getServiceAndMethod(path string) (string, string) { // Decode use the default http to grpc transcoding strategy https://cloud.google.com/endpoints/docs/grpc/transcoding func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { - logger.Debugf("[dubbo-go-pixiu] client Before grpc timout grpc :%v", c.Timeout) svc, mth := getServiceAndMethod(c.GetUrl()) var clientConn *grpc.ClientConn diff --git a/pkg/filter/http/httpproxy/routerfilter.go b/pkg/filter/http/httpproxy/routerfilter.go index 6fb904e5d..6331f6c69 100644 --- a/pkg/filter/http/httpproxy/routerfilter.go +++ b/pkg/filter/http/httpproxy/routerfilter.go @@ -80,7 +80,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus { - logger.Debugf("[dubbo-go-pixiu] client Before Api timout routerfilter :%v", hc.Timeout) rEntry := hc.GetRouteEntry() if rEntry == nil { panic("no route entry") diff --git a/pkg/filter/http/loadbalancer/loadbalancer.go b/pkg/filter/http/loadbalancer/loadbalancer.go index e476fce5e..a93b5f04f 100644 --- a/pkg/filter/http/loadbalancer/loadbalancer.go +++ b/pkg/filter/http/loadbalancer/loadbalancer.go @@ -75,7 +75,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c } func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus { - //logger.Debugf("[dubbo-go-pixiu] client Before Api timout loadbalancer :%v", c.Timeout) allInstances := strings.Split(c.GetAPI().IntegrationRequest.HTTPBackendConfig.URL, ",") idx := rand.Int31n(int32(len(allInstances))) c.Api.IntegrationRequest.HTTPBackendConfig.URL = allInstances[idx] diff --git a/pkg/filter/http/proxyrewrite/rewrite.go b/pkg/filter/http/proxyrewrite/rewrite.go index 836724d3a..f8ab76204 100644 --- a/pkg/filter/http/proxyrewrite/rewrite.go +++ b/pkg/filter/http/proxyrewrite/rewrite.go @@ -96,7 +96,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c } func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus { - //logger.Debugf("[dubbo-go-pixiu] client Before Api timout weite :%v", c.Timeout) url := c.GetUrl() newUrl := f.uriRegex.ReplaceAllString(url, f.replace) diff --git a/pkg/filter/http/remote/call.go b/pkg/filter/http/remote/call.go index 1a42e0b95..a66675a6a 100644 --- a/pkg/filter/http/remote/call.go +++ b/pkg/filter/http/remote/call.go @@ -115,7 +115,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c } func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus { - logger.Debugf("[dubbo-go-pixiu] client Before Api timout call :%v", c.Timeout) if f.conf.Dpc.AutoResolve { if err := f.resolve(c); err != nil { c.SendLocalReply(http.StatusInternalServerError, []byte(fmt.Sprintf("auto resolve err: %s", err))) diff --git a/samples/dubbogo/simple/body/pixiu/conf.yaml b/samples/dubbogo/simple/body/pixiu/conf.yaml index 3be28fa23..1d582030c 100644 --- a/samples/dubbogo/simple/body/pixiu/conf.yaml +++ b/samples/dubbogo/simple/body/pixiu/conf.yaml @@ -24,7 +24,6 @@ static_resources: filters: - name: dgp.filter.httpconnectionmanager config: - timeout : "2ns" route_config: routes: - match: diff --git a/samples/dubbogo/simple/body/server/app/user.go b/samples/dubbogo/simple/body/server/app/user.go index 8e0dadb96..6df7c480f 100644 --- a/samples/dubbogo/simple/body/server/app/user.go +++ b/samples/dubbogo/simple/body/server/app/user.go @@ -235,7 +235,6 @@ func (u *UserProvider) GetUserByNameAndAge(ctx context.Context, name string, age // UpdateUser update by user struct, my be another struct, PX config POST or PUT. func (u *UserProvider) UpdateUser(ctx context.Context, user *User) (bool, error) { fmt.Printf("Req UpdateUser data: %#v \n", user) - time.Sleep(1 * time.Millisecond) r, ok := cache.GetByName(user.Name) if ok { if user.ID != "" { @@ -251,7 +250,6 @@ func (u *UserProvider) UpdateUser(ctx context.Context, user *User) (bool, error) // UpdateUserByName update by user struct, my be another struct, PX config POST or PUT. func (u *UserProvider) UpdateUserByName(ctx context.Context, name string, user *User) (bool, error) { - time.Sleep(1 * time.Millisecond) fmt.Printf("Req UpdateUserByName data: %#v \n", user) r, ok := cache.GetByName(name) if ok { diff --git a/samples/http/simple/pixiu/conf.yaml b/samples/http/simple/pixiu/conf.yaml index 8e8e179a1..6b3898111 100644 --- a/samples/http/simple/pixiu/conf.yaml +++ b/samples/http/simple/pixiu/conf.yaml @@ -29,7 +29,6 @@ static_resources: filters: - name: dgp.filter.httpconnectionmanager config: - timeout: "2ns" route_config: routes: - match: From 61d72d4cb6348d71d131909815264306189bc6f5 Mon Sep 17 00:00:00 2001 From: CSWYF3634076 <3634076@qq.com> Date: Sat, 27 Aug 2022 22:20:41 +0800 Subject: [PATCH 05/10] feat:timeout dubbo --- samples/dubbogo/simple/triple/pixiu/conf.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/dubbogo/simple/triple/pixiu/conf.yaml b/samples/dubbogo/simple/triple/pixiu/conf.yaml index 0f6f0c805..ea45ccb15 100644 --- a/samples/dubbogo/simple/triple/pixiu/conf.yaml +++ b/samples/dubbogo/simple/triple/pixiu/conf.yaml @@ -29,7 +29,6 @@ static_resources: filters: - name: dgp.filter.httpconnectionmanager config: - timeout : "2ns" route_config: routes: - match: From fe35c80886c8bf301e3026c61dc52c5f5c7db0ea Mon Sep 17 00:00:00 2001 From: CSWYF3634076 <3634076@qq.com> Date: Sat, 27 Aug 2022 22:47:55 +0800 Subject: [PATCH 06/10] feat : timeout --- pkg/client/client.go | 2 +- pkg/client/dubbo/dubbo.go | 8 +++++++- pkg/client/http/http.go | 3 +-- pkg/client/mq/config.go | 9 +++++---- pkg/client/mq/kafka_facade.go | 1 + pkg/client/mq/mq.go | 5 ++++- pkg/client/request.go | 2 ++ pkg/client/triple/triple.go | 6 +++++- pkg/common/constant/filter.go | 20 +++++++++++++++++++ pkg/common/grpc/manager.go | 7 +++++-- pkg/common/http/manager.go | 2 +- pkg/context/http/context.go | 4 +++- pkg/filter/event/event.go | 2 ++ pkg/filter/http/grpcproxy/grpc.go | 16 +++++++++------ pkg/filter/http/httpproxy/routerfilter.go | 7 ++++++- pkg/filter/http/remote/call.go | 12 +++++------ pkg/filter/network/dubboproxy/plugin.go | 1 + .../network/grpcconnectionmanager/plugin.go | 1 + .../network/httpconnectionmanager/plugin.go | 1 + pkg/filter/seata/transaction.go | 5 +++-- pkg/model/http.go | 7 +++++++ 21 files changed, 92 insertions(+), 29 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index f07b5a49f..a290179fc 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -22,7 +22,7 @@ type Client interface { // Apply to init client Apply() error - // Close close the clinet + // Close close the client Close() error // Call invoke the downstream service. diff --git a/pkg/client/dubbo/dubbo.go b/pkg/client/dubbo/dubbo.go index e2a9774de..3de1667ab 100644 --- a/pkg/client/dubbo/dubbo.go +++ b/pkg/client/dubbo/dubbo.go @@ -52,6 +52,7 @@ import ( import ( "github.com/apache/dubbo-go-pixiu/pkg/client" + cst "github.com/apache/dubbo-go-pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pkg/config" "github.com/apache/dubbo-go-pixiu/pkg/logger" ) @@ -297,7 +298,6 @@ func (dc *Client) create(key string, irequest fc.IntegrationRequest) *generic.Ge useNacosRegister = true } } - refConf := dg.ReferenceConfig{ InterfaceName: irequest.Interface, Cluster: constant.ClusterKeyFailover, @@ -314,6 +314,12 @@ func (dc *Client) create(key string, irequest fc.IntegrationRequest) *generic.Ge refConf.Retries = irequest.DubboBackendConfig.Retries } + if dc.dubboProxyConfig.Timeout != nil { + refConf.RequestTimeout = dc.dubboProxyConfig.Timeout.RequestTimeoutStr + } else { + refConf.RequestTimeout = cst.DefaultReqTimeout.String() + } + logger.Debugf("[dubbo-go-pixiu] client dubbo timeout val %v", refConf.RequestTimeout) dc.lock.Lock() defer dc.lock.Unlock() diff --git a/pkg/client/http/http.go b/pkg/client/http/http.go index b16989f40..32eb54262 100644 --- a/pkg/client/http/http.go +++ b/pkg/client/http/http.go @@ -22,7 +22,6 @@ import ( "net/url" "strings" "sync" - "time" ) import ( @@ -107,7 +106,7 @@ func (dc *Client) Call(req *client.Request) (resp interface{}, err error) { newReq, _ := http.NewRequest(req.IngressRequest.Method, targetURL, params.Body) newReq.Header = params.Header - httpClient := &http.Client{Timeout: 5 * time.Second} + httpClient := &http.Client{Timeout: req.Timeout} tr := otel.Tracer(traceNameHTTPClient) _, span := tr.Start(req.Context, "HTTP "+newReq.Method, trace.WithSpanKind(trace.SpanKindClient)) diff --git a/pkg/client/mq/config.go b/pkg/client/mq/config.go index c9e7d3d68..5cf0a32b9 100644 --- a/pkg/client/mq/config.go +++ b/pkg/client/mq/config.go @@ -40,10 +40,11 @@ type ( } KafkaProducerConfig struct { - Brokers []string `yaml:"brokers" json:"brokers"` - ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"` - Metadata Metadata `yaml:"metadata" json:"metadata"` - Producer Producer `yaml:"producer" json:"producer"` + Brokers []string `yaml:"brokers" json:"brokers"` + ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"` + Metadata Metadata `yaml:"metadata" json:"metadata"` + Producer Producer `yaml:"producer" json:"producer"` + Timeout time.Duration `yaml:"timeout" json:"timeout"` } Metadata struct { diff --git a/pkg/client/mq/kafka_facade.go b/pkg/client/mq/kafka_facade.go index 591d82ea7..b4375756e 100644 --- a/pkg/client/mq/kafka_facade.go +++ b/pkg/client/mq/kafka_facade.go @@ -212,6 +212,7 @@ func NewKafkaProviderFacade(config KafkaProducerConfig) (*KafkaProducerFacade, e c.Metadata.Retry.Max = config.Metadata.Retry.Max c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff c.Producer.MaxMessageBytes = config.Producer.MaxMessageBytes + c.Producer.Timeout = config.Timeout if config.ProtocolVersion != "" { version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) if err != nil { diff --git a/pkg/client/mq/mq.go b/pkg/client/mq/mq.go index f08e6e8de..3355640d6 100644 --- a/pkg/client/mq/mq.go +++ b/pkg/client/mq/mq.go @@ -59,6 +59,7 @@ func NewMQClient(config Config) (*Client, error) { ctx := context.Background() switch config.MqType { case constant.MQTypeKafka: + config.KafkaProducerConfig.Timeout = config.Timeout pf, err := NewKafkaProviderFacade(config.KafkaProducerConfig) if err != nil { return nil, err @@ -125,7 +126,9 @@ func (c Client) Call(req *client.Request) (res interface{}, err error) { consumerFacadeMap.Store(cReq.ConsumerGroup, facade) if f, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); ok { cf := f.(ConsumerFacade) - err = cf.Subscribe(c.ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup)) + ctx, cancel := context.WithTimeout(c.ctx, req.Timeout) + defer cancel() + err = cf.Subscribe(ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup)) if err != nil { facade.Stop() consumerFacadeMap.Delete(cReq.ConsumerGroup) diff --git a/pkg/client/request.go b/pkg/client/request.go index 39f01429b..b4b198216 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -20,6 +20,7 @@ package client import ( "context" "net/http" + "time" ) import ( @@ -32,6 +33,7 @@ type Request struct { Context context.Context IngressRequest *http.Request API router.API + Timeout time.Duration } // NewReq create a request diff --git a/pkg/client/triple/triple.go b/pkg/client/triple/triple.go index d540fed5b..e25de88e7 100644 --- a/pkg/client/triple/triple.go +++ b/pkg/client/triple/triple.go @@ -23,6 +23,7 @@ import ( "net/url" "strings" "sync" + "time" ) import ( @@ -89,7 +90,10 @@ func (dc *Client) Call(req *client.Request) (res interface{}, err error) { } meta := make(map[string][]string) reqData, _ := ioutil.ReadAll(req.IngressRequest.Body) - call, err := p.Call(context.Background(), req.API.Method.IntegrationRequest.Interface, req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta)) + ctx, cancel := context.WithTimeout(context.Background(), req.Timeout) + defer cancel() + time.Sleep(100 * time.Nanosecond) + call, err := p.Call(ctx, req.API.Method.IntegrationRequest.Interface, req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta)) if err != nil { return "", errors.Errorf("call triple server error = %s", err) } diff --git a/pkg/common/constant/filter.go b/pkg/common/constant/filter.go index d818644db..3de0419f9 100644 --- a/pkg/common/constant/filter.go +++ b/pkg/common/constant/filter.go @@ -17,6 +17,11 @@ package constant +import ( + "fmt" + "time" +) + var ( Default403Body = []byte("403 for bidden") Default404Body = []byte("404 page not found") @@ -40,4 +45,19 @@ const ( LogDataBuffer = 5000 // console Console = "console" + + DefaultReqTimeout = 10 * time.Second ) + +func ResolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration { + fmt.Printf("timeout parse %s : %d", currentV, defaultV) + if currentV == "" { + return defaultV + } else { + if duration, err := time.ParseDuration(currentV); err != nil { + return defaultV + } else { + return duration + } + } +} diff --git a/pkg/common/grpc/manager.go b/pkg/common/grpc/manager.go index bcd3a2ed2..d5cc01db7 100644 --- a/pkg/common/grpc/manager.go +++ b/pkg/common/grpc/manager.go @@ -78,8 +78,11 @@ func (gcm *GrpcConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp gcm.writeStatus(w, status.New(codes.Unknown, "can't find endpoint in cluster")) return } - - newReq := r.Clone(context.Background()) + ctx := context.Background() + // timeout + ctx, cancel := context.WithTimeout(ctx, gcm.config.Timeout) + defer cancel() + newReq := r.Clone(ctx) newReq.URL.Scheme = "http" newReq.URL.Host = endpoint.Address.GetAddress() diff --git a/pkg/common/http/manager.go b/pkg/common/http/manager.go index eb7e66b2b..fc977d27c 100644 --- a/pkg/common/http/manager.go +++ b/pkg/common/http/manager.go @@ -85,7 +85,7 @@ func (hcm *HttpConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp hc.Writer = w hc.Request = r hc.Reset() - + hc.Timeout = hcm.config.Timeout err := hcm.Handle(hc) if err != nil { logger.Errorf("ServeHTTP %v", err) diff --git a/pkg/context/http/context.go b/pkg/context/http/context.go index 4b87123df..544c4aef7 100644 --- a/pkg/context/http/context.go +++ b/pkg/context/http/context.go @@ -209,7 +209,9 @@ func (hc *HttpContext) LocalReply() bool { // API sets the API to http context func (hc *HttpContext) API(api router.API) { - hc.Timeout = api.Timeout + if hc.Timeout > api.Timeout { + hc.Timeout = api.Timeout + } hc.Api = &api } diff --git a/pkg/filter/event/event.go b/pkg/filter/event/event.go index 15355f5f2..c6516e77d 100644 --- a/pkg/filter/event/event.go +++ b/pkg/filter/event/event.go @@ -70,8 +70,10 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi } func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { + f.cfg.Timeout = ctx.Timeout mqClient := mq.NewSingletonMQClient(*f.cfg) req := client.NewReq(ctx.Request.Context(), ctx.Request, *ctx.GetAPI()) + req.Timeout = ctx.Timeout resp, err := mqClient.Call(req) if err != nil { logger.Errorf("[dubbo-go-pixiu] event client call err:%v!", err) diff --git a/pkg/filter/http/grpcproxy/grpc.go b/pkg/filter/http/grpcproxy/grpc.go index 398da1666..bbb373c57 100644 --- a/pkg/filter/http/grpcproxy/grpc.go +++ b/pkg/filter/http/grpcproxy/grpc.go @@ -26,6 +26,7 @@ import ( stdHttp "net/http" "strings" "sync" + "time" ) import ( @@ -111,7 +112,8 @@ type ( Config struct { DescriptorSourceStrategy DescriptorSourceStrategy `yaml:"descriptor_source_strategy" json:"descriptor_source_strategy" default:"auto"` Path string `yaml:"path" json:"path"` - Rules []*Rule `yaml:"rules" json:"rules"` //nolint + Rules []*Rule `yaml:"rules" json:"rules"` //nolint + Timeout time.Duration `yaml:"timeout" json:"timeout"` //nolint } Rule struct { @@ -191,7 +193,9 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte("cluster not exists")) return filter.Stop } - + // timeout for Dial and Invoke + ctx, cancel := context.WithTimeout(c.Ctx, c.Timeout) + defer cancel() ep := e.Address.GetAddress() p, ok := f.pools[strings.Join([]string{re.Cluster, ep}, ".")] @@ -202,7 +206,7 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { clientConn, ok = p.Get().(*grpc.ClientConn) if !ok || clientConn == nil { // TODO(Kenway): Support Credential and TLS - clientConn, err = grpc.DialContext(c.Ctx, ep, grpc.WithTransportCredentials(insecure.NewCredentials())) + clientConn, err = grpc.DialContext(ctx, ep, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil || clientConn == nil { logger.Errorf("%s err {failed to connect to grpc service provider}", loggerHeader) c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte((fmt.Sprintf("%s", err)))) @@ -211,14 +215,14 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { } // get DescriptorSource, contain file and reflection - source, err := f.descriptor.getDescriptorSource(context.WithValue(c.Ctx, ct.ContextKey(GrpcClientConnKey), clientConn), f.cfg) + source, err := f.descriptor.getDescriptorSource(context.WithValue(ctx, ct.ContextKey(GrpcClientConnKey), clientConn), f.cfg) if err != nil { logger.Errorf("%s err %s : %s ", loggerHeader, "get desc source fail", err) c.SendLocalReply(stdHttp.StatusInternalServerError, []byte("service not config proto file or the server not support reflection API")) return filter.Stop } //put DescriptorSource concurrent, del if no need - c.Ctx = context.WithValue(c.Ctx, ct.ContextKey(DescriptorSourceKey), source) + ctx = context.WithValue(ctx, ct.ContextKey(DescriptorSourceKey), source) dscp, err := source.FindSymbol(svc) if err != nil { @@ -257,7 +261,7 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus { // metadata in grpc has the same feature in http md := mapHeaderToMetadata(c.AllHeaders()) - ctx := metadata.NewOutgoingContext(c.Ctx, md) + ctx = metadata.NewOutgoingContext(ctx, md) md = metadata.MD{} t := metadata.MD{} diff --git a/pkg/filter/http/httpproxy/routerfilter.go b/pkg/filter/http/httpproxy/routerfilter.go index 3767440d3..6331f6c69 100644 --- a/pkg/filter/http/httpproxy/routerfilter.go +++ b/pkg/filter/http/httpproxy/routerfilter.go @@ -119,7 +119,12 @@ func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus { } req.Header = r.Header - resp, err := (&http3.Client{Transport: f.transport}).Do(req) + cli := &http3.Client{ + Transport: f.transport, + Timeout: hc.Timeout, + } + + resp, err := cli.Do(req) if err != nil { panic(err) } diff --git a/pkg/filter/http/remote/call.go b/pkg/filter/http/remote/call.go index d40fcc021..a66675a6a 100644 --- a/pkg/filter/http/remote/call.go +++ b/pkg/filter/http/remote/call.go @@ -43,9 +43,9 @@ import ( ) const ( - open = iota - close - all + OPEN = iota + CLOSE + ALL ) const ( @@ -99,7 +99,7 @@ func (factory *FilterFactory) Apply() error { } level := mockLevel(mock) if level < 0 || level > 2 { - level = close + level = CLOSE } factory.conf.Level = level // must init it at apply function @@ -115,7 +115,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c } func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus { - if f.conf.Dpc.AutoResolve { if err := f.resolve(c); err != nil { c.SendLocalReply(http.StatusInternalServerError, []byte(fmt.Sprintf("auto resolve err: %s", err))) @@ -125,7 +124,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus { api := c.GetAPI() - if (f.conf.Level == open && api.Mock) || (f.conf.Level == all) { + if (f.conf.Level == OPEN && api.Mock) || (f.conf.Level == ALL) { c.SourceResp = &contexthttp.ErrResponse{ Message: "mock success", } @@ -140,6 +139,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus { } req := client.NewReq(c.Request.Context(), c.Request, *api) + req.Timeout = c.Timeout resp, err := cli.Call(req) if err != nil { logger.Errorf("[dubbo-go-pixiu] client call err:%v!", err) diff --git a/pkg/filter/network/dubboproxy/plugin.go b/pkg/filter/network/dubboproxy/plugin.go index cb033b2ba..4a69c0b4a 100644 --- a/pkg/filter/network/dubboproxy/plugin.go +++ b/pkg/filter/network/dubboproxy/plugin.go @@ -44,6 +44,7 @@ func (p *Plugin) Kind() string { // CreateFilter create dubbo networkfilter func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) { hcmc, ok := config.(*model.DubboProxyConnectionManagerConfig) + hcmc.Timeout = constant.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) if !ok { panic("CreateFilter occur some exception for the type is not suitable one.") } diff --git a/pkg/filter/network/grpcconnectionmanager/plugin.go b/pkg/filter/network/grpcconnectionmanager/plugin.go index c6002cf88..0976678b0 100644 --- a/pkg/filter/network/grpcconnectionmanager/plugin.go +++ b/pkg/filter/network/grpcconnectionmanager/plugin.go @@ -44,6 +44,7 @@ func (p *Plugin) Kind() string { // CreateFilter create grpc network filter func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) { hcmc := config.(*model.GRPCConnectionManagerConfig) + hcmc.Timeout = constant.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) return grpc.CreateGrpcConnectionManager(hcmc), nil } diff --git a/pkg/filter/network/httpconnectionmanager/plugin.go b/pkg/filter/network/httpconnectionmanager/plugin.go index ce90f374c..e319feb0a 100644 --- a/pkg/filter/network/httpconnectionmanager/plugin.go +++ b/pkg/filter/network/httpconnectionmanager/plugin.go @@ -44,6 +44,7 @@ func (p *Plugin) Kind() string { // CreateFilter create http network filter func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) { hcmc := config.(*model.HttpConnectionManagerConfig) + hcmc.Timeout = constant.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) return http.CreateHttpConnectionManager(hcmc), nil } diff --git a/pkg/filter/seata/transaction.go b/pkg/filter/seata/transaction.go index 60397db5b..90e01f3ec 100644 --- a/pkg/filter/seata/transaction.go +++ b/pkg/filter/seata/transaction.go @@ -130,8 +130,9 @@ func (f *Filter) handleHttp1BranchRegister(ctx *http.HttpContext, tccResource *T ctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("encode request context failed, %v", err))) return false } - - branchID, err := f.branchRegister(ctx.Ctx, xid, tccResource.PrepareRequestPath, apis.TCC, data, "") + Ctx, cancel := context.WithTimeout(ctx.Ctx, ctx.Timeout) + defer cancel() + branchID, err := f.branchRegister(Ctx, xid, tccResource.PrepareRequestPath, apis.TCC, data, "") if err != nil { logger.Errorf("branch transaction register failed, xid: %s, err: %v", xid, err) ctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("branch transaction register failed, %v", err))) diff --git a/pkg/model/http.go b/pkg/model/http.go index 1b016066a..6f0cb8a45 100644 --- a/pkg/model/http.go +++ b/pkg/model/http.go @@ -19,6 +19,7 @@ package model import ( "github.com/mitchellh/mapstructure" + "time" ) import ( @@ -32,17 +33,23 @@ type HttpConnectionManagerConfig struct { ServerName string `yaml:"server_name" json:"server_name" mapstructure:"server_name"` IdleTimeoutStr string `yaml:"idle_timeout" json:"idle_timeout" mapstructure:"idle_timeout"` GenerateRequestID bool `yaml:"generate_request_id" json:"generate_request_id" mapstructure:"generate_request_id"` + TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` + Timeout time.Duration `yaml:"timeoutDuration" json:"timeoutDuration" mapstructure:"timeoutDuration"` } // GRPCConnectionManagerConfig type GRPCConnectionManagerConfig struct { RouteConfig RouteConfiguration `yaml:"route_config" json:"route_config" mapstructure:"route_config"` + TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` + Timeout time.Duration `yaml:"timeoutDuration" json:"timeoutDuration" mapstructure:"timeoutDuration"` } // DubboProxyConnectionManagerConfig type DubboProxyConnectionManagerConfig struct { RouteConfig RouteConfiguration `yaml:"route_config" json:"route_config" mapstructure:"route_config"` DubboFilters []*DubboFilter `yaml:"dubbo_filters" json:"dubbo_filters" mapstructure:"dubbo_filters"` + TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` + Timeout time.Duration `yaml:"timeoutDuration" json:"timeoutDuration" mapstructure:"timeoutDuration"` } // HTTPFilter http filter From 0b50853daa4fc90ba98a6900788d1b851d22a3cc Mon Sep 17 00:00:00 2001 From: CSWYF3634076 <3634076@qq.com> Date: Wed, 14 Sep 2022 11:57:53 +0800 Subject: [PATCH 07/10] feat : fix conflict --- pixiu/pkg/client/triple/triple.go | 1 - pixiu/pkg/common/constant/filter.go | 14 -------- .../pkg/common/util/stringutil/stringutil.go | 13 +++++++ pixiu/pkg/filter/network/dubboproxy/plugin.go | 3 +- .../network/grpcconnectionmanager/plugin.go | 3 +- .../network/httpconnectionmanager/plugin.go | 3 +- pixiu/pkg/filter/seata/transaction.go | 34 +++++++++---------- 7 files changed, 36 insertions(+), 35 deletions(-) diff --git a/pixiu/pkg/client/triple/triple.go b/pixiu/pkg/client/triple/triple.go index ac627d2a6..b64ae8969 100644 --- a/pixiu/pkg/client/triple/triple.go +++ b/pixiu/pkg/client/triple/triple.go @@ -23,7 +23,6 @@ import ( "net/url" "strings" "sync" - "time" ) import ( diff --git a/pixiu/pkg/common/constant/filter.go b/pixiu/pkg/common/constant/filter.go index 3de0419f9..73b5badd4 100644 --- a/pixiu/pkg/common/constant/filter.go +++ b/pixiu/pkg/common/constant/filter.go @@ -18,7 +18,6 @@ package constant import ( - "fmt" "time" ) @@ -48,16 +47,3 @@ const ( DefaultReqTimeout = 10 * time.Second ) - -func ResolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration { - fmt.Printf("timeout parse %s : %d", currentV, defaultV) - if currentV == "" { - return defaultV - } else { - if duration, err := time.ParseDuration(currentV); err != nil { - return defaultV - } else { - return duration - } - } -} diff --git a/pixiu/pkg/common/util/stringutil/stringutil.go b/pixiu/pkg/common/util/stringutil/stringutil.go index 22a266260..a7b19a2ab 100644 --- a/pixiu/pkg/common/util/stringutil/stringutil.go +++ b/pixiu/pkg/common/util/stringutil/stringutil.go @@ -21,6 +21,7 @@ import ( perrors "github.com/pkg/errors" "net" "strings" + "time" ) import ( @@ -106,3 +107,15 @@ func GetIPAndPort(address string) ([]*net.TCPAddr, error) { return tcpAddr, nil } + +func ResolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration { + if currentV == "" { + return defaultV + } else { + if duration, err := time.ParseDuration(currentV); err != nil { + return defaultV + } else { + return duration + } + } +} diff --git a/pixiu/pkg/filter/network/dubboproxy/plugin.go b/pixiu/pkg/filter/network/dubboproxy/plugin.go index f87fac0bb..a003e5dbf 100644 --- a/pixiu/pkg/filter/network/dubboproxy/plugin.go +++ b/pixiu/pkg/filter/network/dubboproxy/plugin.go @@ -20,6 +20,7 @@ package dubboproxy import ( "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/extension/filter" + "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/util/stringutil" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model" ) @@ -44,7 +45,7 @@ func (p *Plugin) Kind() string { // CreateFilter create dubbo networkfilter func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) { hcmc, ok := config.(*model.DubboProxyConnectionManagerConfig) - hcmc.Timeout = constant.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) + hcmc.Timeout = stringutil.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) if !ok { panic("CreateFilter occur some exception for the type is not suitable one.") } diff --git a/pixiu/pkg/filter/network/grpcconnectionmanager/plugin.go b/pixiu/pkg/filter/network/grpcconnectionmanager/plugin.go index 8331108cf..20ce5689a 100644 --- a/pixiu/pkg/filter/network/grpcconnectionmanager/plugin.go +++ b/pixiu/pkg/filter/network/grpcconnectionmanager/plugin.go @@ -21,6 +21,7 @@ import ( "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/extension/filter" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/grpc" + "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/util/stringutil" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model" ) @@ -44,7 +45,7 @@ func (p *Plugin) Kind() string { // CreateFilter create grpc network filter func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) { hcmc := config.(*model.GRPCConnectionManagerConfig) - hcmc.Timeout = constant.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) + hcmc.Timeout = stringutil.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) return grpc.CreateGrpcConnectionManager(hcmc), nil } diff --git a/pixiu/pkg/filter/network/httpconnectionmanager/plugin.go b/pixiu/pkg/filter/network/httpconnectionmanager/plugin.go index 66ac72307..3c0758ef4 100644 --- a/pixiu/pkg/filter/network/httpconnectionmanager/plugin.go +++ b/pixiu/pkg/filter/network/httpconnectionmanager/plugin.go @@ -21,6 +21,7 @@ import ( "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/extension/filter" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/http" + "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/util/stringutil" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model" ) @@ -44,7 +45,7 @@ func (p *Plugin) Kind() string { // CreateFilter create http network filter func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) { hcmc := config.(*model.HttpConnectionManagerConfig) - hcmc.Timeout = constant.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) + hcmc.Timeout = stringutil.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout) return http.CreateHttpConnectionManager(hcmc), nil } diff --git a/pixiu/pkg/filter/seata/transaction.go b/pixiu/pkg/filter/seata/transaction.go index da13707e7..6b049f529 100644 --- a/pixiu/pkg/filter/seata/transaction.go +++ b/pixiu/pkg/filter/seata/transaction.go @@ -78,31 +78,31 @@ func (f *Filter) handleHttp1GlobalEnd(ctx *http.HttpContext) { } // handleHttp1BranchRegister return bool, represent whether continue -func (f *Filter) handleHttp1BranchRegister(ctx *http.HttpContext, tccResource *TCCResource) bool { - xid := ctx.Request.Header.Get(XID) +func (f *Filter) handleHttp1BranchRegister(hctx *http.HttpContext, tccResource *TCCResource) bool { + xid := hctx.Request.Header.Get(XID) if xid == "" { logger.Error("failed to get xid from request header") - ctx.SendLocalReply(netHttp.StatusInternalServerError, []byte("failed to get xid from request header")) + hctx.SendLocalReply(netHttp.StatusInternalServerError, []byte("failed to get xid from request header")) return false } - bodyBytes, err := io.ReadAll(ctx.Request.Body) + bodyBytes, err := io.ReadAll(hctx.Request.Body) if err != nil { logger.Error(err) - ctx.SendLocalReply(netHttp.StatusInternalServerError, []byte("failed to retrieve request body")) + hctx.SendLocalReply(netHttp.StatusInternalServerError, []byte("failed to retrieve request body")) return false } requestContext := &RequestContext{ ActionContext: make(map[string]string), - Headers: ctx.Request.Header.Clone(), + Headers: hctx.Request.Header.Clone(), Body: bodyBytes, - Trailers: ctx.Request.Trailer.Clone(), + Trailers: hctx.Request.Trailer.Clone(), } // Once read body, the body sawEOF will be true, then send request will have no body - ctx.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + hctx.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) - rEntry := ctx.GetRouteEntry() + rEntry := hctx.GetRouteEntry() if rEntry == nil { panic("no route entry") } @@ -112,14 +112,14 @@ func (f *Filter) handleHttp1BranchRegister(ctx *http.HttpContext, tccResource *T clusterManager := server.GetClusterManager() endpoint := clusterManager.PickEndpoint(clusterName) if endpoint == nil { - ctx.SendLocalReply(netHttp.StatusServiceUnavailable, []byte("cluster not found endpoint")) + hctx.SendLocalReply(netHttp.StatusServiceUnavailable, []byte("cluster not found endpoint")) return false } requestContext.ActionContext[VarHost] = endpoint.Address.GetAddress() requestContext.ActionContext[CommitRequestPath] = tccResource.CommitRequestPath requestContext.ActionContext[RollbackRequestPath] = tccResource.RollbackRequestPath - queryString := ctx.Request.URL.RawQuery + queryString := hctx.Request.URL.RawQuery if queryString != "" { requestContext.ActionContext[VarQueryString] = queryString } @@ -127,19 +127,19 @@ func (f *Filter) handleHttp1BranchRegister(ctx *http.HttpContext, tccResource *T data, err := requestContext.Encode() if err != nil { logger.Errorf("encode request context failed, request context: %v, err: %v", requestContext, err) - ctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("encode request context failed, %v", err))) + hctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("encode request context failed, %v", err))) return false } - Ctx, cancel := context.WithTimeout(ctx.Ctx, ctx.Timeout) + ctx, cancel := context.WithTimeout(hctx.Ctx, hctx.Timeout) defer cancel() - branchID, err := f.branchRegister(Ctx, xid, tccResource.PrepareRequestPath, apis.TCC, data, "") + branchID, err := f.branchRegister(ctx, xid, tccResource.PrepareRequestPath, apis.TCC, data, "") if err != nil { logger.Errorf("branch transaction register failed, xid: %s, err: %v", xid, err) - ctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("branch transaction register failed, %v", err))) + hctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("branch transaction register failed, %v", err))) return false } - ctx.Params[XID] = xid - ctx.Params[BranchID] = strconv.FormatInt(branchID, 10) + hctx.Params[XID] = xid + hctx.Params[BranchID] = strconv.FormatInt(branchID, 10) return true } From b44077ed05638172253083e653c3ebc1439a3bc2 Mon Sep 17 00:00:00 2001 From: CSWYF3634076 <3634076@qq.com> Date: Sat, 17 Sep 2022 01:00:50 +0800 Subject: [PATCH 08/10] feat : fix ut --- pixiu/pkg/config/conf_test.yaml | 1 + pixiu/pkg/config/config_load_test.go | 1 + pixiu/pkg/model/http.go | 6 +++--- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pixiu/pkg/config/conf_test.yaml b/pixiu/pkg/config/conf_test.yaml index 170906c65..eb3970cb0 100644 --- a/pixiu/pkg/config/conf_test.yaml +++ b/pixiu/pkg/config/conf_test.yaml @@ -51,6 +51,7 @@ static_resources: config: server_name: "test_http_dubbo" generate_request_id: false + timeout: "10s" config: idle_timeout: 5s read_timeout: 5s diff --git a/pixiu/pkg/config/config_load_test.go b/pixiu/pkg/config/config_load_test.go index 890cb51e1..a8527bcc7 100644 --- a/pixiu/pkg/config/config_load_test.go +++ b/pixiu/pkg/config/config_load_test.go @@ -75,6 +75,7 @@ func TestMain(m *testing.M) { ServerName: "test_http_dubbo", GenerateRequestID: false, IdleTimeoutStr: "100", + TimeoutStr: "10s", } var inInterface map[string]interface{} diff --git a/pixiu/pkg/model/http.go b/pixiu/pkg/model/http.go index 3c2d224ac..c0617e536 100644 --- a/pixiu/pkg/model/http.go +++ b/pixiu/pkg/model/http.go @@ -34,14 +34,14 @@ type HttpConnectionManagerConfig struct { IdleTimeoutStr string `yaml:"idle_timeout" json:"idle_timeout" mapstructure:"idle_timeout"` GenerateRequestID bool `yaml:"generate_request_id" json:"generate_request_id" mapstructure:"generate_request_id"` TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` - Timeout time.Duration `yaml:"timeoutDuration" json:"timeoutDuration" mapstructure:"timeoutDuration"` + Timeout time.Duration `yaml:",omitempty" json:",omitempty" mapstructure:",omitempty"` } // GRPCConnectionManagerConfig type GRPCConnectionManagerConfig struct { RouteConfig RouteConfiguration `yaml:"route_config" json:"route_config" mapstructure:"route_config"` TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` - Timeout time.Duration `yaml:"timeoutDuration" json:"timeoutDuration" mapstructure:"timeoutDuration"` + Timeout time.Duration `yaml:",omitempty" json:",omitempty" mapstructure:",omitempty"` } // DubboProxyConnectionManagerConfig @@ -49,7 +49,7 @@ type DubboProxyConnectionManagerConfig struct { RouteConfig RouteConfiguration `yaml:"route_config" json:"route_config" mapstructure:"route_config"` DubboFilters []*DubboFilter `yaml:"dubbo_filters" json:"dubbo_filters" mapstructure:"dubbo_filters"` TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` - Timeout time.Duration `yaml:"timeoutDuration" json:"timeoutDuration" mapstructure:"timeoutDuration"` + Timeout time.Duration `yaml:",omitempty" json:",omitempty" mapstructure:",omitempty"` } // HTTPFilter http filter From d8a1a05440ae91f5d0a7c83de763fbff946a8642 Mon Sep 17 00:00:00 2001 From: CSWYF3634076 <3634076@qq.com> Date: Tue, 20 Sep 2022 16:03:16 +0800 Subject: [PATCH 09/10] feat : modify struct tag --- pixiu/pkg/model/http.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pixiu/pkg/model/http.go b/pixiu/pkg/model/http.go index c0617e536..03dbda623 100644 --- a/pixiu/pkg/model/http.go +++ b/pixiu/pkg/model/http.go @@ -34,14 +34,14 @@ type HttpConnectionManagerConfig struct { IdleTimeoutStr string `yaml:"idle_timeout" json:"idle_timeout" mapstructure:"idle_timeout"` GenerateRequestID bool `yaml:"generate_request_id" json:"generate_request_id" mapstructure:"generate_request_id"` TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` - Timeout time.Duration `yaml:",omitempty" json:",omitempty" mapstructure:",omitempty"` + Timeout time.Duration `yaml:"-" json:"-" mapstructure:"-"` } // GRPCConnectionManagerConfig type GRPCConnectionManagerConfig struct { RouteConfig RouteConfiguration `yaml:"route_config" json:"route_config" mapstructure:"route_config"` TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` - Timeout time.Duration `yaml:",omitempty" json:",omitempty" mapstructure:",omitempty"` + Timeout time.Duration `yaml:"-" json:"-" mapstructure:"-"` } // DubboProxyConnectionManagerConfig @@ -49,7 +49,7 @@ type DubboProxyConnectionManagerConfig struct { RouteConfig RouteConfiguration `yaml:"route_config" json:"route_config" mapstructure:"route_config"` DubboFilters []*DubboFilter `yaml:"dubbo_filters" json:"dubbo_filters" mapstructure:"dubbo_filters"` TimeoutStr string `yaml:"timeout" json:"timeout" mapstructure:"timeout"` - Timeout time.Duration `yaml:",omitempty" json:",omitempty" mapstructure:",omitempty"` + Timeout time.Duration `yaml:"-" json:"-" mapstructure:"-"` } // HTTPFilter http filter From 3e78766e82ca103fea0b8b4b6153fd094d2e75c8 Mon Sep 17 00:00:00 2001 From: CSWYF3634076 <3634076@qq.com> Date: Mon, 26 Sep 2022 18:00:16 +0800 Subject: [PATCH 10/10] feat : modify struct tag --- pixiu/pkg/filter/http/dubboproxy/dubbo.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pixiu/pkg/filter/http/dubboproxy/dubbo.go b/pixiu/pkg/filter/http/dubboproxy/dubbo.go index 04b98cf07..760471e82 100644 --- a/pixiu/pkg/filter/http/dubboproxy/dubbo.go +++ b/pixiu/pkg/filter/http/dubboproxy/dubbo.go @@ -213,6 +213,7 @@ func (f *Filter) Decode(hc *pixiuHttp.HttpContext) filter.FilterStatus { hc.SendLocalReply(http.StatusServiceUnavailable, bt) return filter.Stop } + var resp interface{} invoc.SetReply(&resp)