Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASoC 2022] Optimization of Pixiu timeout feature #475

Merged
merged 16 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Client interface {
// Apply to init client
Apply() error

// Close close the clinet
// Close close the client
Close() error

// Call invoke the downstream service.
Expand Down
8 changes: 7 additions & 1 deletion pkg/client/dubbo/dubbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (

import (
"github.com/apache/dubbo-go-pixiu/pkg/client"
cst "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
)
Expand Down Expand Up @@ -297,7 +298,6 @@ func (dc *Client) create(key string, irequest fc.IntegrationRequest) *generic.Ge
useNacosRegister = true
}
}

refConf := dg.ReferenceConfig{
InterfaceName: irequest.Interface,
Cluster: constant.ClusterKeyFailover,
Expand All @@ -314,6 +314,12 @@ func (dc *Client) create(key string, irequest fc.IntegrationRequest) *generic.Ge
refConf.Retries = irequest.DubboBackendConfig.Retries
}

if dc.dubboProxyConfig.Timeout != nil {
refConf.RequestTimeout = dc.dubboProxyConfig.Timeout.RequestTimeoutStr
} else {
refConf.RequestTimeout = cst.DefaultReqTimeout.String()
}
logger.Debugf("[dubbo-go-pixiu] client dubbo timeout val %v", refConf.RequestTimeout)
dc.lock.Lock()
defer dc.lock.Unlock()

Expand Down
3 changes: 1 addition & 2 deletions pkg/client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net/url"
"strings"
"sync"
"time"
)

