Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
infra-bot committed Dec 9, 2020
0 parents commit 0af371d
Show file tree
Hide file tree
Showing 10 changed files with 547 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.idea/
go.sum
.DS_Store
example/go.mod
Empty file added example/access.log
Empty file.
Empty file added example/error.log
Empty file.
39 changes: 39 additions & 0 deletions example/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"context"
"fmt"
"github.com/qietv/qgrpc"
"google.golang.org/grpc"
)

type demoServer struct{}

func main() {
s, err := qgrpc.New(&qgrpc.Config{
Name: "demo",
Network: "tcp",
Addr: ":8809",
AccessLog: "access.log",
ErrorLog: "error.log",
Interceptor: nil,
}, func(s *grpc.Server) {
demo.RegisterGRPCDemoServer(s, &demoServer{})
})
if err != nil {
panic(fmt.Sprintf("grpc server start fail, %s", err.Error()))
}
defer s.Server.GracefulStop()

//Test server
conn, err := grpc.Dial("localhost:8809", grpc.WithInsecure())
if err != nil {
panic(err.Error())
}

gRPCClient := demoServer.NewGRPCDemoClient(conn)
ret, err := gRPCClient.GetList(context.Background(), &demo.ListReq{
Start: 0,
Limit: 10,
})
}
19 changes: 19 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module github.com/qietv/qgrpc

go 1.14

require (
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/hanskorg/logkit v1.0.0
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.7.1 // indirect
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible
go.uber.org/atomic v1.6.0 // indirect
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 // indirect
google.golang.org/grpc v1.30.0
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect
)
116 changes: 116 additions & 0 deletions interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package qgrpc

import (
"context"
"fmt"
"github.com/hanskorg/logkit"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"io"
"net"
"os"
"runtime/debug"
"time"
)

// NewLoggingInterceptor create log interceptor
// format {time}\t{origin}\t{method}\t{http code}\t{status.code}\t{status.message}
func NewLoggingInterceptor(logfile string) grpc.UnaryServerInterceptor {
accessLogger, err := logkit.NewFileLogger(logfile, "", time.Second, uint64(1204*1024*1800), 0)
if err != nil {
println("logger conf fail, %s", err.Error())
}
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
var (
startTime time.Time
code int32
message string
remoteIp string
ret *status.Status
durTime float32
)
if accessLogger == nil {
return handler(ctx, req)
}
startTime = time.Now()
resp, err = handler(ctx, req)
if err != nil {
ret = status.Convert(err)
code = int32(ret.Code())
message = ret.Message()
}

if pr, ok := peer.FromContext(ctx); ok {
if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok {
remoteIp = tcpAddr.IP.String()
} else {
remoteIp = pr.Addr.String()
}
}

if md, ok := metadata.FromIncomingContext(ctx); !ok {
rips := md.Get("x-forward-for")
if len(rips) != 0 {
remoteIp = rips[0]
}
}
if remoteIp == "" {
remoteIp = "-"
}
if message == "" {
message = "-"
}
durTime = float32(time.Since(startTime).Microseconds()) / 1000.0
if _, err := accessLogger.Write([]byte(fmt.Sprintf("%s\t%s\t%s\t%s\t%-.3f\t%d\t%s\n", startTime.Format(time.RFC3339), remoteIp, info.FullMethod, "-", durTime, code, message))); err != nil {
println("write access log fail, %s", err.Error())
}
return resp, err
}
}

// NewRecoveryInterceptor grpc server ServerInterceptor
// create union revovery handler for qietv gRPC sevice
func NewRecoveryInterceptor(logfile string) grpc.UnaryServerInterceptor {
var (
errorLogger io.Writer
err error
startTime time.Time
remoteIp string
)

if logfile != "" {
errorLogger, err = logkit.NewFileLogger(logfile, "", time.Second, uint64(1204*1024*1800), 0)
} else {
errorLogger, err = os.Open("/dev/stderr")
}
if err != nil {
println("logger conf fail, %s", err.Error())
}
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {

defer func() {
if r := recover(); r != nil {
if _, err := errorLogger.Write([]byte(fmt.Sprintf("%s\t%s\t%s\n fatal ===> %s %s \n===============\n", startTime.Format(time.RFC3339), remoteIp, info.FullMethod, r, debug.Stack()))); err != nil {
println("write access log fail, %s", err.Error())
}
err = status.Errorf(codes.Internal, "fatal err: %+v", r)
}
}()
return handler(ctx, req)
}
}

// NewTracerInterceptor grpc server ServerInterceptor
func NewTracerInterceptor(serviceName string, agentHost string) grpc.UnaryServerInterceptor {
tracer, _, err := NewTracer(serviceName, agentHost)
if err != nil {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
println("tracer setup fail, %s", err.Error())
return handler(ctx, req)
}
}
return serverInterceptor(tracer)
}
42 changes: 42 additions & 0 deletions pkg/time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package pkg

import (
"encoding/json"
"errors"
"time"
)

type Duration time.Duration

