Skip to content

Commit

Permalink
Merge pull request #99 from dongxuny/dongxuny
Browse files Browse the repository at this point in the history
Add gwPort as boot fields
  • Loading branch information
dongxuny authored Oct 24, 2023
2 parents d4d0cc5 + ad37d03 commit 6a2211d
Show file tree
Hide file tree
Showing 6 changed files with 1,238 additions and 75 deletions.
202 changes: 128 additions & 74 deletions boot/grpc_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rookie-ninja/rk-entry/v2/entry"
rkerror "github.com/rookie-ninja/rk-entry/v2/error"
"github.com/rookie-ninja/rk-entry/v2/error"
"github.com/rookie-ninja/rk-entry/v2/middleware"
"github.com/rookie-ninja/rk-entry/v2/middleware/auth"
"github.com/rookie-ninja/rk-entry/v2/middleware/cors"
Expand Down Expand Up @@ -77,6 +77,7 @@ type BootConfig struct {
Name string `yaml:"name" json:"name"`
Description string `yaml:"description" json:"description"`
Port uint64 `yaml:"port" json:"port"`
GwPort uint64 `yaml:"gwPort" json:"gwPort"`
Enabled bool `yaml:"enabled" json:"enabled"`
EnableReflection bool `yaml:"enableReflection" json:"enableReflection"`
NoRecvMsgSizeLimit bool `yaml:"noRecvMsgSizeLimit" json:"noRecvMsgSizeLimit"`
Expand Down Expand Up @@ -119,6 +120,7 @@ type GrpcEntry struct {
LoggerEntry *rkentry.LoggerEntry `json:"-" yaml:"-"`
EventEntry *rkentry.EventEntry `json:"-" yaml:"-"`
Port uint64 `json:"-" yaml:"-"`
GwPort uint64 `json:"-" yaml:"-"`
TlsConfig *tls.Config `json:"-" yaml:"-"`
TlsConfigInsecure *tls.Config `json:"-" yaml:"-"`
// GRPC related
Expand Down Expand Up @@ -285,6 +287,7 @@ func RegisterGrpcEntryYAML(raw []byte) map[string]rkentry.Entry {
WithLoggerEntry(loggerEntry),
WithEventEntry(eventEntry),
WithPort(element.Port),
WithGwPort(element.GwPort),
WithGrpcDialOptions(grpcDialOptions...),
WithSwEntry(swEntry),
WithDocsEntry(docsEntry),
Expand Down Expand Up @@ -619,8 +622,13 @@ func (entry *GrpcEntry) Bootstrap(ctx context.Context) {
httpHandler = rkgrpccsrf.Interceptor(httpHandler, entry.gwCsrfOptions...)
}

// 20: set http port if missing
if entry.GwPort < 1 {
entry.GwPort = entry.Port
}

entry.HttpServer = &http.Server{
Addr: "0.0.0.0:" + strconv.FormatUint(entry.Port, 10),
Addr: "0.0.0.0:" + strconv.FormatUint(entry.GwPort, 10),
Handler: h2c.NewHandler(httpHandler, &http2.Server{}),
}

Expand All @@ -639,73 +647,109 @@ func (entry *GrpcEntry) Bootstrap(ctx context.Context) {
}

// 20: Start http server
go func(*GrpcEntry) {
// Create inner listener
conn, err := net.Listen("tcp4", ":"+strconv.FormatUint(entry.Port, 10))
if err != nil {
entry.bootstrapLogOnce.Do(func() {
entry.EventEntry.FinishWithError(event, err)
})
rkentry.ShutdownWithError(err)
}
if entry.Port == entry.GwPort {
// same port, using cmux
go func(*GrpcEntry) {
// Create inner listener
conn, err := net.Listen("tcp4", ":"+strconv.FormatUint(entry.Port, 10))
if err != nil {
entry.bootstrapLogOnce.Do(func() {
entry.EventEntry.FinishWithError(event, err)
})
rkentry.ShutdownWithError(err)
}

// We will use cmux to make grpc and grpc gateway on the same port.
// With cmux, we can init one listener but routes connection based on some rules.
if !entry.IsTlsEnabled() {
// 1: Create a TCP listener with cmux
tcpL := cmux.New(conn)

// 2: If header value of content-type is application/grpc, then it is a grpc request.
// Assign a wrapped listener to grpc connection with cmux
grpcL := tcpL.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))

// 3: Not a grpc connection, we will wrap a http listener.
httpL := tcpL.Match(cmux.HTTP1Fast("PATCH"))

// 4: Start both of grpc and http server
go entry.startGrpcServer(grpcL, logger)
go entry.startHttpServer(httpL, logger)

// 5: Start listener
if err := tcpL.Serve(); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
if err != cmux.ErrListenerClosed {
entry.bootstrapLogOnce.Do(func() {
entry.EventEntry.FinishWithError(event, err)
})
logger.Error("Error occurs while serving TCP listener.", zap.Error(err))
rkentry.ShutdownWithError(err)
// We will use cmux to make grpc and grpc gateway on the same port.
// With cmux, we can init one listener but routes connection based on some rules.
if !entry.IsTlsEnabled() {
// 1: Create a TCP listener with cmux
tcpL := cmux.New(conn)

// 2: If header value of content-type is application/grpc, then it is a grpc request.
// Assign a wrapped listener to grpc connection with cmux
grpcL := tcpL.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))

// 3: Not a grpc connection, we will wrap a http listener.
httpL := tcpL.Match(cmux.HTTP1Fast("PATCH"))

// 4: Start both of grpc and http server
go entry.startGrpcServer(grpcL, logger)
go entry.startHttpServer(httpL, logger)

// 5: Start listener
if err := tcpL.Serve(); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
if err != cmux.ErrListenerClosed {
entry.bootstrapLogOnce.Do(func() {
entry.EventEntry.FinishWithError(event, err)
})
logger.Error("Error occurs while serving TCP listener.", zap.Error(err))
rkentry.ShutdownWithError(err)
}
}
}
} else {
// In this case, we will enable tls
// 1: Create a tls listener with tls config
tlsL := cmux.New(tls.NewListener(conn, entry.TlsConfig))

// 2: If header value of content-type is application/grpc, then it is a grpc request.
// Assign a wrapped listener to grpc connection with cmux
grpcL := tlsL.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))

// 3: Not a grpc connection, we will wrap a http listener.
httpL := tlsL.Match(cmux.HTTP1Fast("PATCH"))

// 4: Start both of grpc and http server
go entry.startGrpcServer(grpcL, logger)
go entry.startHttpServer(httpL, logger)

// 5: Start listener
if err := tlsL.Serve(); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
if err != cmux.ErrListenerClosed {
entry.bootstrapLogOnce.Do(func() {
entry.EventEntry.FinishWithError(event, err)
})
logger.Error("Error occurs while serving TLS listener.", zap.Error(err))
rkentry.ShutdownWithError(err)
} else {
// In this case, we will enable tls
// 1: Create a tls listener with tls config
tlsL := cmux.New(tls.NewListener(conn, entry.TlsConfig))

// 2: If header value of content-type is application/grpc, then it is a grpc request.
// Assign a wrapped listener to grpc connection with cmux
grpcL := tlsL.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))