import (
Expand Down Expand Up @@ -107,7 +106,7 @@ func (dc *Client) Call(req *client.Request) (resp interface{}, err error) {

newReq, _ := http.NewRequest(req.IngressRequest.Method, targetURL, params.Body)
newReq.Header = params.Header
httpClient := &http.Client{Timeout: 5 * time.Second}
httpClient := &http.Client{Timeout: req.Timeout}

tr := otel.Tracer(traceNameHTTPClient)
_, span := tr.Start(req.Context, "HTTP "+newReq.Method, trace.WithSpanKind(trace.SpanKindClient))
Expand Down
9 changes: 5 additions & 4 deletions pkg/client/mq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type (
}

KafkaProducerConfig struct {
Brokers []string `yaml:"brokers" json:"brokers"`
ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"`
Metadata Metadata `yaml:"metadata" json:"metadata"`
Producer Producer `yaml:"producer" json:"producer"`
Brokers []string `yaml:"brokers" json:"brokers"`
ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"`
Metadata Metadata `yaml:"metadata" json:"metadata"`
Producer Producer `yaml:"producer" json:"producer"`
Timeout time.Duration `yaml:"timeout" json:"timeout"`
}

Metadata struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/client/mq/kafka_facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func NewKafkaProviderFacade(config KafkaProducerConfig) (*KafkaProducerFacade, e
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
c.Producer.MaxMessageBytes = config.Producer.MaxMessageBytes
c.Producer.Timeout = config.Timeout
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/client/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewMQClient(config Config) (*Client, error) {
ctx := context.Background()
switch config.MqType {
case constant.MQTypeKafka:
config.KafkaProducerConfig.Timeout = config.Timeout
pf, err := NewKafkaProviderFacade(config.KafkaProducerConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -125,7 +126,9 @@ func (c Client) Call(req *client.Request) (res interface{}, err error) {
consumerFacadeMap.Store(cReq.ConsumerGroup, facade)
if f, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); ok {
cf := f.(ConsumerFacade)
err = cf.Subscribe(c.ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup))
ctx, cancel := context.WithTimeout(c.ctx, req.Timeout)
defer cancel()
err = cf.Subscribe(ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup))
if err != nil {
facade.Stop()
consumerFacadeMap.Delete(cReq.ConsumerGroup)
Expand Down
2 changes: 2 additions & 0 deletions pkg/client/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package client
import (
"context"
"net/http"
"time"
)

import (
Expand All @@ -32,6 +33,7 @@ type Request struct {
Context context.Context
IngressRequest *http.Request
API router.API
Timeout time.Duration
}

// NewReq create a request
Expand Down
6 changes: 5 additions & 1 deletion pkg/client/triple/triple.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/url"
"strings"
"sync"
"time"
)

import (
Expand Down Expand Up @@ -89,7 +90,10 @@ func (dc *Client) Call(req *client.Request) (res interface{}, err error) {
}
meta := make(map[string][]string)
reqData, _ := ioutil.ReadAll(req.IngressRequest.Body)
call, err := p.Call(context.Background(), req.API.Method.IntegrationRequest.Interface, req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta))
ctx, cancel := context.WithTimeout(context.Background(), req.Timeout)
defer cancel()
time.Sleep(100 * time.Nanosecond)
call, err := p.Call(ctx, req.API.Method.IntegrationRequest.Interface, req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta))
if err != nil {
return "", errors.Errorf("call triple server error = %s", err)
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/common/constant/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package constant

import (
"fmt"
"time"
)

var (
Default403Body = []byte("403 for bidden")
Default404Body = []byte("404 page not found")
Expand All @@ -40,4 +45,19 @@ const (
LogDataBuffer = 5000
// console
Console = "console"

DefaultReqTimeout = 10 * time.Second
)

func ResolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration {
fmt.Printf("timeout parse %s : %d", currentV, defaultV)
if currentV == "" {
return defaultV
} else {
if duration, err := time.ParseDuration(currentV); err != nil {
return defaultV
} else {
return duration
}
}
}
7 changes: 5 additions & 2 deletions pkg/common/grpc/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ func (gcm *GrpcConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp
gcm.writeStatus(w, status.New(codes.Unknown, "can't find endpoint in cluster"))
return
}

newReq := r.Clone(context.Background())
ctx := context.Background()
// timeout
ctx, cancel := context.WithTimeout(ctx, gcm.config.Timeout)
defer cancel()
newReq := r.Clone(ctx)
newReq.URL.Scheme = "http"
newReq.URL.Host = endpoint.Address.GetAddress()

Expand Down
2 changes: 1 addition & 1 deletion pkg/common/http/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (hcm *HttpConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp
hc.Writer = w
hc.Request = r
hc.Reset()

hc.Timeout = hcm.config.Timeout
err := hcm.Handle(hc)
if err != nil {
logger.Errorf("ServeHTTP %v", err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/context/http/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ func (hc *HttpContext) LocalReply() bool {

// API sets the API to http context
func (hc *HttpContext) API(api router.API) {
hc.Timeout = api.Timeout
if hc.Timeout > api.Timeout {
hc.Timeout = api.Timeout
}
hc.Api = &api
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/filter/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi
}

func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {
f.cfg.Timeout = ctx.Timeout
mqClient := mq.NewSingletonMQClient(*f.cfg)
req := client.NewReq(ctx.Request.Context(), ctx.Request, *ctx.GetAPI())
req.Timeout = ctx.Timeout
resp, err := mqClient.Call(req)
if err != nil {
logger.Errorf("[dubbo-go-pixiu] event client call err:%v!", err)
Expand Down
16 changes: 10 additions & 6 deletions pkg/filter/http/grpcproxy/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
stdHttp "net/http"
"strings"
"sync"
"time"
)

import (
Expand Down Expand Up @@ -111,7 +112,8 @@ type (
Config struct {
DescriptorSourceStrategy DescriptorSourceStrategy `yaml:"descriptor_source_strategy" json:"descriptor_source_strategy" default:"auto"`
Path string `yaml:"path" json:"path"`
Rules []*Rule `yaml:"rules" json:"rules"` //nolint
Rules []*Rule `yaml:"rules" json:"rules"` //nolint
Timeout time.Duration `yaml:"timeout" json:"timeout"` //nolint
}

Rule struct {
Expand Down Expand Up @@ -191,7 +193,9 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte("cluster not exists"))
return filter.Stop
}

// timeout for Dial and Invoke
ctx, cancel := context.WithTimeout(c.Ctx, c.Timeout)
defer cancel()
ep := e.Address.GetAddress()

p, ok := f.pools[strings.Join([]string{re.Cluster, ep}, ".")]
Expand All @@ -202,7 +206,7 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
clientConn, ok = p.Get().(*grpc.ClientConn)
if !ok || clientConn == nil {
// TODO(Kenway): Support Credential and TLS
clientConn, err = grpc.DialContext(c.Ctx, ep, grpc.WithTransportCredentials(insecure.NewCredentials()))
clientConn, err = grpc.DialContext(ctx, ep, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil || clientConn == nil {
logger.Errorf("%s err {failed to connect to grpc service provider}", loggerHeader)
c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte((fmt.Sprintf("%s", err))))
Expand All @@ -211,14 +215,14 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
}

// get DescriptorSource, contain file and reflection
source, err := f.descriptor.getDescriptorSource(context.WithValue(c.Ctx, ct.ContextKey(GrpcClientConnKey), clientConn), f.cfg)
source, err := f.descriptor.getDescriptorSource(context.WithValue(ctx, ct.ContextKey(GrpcClientConnKey), clientConn), f.cfg)
if err != nil {
logger.Errorf("%s err %s : %s ", loggerHeader, "get desc source fail", err)
c.SendLocalReply(stdHttp.StatusInternalServerError, []byte("service not config proto file or the server not support reflection API"))
return filter.Stop
}
//put DescriptorSource concurrent, del if no need
c.Ctx = context.WithValue(c.Ctx, ct.ContextKey(DescriptorSourceKey), source)
ctx = context.WithValue(ctx, ct.ContextKey(DescriptorSourceKey), source)

dscp, err := source.FindSymbol(svc)
if err != nil {
Expand Down Expand Up @@ -257,7 +261,7 @@ func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {

// metadata in grpc has the same feature in http
md := mapHeaderToMetadata(c.AllHeaders())
ctx := metadata.NewOutgoingContext(c.Ctx, md)
ctx = metadata.NewOutgoingContext(ctx, md)

md = metadata.MD{}
t := metadata.MD{}
Expand Down
7 changes: 6 additions & 1 deletion pkg/filter/http/httpproxy/routerfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus {
}
req.Header = r.Header

resp, err := (&http3.Client{Transport: f.transport}).Do(req)
cli := &http3.Client{
Transport: f.transport,
Timeout: hc.Timeout,
}

resp, err := cli.Do(req)
if err != nil {
panic(err)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/filter/http/remote/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ import (
)

const (
open = iota
close
all
OPEN = iota
CLOSE
ALL
)

const (
Expand Down Expand Up @@ -99,7 +99,7 @@ func (factory *FilterFactory) Apply() error {
}
level := mockLevel(mock)
if level < 0 || level > 2 {
level = close
level = CLOSE
}
factory.conf.Level = level
// must init it at apply function
Expand All @@ -115,7 +115,6 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c
}

func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus {

if f.conf.Dpc.AutoResolve {
if err := f.resolve(c); err != nil {
c.SendLocalReply(http.StatusInternalServerError, []byte(fmt.Sprintf("auto resolve err: %s", err)))
Expand All @@ -125,7 +124,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus {

api := c.GetAPI()

if (f.conf.Level == open && api.Mock) || (f.conf.Level == all) {
if (f.conf.Level == OPEN && api.Mock) || (f.conf.Level == ALL) {
c.SourceResp = &contexthttp.ErrResponse{
Message: "mock success",
}
Expand All @@ -140,6 +139,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) filter.FilterStatus {
}

req := client.NewReq(c.Request.Context(), c.Request, *api)
req.Timeout = c.Timeout
resp, err := cli.Call(req)
if err != nil {
logger.Errorf("[dubbo-go-pixiu] client call err:%v!", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/filter/network/dubboproxy/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (p *Plugin) Kind() string {
// CreateFilter create dubbo networkfilter
func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) {
hcmc, ok := config.(*model.DubboProxyConnectionManagerConfig)
hcmc.Timeout = constant.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout)
if !ok {
panic("CreateFilter occur some exception for the type is not suitable one.")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/filter/network/grpcconnectionmanager/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (p *Plugin) Kind() string {
// CreateFilter create grpc network filter
func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) {
hcmc := config.(*model.GRPCConnectionManagerConfig)
hcmc.Timeout = constant.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout)
return grpc.CreateGrpcConnectionManager(hcmc), nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/filter/network/httpconnectionmanager/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (p *Plugin) Kind() string {
// CreateFilter create http network filter
func (p *Plugin) CreateFilter(config interface{}) (filter.NetworkFilter, error) {
hcmc := config.(*model.HttpConnectionManagerConfig)
hcmc.Timeout = constant.ResolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout)
return http.CreateHttpConnectionManager(hcmc), nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/filter/seata/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ func (f *Filter) handleHttp1BranchRegister(ctx *http.HttpContext, tccResource *T
ctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("encode request context failed, %v", err)))
return false
}

branchID, err := f.branchRegister(ctx.Ctx, xid, tccResource.PrepareRequestPath, apis.TCC, data, "")
Ctx, cancel := context.WithTimeout(ctx.Ctx, ctx.Timeout)
defer cancel()
branchID, err := f.branchRegister(Ctx, xid, tccResource.PrepareRequestPath, apis.TCC, data, "")
if err != nil {
logger.Errorf("branch transaction register failed, xid: %s, err: %v", xid, err)
ctx.SendLocalReply(netHttp.StatusInternalServerError, []byte(fmt.Sprintf("branch transaction register failed, %v", err)))
Expand Down
Loading