func (d *Duration) UnmarshalText(text []byte) error {
tmp, err := time.ParseDuration(string(text))
if err == nil {
*d = Duration(tmp)
}
return err
}

func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(d).String())
}

func (d *Duration) UnmarshalJSON(b []byte) error {
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return err
}
switch value := v.(type) {
case float64:
*d = Duration(time.Duration(value))
return nil
case string:
tmp, err := time.ParseDuration(value)
if err != nil {
return err
}
*d = Duration(tmp)
return nil
default:
return errors.New("invalid duration")
}
}
151 changes: 151 additions & 0 deletions qgrpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package qgrpc

import (
"context"
"fmt"
"github.com/qietv/qgrpc/pkg"
"google.golang.org/grpc"
health "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
"net"
"sync"
"time"
)

//gRPC Server config for qietv
type Config struct {
Name string `yaml:"name,omitempty"`
Network string `yaml:"network,omitempty"`
Addr string `yaml:"addr,omitempty"`
Timeout pkg.Duration `yaml:"timeout,omitempty"`
IdleTimeout pkg.Duration `yaml:"idleTimeout,omitempty"`
MaxLifeTime pkg.Duration `yaml:"maxLifeTime,omitempty"`
ForceCloseWait pkg.Duration `yaml:"forceCloseWait,omitempty"`
KeepAliveInterval pkg.Duration `yaml:"keepAliveInterval,omitempty"`
KeepAliveTimeout pkg.Duration `yaml:"keepAliveTimeout,omitempty"`
AccessLog string `yaml:"access,omitempty"`
ErrorLog string `yaml:"error,omitempty"`
Interceptor []map[string]interface{} `yaml:"interceptors,omitempty"`
}

//Server gRPC server for qietv mico-service server
type Server struct {
conf *Config
mu sync.Mutex
*grpc.Server
listener net.TCPListener
}

func (s *Server) Check(ctx context.Context, in *health.HealthCheckRequest) (*health.HealthCheckResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
if in.Service == s.conf.Name {
// check the server overall health status.
return &health.HealthCheckResponse{
Status: health.HealthCheckResponse_SERVING,
}, nil
}
return &health.HealthCheckResponse{
Status: health.HealthCheckResponse_UNKNOWN,
}, nil
}

func (s *Server) Watch(req *health.HealthCheckRequest, hW health.Health_WatchServer) error {
return nil
}

func Default(registerFunc func(s *grpc.Server)) (s *Server, err error) {
return New(&Config{
Name: "qgrpc",
Network: "tcp",
Addr: ":8808",
Timeout: pkg.Duration(time.Second * 20),
IdleTimeout: 0,
MaxLifeTime: 0,
ForceCloseWait: 0,
KeepAliveInterval: 0,
KeepAliveTimeout: 0,
AccessLog: "./access.log",
ErrorLog: "./error.log",
Interceptor: nil,
}, registerFunc)
}

// NewServer creates a gRPC server for qietv's mico service Server
// err when listen fail
func New(c *Config, registerFunc func(s *grpc.Server)) (s *Server, err error) {
var (
listener net.Listener
)

s = &Server{
Server: grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: time.Duration(c.IdleTimeout),
//MaxConnectionAgeGrace: time.Duration(c.ForceCloseWait),
Time: time.Duration(c.KeepAliveInterval),
Timeout: time.Duration(c.Timeout),
MaxConnectionAge: time.Duration(c.MaxLifeTime),
}),
initInterceptor(c.Name, c.AccessLog, c.ErrorLog, c.Interceptor),
),
}
listener, err = net.Listen(c.Network, c.Addr)
if err != nil {
err = fmt.Errorf("create server fail, %s", err.Error())
return
}
registerFunc(s.Server)
health.RegisterHealthServer(s.Server, s)
go func() {
err = s.Serve(listener)
if err != nil {
err = fmt.Errorf("grpc server fail, %s", err.Error())
}
}()
return
}

func initInterceptor(serviceName, access, error string, interceptors []map[string]interface{}) grpc.ServerOption {
var (
chain []grpc.UnaryServerInterceptor
)
//init interceptor chain
for _, interceptor := range interceptors {
var (
name interface{}
has bool
)
if name, has = interceptor["name"]; !has {
println("qgRPC interceptor conf fail, %+v", interceptor)
continue
}
switch name {
case "trace":
var (
service interface{}
tracer interface{}
ok bool
)
if service, ok = interceptor["service"]; !ok {
if serviceName == "" {
println("qgRPC trace conf fail, %+v", interceptor)
continue
}
service = serviceName
}
if tracer, ok = interceptor["tracer"]; !ok {
println("qgRPC trace conf fail, %+v", interceptor)
continue
}
chain = append(chain, NewTracerInterceptor(service.(string), tracer.(string)))
default:
println("qgRPC interceptor not support, %+v", interceptor)
}
}
if access != "" {
chain = append(chain, NewLoggingInterceptor(access))
}
chain = append(chain, NewRecoveryInterceptor(error))
return grpc.ChainUnaryInterceptor(chain...)
}
Loading

0 comments on commit 0af371d

Please sign in to comment.