// 3: Not a grpc connection, we will wrap a http listener.
httpL := tlsL.Match(cmux.HTTP1Fast("PATCH"))

// 4: Start both of grpc and http server
go entry.startGrpcServer(grpcL, logger)
go entry.startHttpServer(httpL, logger)

// 5: Start listener
if err := tlsL.Serve(); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
if err != cmux.ErrListenerClosed {
entry.bootstrapLogOnce.Do(func() {
entry.EventEntry.FinishWithError(event, err)
})
logger.Error("Error occurs while serving TLS listener.", zap.Error(err))
rkentry.ShutdownWithError(err)
}
}
}
}
}(entry)
}(entry)
} else {
go func(*GrpcEntry) {
// Create inner listener
grpcLis, err := net.Listen("tcp4", ":"+strconv.FormatUint(entry.Port, 10))
if err != nil {
entry.bootstrapLogOnce.Do(func() {
entry.EventEntry.FinishWithError(event, err)
})
rkentry.ShutdownWithError(err)
}
gwLis, err := net.Listen("tcp4", ":"+strconv.FormatUint(entry.GwPort, 10))
if err != nil {
entry.bootstrapLogOnce.Do(func() {
entry.EventEntry.FinishWithError(event, err)
})
rkentry.ShutdownWithError(err)
}

// We will use cmux to make grpc and grpc gateway on the same port.
// With cmux, we can init one listener but routes connection based on some rules.
if !entry.IsTlsEnabled() {
// 4: Start both of grpc and http server
go entry.startGrpcServer(grpcLis, logger)
go entry.startHttpServer(gwLis, logger)
} else {
grpcLisTls := tls.NewListener(grpcLis, entry.TlsConfig)
gwLisTls := tls.NewListener(gwLis, entry.TlsConfig)

// 4: Start both of grpc and http server
go entry.startGrpcServer(grpcLisTls, logger)
go entry.startHttpServer(gwLisTls, logger)
}
}(entry)
}

