Skip to content

Commit

Permalink
[ASoC 2022] Optimization of Pixiu timeout feature (#475)
Browse files Browse the repository at this point in the history
* timeout http & grpc & dubbo v1.0

* timeout triple 和 其他

* feat:timeout optimization

* feat:timeout dubbo

* feat:timeout dubbo

* feat : timeout

* feat : fix conflict

* feat : fix ut

* feat : modify struct tag

* feat : modify struct tag
  • Loading branch information
CSWYF3634076 authored Oct 4, 2022
1 parent c270f4b commit 36b0bce
Show file tree
Hide file tree
Showing 25 changed files with 110 additions and 43 deletions.
2 changes: 1 addition & 1 deletion pixiu/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Client interface {
// Apply to init client
Apply() error

// Close close the clinet
// Close close the client
Close() error

// Call invoke the downstream service.
Expand Down
7 changes: 7 additions & 0 deletions pixiu/pkg/client/dubbo/dubbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (

import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/client"
cst "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
)
Expand Down Expand Up @@ -314,6 +315,12 @@ func (dc *Client) create(key string, irequest fc.IntegrationRequest) *generic.Ge
refConf.Retries = irequest.DubboBackendConfig.Retries
}

if dc.dubboProxyConfig.Timeout != nil {
refConf.RequestTimeout = dc.dubboProxyConfig.Timeout.RequestTimeoutStr
} else {
refConf.RequestTimeout = cst.DefaultReqTimeout.String()
}
logger.Debugf("[dubbo-go-pixiu] client dubbo timeout val %v", refConf.RequestTimeout)
dc.lock.Lock()
defer dc.lock.Unlock()

Expand Down
3 changes: 1 addition & 2 deletions pixiu/pkg/client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net/url"
"strings"
"sync"
"time"
)

