diff --git a/docs/sample/dubbo/dubbo-registry.md b/docs/sample/dubbo/dubbo-registry.md index 28202c8e4..8564e88a6 100644 --- a/docs/sample/dubbo/dubbo-registry.md +++ b/docs/sample/dubbo/dubbo-registry.md @@ -8,16 +8,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8881 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/docs/sample/http/http-grpc.md b/docs/sample/http/http-grpc.md index 24b8eb89c..3dfaaf7de 100644 --- a/docs/sample/http/http-grpc.md +++ b/docs/sample/http/http-grpc.md @@ -8,16 +8,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8881 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/docs/sample/http/http-http.md b/docs/sample/http/http-http.md index c854e4b49..19c2e3e39 100644 --- a/docs/sample/http/http-http.md +++ b/docs/sample/http/http-http.md @@ -9,16 +9,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8888 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/docs/user/config.md b/docs/user/config.md index 9cb00ab79..9c1c08296 100644 --- a/docs/user/config.md +++ b/docs/user/config.md @@ -16,16 +16,12 @@ This document mainly describes the pixiu config abstraction, there is a example static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8888 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/go.mod b/go.mod index 60a16111d..4db1fbc31 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/dubbogo/dubbo-go-pixiu-filter v0.1.4 github.com/dubbogo/go-zookeeper v1.0.3 github.com/dubbogo/gost v1.11.19 - github.com/emirpasic/gods v1.12.0 + github.com/emirpasic/gods v1.12.0 // indirect github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 github.com/gin-gonic/gin v1.7.4 github.com/go-playground/assert/v2 v2.0.1 @@ -22,6 +22,7 @@ require ( github.com/goinggo/mapstructure v0.0.0-20140717182941-194205d9b4a9 github.com/golang-jwt/jwt/v4 v4.1.0 github.com/golang/protobuf v1.5.2 + github.com/gorilla/mux v1.7.3 // indirect github.com/jhump/protoreflect v1.9.0 github.com/mercari/grpc-http-proxy v0.1.2 github.com/mitchellh/mapstructure v1.4.2 @@ -45,6 +46,7 @@ require ( go.opentelemetry.io/otel/trace v1.0.0-RC2 go.uber.org/zap v1.19.1 golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 + golang.org/x/net v0.0.0-20211105192438-b53810dc28af google.golang.org/grpc v1.42.0 google.golang.org/protobuf v1.27.1 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index ebd28f488..61e837d48 100644 --- a/go.sum +++ b/go.sum @@ -439,6 +439,7 @@ github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99/go.mod h1:wJfORR github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf/go.mod h1:cuNKsD1zp2v6XfE/orVX2QE1LC+i254ceGcVeDT3pTU= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= diff --git a/pkg/common/constant/key.go b/pkg/common/constant/key.go index 884ee898e..1fc0395b8 100644 --- a/pkg/common/constant/key.go +++ b/pkg/common/constant/key.go @@ -19,6 +19,7 @@ package constant const ( HTTPConnectManagerFilter = "dgp.filter.httpconnectionmanager" + GRPCConnectManagerFilter = "dgp.filter.grpcconnectionmanager" HTTPAuthorityFilter = "dgp.filter.http.authority" HTTPProxyFilter = "dgp.filter.http.httpproxy" diff --git a/pkg/common/extension/filter/filter.go b/pkg/common/extension/filter/filter.go index 5c203c16c..d3030e7d2 100644 --- a/pkg/common/extension/filter/filter.go +++ b/pkg/common/extension/filter/filter.go @@ -19,6 +19,7 @@ package filter import ( "fmt" + stdHttp "net/http" ) import ( @@ -83,8 +84,8 @@ type ( // NetworkFilter describe network filter NetworkFilter interface { - // OnData handle the http context from worker - OnData(hc *http.HttpContext) error + // ServeHTTP handle request and response + ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp.Request) } ) diff --git a/pkg/common/grpc/RoundTripper.go b/pkg/common/grpc/RoundTripper.go new file mode 100644 index 000000000..6e44e440c --- /dev/null +++ b/pkg/common/grpc/RoundTripper.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpc + +import ( + "net/http" +) + +import ( + "golang.org/x/net/http2" +) + +type HttpForwarder struct { + transport *http2.Transport +} + +func (hf *HttpForwarder) Forward(r *http.Request) (*http.Response, error) { + return hf.transport.RoundTrip(r) +} diff --git a/pkg/common/grpc/manager.go b/pkg/common/grpc/manager.go new file mode 100644 index 000000000..cf88cc490 --- /dev/null +++ b/pkg/common/grpc/manager.go @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpc + +import ( + "context" + "crypto/tls" + "encoding/json" + "io/ioutil" + "net" + stdHttp "net/http" +) + +import ( + "golang.org/x/net/http2" +) + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/common/constant" + router2 "github.com/apache/dubbo-go-pixiu/pkg/common/router" + "github.com/apache/dubbo-go-pixiu/pkg/context/http" + pch "github.com/apache/dubbo-go-pixiu/pkg/context/http" + "github.com/apache/dubbo-go-pixiu/pkg/logger" + "github.com/apache/dubbo-go-pixiu/pkg/model" + "github.com/apache/dubbo-go-pixiu/pkg/server" +) + +// GrpcConnectionManager network filter for grpc +type GrpcConnectionManager struct { + config *model.GRPCConnectionManagerConfig + routerCoordinator *router2.RouterCoordinator +} + +// CreateGrpcConnectionManager create grpc connection manager +func CreateGrpcConnectionManager(hcmc *model.GRPCConnectionManagerConfig, bs *model.Bootstrap) *GrpcConnectionManager { + hcm := &GrpcConnectionManager{config: hcmc} + hcm.routerCoordinator = router2.CreateRouterCoordinator(&hcmc.RouteConfig) + return hcm +} + +// OnData receive data from listener +func (gcm *GrpcConnectionManager) OnData(hc *pch.HttpContext) error { + panic("grpc connection manager OnData function shouldn't be called") +} + +// ServeHTTP handle request and response +func (gcm *GrpcConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp.Request) { + + ra, err := gcm.routerCoordinator.RouteByPathAndName(r.RequestURI, r.Method) + if err != nil { + logger.Info("GrpcConnectionManager can't find route %v", err) + w.WriteHeader(stdHttp.StatusNotFound) + if _, err := w.Write(constant.Default404Body); err != nil { + logger.Warnf("WriteWithStatus error %v", err) + } + } + + logger.Debugf("[dubbo-go-pixiu] client choose endpoint from cluster :%v", ra.Cluster) + + clusterName := ra.Cluster + clusterManager := server.GetClusterManager() + endpoint := clusterManager.PickEndpoint(clusterName) + if endpoint == nil { + bt, _ := json.Marshal(http.ErrResponse{Message: "cluster not found endpoint"}) + w.WriteHeader(stdHttp.StatusServiceUnavailable) + w.Write(bt) + return + } + + newReq := r.Clone(context.Background()) + newReq.URL.Scheme = "http" + newReq.URL.Host = endpoint.Address.GetAddress() + + // todo: need cache? + forwarder := gcm.newHttpForwarder() + res, err := forwarder.Forward(newReq) + + if err != nil { + logger.Info("GrpcConnectionManager forward request error %v", err) + bt, _ := json.Marshal(http.ErrResponse{Message: "pixiu forward error"}) + w.WriteHeader(stdHttp.StatusServiceUnavailable) + w.Write(bt) + return + } + + if err := gcm.response(w, res); err != nil { + logger.Info("GrpcConnectionManager response error %v", err) + } +} + +func (gcm *GrpcConnectionManager) response(w stdHttp.ResponseWriter, res *stdHttp.Response) error { + defer res.Body.Close() + + copyHeader(w.Header(), res.Header) + w.WriteHeader(res.StatusCode) + + bytes, err := ioutil.ReadAll(res.Body) + if err != nil { + return err + } + w.Write(bytes) + + for k, vv := range res.Trailer { + k = stdHttp.TrailerPrefix + k + for _, v := range vv { + w.Header().Add(k, v) + } + } + return nil +} + +func (gcm *GrpcConnectionManager) newHttpForwarder() *HttpForwarder { + transport := &http2.Transport{ + DialTLS: func(network, addr string, _ *tls.Config) (net.Conn, error) { + return net.Dial(network, addr) + }, + AllowHTTP: true, + } + return &HttpForwarder{transport: transport} +} + +func copyHeader(dst, src stdHttp.Header) { + for k, vv := range src { + for _, v := range vv { + dst.Add(k, v) + } + } +} diff --git a/pkg/common/http/manager.go b/pkg/common/http/manager.go index 2ecb8d705..ae291b833 100644 --- a/pkg/common/http/manager.go +++ b/pkg/common/http/manager.go @@ -21,7 +21,8 @@ import ( "context" "fmt" "io/ioutil" - "net/http" + stdHttp "net/http" + "sync" ) import ( @@ -44,18 +45,27 @@ type HttpConnectionManager struct { config *model.HttpConnectionManagerConfig routerCoordinator *router2.RouterCoordinator filterManager *filter.FilterManager + pool sync.Pool } // CreateHttpConnectionManager create http connection manager func CreateHttpConnectionManager(hcmc *model.HttpConnectionManagerConfig, bs *model.Bootstrap) *HttpConnectionManager { hcm := &HttpConnectionManager{config: hcmc} - hcm.routerCoordinator = router2.CreateRouterCoordinator(hcmc) + hcm.pool.New = func() interface{} { + return hcm.allocateContext() + } + hcm.routerCoordinator = router2.CreateRouterCoordinator(&hcmc.RouteConfig) hcm.filterManager = filter.NewFilterManager(hcmc.HTTPFilters) hcm.filterManager.Load() return hcm } -// OnData receive data from listener +func (ls *HttpConnectionManager) allocateContext() *pch.HttpContext { + return &pch.HttpContext{ + Params: make(map[string]interface{}), + } +} + func (hcm *HttpConnectionManager) OnData(hc *pch.HttpContext) error { hc.Ctx = context.Background() err := hcm.findRoute(hc) @@ -66,6 +76,20 @@ func (hcm *HttpConnectionManager) OnData(hc *pch.HttpContext) error { return nil } +func (hcm *HttpConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp.Request) { + hc := hcm.pool.Get().(*pch.HttpContext) + defer hcm.pool.Put(hc) + + hc.Request = r + hc.Writer = w + hc.Reset() + + err := hcm.OnData(hc) + if err != nil { + logger.Errorf("ServeHTTP %v", err) + } +} + // handleHTTPRequest handle http request func (hcm *HttpConnectionManager) handleHTTPRequest(c *pch.HttpContext) { filterChain := hcm.filterManager.CreateFilterChain(c) @@ -74,7 +98,7 @@ func (hcm *HttpConnectionManager) handleHTTPRequest(c *pch.HttpContext) { defer func() { if err := recover(); err != nil { logger.Warnf("[dubbopixiu go] Occur An Unexpected Err: %+v", err) - c.SendLocalReply(http.StatusInternalServerError, []byte(fmt.Sprintf("Occur An Unexpected Err: %v", err))) + c.SendLocalReply(stdHttp.StatusInternalServerError, []byte(fmt.Sprintf("Occur An Unexpected Err: %v", err))) } }() @@ -101,7 +125,7 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c *pch.HttpContext) { } switch res := c.SourceResp.(type) { - case *http.Response: + case *stdHttp.Response: body, err := ioutil.ReadAll(res.Body) if err != nil { panic(err) @@ -115,13 +139,13 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c *pch.HttpContext) { c.StatusCode(res.StatusCode) c.TargetResp = &client.Response{Data: body} case []byte: - c.StatusCode(http.StatusOK) + c.StatusCode(stdHttp.StatusOK) c.AddHeader(constant.HeaderKeyContextType, constant.HeaderValueTextPlain) c.TargetResp = &client.Response{Data: res} default: //dubbo go generic invoke response := util.NewDubboResponse(res, false) - c.StatusCode(http.StatusOK) + c.StatusCode(stdHttp.StatusOK) c.AddHeader(constant.HeaderKeyContextType, constant.HeaderValueJsonUtf8) c.TargetResp = response } @@ -131,7 +155,7 @@ func (hcm *HttpConnectionManager) findRoute(hc *pch.HttpContext) error { ra, err := hcm.routerCoordinator.Route(hc) if err != nil { hc.AddHeader(constant.HeaderKeyContextType, constant.HeaderValueTextPlain) - hc.SendLocalReply(http.StatusNotFound, constant.Default404Body) + hc.SendLocalReply(stdHttp.StatusNotFound, constant.Default404Body) e := errors.Errorf("Requested URL %s not found", hc.GetUrl()) logger.Debug(e.Error()) diff --git a/pkg/common/router/router.go b/pkg/common/router/router.go index ddbd3ce35..80328f5bd 100644 --- a/pkg/common/router/router.go +++ b/pkg/common/router/router.go @@ -40,9 +40,9 @@ type ( ) // CreateRouterCoordinator create coordinator for http connection manager -func CreateRouterCoordinator(hcmc *model.HttpConnectionManagerConfig) *RouterCoordinator { - rc := &RouterCoordinator{activeConfig: &hcmc.RouteConfig} - if hcmc.RouteConfig.Dynamic { +func CreateRouterCoordinator(routeConfig *model.RouteConfiguration) *RouterCoordinator { + rc := &RouterCoordinator{activeConfig: routeConfig} + if routeConfig.Dynamic { server.GetRouterManager().AddRouterListener(rc) } rc.initTrie() @@ -57,6 +57,13 @@ func (rm *RouterCoordinator) Route(hc *http.HttpContext) (*model.RouteAction, er return rm.activeConfig.Route(hc.Request) } +func (rm *RouterCoordinator) RouteByPathAndName(path, method string) (*model.RouteAction, error) { + rm.rw.RLock() + defer rm.rw.RUnlock() + + return rm.activeConfig.RouteByPathAndMethod(path, method) +} + func getTrieKey(method string, path string, isPrefix bool) string { if isPrefix { if !strings.HasSuffix(path, constant.PathSlash) { diff --git a/pkg/common/router/router_test.go b/pkg/common/router/router_test.go index 697a293e3..faa7eec9d 100644 --- a/pkg/common/router/router_test.go +++ b/pkg/common/router/router_test.go @@ -53,7 +53,7 @@ func TestCreateRouterCoordinator(t *testing.T) { IdleTimeoutStr: "100", } - r := CreateRouterCoordinator(&hcmc) + r := CreateRouterCoordinator(&hcmc.RouteConfig) request, err := http.NewRequest("POST", "http://www.dubbogopixiu.com/api/v1?name=tc", bytes.NewReader([]byte("{\"id\":\"12345\"}"))) assert.NoError(t, err) c := mock.GetMockHTTPContext(request) diff --git a/pkg/config/conf_test.yaml b/pkg/config/conf_test.yaml index 2d4e6fba2..170906c65 100644 --- a/pkg/config/conf_test.yaml +++ b/pkg/config/conf_test.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTPS" address: socket_address: - protocol_type: "HTTPS" address: "0.0.0.0" port: 443 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/pkg/config/config_load.go b/pkg/config/config_load.go index 16d8b5bcb..870e1be88 100644 --- a/pkg/config/config_load.go +++ b/pkg/config/config_load.go @@ -113,10 +113,10 @@ func GetProtocol(cfg *model.Bootstrap) (err error) { return err } for _, l := range cfg.StaticResources.Listeners { - if l.Address.SocketAddress.ProtocolStr == "" { - l.Address.SocketAddress.ProtocolStr = constant.DefaultProtocolType + if l.ProtocolStr == "" { + l.ProtocolStr = constant.DefaultProtocolType } - l.Address.SocketAddress.Protocol = model.ProtocolType(model.ProtocolTypeValue[l.Address.SocketAddress.ProtocolStr]) + l.Protocol = model.ProtocolType(model.ProtocolTypeValue[l.ProtocolStr]) } return nil } diff --git a/pkg/config/config_load_test.go b/pkg/config/config_load_test.go index 94aef4dfe..b83d6bbb4 100644 --- a/pkg/config/config_load_test.go +++ b/pkg/config/config_load_test.go @@ -84,12 +84,12 @@ func TestMain(m *testing.M) { StaticResources: model.StaticResources{ Listeners: []*model.Listener{ { - Name: "net/http", + Name: "net/http", + ProtocolStr: "HTTPS", Address: model.Address{ SocketAddress: model.SocketAddress{ - ProtocolStr: "HTTPS", - Address: "0.0.0.0", - Port: 443, + Address: "0.0.0.0", + Port: 443, }, }, Config: model.HttpConfig{ @@ -97,19 +97,11 @@ func TestMain(m *testing.M) { WriteTimeoutStr: "5s", ReadTimeoutStr: "5s", }, - FilterChains: []model.FilterChain{ - { - FilterChainMatch: model.FilterChainMatch{ - Domains: []string{ - "api.dubbo.com", - "api.pixiu.com", - }, - }, - Filters: []model.Filter{ - { - Name: "dgp.filter.httpconnectionmanager", - Config: inInterface, - }, + FilterChain: model.FilterChain{ + Filters: []model.NetworkFilter{ + { + Name: "dgp.filter.httpconnectionmanager", + Config: inInterface, }, }, }, diff --git a/pkg/context/http/context.go b/pkg/context/http/context.go index bd9c2a9c1..978ce689e 100644 --- a/pkg/context/http/context.go +++ b/pkg/context/http/context.go @@ -63,7 +63,6 @@ type HttpContext struct { SourceResp interface{} HttpConnectionManager model.HttpConnectionManagerConfig - Listener *model.Listener Route *model.RouteAction Api *router.API diff --git a/pkg/filter/network/grpcconnectionmanager/plugin.go b/pkg/filter/network/grpcconnectionmanager/plugin.go new file mode 100644 index 000000000..3e66ac7b4 --- /dev/null +++ b/pkg/filter/network/grpcconnectionmanager/plugin.go @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpcconnectionmanager + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/common/constant" + "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter" + "github.com/apache/dubbo-go-pixiu/pkg/common/grpc" + "github.com/apache/dubbo-go-pixiu/pkg/model" +) + +const ( + Kind = constant.GRPCConnectManagerFilter +) + +func init() { + filter.RegisterNetworkFilter(&Plugin{}) +} + +type ( + Plugin struct{} +) + +func (p *Plugin) Kind() string { + return Kind +} + +func (hp *Plugin) CreateFilter(config interface{}, bs *model.Bootstrap) (filter.NetworkFilter, error) { + hcmc := config.(*model.GRPCConnectionManagerConfig) + return grpc.CreateGrpcConnectionManager(hcmc, bs), nil +} diff --git a/pkg/filterchain/network_filter_chain.go b/pkg/filterchain/network_filter_chain.go new file mode 100644 index 000000000..dd0460731 --- /dev/null +++ b/pkg/filterchain/network_filter_chain.go @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package filterchain + +import ( + "net/http" +) + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/common/constant" + "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter" + "github.com/apache/dubbo-go-pixiu/pkg/common/yaml" + "github.com/apache/dubbo-go-pixiu/pkg/logger" + "github.com/apache/dubbo-go-pixiu/pkg/model" +) + +type NetworkFilterChain struct { + filtersArray []filter.NetworkFilter + config model.FilterChain +} + +func (fc NetworkFilterChain) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // todo: only one filter will exist for now, needs change when more than one + for _, filter := range fc.filtersArray { + filter.ServeHTTP(w, r) + } +} + +func CreateNetworkFilterChain(config model.FilterChain, bs *model.Bootstrap) *NetworkFilterChain { + filtersArray := make([]filter.NetworkFilter, len(config.Filters)) + // todo: split code block like http filter manager + // todo: only one filter will exist for now, needs change when more than one + for i, f := range config.Filters { + if f.Name == constant.GRPCConnectManagerFilter { + gcmc := &model.GRPCConnectionManagerConfig{} + if err := yaml.ParseConfig(gcmc, f.Config); err != nil { + logger.Error("CreateNetworkFilterChain %s parse config error %s", f.Name, err) + } + p, err := filter.GetNetworkFilterPlugin(constant.GRPCConnectManagerFilter) + if err != nil { + logger.Error("CreateNetworkFilterChain %s getNetworkFilterPlugin error %s", f.Name, err) + } + filter, err := p.CreateFilter(gcmc, bs) + if err != nil { + logger.Error("CreateNetworkFilterChain %s createFilter error %s", f.Name, err) + } + filtersArray[i] = filter + } else if f.Name == constant.HTTPConnectManagerFilter { + hcmc := &model.HttpConnectionManagerConfig{} + if err := yaml.ParseConfig(hcmc, f.Config); err != nil { + logger.Error("CreateNetworkFilterChain parse %s config error %s", f.Name, err) + } + p, err := filter.GetNetworkFilterPlugin(constant.HTTPConnectManagerFilter) + if err != nil { + logger.Error("CreateNetworkFilterChain %s getNetworkFilterPlugin error %s", f.Name, err) + } + filter, err := p.CreateFilter(hcmc, bs) + if err != nil { + logger.Error("CreateNetworkFilterChain %s createFilter error %s", f.Name, err) + } + filtersArray[i] = filter + } + } + + return &NetworkFilterChain{ + filtersArray: filtersArray, + config: config, + } +} diff --git a/pkg/server/listener.go b/pkg/listener/http/http_listener.go similarity index 54% rename from pkg/server/listener.go rename to pkg/listener/http/http_listener.go index b71212caf..5fc477c8d 100644 --- a/pkg/server/listener.go +++ b/pkg/listener/http/http_listener.go @@ -15,83 +15,82 @@ * limitations under the License. */ -package server +package http import ( + "fmt" "log" "net/http" "strconv" - "sync" "time" ) import ( + "github.com/pkg/errors" "golang.org/x/crypto/acme/autocert" ) import ( - "github.com/apache/dubbo-go-pixiu/pkg/common/constant" - "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter" - "github.com/apache/dubbo-go-pixiu/pkg/common/yaml" - h "github.com/apache/dubbo-go-pixiu/pkg/context/http" + "github.com/apache/dubbo-go-pixiu/pkg/filterchain" + "github.com/apache/dubbo-go-pixiu/pkg/listener" "github.com/apache/dubbo-go-pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pkg/model" ) +func init() { + listener.SetListenerServiceFactory(model.ProtocolTypeHTTP, newHttpListenerService) +} + type ( // ListenerService the facade of a listener - ListenerService struct { - cfg *model.Listener - // TODO: just temporary because only one network filter - nf filter.NetworkFilter + HttpListenerService struct { + listener.BaseListenerService srv *http.Server } // DefaultHttpListener DefaultHttpWorker struct { - pool sync.Pool - ls *ListenerService + ls *HttpListenerService } ) -// NewListenerService create listener service -func CreateListenerService(lc *model.Listener, bs *model.Bootstrap) *ListenerService { - hcm := createHttpManager(lc, bs) - return &ListenerService{cfg: lc, nf: *hcm} -} - -func (ls *ListenerService) GetNetworkFilter() filter.NetworkFilter { - return ls.nf +func newHttpListenerService(lc *model.Listener, bs *model.Bootstrap) (listener.ListenerService, error) { + fc := filterchain.CreateNetworkFilterChain(lc.FilterChain, bs) + return &HttpListenerService{ + BaseListenerService: listener.BaseListenerService{ + Config: lc, + FilterChain: fc, + }, + srv: nil, + }, nil } // Start start the listener -func (ls *ListenerService) Start() { - sa := ls.cfg.Address.SocketAddress - switch sa.Protocol { +func (ls *HttpListenerService) Start() error { + switch ls.Config.Protocol { case model.ProtocolTypeHTTP: ls.httpListener() case model.ProtocolTypeHTTPS: ls.httpsListener() default: - panic("unsupported protocol start: " + sa.ProtocolStr) + return errors.New(fmt.Sprintf("unsupported protocol start: %d", ls.Config.Protocol)) } + return nil } -func (ls *ListenerService) httpsListener() { - hl := CreateDefaultHttpWorker(ls) - hl.pool.New = func() interface{} { - return ls.allocateContext() - } +func (ls *HttpListenerService) httpsListener() { + hl := createDefaultHttpWorker(ls) + // user customize http config var hc *model.HttpConfig - hc = model.MapInStruct(ls.cfg) + hc = model.MapInStruct(ls.Config) mux := http.NewServeMux() mux.HandleFunc("/", hl.ServeHTTP) m := &autocert.Manager{ - Cache: autocert.DirCache(ls.cfg.Address.SocketAddress.CertsDir), + Cache: autocert.DirCache(ls.Config.Address.SocketAddress.CertsDir), Prompt: autocert.AcceptTOS, - HostPolicy: autocert.HostWhitelist(ls.cfg.Address.SocketAddress.Domains...), + HostPolicy: autocert.HostWhitelist(ls.Config.Address.SocketAddress.Domains...), } ls.srv = &http.Server{ Addr: ":https", @@ -102,26 +101,23 @@ func (ls *ListenerService) httpsListener() { MaxHeaderBytes: resolveInt2IntProp(hc.MaxHeaderBytes, 1<<20), TLSConfig: m.TLSConfig(), } - autoLs := autocert.NewListener(ls.cfg.Address.SocketAddress.Domains...) + autoLs := autocert.NewListener(ls.Config.Address.SocketAddress.Domains...) logger.Infof("[dubbo-go-server] httpsListener start at : %s", ls.srv.Addr) err := ls.srv.Serve(autoLs) logger.Info("[dubbo-go-server] httpsListener result:", err) } -func (ls *ListenerService) httpListener() { - hl := CreateDefaultHttpWorker(ls) - hl.pool.New = func() interface{} { - return ls.allocateContext() - } +func (ls *HttpListenerService) httpListener() { + hl := createDefaultHttpWorker(ls) // user customize http config var hc *model.HttpConfig - hc = model.MapInStruct(ls.cfg) + hc = model.MapInStruct(ls.Config) mux := http.NewServeMux() mux.HandleFunc("/", hl.ServeHTTP) - sa := ls.cfg.Address.SocketAddress + sa := ls.Config.Address.SocketAddress ls.srv = &http.Server{ Addr: resolveAddress(sa.Address + ":" + strconv.Itoa(sa.Port)), Handler: mux, @@ -136,37 +132,16 @@ func (ls *ListenerService) httpListener() { log.Println(ls.srv.ListenAndServe()) } -func (ls *ListenerService) allocateContext() *h.HttpContext { - return &h.HttpContext{ - Listener: ls.cfg, - Params: make(map[string]interface{}), - } -} - -// NewDefaultHttpListener create http listener -func CreateDefaultHttpWorker(ls *ListenerService) *DefaultHttpWorker { +// createDefaultHttpWorker create http listener +func createDefaultHttpWorker(ls *HttpListenerService) *DefaultHttpWorker { return &DefaultHttpWorker{ - pool: sync.Pool{}, - ls: ls, + ls: ls, } } // ServeHTTP http request entrance. func (s *DefaultHttpWorker) ServeHTTP(w http.ResponseWriter, r *http.Request) { - hc := s.pool.Get().(*h.HttpContext) - defer s.pool.Put(hc) - - hc.Request = r - hc.Writer = w - hc.Reset() - - // now only one filter http_connection_manager, so just get it and call - err := s.ls.nf.OnData(hc) - - if err != nil { - s.pool.Put(hc) - logger.Errorf("ServeHTTP %s", err) - } + s.ls.FilterChain.ServeHTTP(w, r) } func resolveInt2IntProp(currentV, defaultV int) int { @@ -197,34 +172,3 @@ func resolveAddress(addr string) string { return addr } - -func findHttpManager(l *model.Listener) *model.HttpConnectionManagerConfig { - for _, fc := range l.FilterChains { - for _, f := range fc.Filters { - if f.Name == constant.HTTPConnectManagerFilter { - hcmc := &model.HttpConnectionManagerConfig{} - if err := yaml.ParseConfig(hcmc, f.Config); err != nil { - return nil - } - - return hcmc - } - } - } - - return DefaultHttpConnectionManager() -} - -func createHttpManager(lc *model.Listener, bs *model.Bootstrap) *filter.NetworkFilter { - p, err := filter.GetNetworkFilterPlugin(constant.HTTPConnectManagerFilter) - if err != nil { - panic(err) - } - - hcmc := findHttpManager(lc) - hcm, err := p.CreateFilter(hcmc, bs) - if err != nil { - panic(err) - } - return &hcm -} diff --git a/pkg/listener/http2/http2_listener.go b/pkg/listener/http2/http2_listener.go new file mode 100644 index 000000000..5da0dbb7c --- /dev/null +++ b/pkg/listener/http2/http2_listener.go @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package http2 + +import ( + "net" + "net/http" + "strconv" +) + +import ( + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" +) + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/filterchain" + "github.com/apache/dubbo-go-pixiu/pkg/listener" + "github.com/apache/dubbo-go-pixiu/pkg/logger" + "github.com/apache/dubbo-go-pixiu/pkg/model" +) + +func init() { + listener.SetListenerServiceFactory(model.ProtocolTypeHTTP2, newHttp2ListenerService) +} + +type ( + // Http2ListenerService the facade of a listener + Http2ListenerService struct { + listener.BaseListenerService + listener net.Listener + server *http.Server + } +) + +type handleWrapper struct { + fc *filterchain.NetworkFilterChain +} + +type h2cWrapper struct { + w *handleWrapper + h http.Handler +} + +// ServeHTTP call Handler to handle http request and response. +func (h *h2cWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.h.ServeHTTP(w, r) +} + +// ServeHTTP call FilterChain to handle http request and response. +func (h *handleWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.fc.ServeHTTP(w, r) +} + +func newHttp2ListenerService(lc *model.Listener, bs *model.Bootstrap) (listener.ListenerService, error) { + fc := filterchain.CreateNetworkFilterChain(lc.FilterChain, bs) + return &Http2ListenerService{ + BaseListenerService: listener.BaseListenerService{ + Config: lc, + FilterChain: fc, + }, + listener: nil, + server: nil, + }, nil +} + +// Start start listen +func (ls Http2ListenerService) Start() error { + + sa := ls.Config.Address.SocketAddress + addr := resolveAddress(sa.Address + ":" + strconv.Itoa(sa.Port)) + + l, err := net.Listen("tcp", addr) + if err != nil { + return err + } + ls.listener = l + + handlerWrapper := &handleWrapper{ls.FilterChain} + h2s := &http2.Server{} + h := &h2cWrapper{ + w: handlerWrapper, + h: h2c.NewHandler(handlerWrapper, h2s), + } + + ls.server = &http.Server{ + Addr: addr, + Handler: h, + } + + go func() { + if err := ls.server.Serve(ls.listener); err != nil { + logger.Error("Http2ListenerService Start error %s", err) + } + }() + return nil +} + +func resolveAddress(addr string) string { + if addr == "" { + logger.Debug("Addr is undefined. Using port :8080 by default") + return ":8080" + } + + return addr +} diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go new file mode 100644 index 000000000..5b9a76dbb --- /dev/null +++ b/pkg/listener/listener.go @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package listener + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/filterchain" + "github.com/apache/dubbo-go-pixiu/pkg/model" +) + +var factoryMap = make(map[model.ProtocolType]func(lc *model.Listener, bs *model.Bootstrap) (ListenerService, error), 8) + +type ( + ListenerService interface { + Start() error + } + + BaseListenerService struct { + Config *model.Listener + FilterChain *filterchain.NetworkFilterChain + } +) + +// SetListenerServiceFactory will store the listenerService factory by name +func SetListenerServiceFactory(protocol model.ProtocolType, newRegFunc func(lc *model.Listener, bs *model.Bootstrap) (ListenerService, error)) { + factoryMap[protocol] = newRegFunc +} + +// CreateListenerService create listener service +func CreateListenerService(lc *model.Listener, bs *model.Bootstrap) (ListenerService, error) { + if registry, ok := factoryMap[lc.Protocol]; ok { + reg, err := registry(lc, bs) + if err != nil { + panic("Initialize ListenerService " + lc.Name + "failed due to: " + err.Error()) + } + return reg, nil + } + return nil, errors.New("Registry " + lc.ProtocolStr + " does not support yet") +} diff --git a/pkg/server/listener_test.go b/pkg/listener/listener_test.go similarity index 71% rename from pkg/server/listener_test.go rename to pkg/listener/listener_test.go index b3401f7e6..fe2514d78 100644 --- a/pkg/server/listener_test.go +++ b/pkg/listener/listener_test.go @@ -15,31 +15,16 @@ * limitations under the License. */ -package server +package listener import ( ctxHttp "github.com/apache/dubbo-go-pixiu/pkg/context/http" - "github.com/apache/dubbo-go-pixiu/pkg/model" ) func getTestContext() *ctxHttp.HttpContext { - l := ListenerService{ - cfg: &model.Listener{ - Name: "test", - Address: model.Address{ - SocketAddress: model.SocketAddress{ - Protocol: model.ProtocolTypeHTTP, - Address: "0.0.0.0", - Port: 8888, - }, - }, - FilterChains: []model.FilterChain{}, - }, - } hc := &ctxHttp.HttpContext{ - Listener: l.cfg, - Filters: []ctxHttp.FilterFunc{}, + Filters: []ctxHttp.FilterFunc{}, } hc.Reset() return hc diff --git a/pkg/model/base.go b/pkg/model/base.go index 6503f32a2..ea76b7fa2 100644 --- a/pkg/model/base.go +++ b/pkg/model/base.go @@ -41,20 +41,6 @@ const ( Unknown api.Status = 2 ) -const ( - ProtocolTypeHTTP ProtocolType = 0 + iota // support for 1.0 - ProtocolTypeTCP - ProtocolTypeUDP - ProtocolTypeHTTPS - ProtocolTypeGRPC -) - -const ( - REST_VALUE = "REST" - GRPC_VALUE = "GRPC" - DUBBO_VALUE = "DUBBO" -) - const ( ApiTypeREST api.ApiType = 0 + iota // support for 1.0 ApiTypeGRPC @@ -74,24 +60,6 @@ var ( "Unknown": 2, } - // ProtocolTypeName enum seq to protocol type name - ProtocolTypeName = map[int32]string{ - 0: "HTTP", - 1: "TCP", - 2: "UDP", - 3: "HTTPS", - 4: "GRPC", - } - - // ProtocolTypeValue protocol type name to enum seq - ProtocolTypeValue = map[string]int32{ - "HTTP": 0, - "TCP": 1, - "UDP": 2, - "HTTPS": 3, - "GRPC": 4, - } - ApiTypeName = map[int32]string{ 0: REST_VALUE, 1: GRPC_VALUE, @@ -106,8 +74,6 @@ var ( ) type ( - // ProtocolType protocol type enum - ProtocolType int32 // Address the address Address struct { @@ -119,13 +85,11 @@ type ( // used to tell server where to bind/listen, connect to upstream and find // management servers SocketAddress struct { - ProtocolStr string `yaml:"protocol_type" json:"protocol_type" mapstructure:"protocol_type"` - Protocol ProtocolType `default:"http" yaml:"omitempty" json:"omitempty"` - Address string `default:"0.0.0.0" yaml:"address" json:"address" mapstructure:"address"` - Port int `default:"8881" yaml:"port" json:"port" mapstructure:"port"` - ResolverName string `yaml:"resolver_name" json:"resolver_name" mapstructure:"resolver_name"` - Domains []string `yaml:"domains" json:"domains" mapstructure:"domains"` - CertsDir string `yaml:"certs_dir" json:"certs_dir" mapstructure:"certs_dir"` + Address string `default:"0.0.0.0" yaml:"address" json:"address" mapstructure:"address"` + Port int `default:"8881" yaml:"port" json:"port" mapstructure:"port"` + ResolverName string `yaml:"resolver_name" json:"resolver_name" mapstructure:"resolver_name"` + Domains []string `yaml:"domains" json:"domains" mapstructure:"domains"` + CertsDir string `yaml:"certs_dir" json:"certs_dir" mapstructure:"certs_dir"` } // ConfigSource diff --git a/pkg/model/filter.go b/pkg/model/filter.go index 4214bf6ca..a03abe412 100644 --- a/pkg/model/filter.go +++ b/pkg/model/filter.go @@ -19,12 +19,11 @@ package model // FilterChain filter chain type FilterChain struct { - FilterChainMatch FilterChainMatch `yaml:"filter_chain_match" json:"filter_chain_match" mapstructure:"filter_chain_match"` - Filters []Filter `yaml:"filters" json:"filters" mapstructure:"filters"` + Filters []NetworkFilter `yaml:"filters" json:"filters" mapstructure:"filters"` } -// Filter core struct, filter is extend by user -type Filter struct { +// NetworkFilter core struct, filter is extend by user +type NetworkFilter struct { Name string `yaml:"name" json:"name" mapstructure:"name"` // Name filter name unique Config map[string]interface{} `yaml:"config" json:"config" mapstructure:"config"` // Config filter config } diff --git a/pkg/model/http.go b/pkg/model/http.go index aa944d9f6..05ba60ba5 100644 --- a/pkg/model/http.go +++ b/pkg/model/http.go @@ -34,6 +34,11 @@ type HttpConnectionManagerConfig struct { GenerateRequestID bool `yaml:"generate_request_id" json:"generate_request_id" mapstructure:"generate_request_id"` } +// GRPCConnectionManagerConfig +type GRPCConnectionManagerConfig struct { + RouteConfig RouteConfiguration `yaml:"route_config" json:"route_config" mapstructure:"route_config"` +} + // HTTPFilter http filter type HTTPFilter struct { Name string `yaml:"name" json:"name" mapstructure:"name"` diff --git a/pkg/model/listener.go b/pkg/model/listener.go index 730f00d05..73ea0f61c 100644 --- a/pkg/model/listener.go +++ b/pkg/model/listener.go @@ -17,11 +17,54 @@ package model -// Listener is a server, listener a port -type Listener struct { - Name string `yaml:"name" json:"name" mapstructure:"name"` - Address Address `yaml:"address" json:"address" mapstructure:"address"` +const ( + ProtocolTypeHTTP ProtocolType = 0 + iota // support for 1.0 + ProtocolTypeTCP + ProtocolTypeUDP + ProtocolTypeHTTPS + ProtocolTypeGRPC + ProtocolTypeHTTP2 +) - FilterChains []FilterChain `yaml:"filter_chains" json:"filter_chains" mapstructure:"filter_chains"` - Config interface{} `yaml:"config" json:"config" mapstructure:"config"` -} +const ( + REST_VALUE = "REST" + GRPC_VALUE = "GRPC" + DUBBO_VALUE = "DUBBO" +) + +var ( + // ProtocolTypeName enum seq to protocol type name + ProtocolTypeName = map[int32]string{ + 0: "HTTP", + 1: "TCP", + 2: "UDP", + 3: "HTTPS", + 4: "GRPC", + 5: "HTTP2", + } + + // ProtocolTypeValue protocol type name to enum seq + ProtocolTypeValue = map[string]int32{ + "HTTP": 0, + "TCP": 1, + "UDP": 2, + "HTTPS": 3, + "GRPC": 4, + "HTTP2": 5, + } +) + +type ( + // ProtocolType protocol type enum + ProtocolType int32 + + // Listener is a server, listener a port + Listener struct { + Name string `yaml:"name" json:"name" mapstructure:"name"` + Address Address `yaml:"address" json:"address" mapstructure:"address"` + ProtocolStr string `default:"http" yaml:"protocol_type" json:"protocol_type" mapstructure:"protocol_type"` + Protocol ProtocolType `default:"http" yaml:"omitempty" json:"omitempty"` + FilterChain FilterChain `yaml:"filter_chains" json:"filter_chains" mapstructure:"filter_chains"` + Config interface{} `yaml:"config" json:"config" mapstructure:"config"` + } +) diff --git a/pkg/model/router.go b/pkg/model/router.go index 5314418ca..af8fc6f1b 100644 --- a/pkg/model/router.go +++ b/pkg/model/router.go @@ -71,14 +71,14 @@ type ( } ) -func (rc *RouteConfiguration) Route(req *stdHttp.Request) (*RouteAction, error) { +func (rc *RouteConfiguration) RouteByPathAndMethod(path, method string) (*RouteAction, error) { if rc.RouteTrie.IsEmpty() { return nil, errors.Errorf("router configuration is empty") } - node, _, _ := rc.RouteTrie.Match(stringutil.GetTrieKey(req.Method, req.URL.Path)) + node, _, _ := rc.RouteTrie.Match(stringutil.GetTrieKey(method, path)) if node == nil { - return nil, errors.Errorf("route failed for %s,no rules matched.", stringutil.GetTrieKey(req.Method, req.URL.Path)) + return nil, errors.Errorf("route failed for %s,no rules matched.", stringutil.GetTrieKey(method, path)) } if node.GetBizInfo() == nil { return nil, errors.Errorf("info is nil.please check your configuration.") @@ -86,3 +86,7 @@ func (rc *RouteConfiguration) Route(req *stdHttp.Request) (*RouteAction, error) ret := (node.GetBizInfo()).(RouteAction) return &ret, nil } + +func (rc *RouteConfiguration) Route(req *stdHttp.Request) (*RouteAction, error) { + return rc.RouteByPathAndMethod(req.URL.Path, req.Method) +} diff --git a/pkg/pluginregistry/registry.go b/pkg/pluginregistry/registry.go index 367d39415..df9f33e37 100644 --- a/pkg/pluginregistry/registry.go +++ b/pkg/pluginregistry/registry.go @@ -34,7 +34,10 @@ import ( _ "github.com/apache/dubbo-go-pixiu/pkg/filter/http/proxyrewrite" _ "github.com/apache/dubbo-go-pixiu/pkg/filter/http/remote" _ "github.com/apache/dubbo-go-pixiu/pkg/filter/metric" + _ "github.com/apache/dubbo-go-pixiu/pkg/filter/network/grpcconnectionmanager" _ "github.com/apache/dubbo-go-pixiu/pkg/filter/network/httpconnectionmanager" _ "github.com/apache/dubbo-go-pixiu/pkg/filter/seata" _ "github.com/apache/dubbo-go-pixiu/pkg/filter/tracing" + _ "github.com/apache/dubbo-go-pixiu/pkg/listener/http" + _ "github.com/apache/dubbo-go-pixiu/pkg/listener/http2" ) diff --git a/pkg/server/listener_manager.go b/pkg/server/listener_manager.go index 9bee77d9c..3c433fe25 100644 --- a/pkg/server/listener_manager.go +++ b/pkg/server/listener_manager.go @@ -18,19 +18,26 @@ package server import ( + "github.com/apache/dubbo-go-pixiu/pkg/listener" + "github.com/apache/dubbo-go-pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pkg/model" ) +// ListenerManager the listener manager type ListenerManager struct { activeListener []*model.Listener - activeListenerService []*ListenerService + activeListenerService []listener.ListenerService } +// CreateDefaultListenerManager create listener manager from config func CreateDefaultListenerManager(bs *model.Bootstrap) *ListenerManager { sl := bs.GetStaticListeners() - var ls []*ListenerService + var ls []listener.ListenerService for _, l := range bs.StaticResources.Listeners { - listener := CreateListenerService(l, bs) + listener, err := listener.CreateListenerService(l, bs) + if err != nil { + logger.Error("CreateDefaultListenerManager %s error: %v", l.Name, err) + } ls = append(ls, listener) } @@ -44,21 +51,15 @@ func (lm *ListenerManager) addOrUpdateListener(l *model.Listener) { lm.activeListener = append(lm.activeListener, l) } +// StartListen make active listener to start listening func (lm *ListenerManager) StartListen() { for _, s := range lm.activeListenerService { - go s.Start() - } -} - -func (lm *ListenerManager) addListenerService(ls *ListenerService) { - lm.activeListenerService = append(lm.activeListenerService, ls) -} - -func (lm *ListenerManager) GetListenerService(name string) *ListenerService { - for i := range lm.activeListenerService { - if lm.activeListenerService[i].cfg.Name == name { - return lm.activeListenerService[i] - } + s := s + go func() { + err := s.Start() + if err != nil { + panic(err) + } + }() } - return nil } diff --git a/samples/dubbogo/http/pixiu/conf.yaml b/samples/dubbogo/http/pixiu/conf.yaml index 24ae301be..e7ddc6a73 100644 --- a/samples/dubbogo/http/pixiu/conf.yaml +++ b/samples/dubbogo/http/pixiu/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8888 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/dubbogo/multi/config/conf.yaml b/samples/dubbogo/multi/config/conf.yaml index e1f0eb8fb..bcb21ed17 100644 --- a/samples/dubbogo/multi/config/conf.yaml +++ b/samples/dubbogo/multi/config/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8882 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/dubbogo/simple/body/pixiu/conf.yaml b/samples/dubbogo/simple/body/pixiu/conf.yaml index 0aa215d7e..1d582030c 100644 --- a/samples/dubbogo/simple/body/pixiu/conf.yaml +++ b/samples/dubbogo/simple/body/pixiu/conf.yaml @@ -21,7 +21,7 @@ static_resources: listeners: - name: "net/http" filter_chains: - - filters: + filters: - name: dgp.filter.httpconnectionmanager config: route_config: diff --git a/samples/dubbogo/simple/csrf/pixiu/conf.yaml b/samples/dubbogo/simple/csrf/pixiu/conf.yaml index ea6eea071..f6b28a55c 100644 --- a/samples/dubbogo/simple/csrf/pixiu/conf.yaml +++ b/samples/dubbogo/simple/csrf/pixiu/conf.yaml @@ -22,14 +22,9 @@ static_resources: - name: "net/http" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8888 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/dubbogo/simple/jaeger/pixiu/conf.yaml b/samples/dubbogo/simple/jaeger/pixiu/conf.yaml index 2199759b4..eb85ee5bc 100644 --- a/samples/dubbogo/simple/jaeger/pixiu/conf.yaml +++ b/samples/dubbogo/simple/jaeger/pixiu/conf.yaml @@ -21,7 +21,7 @@ static_resources: listeners: - name: "net/http" filter_chains: - - filters: + filters: - name: dgp.filter.httpconnectionmanager config: route_config: diff --git a/samples/dubbogo/simple/mix/pixiu/conf.yaml b/samples/dubbogo/simple/mix/pixiu/conf.yaml index d241130e7..c5cb2adae 100644 --- a/samples/dubbogo/simple/mix/pixiu/conf.yaml +++ b/samples/dubbogo/simple/mix/pixiu/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8882 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/dubbogo/simple/proxy/pixiu/conf.yaml b/samples/dubbogo/simple/proxy/pixiu/conf.yaml index a5c89c5dd..be9e70d93 100644 --- a/samples/dubbogo/simple/proxy/pixiu/conf.yaml +++ b/samples/dubbogo/simple/proxy/pixiu/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8883 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/dubbogo/simple/query/pixiu/conf.yaml b/samples/dubbogo/simple/query/pixiu/conf.yaml index 7f2386cdc..d300d8c07 100644 --- a/samples/dubbogo/simple/query/pixiu/conf.yaml +++ b/samples/dubbogo/simple/query/pixiu/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8884 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/dubbogo/simple/registry/dist/darwin_amd64/pixiuconf/conf.yaml b/samples/dubbogo/simple/registry/dist/darwin_amd64/pixiuconf/conf.yaml new file mode 100644 index 000000000..1c2bdabd9 --- /dev/null +++ b/samples/dubbogo/simple/registry/dist/darwin_amd64/pixiuconf/conf.yaml @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +static_resources: + listeners: + - name: "net/http" + address: + socket_address: + protocol_type: "HTTP" + address: "0.0.0.0" + port: 8881 + filter_chains: + - filter_chain_match: + domains: + - api.dubbo.com + - api.pixiu.com + filters: + - name: dgp.filter.httpconnectionmanager + config: + route_config: + routes: + - match: + prefix: "/" + route: + cluster: "test-dubbo" + cluster_not_found_response_code: 505 + http_filters: + - name: dgp.filter.http.apiconfig + config: + dynamic: true + dynamic_adapter: test + - name: dgp.filter.http.dubboproxy + config: + dubboProxyConfig: + registries: + "zookeeper": + protocol: "zookeeper" + timeout: "3s" + address: "127.0.0.1:2181" + username: "" + password: "" + timeout_config: + connect_timeout: 5s + request_timeout: 5s + - name: dgp.filter.http.response + config: + server_name: "test_http_dubbo" + generate_request_id: false + config: + idle_timeout: 5s + read_timeout: 5s + write_timeout: 5s + clusters: + - name: "test-dubbo" + lb_policy: "RoundRobin" + registries: + "zookeeper": + timeout: "3s" + address: "127.0.0.1:2181" + username: "" + password: "" + shutdown_config: + timeout: "60s" + step_timeout: "10s" + reject_policy: "immediacy" + adapters: + - id: test + name: dgp.adapter.dubboregistrycenter + config: + registries: + "zookeeper": + protocol: zookeeper + address: "127.0.0.1:2181" + timeout: "5s" diff --git a/samples/dubbogo/simple/registry/dist/darwin_amd64/server/log.yml b/samples/dubbogo/simple/registry/dist/darwin_amd64/server/log.yml new file mode 100644 index 000000000..9330cda17 --- /dev/null +++ b/samples/dubbogo/simple/registry/dist/darwin_amd64/server/log.yml @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +level: "debug" +development: true +disableCaller: false +disableStacktrace: false +sampling: +encoding: "console" + +# encoder +encoderConfig: + messageKey: "message" + levelKey: "level" + timeKey: "time" + nameKey: "logger" + callerKey: "caller" + stacktraceKey: "stacktrace" + lineEnding: "" + levelEncoder: "capitalColor" + timeEncoder: "iso8601" + durationEncoder: "seconds" + callerEncoder: "short" + nameEncoder: "" + +outputPaths: + - "stderr" +errorOutputPaths: + - "stderr" +initialFields: diff --git a/samples/dubbogo/simple/registry/dist/darwin_amd64/server/server.yml b/samples/dubbogo/simple/registry/dist/darwin_amd64/server/server.yml new file mode 100644 index 000000000..1fbfa468b --- /dev/null +++ b/samples/dubbogo/simple/registry/dist/darwin_amd64/server/server.yml @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# dubbo server yaml configure file +# application config +dubbo: + application: + name: BDTService + registries: + zk: + protocol: zookeeper + timeout: 3s + address: 127.0.0.1:2181 + protocols: + dubbo: + name: dubbo + port: 20000 + provider: + registry-ids: zk + services: + UserProvider: + group: test + version: 1.0.0 + cluster: test_dubbo + serialization: hessian2 + interface: com.dubbogo.pixiu.UserService diff --git a/samples/dubbogo/simple/registry/pixiu/conf.yaml b/samples/dubbogo/simple/registry/pixiu/conf.yaml index 311274276..f48327bc2 100644 --- a/samples/dubbogo/simple/registry/pixiu/conf.yaml +++ b/samples/dubbogo/simple/registry/pixiu/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8881 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/dubbogo/simple/triple/pixiu/conf.yaml b/samples/dubbogo/simple/triple/pixiu/conf.yaml index 652310e27..9f5217318 100644 --- a/samples/dubbogo/simple/triple/pixiu/conf.yaml +++ b/samples/dubbogo/simple/triple/pixiu/conf.yaml @@ -20,13 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8881 filter_chains: - - filter_chain_match: filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/dubbogo/simple/uri/conf.yaml b/samples/dubbogo/simple/uri/conf.yaml index af60e205d..2e162b829 100644 --- a/samples/dubbogo/simple/uri/conf.yaml +++ b/samples/dubbogo/simple/uri/conf.yaml @@ -20,9 +20,9 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8885 config: diff --git a/samples/dubbogo/simple/uri/pixiu/conf.yaml b/samples/dubbogo/simple/uri/pixiu/conf.yaml index ecab81dd6..f40bf358a 100644 --- a/samples/dubbogo/simple/uri/pixiu/conf.yaml +++ b/samples/dubbogo/simple/uri/pixiu/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8885 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/grpc/pixiu/conf.yaml b/samples/grpc/pixiu/conf.yaml new file mode 100644 index 000000000..84fdad573 --- /dev/null +++ b/samples/grpc/pixiu/conf.yaml @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +static_resources: + listeners: + - name: "net/http" + protocol_type: "HTTP2" + address: + socket_address: + address: "0.0.0.0" + port: 8881 + filter_chains: + filters: + - name: dgp.filter.grpcconnectionmanager + config: + route_config: + routes: + - match: + prefix: "/provider.UserProvider/" + route: + cluster: "test-grpc" + cluster_not_found_response_code: 505 + config: + idle_timeout: 5s + read_timeout: 5s + write_timeout: 5s + clusters: + - name: "test-grpc" + lb_policy: "RoundRobin" + endpoints: + - socket_address: + address: 127.0.0.1 + port: 50001 + protocol_type: "GRPC" + shutdown_config: + timeout: "60s" + step_timeout: "10s" + reject_policy: "immediacy" \ No newline at end of file diff --git a/samples/grpc/proto/hello_grpc.pb.go b/samples/grpc/proto/hello_grpc.pb.go new file mode 100644 index 000000000..7403d6e8b --- /dev/null +++ b/samples/grpc/proto/hello_grpc.pb.go @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// protoc --proto_path=. --go_out=. --go-grpc_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative .\hello_grpc.proto + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.15.0--rc2 +// source: hello_grpc.proto + +package proto + +import ( + reflect "reflect" + sync "sync" +) + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type GetUserRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + UserId int32 `protobuf:"varint,1,opt,name=userId,proto3" json:"userId,omitempty"` +} + +func (x *GetUserRequest) Reset() { + *x = GetUserRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_hello_grpc_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetUserRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetUserRequest) ProtoMessage() {} + +func (x *GetUserRequest) ProtoReflect() protoreflect.Message { + mi := &file_hello_grpc_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetUserRequest.ProtoReflect.Descriptor instead. +func (*GetUserRequest) Descriptor() ([]byte, []int) { + return file_hello_grpc_proto_rawDescGZIP(), []int{0} +} + +func (x *GetUserRequest) GetUserId() int32 { + if x != nil { + return x.UserId + } + return 0 +} + +type GetUserResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` + Users []*User `protobuf:"bytes,2,rep,name=users,proto3" json:"users,omitempty"` +} + +func (x *GetUserResponse) Reset() { + *x = GetUserResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_hello_grpc_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetUserResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetUserResponse) ProtoMessage() {} + +func (x *GetUserResponse) ProtoReflect() protoreflect.Message { + mi := &file_hello_grpc_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetUserResponse.ProtoReflect.Descriptor instead. +func (*GetUserResponse) Descriptor() ([]byte, []int) { + return file_hello_grpc_proto_rawDescGZIP(), []int{1} +} + +func (x *GetUserResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +func (x *GetUserResponse) GetUsers() []*User { + if x != nil { + return x.Users + } + return nil +} + +type User struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + UserId int32 `protobuf:"varint,1,opt,name=userId,proto3" json:"userId,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` +} + +func (x *User) Reset() { + *x = User{} + if protoimpl.UnsafeEnabled { + mi := &file_hello_grpc_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *User) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*User) ProtoMessage() {} + +func (x *User) ProtoReflect() protoreflect.Message { + mi := &file_hello_grpc_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use User.ProtoReflect.Descriptor instead. +func (*User) Descriptor() ([]byte, []int) { + return file_hello_grpc_proto_rawDescGZIP(), []int{2} +} + +func (x *User) GetUserId() int32 { + if x != nil { + return x.UserId + } + return 0 +} + +func (x *User) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +var File_hello_grpc_proto protoreflect.FileDescriptor + +var file_hello_grpc_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x5f, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x22, 0x28, 0x0a, 0x0e, + 0x47, 0x65, 0x74, 0x55, 0x73, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, + 0x0a, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, + 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, 0x22, 0x51, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x55, 0x73, 0x65, + 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x12, 0x24, 0x0a, 0x05, 0x75, 0x73, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x2e, 0x55, 0x73, + 0x65, 0x72, 0x52, 0x05, 0x75, 0x73, 0x65, 0x72, 0x73, 0x22, 0x32, 0x0a, 0x04, 0x55, 0x73, 0x65, + 0x72, 0x12, 0x16, 0x0a, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x32, 0x4e, 0x0a, + 0x0c, 0x55, 0x73, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x12, 0x3e, 0x0a, + 0x07, 0x47, 0x65, 0x74, 0x55, 0x73, 0x65, 0x72, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x76, 0x69, + 0x64, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x55, 0x73, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x2e, 0x47, 0x65, + 0x74, 0x55, 0x73, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x3a, 0x5a, + 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, + 0x68, 0x65, 0x2f, 0x64, 0x75, 0x62, 0x62, 0x6f, 0x2d, 0x67, 0x6f, 0x2d, 0x70, 0x69, 0x78, 0x69, + 0x75, 0x2f, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x2f, 0x68, 0x74, 0x74, 0x70, 0x2f, 0x67, + 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, +} + +var ( + file_hello_grpc_proto_rawDescOnce sync.Once + file_hello_grpc_proto_rawDescData = file_hello_grpc_proto_rawDesc +) + +func file_hello_grpc_proto_rawDescGZIP() []byte { + file_hello_grpc_proto_rawDescOnce.Do(func() { + file_hello_grpc_proto_rawDescData = protoimpl.X.CompressGZIP(file_hello_grpc_proto_rawDescData) + }) + return file_hello_grpc_proto_rawDescData +} + +var file_hello_grpc_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_hello_grpc_proto_goTypes = []interface{}{ + (*GetUserRequest)(nil), // 0: provider.GetUserRequest + (*GetUserResponse)(nil), // 1: provider.GetUserResponse + (*User)(nil), // 2: provider.User +} +var file_hello_grpc_proto_depIdxs = []int32{ + 2, // 0: provider.GetUserResponse.users:type_name -> provider.User + 0, // 1: provider.UserProvider.GetUser:input_type -> provider.GetUserRequest + 1, // 2: provider.UserProvider.GetUser:output_type -> provider.GetUserResponse + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_hello_grpc_proto_init() } +func file_hello_grpc_proto_init() { + if File_hello_grpc_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_hello_grpc_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetUserRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_hello_grpc_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetUserResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_hello_grpc_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*User); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_hello_grpc_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_hello_grpc_proto_goTypes, + DependencyIndexes: file_hello_grpc_proto_depIdxs, + MessageInfos: file_hello_grpc_proto_msgTypes, + }.Build() + File_hello_grpc_proto = out.File + file_hello_grpc_proto_rawDesc = nil + file_hello_grpc_proto_goTypes = nil + file_hello_grpc_proto_depIdxs = nil +} diff --git a/samples/grpc/proto/hello_grpc.proto b/samples/grpc/proto/hello_grpc.proto new file mode 100644 index 000000000..f3d4bcba6 --- /dev/null +++ b/samples/grpc/proto/hello_grpc.proto @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +// protoc --proto_path=. --go_out=. --go-grpc_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative .\hello_grpc.proto +syntax = "proto3"; + +option go_package = "github.com/apache/dubbo-go-pixiu/samples/http/grpc/proto"; + +package provider; + +service UserProvider { + rpc GetUser (GetUserRequest) returns (GetUserResponse); +} + +message GetUserRequest { + int32 userId = 1; +} + +message GetUserResponse { + string message = 1; + repeated User users = 2; +} + +message User { + int32 userId = 1; + string name = 2; +} \ No newline at end of file diff --git a/samples/grpc/proto/hello_grpc_grpc.pb.go b/samples/grpc/proto/hello_grpc_grpc.pb.go new file mode 100644 index 000000000..48b5f4c2e --- /dev/null +++ b/samples/grpc/proto/hello_grpc_grpc.pb.go @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package proto + +import ( + context "context" +) + +import ( + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion7 + +// UserProviderClient is the client API for UserProvider service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type UserProviderClient interface { + GetUser(ctx context.Context, in *GetUserRequest, opts ...grpc.CallOption) (*GetUserResponse, error) +} + +type userProviderClient struct { + cc grpc.ClientConnInterface +} + +func NewUserProviderClient(cc grpc.ClientConnInterface) UserProviderClient { + return &userProviderClient{cc} +} + +func (c *userProviderClient) GetUser(ctx context.Context, in *GetUserRequest, opts ...grpc.CallOption) (*GetUserResponse, error) { + out := new(GetUserResponse) + err := c.cc.Invoke(ctx, "/provider.UserProvider/GetUser", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// UserProviderServer is the server API for UserProvider service. +// All implementations must embed UnimplementedUserProviderServer +// for forward compatibility +type UserProviderServer interface { + GetUser(context.Context, *GetUserRequest) (*GetUserResponse, error) + mustEmbedUnimplementedUserProviderServer() +} + +// UnimplementedUserProviderServer must be embedded to have forward compatible implementations. +type UnimplementedUserProviderServer struct { +} + +func (UnimplementedUserProviderServer) GetUser(context.Context, *GetUserRequest) (*GetUserResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetUser not implemented") +} +func (UnimplementedUserProviderServer) mustEmbedUnimplementedUserProviderServer() {} + +// UnsafeUserProviderServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to UserProviderServer will +// result in compilation errors. +type UnsafeUserProviderServer interface { + mustEmbedUnimplementedUserProviderServer() +} + +func RegisterUserProviderServer(s *grpc.Server, srv UserProviderServer) { + s.RegisterService(&_UserProvider_serviceDesc, srv) +} + +func _UserProvider_GetUser_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetUserRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(UserProviderServer).GetUser(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/provider.UserProvider/GetUser", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(UserProviderServer).GetUser(ctx, req.(*GetUserRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _UserProvider_serviceDesc = grpc.ServiceDesc{ + ServiceName: "provider.UserProvider", + HandlerType: (*UserProviderServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetUser", + Handler: _UserProvider_GetUser_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "hello_grpc.proto", +} diff --git a/samples/grpc/server/app/server.go b/samples/grpc/server/app/server.go new file mode 100644 index 000000000..776d4ccb2 --- /dev/null +++ b/samples/grpc/server/app/server.go @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "net" +) + +import ( + "google.golang.org/grpc" +) + +import ( + "github.com/apache/dubbo-go-pixiu/pkg/logger" + "github.com/apache/dubbo-go-pixiu/samples/http/grpc/proto" +) + +const ( + MsgUserNotFound = "user not found" + MsgUserQuerySuccessfully = "user(s) query successfully" +) + +// Test Cases +// curl http://127.0.0.1:8881/api/v1/provider.UserProvider/GetUser +// curl http://127.0.0.1:8881/api/v1/provider.UserProvider/GetUser -X POST -d '{"userId":1}' + +type server struct { + users map[int32]*proto.User + proto.UnimplementedUserProviderServer +} + +func (s *server) GetUser(ctx context.Context, request *proto.GetUserRequest) (*proto.GetUserResponse, error) { + us := make([]*proto.User, 0) + if request.GetUserId() == 0 { + for i := 1; i <= 2; i++ { + us = append(us, s.users[int32(i)]) + } + } else { + u, ok := s.users[request.GetUserId()] + if !ok { + return &proto.GetUserResponse{Message: MsgUserNotFound}, nil + } + us = append(us, u) + } + return &proto.GetUserResponse{Message: MsgUserQuerySuccessfully, Users: us}, nil +} + +func initUsers(s *server) { + s.users[1] = &proto.User{UserId: 1, Name: "Kenway"} + s.users[2] = &proto.User{UserId: 2, Name: "Ken"} +} + +func main() { + l, err := net.Listen("tcp", ":50001") //nolint:gosec + if err != nil { + panic(err) + } + + s := &server{users: make(map[int32]*proto.User)} + initUsers(s) + + gs := grpc.NewServer() + proto.RegisterUserProviderServer(gs, s) + logger.Info("grpc test server is now running...") + err = gs.Serve(l) + if err != nil { + panic(err) + } +} diff --git a/samples/grpc/test/pixiu_test.go b/samples/grpc/test/pixiu_test.go new file mode 100644 index 000000000..250f75ee1 --- /dev/null +++ b/samples/grpc/test/pixiu_test.go @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test + +import ( + "context" + "flag" + "log" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +import ( + pb "github.com/apache/dubbo-go-pixiu/samples/http/grpc/proto" +) + +var ( + addr = flag.String("addr", "localhost:8881", "the address to connect to") +) + +func TestGet(t *testing.T) { + flag.Parse() + // Set up a connection to the server. + conn, err := grpc.Dial(*addr, grpc.WithInsecure()) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + defer conn.Close() + c := pb.NewUserProviderClient(conn) + + // Contact the server and print out its response. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + r, err := c.GetUser(ctx, &pb.GetUserRequest{UserId: 1}) + assert.NoError(t, err) + assert.Equal(t, "user(s) query successfully", r.Message) +} diff --git a/samples/http/grpc/pixiu/conf.yaml b/samples/http/grpc/pixiu/conf.yaml index 5b6faab12..0489aea5f 100644 --- a/samples/http/grpc/pixiu/conf.yaml +++ b/samples/http/grpc/pixiu/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8881 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/http/simple/pixiu/conf.yaml b/samples/http/simple/pixiu/conf.yaml index 9eebd8472..ad238c180 100644 --- a/samples/http/simple/pixiu/conf.yaml +++ b/samples/http/simple/pixiu/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8888 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/https/pixiu/conf.yaml b/samples/https/pixiu/conf.yaml index 334dc736c..2515f5a13 100644 --- a/samples/https/pixiu/conf.yaml +++ b/samples/https/pixiu/conf.yaml @@ -20,19 +20,15 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTPS" address: socket_address: - protocol_type: "HTTPS" domains: - "sample.domain.com" - "sample.domain-1.com" - "sample.domain-2.com" certs_dir: $PROJECT_DIR/cert filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/seata/gateway/conf.yaml b/samples/seata/gateway/conf.yaml index 9fca54c4a..dea415351 100644 --- a/samples/seata/gateway/conf.yaml +++ b/samples/seata/gateway/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 2046 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: @@ -62,16 +58,12 @@ static_resources: read_timeout: 5s write_timeout: 5s - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 2047 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: @@ -105,16 +97,12 @@ static_resources: read_timeout: 5s write_timeout: 5s - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 2048 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/seata/sidecar/server_a/conf.yaml b/samples/seata/sidecar/server_a/conf.yaml index dd57eac28..fb24d51bd 100644 --- a/samples/seata/sidecar/server_a/conf.yaml +++ b/samples/seata/sidecar/server_a/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 2046 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/seata/sidecar/server_b/conf.yaml b/samples/seata/sidecar/server_b/conf.yaml index 60e75f392..2977ad613 100644 --- a/samples/seata/sidecar/server_b/conf.yaml +++ b/samples/seata/sidecar/server_b/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 2047 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/seata/sidecar/server_c/conf.yaml b/samples/seata/sidecar/server_c/conf.yaml index 66cc3d391..81d93ab2e 100644 --- a/samples/seata/sidecar/server_c/conf.yaml +++ b/samples/seata/sidecar/server_c/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 2048 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/samples/springcloud/pixiu/conf.yaml b/samples/springcloud/pixiu/conf.yaml index 897dc9a58..f322eacbf 100644 --- a/samples/springcloud/pixiu/conf.yaml +++ b/samples/springcloud/pixiu/conf.yaml @@ -20,16 +20,12 @@ static_resources: listeners: - name: "net/http" + protocol_type: "HTTP" address: socket_address: - protocol_type: "HTTP" address: "0.0.0.0" port: 8888 filter_chains: - - filter_chain_match: - domains: - - api.dubbo.com - - api.pixiu.com filters: - name: dgp.filter.httpconnectionmanager config: diff --git a/start_integrate_test.sh b/start_integrate_test.sh index f55b57c9a..2d2cc6b70 100755 --- a/start_integrate_test.sh +++ b/start_integrate_test.sh @@ -27,6 +27,8 @@ array+=("samples/dubbogo/http") ##http array+=("samples/http/grpc") array+=("samples/http/simple") +## grpc proxy +array+=("samples/grpc") for((i=0;i<${#array[*]};i++)) do