entry.bootstrapLogOnce.Do(func() {
// Print link and logging message
Expand All @@ -714,29 +758,32 @@ func (entry *GrpcEntry) Bootstrap(ctx context.Context) {
scheme = "https"
}

entry.LoggerEntry.Info(fmt.Sprintf("gRPC_port:%d", entry.Port))
entry.LoggerEntry.Info(fmt.Sprintf("gateway_port:%d", entry.GwPort))

if entry.IsSWEnabled() {
entry.LoggerEntry.Info(fmt.Sprintf("SwaggerEntry: %s://localhost:%d%s", scheme, entry.Port, entry.SWEntry.Path))
entry.LoggerEntry.Info(fmt.Sprintf("SwaggerEntry: %s://localhost:%d%s", scheme, entry.GwPort, entry.SWEntry.Path))
}
if entry.IsDocsEnabled() {
entry.LoggerEntry.Info(fmt.Sprintf("DocsEntry: %s://localhost:%d%s", scheme, entry.Port, entry.DocsEntry.Path))
entry.LoggerEntry.Info(fmt.Sprintf("DocsEntry: %s://localhost:%d%s", scheme, entry.GwPort, entry.DocsEntry.Path))
}
if entry.IsPromEnabled() {
entry.LoggerEntry.Info(fmt.Sprintf("PromEntry: %s://localhost:%d%s", scheme, entry.Port, entry.PromEntry.Path))
entry.LoggerEntry.Info(fmt.Sprintf("PromEntry: %s://localhost:%d%s", scheme, entry.GwPort, entry.PromEntry.Path))
}
if entry.IsStaticFileHandlerEnabled() {
entry.LoggerEntry.Info(fmt.Sprintf("StaticFileHandlerEntry: %s://localhost:%d%s", scheme, entry.Port, entry.StaticFileEntry.Path))
entry.LoggerEntry.Info(fmt.Sprintf("StaticFileHandlerEntry: %s://localhost:%d%s", scheme, entry.GwPort, entry.StaticFileEntry.Path))
}
if entry.IsCommonServiceEnabled() {
handlers := []string{
fmt.Sprintf("%s://localhost:%d%s", scheme, entry.Port, entry.CommonServiceEntry.ReadyPath),
fmt.Sprintf("%s://localhost:%d%s", scheme, entry.Port, entry.CommonServiceEntry.AlivePath),
fmt.Sprintf("%s://localhost:%d%s", scheme, entry.Port, entry.CommonServiceEntry.InfoPath),
fmt.Sprintf("%s://localhost:%d%s", scheme, entry.GwPort, entry.CommonServiceEntry.ReadyPath),
fmt.Sprintf("%s://localhost:%d%s", scheme, entry.GwPort, entry.CommonServiceEntry.AlivePath),
fmt.Sprintf("%s://localhost:%d%s", scheme, entry.GwPort, entry.CommonServiceEntry.InfoPath),
}

entry.LoggerEntry.Info(fmt.Sprintf("CommonSreviceEntry: %s", strings.Join(handlers, ", ")))
}
if entry.IsPProfEnabled() {
entry.LoggerEntry.Info(fmt.Sprintf("PProfEntry: %s://localhost:%d%s", scheme, entry.Port, entry.PProfEntry.Path))
entry.LoggerEntry.Info(fmt.Sprintf("PProfEntry: %s://localhost:%d%s", scheme, entry.GwPort, entry.PProfEntry.Path))
}
entry.EventEntry.Finish(event)
})
Expand Down Expand Up @@ -1065,13 +1112,20 @@ func WithEventEntry(logger *rkentry.EventEntry) GrpcEntryOption {
}
}