import (
Expand Down Expand Up @@ -107,7 +106,7 @@ func (dc *Client) Call(req *client.Request) (resp interface{}, err error) {

newReq, _ := http.NewRequest(req.IngressRequest.Method, targetURL, params.Body)
newReq.Header = params.Header
httpClient := &http.Client{Timeout: 5 * time.Second}
httpClient := &http.Client{Timeout: req.Timeout}

tr := otel.Tracer(traceNameHTTPClient)
_, span := tr.Start(req.Context, "HTTP "+newReq.Method, trace.WithSpanKind(trace.SpanKindClient))
Expand Down
9 changes: 5 additions & 4 deletions pixiu/pkg/client/mq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type (
}

KafkaProducerConfig struct {
Brokers []string `yaml:"brokers" json:"brokers"`
ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"`
Metadata Metadata `yaml:"metadata" json:"metadata"`
Producer Producer `yaml:"producer" json:"producer"`
Brokers []string `yaml:"brokers" json:"brokers"`
ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"`
Metadata Metadata `yaml:"metadata" json:"metadata"`
Producer Producer `yaml:"producer" json:"producer"`
Timeout time.Duration `yaml:"timeout" json:"timeout"`
}

Metadata struct {
Expand Down
1 change: 1 addition & 0 deletions pixiu/pkg/client/mq/kafka_facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func NewKafkaProviderFacade(config KafkaProducerConfig) (*KafkaProducerFacade, e
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
c.Producer.MaxMessageBytes = config.Producer.MaxMessageBytes
c.Producer.Timeout = config.Timeout
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pixiu/pkg/client/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewMQClient(config Config) (*Client, error) {
ctx := context.Background()
switch config.MqType {
case constant.MQTypeKafka:
config.KafkaProducerConfig.Timeout = config.Timeout
pf, err := NewKafkaProviderFacade(config.KafkaProducerConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -125,7 +126,9 @@ func (c Client) Call(req *client.Request) (res interface{}, err error) {
consumerFacadeMap.Store(cReq.ConsumerGroup, facade)
if f, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); ok {
cf := f.(ConsumerFacade)
err = cf.Subscribe(c.ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup))
ctx, cancel := context.WithTimeout(c.ctx, req.Timeout)
defer cancel()
err = cf.Subscribe(ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup))
if err != nil {
facade.Stop()
consumerFacadeMap.Delete(cReq.ConsumerGroup)
Expand Down
2 changes: 2 additions & 0 deletions pixiu/pkg/client/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package client
import (
"context"
"net/http"
"time"
)

import (
Expand All @@ -32,6 +33,7 @@ type Request struct {
Context context.Context
IngressRequest *http.Request
API router.API
Timeout time.Duration
}

// NewReq create a request
Expand Down
4 changes: 3 additions & 1 deletion pixiu/pkg/client/triple/triple.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ func (dc *Client) Call(req *client.Request) (res interface{}, err error) {
}
meta := make(map[string][]string)
reqData, _ := io.ReadAll(req.IngressRequest.Body)
call, err := p.Call(context.Background(), req.API.Method.IntegrationRequest.Interface, req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta))
ctx, cancel := context.WithTimeout(context.Background(), req.Timeout)
defer cancel()
call, err := p.Call(ctx, req.API.Method.IntegrationRequest.Interface, req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta))
if err != nil {
return "", errors.Errorf("call triple server error = %s", err)
}
Expand Down
6 changes: 6 additions & 0 deletions pixiu/pkg/common/constant/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package constant

import (
"time"
)

var (
Default403Body = []byte("403 for bidden")
Default404Body = []byte("404 page not found")
Expand All @@ -40,4 +44,6 @@ const (
LogDataBuffer = 5000
// console
Console = "console"

DefaultReqTimeout = 10 * time.Second
)
7 changes: 5 additions & 2 deletions pixiu/pkg/common/grpc/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ func (gcm *GrpcConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp
gcm.writeStatus(w, status.New(codes.Unknown, "can't find endpoint in cluster"))
return
}

newReq := r.Clone(context.Background())
ctx := context.Background()
// timeout
ctx, cancel := context.WithTimeout(ctx, gcm.config.Timeout)
defer cancel()
newReq := r.Clone(ctx)
newReq.URL.Scheme = "http"
newReq.URL.Host = endpoint.Address.GetAddress()

Expand Down
2 changes: 1 addition & 1 deletion pixiu/pkg/common/http/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (hcm *HttpConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp
hc.Writer = w
hc.Request = r
hc.Reset()

hc.Timeout = hcm.config.Timeout
err := hcm.Handle(hc)
if err != nil {
logger.Errorf("ServeHTTP %v", err)
Expand Down
13 changes: 13 additions & 0 deletions pixiu/pkg/common/util/stringutil/stringutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
perrors "github.com/pkg/errors"
"net"
"strings"
"time"
)

import (
Expand Down Expand Up @@ -106,3 +107,15 @@ func GetIPAndPort(address string) ([]*net.TCPAddr, error) {

return tcpAddr, nil
}

func ResolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration {
if currentV == "" {
return defaultV
} else {
if duration, err := time.ParseDuration(currentV); err != nil {
return defaultV
} else {
return duration
}
}
}
1 change: 1 addition & 0 deletions pixiu/pkg/config/conf_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ static_resources:
config:
server_name: "test_http_dubbo"
generate_request_id: false
timeout: "10s"
config:
idle_timeout: 5s
read_timeout: 5s
Expand Down
1 change: 1 addition & 0 deletions pixiu/pkg/config/config_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func TestMain(m *testing.M) {
ServerName: "test_http_dubbo",
GenerateRequestID: false,
IdleTimeoutStr: "100",
TimeoutStr: "10s",
}

var inInterface map[string]interface{}
Expand Down
4 changes: 3 additions & 1 deletion pixiu/pkg/context/http/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ func (hc *HttpContext) LocalReply() bool {

// API sets the API to http context
func (hc *HttpContext) API(api router.API) {
hc.Timeout = api.Timeout
if hc.Timeout > api.Timeout {
hc.Timeout = api.Timeout
}
hc.Api = &api
}

Expand Down
2 changes: 2 additions & 0 deletions pixiu/pkg/filter/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi
}

func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {
f.cfg.Timeout = ctx.Timeout
mqClient := mq.NewSingletonMQClient(*f.cfg)
req := client.NewReq(ctx.Request.Context(), ctx.Request, *ctx.GetAPI())
req.Timeout = ctx.Timeout
resp, err := mqClient.Call(req)
if err != nil {
logger.Errorf("[dubbo-go-pixiu] event client call err:%v!", err)
Expand Down
1 change: 1 addition & 0 deletions pixiu/pkg/filter/http/dubboproxy/dubbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func (f *Filter) Decode(hc *pixiuHttp.HttpContext) filter.FilterStatus {
hc.SendLocalReply(http.StatusServiceUnavailable, bt)
return filter.Stop
}

var resp interface{}
invoc.SetReply(&resp)

Expand Down
16 changes: 10 additions & 6 deletions pixiu/pkg/filter/http/grpcproxy/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
stdHttp "net/http"
"strings"
"sync"
"time"
)

import (
Expand Down Expand Up @@ -110,7 +111,8 @@ type (
Config struct {
DescriptorSourceStrategy DescriptorSourceStrategy `yaml:"descriptor_source_strategy" json:"descriptor_source_strategy" default:"auto"`
Path string `yaml:"path" json:"path"`
Rules []*Rule `yaml:"rules" json:"rules"` //nolint
Rules []*Rule `yaml:"rules" json:"rules"` //nolint
Timeout time.Duration `yaml:"timeout" json:"timeout"` //nolint
}

Rule struct {
Expand Down Expand Up @@ -190,7 +192,9 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte("cluster not exists"))
return filter.Stop
}

// timeout for Dial and Invoke
ctx, cancel := context.WithTimeout(c.Ctx, c.Timeout)
defer cancel()
ep := e.Address.GetAddress()

p, ok := f.pools[strings.Join([]string{re.Cluster, ep}, ".")]
Expand All @@ -201,7 +205,7 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
clientConn, ok = p.Get().(*grpc.ClientConn)
if !ok || clientConn == nil {
// TODO(Kenway): Support Credential and TLS
clientConn, err = grpc.DialContext(c.Ctx, ep, grpc.WithTransportCredentials(insecure.NewCredentials()))
clientConn, err = grpc.DialContext(ctx, ep, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil || clientConn == nil {
logger.Errorf("%s err {failed to connect to grpc service provider}", loggerHeader)
c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte((fmt.Sprintf("%s", err))))
Expand All @@ -210,14 +214,14 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
}

// get DescriptorSource, contain file and reflection
source, err := f.descriptor.getDescriptorSource(context.WithValue(c.Ctx, ct.ContextKey(GrpcClientConnKey), clientConn), f.cfg)
source, err := f.descriptor.getDescriptorSource(context.WithValue(ctx, ct.ContextKey(GrpcClientConnKey), clientConn), f.cfg)
if err != nil {
logger.Errorf("%s err %s : %s ", loggerHeader, "get desc source fail", err)
c.SendLocalReply(stdHttp.StatusInternalServerError, []byte("service not config proto file or the server not support reflection API"))
return filter.Stop
}
//put DescriptorSource concurrent, del if no need
c.Ctx = context.WithValue(c.Ctx, ct.ContextKey(DescriptorSourceKey), source)
ctx = context.WithValue(ctx, ct.ContextKey(DescriptorSourceKey), source)

dscp, err := source.FindSymbol(svc)
if err != nil {
Expand Down Expand Up @@ -256,7 +260,7 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {

// metadata in grpc has the same feature in http
md := mapHeaderToMetadata(c.AllHeaders())
ctx := metadata.NewOutgoingContext(c.Ctx, md)
ctx = metadata.NewOutgoingContext(ctx, md)

md = metadata.MD{}
t := metadata.MD{}
Expand Down
7 changes: 6 additions & 1 deletion pixiu/pkg/filter/http/httpproxy/routerfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus {
}
req.Header = r.Header

resp, err := (&http3.Client{Transport: f.transport}).Do(req)
cli := &http3.Client{
Transport: f.transport,
Timeout: hc.Timeout,
}

resp, err := cli.Do(req)
if err != nil {
panic(err)
}
Expand Down
12 changes: 6 additions & 6 deletions pixiu/pkg/filter/http/remote/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ import (
)

const (
open = iota
close
all
OPEN = iota
CLOSE
ALL
)

const (
Expand Down Expand Up @@ -99,7 +99,7 @@ func (factory *FilterFactory) Apply() error {
}
level := mockLevel(mock)
if level < 0 || level > 2 {
level = close
level = CLOSE
}
factory.conf.Level = level
// must init it at apply function
Expand All @@ -115,7 +115,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c
}

func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus {

if f.conf.Dpc.AutoResolve {
if err := f.resolve(c); err != nil {
c.SendLocalReply(http.StatusInternalServerError, []byte(fmt.Sprintf("auto resolve err: %s", err)))
Expand All @@ -125,7 +124,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus {

api := c.GetAPI()

if (f.conf.Level == open && api.Mock) || (f.conf.Level == all) {
if (f.conf.Level == OPEN && api.Mock) || (f.conf.Level == ALL) {
c.SourceResp = &contexthttp.ErrResponse{
Message: "mock success",
}
Expand All @@ -140,6 +139,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus {
}

req := client.NewReq(c.Request.Context(), c.Request, *api)
req.Timeout = c.Timeout
resp, err := cli.Call(req)
if err != nil {
logger.Errorf("[dubbo-go-pixiu] client call err:%v!", err)
Expand Down
2 changes: 2 additions & 0 deletions pixiu/pkg/filter/network/dubboproxy/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package dubboproxy
import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/extension/filter"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/util/stringutil"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
)

Expand All @@ -44,6 +45,7 @@ func (p *Plugin) Kind() string {
// CreateFilter create dubbo networkfilter
func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) {
hcmc, ok := config.(*model.DubboProxyConnectionManagerConfig)
hcmc.Timeout = stringutil.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout)
if !ok {
panic("CreateFilter occur some exception for the type is not suitable one.")
}
Expand Down
2 changes: 2 additions & 0 deletions pixiu/pkg/filter/network/grpcconnectionmanager/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/extension/filter"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/grpc"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/util/stringutil"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
)

Expand All @@ -44,6 +45,7 @@ func (p *Plugin) Kind() string {
// CreateFilter create grpc network filter
func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) {
hcmc := config.(*model.GRPCConnectionManagerConfig)
hcmc.Timeout = stringutil.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout)
return grpc.CreateGrpcConnectionManager(hcmc), nil
}

Expand Down
Loading

0 comments on commit 36b0bce

Please sign in to comment.