Skip to content

Commit

Permalink
Merge pull request #10 from vearne/dev
Browse files Browse the repository at this point in the history
增加filter按照method过滤grpc请求
  • Loading branch information
vearne authored Oct 27, 2022
2 parents 16a00b2 + 124a73e commit 2b389ab
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 22 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ and printed in the console
./grpcr --input-file-directory="/tmp/mycapture" --output-stdout --output-grpc="grpc://127.0.0.1:35002"
```

Capture gRPC requests on "127.0.0.1:35001",
keep only requests whose method suffix is Time, and print them in the console
```
./grpcr --input-raw="127.0.0.1:35001" --output-stdout --include-filter-method-match=".*Time$"
```

### The captured request looks like
```
{
Expand Down Expand Up @@ -127,5 +133,5 @@ and [buger/goreplay](https://github.com/buger/goreplay)
* [x] 9)Support for reading GRPC requests from files
* [ ] 10)Support reading GRPC requests from kafka
* [ ] 11)Support for reading GRPC requests from RocketMQ
* [ ] 12)Support custom filter
* [x] 12)Support custom filter
* [ ] 13)support TLS
7 changes: 6 additions & 1 deletion README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ sudo -s
./grpcr --input-file-directory="/tmp/mycapture" --output-stdout --output-grpc="grpc://127.0.0.1:35002"
```

捕获"127.0.0.1:35001"上的gRPC请求,只保留method后缀为Time的请求,并打印在控制台中
```
./grpcr --input-raw="127.0.0.1:35001" --output-stdout --include-filter-method-match=".*Time$"
```

### 捕获的请求形如
```
{
Expand Down Expand Up @@ -125,5 +130,5 @@ export SIMPLE_LOG_LEVEL=debug
* [x] 9)支持从文件中读取GRPC请求
* [ ] 10)支持从kafka中读取GRPC请求
* [ ] 11)支持从RocketMQ中读取GRPC请求
* [ ] 12)支持自定义filter
* [x] 12)支持自定义filter
* [ ] 13)支持TLS
5 changes: 5 additions & 0 deletions biz/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,10 @@ import (
func NewFilterChain(settings *config.AppSettings) (filter.Filter, error) {
c := filter.NewFilterChain()
c.AddExcludeFilters(filter.NewMethodExcludeFilter("grpc.reflection"))

if len(settings.IncludeFilterMethodMatch) > 0 {
f := filter.NewMethodMatchIncludeFilter(settings.IncludeFilterMethodMatch)
c.AddIncludeFilter(f)
}
return c, nil
}
2 changes: 2 additions & 0 deletions config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type AppSettings struct {
OutputFileMaxAge int `json:"output-file-max-age"`

OutputKafkaConfig plugin.OutputKafkaConfig
// --- filter ---
IncludeFilterMethodMatch string `json:"include-filter-method-match"`
// --- other ---
Codec string `json:"codec"`
}
68 changes: 48 additions & 20 deletions example/search_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"log"
"math/rand"
"time"

pb "github.com/vearne/grpcreplay/example/search_proto"
Expand All @@ -26,35 +27,62 @@ func main() {
}
defer conn.Close()

client := pb.NewSearchServiceClient(conn)
counter := 0
for i := 0; i < 1000000; i++ {
if rand.Intn(1000)%2 == 0 {
counter++
sendSearch(client, counter)
} else {
sendCurrTime(client)
}
time.Sleep(10 * time.Second)
}
}

func sendSearch(client pb.SearchServiceClient, i int) {
// add some headers
md := metadata.New(map[string]string{
"testkey1": "testvalue1",
"testkey2": "testvalue2",
})
ctx := metadata.NewOutgoingContext(context.Background(), md)

client := pb.NewSearchServiceClient(conn)
for i := 0; i < 1000000; i++ {
resp, err := client.Search(ctx,
&pb.SearchRequest{
StaffName: "zhangsan",
Age: uint32(i),
Gender: true,
},
)
if err != nil {
statusErr, ok := status.FromError(err)
if ok {
if statusErr.Code() == codes.DeadlineExceeded {
log.Fatalln("client.Search err: deadline")
}
resp, err := client.Search(ctx,
&pb.SearchRequest{
StaffName: "zhangsan",
Age: uint32(i),
Gender: true,
},
)
if err != nil {
statusErr, ok := status.FromError(err)
if ok {
if statusErr.Code() == codes.DeadlineExceeded {
log.Fatalln("client.Search err: deadline")
}

log.Fatalf("client.Search err: %v", err)
}

bt, _ := json.Marshal(resp)
log.Println("resp:", string(bt))
time.Sleep(10 * time.Second)
log.Fatalf("client.Search err: %v", err)
}

bt, _ := json.Marshal(resp)
log.Println("resp:", string(bt))
}

func sendCurrTime(client pb.SearchServiceClient) {
md := metadata.New(map[string]string{
"testkey3": "testvalue3",
"testkey4": "testvalue4",
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
resp, err := client.CurrentTime(
ctx,
&pb.TimeRequest{},
)
if err != nil {
log.Fatalf("client.CurrentTime err: %v", err)
}
bt, _ := json.Marshal(resp)
log.Println("resp:", string(bt))
}
28 changes: 28 additions & 0 deletions filter/include_filter.go
Original file line number Diff line number Diff line change
@@ -1 +1,29 @@
package filter

import (
"github.com/vearne/grpcreplay/protocol"
slog "github.com/vearne/simplelog"
"regexp"
)

type MethodMatchIncludeFilter struct {
r *regexp.Regexp
}

func NewMethodMatchIncludeFilter(expr string) *MethodMatchIncludeFilter {
var f MethodMatchIncludeFilter
var err error
f.r, err = regexp.Compile(expr)
if err != nil {
slog.Fatal("expr error:%v", err)
}
return &f
}

// Filter :If ok is true, it means that the message can pass
func (f *MethodMatchIncludeFilter) Filter(msg *protocol.Message) (*protocol.Message, bool) {
if f.r.MatchString(msg.Data.Method) {
return msg, true
}
return nil, false
}
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func init() {
based on the timestamp encoded in their filename`)

flag.StringVar(&settings.Codec, "codec", "simple", "")

flag.StringVar(&settings.IncludeFilterMethodMatch, "include-filter-method-match", "",
`filter requests when the method matches the specified regular expression`)
}

func main() {
Expand Down

0 comments on commit 2b389ab

Please sign in to comment.