// WithPort Provide port.
// WithPort Provide grpc port, gateway will use same port if not provided
func WithPort(port uint64) GrpcEntryOption {
return func(entry *GrpcEntry) {
entry.Port = port
}
}

// WithGwPort Provide gateway port, gateway will use same port if not provided
func WithGwPort(gwPort uint64) GrpcEntryOption {
return func(entry *GrpcEntry) {
entry.GwPort = gwPort
}
}

// WithServerOptions Provide grpc.ServerOption.
func WithServerOptions(opts ...grpc.ServerOption) GrpcEntryOption {
return func(entry *GrpcEntry) {
Expand Down
16 changes: 16 additions & 0 deletions example/boot/simple/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM golang:latest AS builder
WORKDIR /build/rk
COPY . /build/rk

RUN go env -w CGO_ENABLED="0" && go build main.go

FROM alpine

ENV WD=/usr/src/rk
WORKDIR $WD

COPY --from=builder /build/rk/main $WD/main
COPY --from=builder /build/rk/boot.yaml $WD/boot.yaml
COPY --from=builder /build/rk/api $WD/api

CMD $WD/main
1 change: 1 addition & 0 deletions example/boot/simple/boot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
grpc:
- name: greeter # Required
port: 8080 # Required
gwPort: 8081 # Optional, default: gateway port will be the same as grpc port if not provided
enabled: true # Required
enableReflection: true # Optional, default: false
enableRkGwOption: true # Optional, default: false
Expand Down
75 changes: 75 additions & 0 deletions example/boot/simple/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
module github.com/rookie-ninja/greeter

go 1.19

replace github.com/rookie-ninja/rk-grpc/v2 => ../../../

require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0
github.com/rookie-ninja/rk-entry/v2 v2.2.18
github.com/rookie-ninja/rk-grpc/v2 v2.2.18
google.golang.org/grpc v1.58.2
google.golang.org/protobuf v1.31.0
)

require (
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang-jwt/jwt/v4 v4.4.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/improbable-eng/grpc-web v0.15.0 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/openzipkin/zipkin-go v0.4.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/prometheus/client_golang v1.13.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rookie-ninja/rk-logger v1.2.13 // indirect
github.com/rookie-ninja/rk-query v1.2.14 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.12.0 // indirect
github.com/subosito/gotenv v1.3.0 // indirect
go.opentelemetry.io/contrib v1.8.0 // indirect
go.opentelemetry.io/otel v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.8.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.8.0 // indirect
go.opentelemetry.io/otel/exporters/zipkin v1.10.0 // indirect
go.opentelemetry.io/otel/sdk v1.10.0 // indirect
go.opentelemetry.io/otel/trace v1.10.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/ratelimit v0.2.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
gopkg.in/ini.v1 v1.66.6 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
nhooyr.io/websocket v1.8.6 // indirect
)
Loading

0 comments on commit 6a2211d

Please sign in to comment.