From ec3872894dda543f12f5de2ce2066f0bd7501c68 Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Thu, 23 Sep 2021 17:13:35 +0800 Subject: [PATCH 01/22] add reporter plugins: kafka sarama --- README.md | 6 + sarama/README.md | 18 ++ sarama/doc.go | 18 ++ sarama/example_kafka_test.go | 22 +++ sarama/go.mod | 10 + sarama/go.sum | 225 +++++++++++++++++++++ sarama/internal/tool/os_util.go | 70 +++++++ sarama/kafka.go | 339 ++++++++++++++++++++++++++++++++ sarama/kafka_test.go | 289 +++++++++++++++++++++++++++ 9 files changed, 997 insertions(+) create mode 100644 sarama/README.md create mode 100644 sarama/doc.go create mode 100644 sarama/example_kafka_test.go create mode 100644 sarama/go.mod create mode 100644 sarama/go.sum create mode 100644 sarama/internal/tool/os_util.go create mode 100644 sarama/kafka.go create mode 100644 sarama/kafka_test.go diff --git a/README.md b/README.md index f701391..31f2346 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,12 @@ The plugins of [go2sky](https://github.com/SkyAPM/go2sky) ## Plugin Summary +### Reporter Plugins + +1. Kafka: [sarama](sarama/README.md) + ### Trace Plugins + 1. [http server & client](http/README.md) 1. [gin](gin/README.md) 1. [gear](gear/README.md) @@ -17,5 +22,6 @@ The plugins of [go2sky](https://github.com/SkyAPM/go2sky) 1. [sql](sql/README.md) ### Log Plugins + 1. [logrus](logrus/README.md) 1. [zap](zap/README.md) diff --git a/sarama/README.md b/sarama/README.md new file mode 100644 index 0000000..f6b850a --- /dev/null +++ b/sarama/README.md @@ -0,0 +1,18 @@ +# Go2sky with sarama reporter + +## Installation + +```go +go get -u github.com/SkyAPM/go2sky-plugins/sarama +``` + +## Usage + +```go +r, err := sarama.NewKafkaReporter([]string{"localhost:9092"}) +if err != nil { + log.Fatalf("new kafka reporter error %v \n", err) +} +defer r.Close() +tracer, err := go2sky.NewTracer("example", go2sky.WithReporter(r)) +``` diff --git a/sarama/doc.go b/sarama/doc.go new file mode 100644 index 0000000..2582def --- /dev/null +++ b/sarama/doc.go @@ -0,0 +1,18 @@ +// +// Copyright 2021 SkyAPM org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package sarama is a plugin that can be one kafka reporter as go2sky.Reporter. +package sarama diff --git a/sarama/example_kafka_test.go b/sarama/example_kafka_test.go new file mode 100644 index 0000000..6fd756d --- /dev/null +++ b/sarama/example_kafka_test.go @@ -0,0 +1,22 @@ +package sarama + +import ( + "log" + + "github.com/SkyAPM/go2sky" +) + +func ExampleNewKafkaReporter() { + r, err := NewKafkaReporter([]string{"localhost:9092"}) + if err != nil { + log.Fatalf("new kafka reporter error %v \n", err) + } + defer r.Close() + + _, err = go2sky.NewTracer("example", go2sky.WithReporter(r)) + if err != nil { + log.Fatalf("create tracer error %v \n", err) + } + + // Output: +} diff --git a/sarama/go.mod b/sarama/go.mod new file mode 100644 index 0000000..4a5367a --- /dev/null +++ b/sarama/go.mod @@ -0,0 +1,10 @@ +module github.com/SkyAPM/go2sky-plugins/sarama + +go 1.16 + +require ( + github.com/Shopify/sarama v1.29.1 + github.com/SkyAPM/go2sky v1.2.0 + google.golang.org/protobuf v1.27.1 + skywalking.apache.org/repo/goapi v0.0.0-20210820070710-e10b78bbf481 +) diff --git a/sarama/go.sum b/sarama/go.sum new file mode 100644 index 0000000..90ba271 --- /dev/null +++ b/sarama/go.sum @@ -0,0 +1,225 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/99designs/gqlgen v0.13.0/go.mod h1:NV130r6f4tpRWuAI+zsrSdooO/eWUv+Gyyoi3rEfXIk= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Shopify/sarama v1.29.1 h1:wBAacXbYVLmWieEA/0X/JagDdCZ8NVFOfS6l6+2u5S0= +github.com/Shopify/sarama v1.29.1/go.mod h1:mdtqvCSg8JOxk8PmpTNGyo6wzd4BMm4QXSfDnTXmgkE= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/SkyAPM/go2sky v1.2.0 h1:DTkNf16MVZ/IHC6Bj9kFIoPKodgyXTPLHYca8qMgKbw= +github.com/SkyAPM/go2sky v1.2.0/go.mod h1:LzuySkt/TsQL8FMANZu4BX/6Rn0UfGu5KS/r9TwG/dc= +github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= +github.com/agnivade/levenshtein v1.0.3/go.mod h1:4SFRZbbXWLF4MU1T9Qg0pGgH3Pjs+t6ie5efyrwRJXs= +github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= +github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/trifles v0.0.0-20190318185328-a8d75aae118c/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= +github.com/go-chi/chi v3.3.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= +github.com/gogo/protobuf v1.0.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/context v0.0.0-20160226214623-1ea25387ff6f/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= +github.com/gorilla/mux v1.6.1/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA= +github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8= +github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= +github.com/matryer/moq v0.0.0-20200106131100-75d0ddfc0007/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mitchellh/mapstructure v0.0.0-20180203102830-a4e142e9c047/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= +github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= +github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= +github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/shurcooL/vfsgen v0.0.0-20180121065927-ffb13db8def0/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= +github.com/vektah/dataloaden v0.2.1-0.20190515034641-a19b9a6e7c9e/go.mod h1:/HUdMve7rvxZma+2ZELQeNh88+003LL7Pf/CZ089j8U= +github.com/vektah/gqlparser/v2 v2.1.0/go.mod h1:SyUiHgLATUR8BiYURfTirrTcGpcE+4XkV2se04Px1Ms= +github.com/xdg/scram v1.0.3/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.1-0.20181010134911-4d1c5fb19474/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190125232054-d66bd3c5d5a6/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190515012406-7d7faa4812bd/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200114235610-7ae403b6b589/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0= +google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +skywalking.apache.org/repo/goapi v0.0.0-20210401062122-a049ca15c62d/go.mod h1:S9co6uRVlbQU7PnN7RpEqRtLRCF5n0E9TB7RSmqC5kw= +skywalking.apache.org/repo/goapi v0.0.0-20210820070710-e10b78bbf481 h1:K8jQuADJdwsl4+3P6g/nFjRo9ADNhal2MWUW2R4D8Xk= +skywalking.apache.org/repo/goapi v0.0.0-20210820070710-e10b78bbf481/go.mod h1:2abOB2LaQEsJLmollzCt5kNfVMWFGKE58905uYzs+sc= +sourcegraph.com/sourcegraph/appdash v0.0.0-20180110180208-2cc67fd64755/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= +sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.mod h1:L5q+DGLGOQFpo1snNEkLOJT2d1YTW66rWNzatr3He1k= diff --git a/sarama/internal/tool/os_util.go b/sarama/internal/tool/os_util.go new file mode 100644 index 0000000..90d0b9b --- /dev/null +++ b/sarama/internal/tool/os_util.go @@ -0,0 +1,70 @@ +// +// Copyright 2021 SkyAPM org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package tool + +import ( + "net" + "os" + "runtime" + "strconv" +) + +func ProcessNo() string { + if os.Getpid() > 0 { + return strconv.Itoa(os.Getpid()) + } + return "" +} + +func HostName() string { + if hs, err := os.Hostname(); err == nil { + return hs + } + return "unknown" +} + +func OSName() string { + return runtime.GOOS +} + +func AllIPV4() (ipv4s []string) { + adders, err := net.InterfaceAddrs() + if err != nil { + return + } + + for _, addr := range adders { + if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { + if ipNet.IP.To4() != nil { + ipv4 := ipNet.IP.String() + if ipv4 == "127.0.0.1" || ipv4 == "localhost" { + continue + } + ipv4s = append(ipv4s, ipv4) + } + } + } + return +} + +func IPV4() string { + ipv4s := AllIPV4() + if len(ipv4s) > 0 { + return ipv4s[0] + } + return "no-hostname" +} diff --git a/sarama/kafka.go b/sarama/kafka.go new file mode 100644 index 0000000..b64ba8a --- /dev/null +++ b/sarama/kafka.go @@ -0,0 +1,339 @@ +// +// Copyright 2021 SkyAPM org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package sarama + +import ( + "context" + "log" + "os" + "sync" + "time" + + "github.com/Shopify/sarama" + "github.com/SkyAPM/go2sky" + "github.com/SkyAPM/go2sky-plugins/sarama/internal/tool" + "google.golang.org/protobuf/proto" + commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3" + agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + managementv3 "skywalking.apache.org/repo/goapi/collect/management/v3" +) + +const ( + defaultCheckInterval = 20 * time.Second + defaultKafkaLogPrefix = "go2sky-kafka" + topicKeyRegister = "register-" + defaultTopicManagement = "skywalking-managements" + defaultTopicSegment = "skywalking-segments" +) + +type kafkaReporter struct { + c *sarama.Config + producer sarama.AsyncProducer + service string + serviceInstance string + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + instanceProps map[string]string + logger *log.Logger + topicManagement string + topicSegment string + checkInterval time.Duration + // cdsInterval time.Duration + // cdsService *go2sky.ConfigDiscoveryService + // cdsClient configuration.ConfigurationDiscoveryServiceClient +} + +// NewKafkaReporter create a new reporter to send data to kafka. +func NewKafkaReporter(addrs []string, opts ...KafkaReporterOption) (go2sky.Reporter, error) { + r := &kafkaReporter{ + logger: log.New(os.Stderr, defaultKafkaLogPrefix, log.LstdFlags), + checkInterval: defaultCheckInterval, + topicManagement: defaultTopicManagement, + topicSegment: defaultTopicSegment, + } + + for _, o := range opts { + o(r) + } + + p, err := sarama.NewAsyncProducer(addrs, r.c) + if err != nil { + return nil, err + } + r.producer = p + + if r.c.Producer.Return.Errors { + go func() { + for e := range p.Errors() { + r.logger.Printf("send kafka err: %v", e.Err) + } + }() + } + + return r, nil +} + +// KafkaReporterOption allows for functional options to adjust behaviour +// of a kafka reporter to be created by NewKafkaReporter +type KafkaReporterOption func(r *kafkaReporter) + +// WithLogger setup logger for gRPC reporter +func WithKafkaConfig(c *sarama.Config) KafkaReporterOption { + return func(r *kafkaReporter) { + r.c = c + } +} + +// WithKafkaCheckInterval setup service and endpoint registry check interval +func WithKafkaCheckInterval(interval time.Duration) KafkaReporterOption { + return func(r *kafkaReporter) { + r.checkInterval = interval + } +} + +// WithInstanceProps setup service instance properties eg: org=SkyAPM +func WithKafkaInstanceProps(props map[string]string) KafkaReporterOption { + return func(r *kafkaReporter) { + r.instanceProps = props + } +} + +// WithKafkaLogger setup logger for kafka reporter +func WithKafkaLogger(logger *log.Logger) KafkaReporterOption { + return func(r *kafkaReporter) { + r.logger = logger + } +} + +// WithKafkaTopicManagement setup service management topic +func WithKafkaTopicManagement(topicManagement string) KafkaReporterOption { + return func(r *kafkaReporter) { + r.topicManagement = topicManagement + } +} + +// WithKafkaTopicSegment setup service segment topic +func WithKafkaTopicSegment(topicSegment string) KafkaReporterOption { + return func(r *kafkaReporter) { + r.topicSegment = topicSegment + } +} + +func (r *kafkaReporter) Boot(service string, serviceInstance string, cdsWatchers []go2sky.AgentConfigChangeWatcher) { + r.service = service + r.serviceInstance = serviceInstance + r.ctx, r.cancel = context.WithCancel(context.Background()) + r.check() +} + +func (r *kafkaReporter) reportInstanceProperties() error { + props := buildOSInfo() + if r.instanceProps != nil { + for k, v := range r.instanceProps { + props = append(props, &commonv3.KeyStringValuePair{ + Key: k, + Value: v, + }) + } + } + instanceProperties := &managementv3.InstanceProperties{ + Service: r.service, + ServiceInstance: r.serviceInstance, + Properties: props, + } + b, err := proto.Marshal(instanceProperties) + if err != nil { + return err + } + + r.producer.Input() <- &sarama.ProducerMessage{ + Topic: r.topicManagement, + Key: sarama.StringEncoder(topicKeyRegister + instanceProperties.ServiceInstance), + Value: sarama.ByteEncoder(b), + } + return nil +} + +func (r *kafkaReporter) check() { + if r.checkInterval < 0 || r.producer == nil { + return + } + r.wg.Add(1) + go func() { + defer r.wg.Done() + ticker := time.NewTicker(r.checkInterval) + defer ticker.Stop() + instancePropertiesSubmitted := false + for { + select { + case <-r.ctx.Done(): + return + case <-ticker.C: + if !instancePropertiesSubmitted { + err := r.reportInstanceProperties() + if err != nil { + r.logger.Printf("report serviceInstance properties error %v", err) + continue + } + instancePropertiesSubmitted = true + } + + instancePingPkg := &managementv3.InstancePingPkg{ + Service: r.service, + ServiceInstance: r.serviceInstance, + } + b, err := proto.Marshal(instancePingPkg) + if err != nil { + r.logger.Printf("send keep alive signal error %v", err) + continue + } + + r.producer.Input() <- &sarama.ProducerMessage{ + Topic: r.topicManagement, + Key: sarama.StringEncoder(instancePingPkg.ServiceInstance), + Value: sarama.ByteEncoder(b), + } + } + } + }() +} + +func (r *kafkaReporter) Send(spans []go2sky.ReportedSpan) { + spanSize := len(spans) + if spanSize < 1 { + return + } + rootSpan := spans[spanSize-1] + rootCtx := rootSpan.Context() + segmentObject := &agentv3.SegmentObject{ + TraceId: rootCtx.TraceID, + TraceSegmentId: rootCtx.SegmentID, + Spans: make([]*agentv3.SpanObject, spanSize), + Service: r.service, + ServiceInstance: r.serviceInstance, + } + for i, s := range spans { + spanCtx := s.Context() + segmentObject.Spans[i] = &agentv3.SpanObject{ + SpanId: spanCtx.SpanID, + ParentSpanId: spanCtx.ParentSpanID, + StartTime: s.StartTime(), + EndTime: s.EndTime(), + OperationName: s.OperationName(), + Peer: s.Peer(), + SpanType: s.SpanType(), + SpanLayer: s.SpanLayer(), + ComponentId: s.ComponentID(), + IsError: s.IsError(), + Tags: s.Tags(), + Logs: s.Logs(), + } + srr := make([]*agentv3.SegmentReference, 0) + if i == (spanSize-1) && spanCtx.ParentSpanID > -1 { + srr = append(srr, &agentv3.SegmentReference{ + RefType: agentv3.RefType_CrossThread, + TraceId: spanCtx.TraceID, + ParentTraceSegmentId: spanCtx.ParentSegmentID, + ParentSpanId: spanCtx.ParentSpanID, + ParentService: r.service, + ParentServiceInstance: r.serviceInstance, + }) + } + if len(s.Refs()) > 0 { + for _, tc := range s.Refs() { + srr = append(srr, &agentv3.SegmentReference{ + RefType: agentv3.RefType_CrossProcess, + TraceId: spanCtx.TraceID, + ParentTraceSegmentId: tc.ParentSegmentID, + ParentSpanId: tc.ParentSpanID, + ParentService: tc.ParentService, + ParentServiceInstance: tc.ParentServiceInstance, + ParentEndpoint: tc.ParentEndpoint, + NetworkAddressUsedAtPeer: tc.AddressUsedAtClient, + }) + } + } + segmentObject.Spans[i].Refs = srr + } + + b, err := proto.Marshal(segmentObject) + if err != nil { + r.logger.Printf("reporter segment err %v", err) + return + } + select { + case <-r.ctx.Done(): + r.logger.Printf("reporter segment closed") + return + default: + } + r.producer.Input() <- &sarama.ProducerMessage{ + Topic: r.topicSegment, + Key: sarama.StringEncoder(segmentObject.TraceSegmentId), + Value: sarama.ByteEncoder(b), + } +} + +func (r *kafkaReporter) Close() { + r.cancel() + r.wg.Wait() + if err := r.producer.Close(); err != nil { + r.logger.Print(err) + } +} + +func buildOSInfo() (props []*commonv3.KeyStringValuePair) { + processNo := tool.ProcessNo() + if processNo != "" { + kv := &commonv3.KeyStringValuePair{ + Key: "Process No.", + Value: processNo, + } + props = append(props, kv) + } + + hostname := &commonv3.KeyStringValuePair{ + Key: "hostname", + Value: tool.HostName(), + } + props = append(props, hostname) + + language := &commonv3.KeyStringValuePair{ + Key: "language", + Value: "go", + } + props = append(props, language) + + osName := &commonv3.KeyStringValuePair{ + Key: "OS Name", + Value: tool.OSName(), + } + props = append(props, osName) + + ipv4s := tool.AllIPV4() + if len(ipv4s) > 0 { + for _, ipv4 := range ipv4s { + kv := &commonv3.KeyStringValuePair{ + Key: "ipv4", + Value: ipv4, + } + props = append(props, kv) + } + } + return +} diff --git a/sarama/kafka_test.go b/sarama/kafka_test.go new file mode 100644 index 0000000..f2d38e3 --- /dev/null +++ b/sarama/kafka_test.go @@ -0,0 +1,289 @@ +// +// Copyright 2021 SkyAPM org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package sarama + +import ( + "context" + "log" + "os" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/Shopify/sarama/mocks" + "github.com/SkyAPM/go2sky" + "github.com/SkyAPM/go2sky/propagation" + "google.golang.org/protobuf/proto" + commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3" + agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + managementv3 "skywalking.apache.org/repo/goapi/collect/management/v3" +) + +const ( + sample = 1 + traceID = "1f2d4bf47bf711eab794acde48001122" + parentSegmentID = "1e7c204a7bf711eab858acde48001122" + parentSpanID = 0 + parentService = "service" + parentServiceInstance = "serviceInstance" + parentEndpoint = "/foo/bar" + addressUsedAtClient = "foo.svc:8787" + + mockService = "service" + mockServiceInstance = "serviceInstance" +) + +var header string + +func init() { + scx := propagation.SpanContext{ + Sample: sample, + TraceID: traceID, + ParentSegmentID: parentSegmentID, + ParentSpanID: parentSpanID, + ParentService: parentService, + ParentServiceInstance: parentServiceInstance, + ParentEndpoint: parentEndpoint, + AddressUsedAtClient: addressUsedAtClient, + } + header = scx.EncodeSW8() +} + +func TestKafkaReporterE2E(t *testing.T) { + r := createKafkaReporter() + tracer, err := go2sky.NewTracer(mockService, go2sky.WithReporter(r), go2sky.WithInstance(mockServiceInstance)) + if err != nil { + t.Error(err) + } + + c := mocks.NewTestConfig() + c.Producer.Return.Successes = true + c.Producer.Return.Errors = false + mp := mocks.NewAsyncProducer(t, c) + mp.ExpectInputAndSucceed() + r.producer = mp + + entrySpan, ctx, err := tracer.CreateEntrySpan(context.Background(), "/rest/api", func(key string) (string, error) { + return header, nil + }) + if err != nil { + t.Error(err) + } + exitSpan, err := tracer.CreateExitSpan(ctx, "/foo/bar", "foo.svc:8787", func(key, value string) error { + scx := propagation.SpanContext{} + if key == propagation.Header { + err = scx.DecodeSW8(value) + if err != nil { + t.Fatal(err) + } + } + return nil + }) + if err != nil { + t.Error(err) + } + exitSpan.End() + entrySpan.End() + for msg := range r.producer.Successes() { + r.Close() + if msg.Topic != r.topicSegment { + t.Errorf("Excepted kafka topic is %s not %s", r.topicSegment, msg.Topic) + } + v, _ := msg.Value.Encode() + var s agentv3.SegmentObject + if err := proto.Unmarshal(v, &s); err != nil { + t.Fatal(err) + } + if s.TraceId != traceID { + t.Errorf("trace id parse error") + } + if len(s.Spans) == 0 { + t.Error("empty spans") + } + if s.Service != mockService { + t.Error("error are not set service") + } + if s.ServiceInstance != mockServiceInstance { + t.Error("error are not set service instance") + } + } +} + +func TestKafkaReporter_Close(t *testing.T) { + r := createKafkaReporter() + tracer, err := go2sky.NewTracer(mockService, go2sky.WithReporter(r), go2sky.WithInstance(mockServiceInstance)) + if err != nil { + t.Error(err) + } + c := mocks.NewTestConfig() + c.Producer.Return.Errors = false + mp := mocks.NewAsyncProducer(t, c) + r.producer = mp + + entry, _, err := tracer.CreateEntrySpan(context.Background(), "/close", func(key string) (s string, err error) { + return header, nil + }) + if err != nil { + t.Error(err) + } + r.Close() + entry.End() +} + +func TestKafkaReporterOption(t *testing.T) { + // props + instanceProps := make(map[string]string) + instanceProps["org"] = "SkyAPM" + + // log + logger := log.New(os.Stderr, "WithLogger", log.LstdFlags) + + // kafka config + c := sarama.NewConfig() + + tests := []struct { + name string + option KafkaReporterOption + verifyFunc func(t *testing.T, reporter *kafkaReporter) + }{ + { + name: "with kafka config", + option: WithKafkaConfig(c), + verifyFunc: func(t *testing.T, reporter *kafkaReporter) { + if reporter.c != c { + t.Error("error are not set WithKafkaConfig") + } + }, + }, + { + name: "with check interval", + option: WithKafkaCheckInterval(time.Second), + verifyFunc: func(t *testing.T, reporter *kafkaReporter) { + if reporter.checkInterval != time.Second { + t.Error("error are not set checkInterval") + } + }, + }, + { + name: "with serviceInstance props", + option: WithKafkaInstanceProps(instanceProps), + verifyFunc: func(t *testing.T, reporter *kafkaReporter) { + var value string + var ok bool + if value, ok = reporter.instanceProps["org"]; !ok { + t.Error("error are not set instanceProps") + } + if value != "SkyAPM" { + t.Error("error are not set instanceProps") + } + }, + }, + { + name: "with logger", + option: WithKafkaLogger(logger), + verifyFunc: func(t *testing.T, reporter *kafkaReporter) { + if reporter.logger != logger { + t.Error("error are not set logger") + } + }, + }, + { + name: "with topic management", + option: WithKafkaTopicManagement("test_management"), + verifyFunc: func(t *testing.T, reporter *kafkaReporter) { + if reporter.topicManagement != "test_management" { + t.Error("error are not set WithKafkaTopicManagement") + } + }, + }, + { + name: "with topic segment", + option: WithKafkaTopicSegment("test_segment"), + verifyFunc: func(t *testing.T, reporter *kafkaReporter) { + if reporter.topicSegment != "test_segment" { + t.Error("error are not set WithKafkaTopicSegment") + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reporter := createKafkaReporter() + tt.option(reporter) + tt.verifyFunc(t, reporter) + }) + } +} + +func TestKafkaReporter_reportInstanceProperties(t *testing.T) { + customProps := make(map[string]string) + customProps["org"] = "SkyAPM" + osProps := buildOSInfo() + for k, v := range customProps { + osProps = append(osProps, &commonv3.KeyStringValuePair{ + Key: k, + Value: v, + }) + } + + reporter := createKafkaReporter() + reporter.service = mockService + reporter.serviceInstance = mockServiceInstance + reporter.instanceProps = customProps + + c := mocks.NewTestConfig() + c.Producer.Return.Successes = true + c.Producer.Return.Errors = false + mp := mocks.NewAsyncProducer(t, c) + mp.ExpectInputAndSucceed() + reporter.producer = mp + + err := reporter.reportInstanceProperties() + if err != nil { + t.Error() + } + for msg := range reporter.producer.Successes() { + reporter.producer.Close() + if msg.Topic != reporter.topicManagement { + t.Errorf("Excepted kafka topic is %s not %s", reporter.topicManagement, msg.Topic) + } + v, _ := msg.Value.Encode() + var s managementv3.InstanceProperties + if err := proto.Unmarshal(v, &s); err != nil { + t.Fatal(err) + } + if s.Service != mockService { + t.Error("error are not set service") + } + if s.ServiceInstance != mockServiceInstance { + t.Error("error are not set service instance") + } + if len(s.Properties) != len(osProps) { + t.Error("error are not set service Properties") + } + } +} + +func createKafkaReporter() *kafkaReporter { + reporter := &kafkaReporter{ + logger: log.New(os.Stderr, "go2sky", log.LstdFlags), + topicManagement: defaultTopicManagement, + topicSegment: defaultTopicSegment, + } + return reporter +} From bbc2212807619c7a9c576c9b513330b9751c332a Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Fri, 24 Sep 2021 11:24:08 +0800 Subject: [PATCH 02/22] add e2e test to sarama reporter --- sarama/example_kafka_test.go | 16 +++ sarama/test/docker-compose.yml | 77 ++++++++++++ sarama/test/docker/Dockerfile.sarama | 26 ++++ sarama/test/expected.data.yml | 134 +++++++++++++++++++++ sarama/test/expected/service-instance.yml | 42 +++++++ sarama/test/expected/service.yml | 18 +++ sarama/test/expected/trace-info-detail.yml | 34 ++++++ sarama/test/go_sarama_plugin_test.yaml | 64 ++++++++++ sarama/test/server/server.go | 69 +++++++++++ 9 files changed, 480 insertions(+) create mode 100644 sarama/test/docker-compose.yml create mode 100644 sarama/test/docker/Dockerfile.sarama create mode 100644 sarama/test/expected.data.yml create mode 100644 sarama/test/expected/service-instance.yml create mode 100644 sarama/test/expected/service.yml create mode 100644 sarama/test/expected/trace-info-detail.yml create mode 100644 sarama/test/go_sarama_plugin_test.yaml create mode 100644 sarama/test/server/server.go diff --git a/sarama/example_kafka_test.go b/sarama/example_kafka_test.go index 6fd756d..8c32a01 100644 --- a/sarama/example_kafka_test.go +++ b/sarama/example_kafka_test.go @@ -1,3 +1,19 @@ +// +// Copyright 2021 SkyAPM org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + package sarama import ( diff --git a/sarama/test/docker-compose.yml b/sarama/test/docker-compose.yml new file mode 100644 index 0000000..34f5c4c --- /dev/null +++ b/sarama/test/docker-compose.yml @@ -0,0 +1,77 @@ +# +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '2.1' + +services: + oap: + image: ghcr.io/apache/skywalking/oap:1730f2c84bbd4da999ec2c74d1c26db31d5a0d24 + expose: + - 11800 + - 12800 + networks: + - e2e + restart: on-failure + environment: + SW_KAFKA_FETCHER_SERVERS: kafka:9092 + healthcheck: + test: ["CMD", "sh", "-c", "nc -zn 127.0.0.1 11800"] + interval: 5s + timeout: 60s + retries: 120 + depends_on: + kafka: + condition: service_healthy + + kafka: + image: wurstmeister/kafka:2.12 + networks: + - e2e + expose: + - 9092 + ports: + - 9092:9092 + healthcheck: + test: ["CMD", "nc", "-vz", "localhost", "9092"] + interval: 2s + timeout: 2s + retries: 15 + + sarama: + build: + context: ../ + dockerfile: ./test/docker/Dockerfile.sarama + networks: + - e2e + expose: + - 8081 + ports: + - 8081:8081 + environment: + GOPROXY: "https://goproxy.io" + healthcheck: + test: ["CMD", "curl", "http://127.0.0.1:8081/healthCheck"] + interval: 5s + timeout: 20s + retries: 10 + depends_on: + oap: + condition: service_healthy + kafka: + condition: service_healthy + +networks: + e2e: diff --git a/sarama/test/docker/Dockerfile.sarama b/sarama/test/docker/Dockerfile.sarama new file mode 100644 index 0000000..a2d7c80 --- /dev/null +++ b/sarama/test/docker/Dockerfile.sarama @@ -0,0 +1,26 @@ +# +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM golang:1.16 + +ADD ./sarama /sarama +WORKDIR /sarama + +EXPOSE 8081 + +ENTRYPOINT ["go"] + +CMD ["run", "test/server/server.go"] \ No newline at end of file diff --git a/sarama/test/expected.data.yml b/sarama/test/expected.data.yml new file mode 100644 index 0000000..e3be62a --- /dev/null +++ b/sarama/test/expected.data.yml @@ -0,0 +1,134 @@ +# +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +segmentItems: + {{- range .segmentItems }} + {{- if eq .serviceName "kratos-server" }} + - serviceName: kratos-server + segmentSize: {{ gt .segmentSize 0 }} + segments: + {{- range .segments }} + - segmentId: {{ notEmpty .segmentId }} + spans: + {{- range .spans}} + - operationName: /helloworld.Greeter/SayHello + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: RPCFramework + startTime: {{ gt .startTime 0 }} + endTime: {{ gt .endTime 0 }} + componentId: 5010 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + {{- if eq (index .refs 0).networkAddress "kratosserver:8000"}} + tags: + - { key: User-Agent, value: Go-http-client/1.1 } + refs: + - parentEndpoint: /hello + networkAddress: 'kratosserver:8000' + refType: CrossProcess + parentSpanId: 1 + parentTraceSegmentId: {{ notEmpty (index .refs 0).parentTraceSegmentId }} + parentServiceInstance: {{ notEmpty (index .refs 0).parentServiceInstance }} + parentService: kratos-client + traceId: {{ notEmpty (index .refs 0).traceId }} + {{- end}} + {{- if eq (index .refs 0).networkAddress "kratosserver:9000" }} + refs: + - parentEndpoint: /hello + networkAddress: 'kratosserver:9000' + refType: CrossProcess + parentSpanId: 2 + parentTraceSegmentId: {{ notEmpty (index .refs 0).parentTraceSegmentId }} + parentServiceInstance: {{ notEmpty (index .refs 0).parentServiceInstance }} + parentService: kratos-client + traceId: {{ notEmpty (index .refs 0).traceId }} + {{- end }} + {{- end}} + {{- end}} + {{- end}} + {{- if eq .serviceName "kratos-client" }} + - serviceName: kratos-client + segmentSize: {{ gt .segmentSize 0 }} + segments: + {{- range .segments }} + - segmentId: {{ notEmpty .segmentId }} + spans: + {{- range .spans}} + {{- if eq .peer "kratosserver:8000"}} + - operationName: /helloworld.Greeter/SayHello + operationId: 0 + parentSpanId: 0 + spanId: 1 + spanLayer: RPCFramework + startTime: {{ gt .startTime 0 }} + endTime: {{ gt .endTime 0 }} + componentId: 5010 + isError: false + spanType: Exit + peer: 'kratosserver:8000' + skipAnalysis: false + {{- end}} + {{- if eq .peer "kratosserver:9000"}} + - operationName: /helloworld.Greeter/SayHello + operationId: 0 + parentSpanId: 0 + spanId: 2 + spanLayer: RPCFramework + startTime: {{ gt .startTime 0 }} + endTime: {{ gt .endTime 0 }} + componentId: 5010 + isError: false + spanType: Exit + peer: 'kratosserver:9000' + skipAnalysis: false + {{- end}} + {{- if eq .operationName "/healthCheck" }} + - operationName: /healthCheck + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: RPCFramework + startTime: {{ gt .startTime 0 }} + endTime: {{ gt .endTime 0 }} + componentId: 5010 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + {{- end }} + {{- if eq .operationName "/hello" }} + - operationName: /hello + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: RPCFramework + startTime: {{ gt .startTime 0 }} + endTime: {{ gt .endTime 0 }} + componentId: 5010 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + {{- end }} + {{- end }} + {{- end }} + {{- end }} + {{- end }} +meterItems: [] \ No newline at end of file diff --git a/sarama/test/expected/service-instance.yml b/sarama/test/expected/service-instance.yml new file mode 100644 index 0000000..3190f0e --- /dev/null +++ b/sarama/test/expected/service-instance.yml @@ -0,0 +1,42 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +{{- range .}} +- id: {{ b64enc "kafka-reporter" }}.1_{{ b64enc "provider1" }} + name: {{ notEmpty .name }} + attributes: + {{- range .attributes }} + {{- if eq .name "OS Name" }} + - name: OS Name + value: Linux + {{- end }} + {{- if eq .name "hostname" }} + - name: hostname + value: {{ notEmpty .value }} + {{- end }} + {{- if eq .name "Process No." }} + - name: Process No. + value: "1" + {{- end }} + {{- if eq .name "ipv4s" }} + - name: ipv4s + value: {{ notEmpty .value }} + {{- end }} + {{- end}} + language: go + instanceuuid: {{ b64enc "kafka-reporter" }}.1_{{ b64enc "provider1" }} +{{- end}} \ No newline at end of file diff --git a/sarama/test/expected/service.yml b/sarama/test/expected/service.yml new file mode 100644 index 0000000..de00cea --- /dev/null +++ b/sarama/test/expected/service.yml @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +- id: {{ b64enc "kafka-reporter" }}.1 + name: kafka-reporter + group: "" \ No newline at end of file diff --git a/sarama/test/expected/trace-info-detail.yml b/sarama/test/expected/trace-info-detail.yml new file mode 100644 index 0000000..4f36e72 --- /dev/null +++ b/sarama/test/expected/trace-info-detail.yml @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +spans: + {{- range .spans}} + - traceid: {{ notEmpty .traceid }} + segmentid: {{ notEmpty .segmentid }} + spanid: 0 + parentspanid: -1 + refs: [] + servicecode: {{ notEmpty .servicecode }} + serviceinstancename: {{ notEmpty .serviceinstancename }} + starttime: {{ gt .starttime 0 }} + endtime: {{ gt .endtime 0 }} + type: Local + peer: "" + component: "" + iserror: false + layer: "" + tags: [] + logs: [] + {{- end }} \ No newline at end of file diff --git a/sarama/test/go_sarama_plugin_test.yaml b/sarama/test/go_sarama_plugin_test.yaml new file mode 100644 index 0000000..8ac6ea0 --- /dev/null +++ b/sarama/test/go_sarama_plugin_test.yaml @@ -0,0 +1,64 @@ +# +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +setup: + env: compose + file: docker-compose.yml + timeout: 1200 + steps: + - name: install swctl + command: | + if ! command -v swctl &> /dev/null; then + mkdir -p /tmp/skywalking-infra-e2e/bin && cd /tmp/skywalking-infra-e2e + mkdir -p swctl && cd swctl + curl -kLo skywalking-cli.tar.gz https://github.com/apache/skywalking-cli/archive/4d1cb83e24ff58988f4aba0daa50259593b11670.tar.gz + tar -zxf skywalking-cli.tar.gz --strip=1 + utype=$(uname | awk '{print tolower($0)}') + make $utype && mv bin/swctl-*-$utype-amd64 ../bin/swctl + export PATH="$PATH:/tmp/skywalking-infra-e2e/bin" + echo "success to install swctl" + fi + +cleanup: + # always never success failure + on: always + +trigger: + action: http + interval: 3s + times: 5 + url: http://127.0.0.1:${sarama_8081}/info + method: GET + +verify: + # verify with retry strategy + retry: + # max retry count + count: 10 + # the interval between two retries, in millisecond. + interval: 10000 + cases: + # basic check: service list + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls + expected: expected/service.yml + + # native management: service instance list + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider + expected: expected/service-instance.yml + + # native tracing: trace detail + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls|grep -A 5 '/info'|tail -n1|awk -F ' ' '{print $2}') + expected: expected/trace-info-detail.yml diff --git a/sarama/test/server/server.go b/sarama/test/server/server.go new file mode 100644 index 0000000..5c2eabb --- /dev/null +++ b/sarama/test/server/server.go @@ -0,0 +1,69 @@ +// +// Copyright 2021 SkyAPM org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package main + +import ( + "context" + "log" + "net/http" + + "github.com/SkyAPM/go2sky" + "github.com/SkyAPM/go2sky-plugins/sarama" +) + +var kafka = []string{"kafka:9092"} + +const ( + service = "kafka-reporter" + addr = ":8081" +) + +func main() { + // init tracer + re, err := sarama.NewKafkaReporter(kafka) + if err != nil { + log.Fatalf("create kafka reporter error: %v \n", err) + } + + tracer, err := go2sky.NewTracer(service, go2sky.WithReporter(re), go2sky.WithInstance("provider1")) + if err != nil { + log.Fatalf("crate tracer error: %v \n", err) + } + + route := http.NewServeMux() + route.HandleFunc("/healthCheck", func(res http.ResponseWriter, req *http.Request) { + _, _ = res.Write([]byte("success")) + }) + route.HandleFunc("/info", func(res http.ResponseWriter, req *http.Request) { + span, _, err := tracer.CreateLocalSpan( + context.Background(), + go2sky.WithOperationName("info"), + ) + if err != nil { + log.Fatalf("create span error: %v \n", err) + } + defer span.End() + + _, _ = res.Write([]byte("info success")) + }) + + log.Println("start server") + err = http.ListenAndServe(addr, route) + if err != nil { + log.Fatalf("server start error: %v \n", err) + } +} From 647e8a51a0cd6dbe70dda979f556d25621e76770 Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Fri, 24 Sep 2021 11:28:00 +0800 Subject: [PATCH 03/22] fix license and add sarama plugin_test --- .github/workflows/plugin_test.yaml | 1 + sarama/test/expected.data.yml | 134 --------------------- sarama/test/expected/service-instance.yml | 25 ++-- sarama/test/expected/service.yml | 15 +-- sarama/test/expected/trace-info-detail.yml | 15 +-- 5 files changed, 29 insertions(+), 161 deletions(-) delete mode 100644 sarama/test/expected.data.yml diff --git a/.github/workflows/plugin_test.yaml b/.github/workflows/plugin_test.yaml index 5caa9e4..8e63d68 100644 --- a/.github/workflows/plugin_test.yaml +++ b/.github/workflows/plugin_test.yaml @@ -42,6 +42,7 @@ jobs: resty/test/go_resty_plugin_test.yaml: resty/** kratos/test/go_kratos_plugin_test.yaml: kratos/** sql/test/sql_plugin_test.yaml: sql/** + sarama/test/go_sarama_plugin_test.yaml: sarama/** PluginsTest: name: Plugin diff --git a/sarama/test/expected.data.yml b/sarama/test/expected.data.yml deleted file mode 100644 index e3be62a..0000000 --- a/sarama/test/expected.data.yml +++ /dev/null @@ -1,134 +0,0 @@ -# -# Copyright 2021 SkyAPM org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -segmentItems: - {{- range .segmentItems }} - {{- if eq .serviceName "kratos-server" }} - - serviceName: kratos-server - segmentSize: {{ gt .segmentSize 0 }} - segments: - {{- range .segments }} - - segmentId: {{ notEmpty .segmentId }} - spans: - {{- range .spans}} - - operationName: /helloworld.Greeter/SayHello - operationId: 0 - parentSpanId: -1 - spanId: 0 - spanLayer: RPCFramework - startTime: {{ gt .startTime 0 }} - endTime: {{ gt .endTime 0 }} - componentId: 5010 - isError: false - spanType: Entry - peer: '' - skipAnalysis: false - {{- if eq (index .refs 0).networkAddress "kratosserver:8000"}} - tags: - - { key: User-Agent, value: Go-http-client/1.1 } - refs: - - parentEndpoint: /hello - networkAddress: 'kratosserver:8000' - refType: CrossProcess - parentSpanId: 1 - parentTraceSegmentId: {{ notEmpty (index .refs 0).parentTraceSegmentId }} - parentServiceInstance: {{ notEmpty (index .refs 0).parentServiceInstance }} - parentService: kratos-client - traceId: {{ notEmpty (index .refs 0).traceId }} - {{- end}} - {{- if eq (index .refs 0).networkAddress "kratosserver:9000" }} - refs: - - parentEndpoint: /hello - networkAddress: 'kratosserver:9000' - refType: CrossProcess - parentSpanId: 2 - parentTraceSegmentId: {{ notEmpty (index .refs 0).parentTraceSegmentId }} - parentServiceInstance: {{ notEmpty (index .refs 0).parentServiceInstance }} - parentService: kratos-client - traceId: {{ notEmpty (index .refs 0).traceId }} - {{- end }} - {{- end}} - {{- end}} - {{- end}} - {{- if eq .serviceName "kratos-client" }} - - serviceName: kratos-client - segmentSize: {{ gt .segmentSize 0 }} - segments: - {{- range .segments }} - - segmentId: {{ notEmpty .segmentId }} - spans: - {{- range .spans}} - {{- if eq .peer "kratosserver:8000"}} - - operationName: /helloworld.Greeter/SayHello - operationId: 0 - parentSpanId: 0 - spanId: 1 - spanLayer: RPCFramework - startTime: {{ gt .startTime 0 }} - endTime: {{ gt .endTime 0 }} - componentId: 5010 - isError: false - spanType: Exit - peer: 'kratosserver:8000' - skipAnalysis: false - {{- end}} - {{- if eq .peer "kratosserver:9000"}} - - operationName: /helloworld.Greeter/SayHello - operationId: 0 - parentSpanId: 0 - spanId: 2 - spanLayer: RPCFramework - startTime: {{ gt .startTime 0 }} - endTime: {{ gt .endTime 0 }} - componentId: 5010 - isError: false - spanType: Exit - peer: 'kratosserver:9000' - skipAnalysis: false - {{- end}} - {{- if eq .operationName "/healthCheck" }} - - operationName: /healthCheck - operationId: 0 - parentSpanId: -1 - spanId: 0 - spanLayer: RPCFramework - startTime: {{ gt .startTime 0 }} - endTime: {{ gt .endTime 0 }} - componentId: 5010 - isError: false - spanType: Entry - peer: '' - skipAnalysis: false - {{- end }} - {{- if eq .operationName "/hello" }} - - operationName: /hello - operationId: 0 - parentSpanId: -1 - spanId: 0 - spanLayer: RPCFramework - startTime: {{ gt .startTime 0 }} - endTime: {{ gt .endTime 0 }} - componentId: 5010 - isError: false - spanType: Entry - peer: '' - skipAnalysis: false - {{- end }} - {{- end }} - {{- end }} - {{- end }} - {{- end }} -meterItems: [] \ No newline at end of file diff --git a/sarama/test/expected/service-instance.yml b/sarama/test/expected/service-instance.yml index 3190f0e..a55bbe8 100644 --- a/sarama/test/expected/service-instance.yml +++ b/sarama/test/expected/service-instance.yml @@ -1,19 +1,18 @@ -# Licensed to Apache Software Foundation (ASF) under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Apache Software Foundation (ASF) licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. +# +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. {{- range .}} - id: {{ b64enc "kafka-reporter" }}.1_{{ b64enc "provider1" }} diff --git a/sarama/test/expected/service.yml b/sarama/test/expected/service.yml index de00cea..d50c80a 100644 --- a/sarama/test/expected/service.yml +++ b/sarama/test/expected/service.yml @@ -1,17 +1,18 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# - id: {{ b64enc "kafka-reporter" }}.1 name: kafka-reporter diff --git a/sarama/test/expected/trace-info-detail.yml b/sarama/test/expected/trace-info-detail.yml index 4f36e72..e650d34 100644 --- a/sarama/test/expected/trace-info-detail.yml +++ b/sarama/test/expected/trace-info-detail.yml @@ -1,17 +1,18 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# spans: {{- range .spans}} From 55336e14e018532d41fb026c32163fd220e9e8fb Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Fri, 24 Sep 2021 14:14:55 +0800 Subject: [PATCH 04/22] rename sarama to kafkareporter --- .github/workflows/plugin_test.yaml | 2 +- README.md | 2 +- {sarama => kafkareporter}/README.md | 6 +-- {sarama => kafkareporter}/doc.go | 2 +- {sarama => kafkareporter}/go.mod | 2 +- {sarama => kafkareporter}/go.sum | 0 .../internal/tool/os_util.go | 0 {sarama => kafkareporter}/kafka.go | 32 ++++++++-------- {sarama => kafkareporter}/kafka_test.go | 14 +++---- .../test/docker-compose.yml | 6 +-- .../test/docker/Dockerfile.kafkareporter | 4 +- .../test/expected/service-instance.yml | 0 .../test/expected/service.yml | 0 .../test/expected/trace-info-detail.yml | 0 .../test/go_kafka_reporter_plugin_test.yaml | 2 +- .../test/server/server.go | 7 ++-- sarama/example_kafka_test.go | 38 ------------------- 17 files changed, 39 insertions(+), 78 deletions(-) rename {sarama => kafkareporter}/README.md (58%) rename {sarama => kafkareporter}/doc.go (96%) rename {sarama => kafkareporter}/go.mod (78%) rename {sarama => kafkareporter}/go.sum (100%) rename {sarama => kafkareporter}/internal/tool/os_util.go (100%) rename {sarama => kafkareporter}/kafka.go (89%) rename {sarama => kafkareporter}/kafka_test.go (96%) rename {sarama => kafkareporter}/test/docker-compose.yml (93%) rename sarama/test/docker/Dockerfile.sarama => kafkareporter/test/docker/Dockerfile.kafkareporter (91%) rename {sarama => kafkareporter}/test/expected/service-instance.yml (100%) rename {sarama => kafkareporter}/test/expected/service.yml (100%) rename {sarama => kafkareporter}/test/expected/trace-info-detail.yml (100%) rename sarama/test/go_sarama_plugin_test.yaml => kafkareporter/test/go_kafka_reporter_plugin_test.yaml (97%) rename {sarama => kafkareporter}/test/server/server.go (93%) delete mode 100644 sarama/example_kafka_test.go diff --git a/.github/workflows/plugin_test.yaml b/.github/workflows/plugin_test.yaml index 8e63d68..bc8d583 100644 --- a/.github/workflows/plugin_test.yaml +++ b/.github/workflows/plugin_test.yaml @@ -42,7 +42,7 @@ jobs: resty/test/go_resty_plugin_test.yaml: resty/** kratos/test/go_kratos_plugin_test.yaml: kratos/** sql/test/sql_plugin_test.yaml: sql/** - sarama/test/go_sarama_plugin_test.yaml: sarama/** + kafkareporter/test/go_kafka_reporter_plugin_test.yaml: kafkareporter/** PluginsTest: name: Plugin diff --git a/README.md b/README.md index 31f2346..a3c04bb 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ The plugins of [go2sky](https://github.com/SkyAPM/go2sky) ### Reporter Plugins -1. Kafka: [sarama](sarama/README.md) +1. [kafkareporter](kafkareporter/README.md) ### Trace Plugins diff --git a/sarama/README.md b/kafkareporter/README.md similarity index 58% rename from sarama/README.md rename to kafkareporter/README.md index f6b850a..c913726 100644 --- a/sarama/README.md +++ b/kafkareporter/README.md @@ -1,15 +1,15 @@ -# Go2sky with sarama reporter +# Go2sky with kafka reporter ## Installation ```go -go get -u github.com/SkyAPM/go2sky-plugins/sarama +go get -u github.com/SkyAPM/go2sky-plugins/kafkareporter ``` ## Usage ```go -r, err := sarama.NewKafkaReporter([]string{"localhost:9092"}) +r, err := kafkareporter.New([]string{"localhost:9092"}) if err != nil { log.Fatalf("new kafka reporter error %v \n", err) } diff --git a/sarama/doc.go b/kafkareporter/doc.go similarity index 96% rename from sarama/doc.go rename to kafkareporter/doc.go index 2582def..28c2b32 100644 --- a/sarama/doc.go +++ b/kafkareporter/doc.go @@ -15,4 +15,4 @@ // // Package sarama is a plugin that can be one kafka reporter as go2sky.Reporter. -package sarama +package kafkareporter diff --git a/sarama/go.mod b/kafkareporter/go.mod similarity index 78% rename from sarama/go.mod rename to kafkareporter/go.mod index 4a5367a..ceca56f 100644 --- a/sarama/go.mod +++ b/kafkareporter/go.mod @@ -1,4 +1,4 @@ -module github.com/SkyAPM/go2sky-plugins/sarama +module github.com/SkyAPM/go2sky-plugins/kafkareporter go 1.16 diff --git a/sarama/go.sum b/kafkareporter/go.sum similarity index 100% rename from sarama/go.sum rename to kafkareporter/go.sum diff --git a/sarama/internal/tool/os_util.go b/kafkareporter/internal/tool/os_util.go similarity index 100% rename from sarama/internal/tool/os_util.go rename to kafkareporter/internal/tool/os_util.go diff --git a/sarama/kafka.go b/kafkareporter/kafka.go similarity index 89% rename from sarama/kafka.go rename to kafkareporter/kafka.go index b64ba8a..92d9543 100644 --- a/sarama/kafka.go +++ b/kafkareporter/kafka.go @@ -14,7 +14,7 @@ // limitations under the License. // -package sarama +package kafkareporter import ( "context" @@ -25,7 +25,7 @@ import ( "github.com/Shopify/sarama" "github.com/SkyAPM/go2sky" - "github.com/SkyAPM/go2sky-plugins/sarama/internal/tool" + "github.com/SkyAPM/go2sky-plugins/kafkareporter/internal/tool" "google.golang.org/protobuf/proto" commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3" agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" @@ -58,8 +58,8 @@ type kafkaReporter struct { // cdsClient configuration.ConfigurationDiscoveryServiceClient } -// NewKafkaReporter create a new reporter to send data to kafka. -func NewKafkaReporter(addrs []string, opts ...KafkaReporterOption) (go2sky.Reporter, error) { +// New create a new reporter to send data to kafka. +func New(addrs []string, opts ...KafkaReporterOption) (go2sky.Reporter, error) { r := &kafkaReporter{ logger: log.New(os.Stderr, defaultKafkaLogPrefix, log.LstdFlags), checkInterval: defaultCheckInterval, @@ -89,46 +89,46 @@ func NewKafkaReporter(addrs []string, opts ...KafkaReporterOption) (go2sky.Repor } // KafkaReporterOption allows for functional options to adjust behaviour -// of a kafka reporter to be created by NewKafkaReporter +// of a kafka reporter to be created by New type KafkaReporterOption func(r *kafkaReporter) -// WithLogger setup logger for gRPC reporter -func WithKafkaConfig(c *sarama.Config) KafkaReporterOption { +// WithConfig setup sarama.Config for kafka reporter +func WithConfig(c *sarama.Config) KafkaReporterOption { return func(r *kafkaReporter) { r.c = c } } -// WithKafkaCheckInterval setup service and endpoint registry check interval -func WithKafkaCheckInterval(interval time.Duration) KafkaReporterOption { +// WithCheckInterval setup service and endpoint registry check interval +func WithCheckInterval(interval time.Duration) KafkaReporterOption { return func(r *kafkaReporter) { r.checkInterval = interval } } // WithInstanceProps setup service instance properties eg: org=SkyAPM -func WithKafkaInstanceProps(props map[string]string) KafkaReporterOption { +func WithInstanceProps(props map[string]string) KafkaReporterOption { return func(r *kafkaReporter) { r.instanceProps = props } } -// WithKafkaLogger setup logger for kafka reporter -func WithKafkaLogger(logger *log.Logger) KafkaReporterOption { +// WithLogger setup logger for kafka reporter +func WithLogger(logger *log.Logger) KafkaReporterOption { return func(r *kafkaReporter) { r.logger = logger } } -// WithKafkaTopicManagement setup service management topic -func WithKafkaTopicManagement(topicManagement string) KafkaReporterOption { +// WithTopicManagement setup service management topic +func WithTopicManagement(topicManagement string) KafkaReporterOption { return func(r *kafkaReporter) { r.topicManagement = topicManagement } } -// WithKafkaTopicSegment setup service segment topic -func WithKafkaTopicSegment(topicSegment string) KafkaReporterOption { +// WithTopicSegment setup service segment topic +func WithTopicSegment(topicSegment string) KafkaReporterOption { return func(r *kafkaReporter) { r.topicSegment = topicSegment } diff --git a/sarama/kafka_test.go b/kafkareporter/kafka_test.go similarity index 96% rename from sarama/kafka_test.go rename to kafkareporter/kafka_test.go index f2d38e3..92dbddc 100644 --- a/sarama/kafka_test.go +++ b/kafkareporter/kafka_test.go @@ -14,7 +14,7 @@ // limitations under the License. // -package sarama +package kafkareporter import ( "context" @@ -162,7 +162,7 @@ func TestKafkaReporterOption(t *testing.T) { }{ { name: "with kafka config", - option: WithKafkaConfig(c), + option: WithConfig(c), verifyFunc: func(t *testing.T, reporter *kafkaReporter) { if reporter.c != c { t.Error("error are not set WithKafkaConfig") @@ -171,7 +171,7 @@ func TestKafkaReporterOption(t *testing.T) { }, { name: "with check interval", - option: WithKafkaCheckInterval(time.Second), + option: WithCheckInterval(time.Second), verifyFunc: func(t *testing.T, reporter *kafkaReporter) { if reporter.checkInterval != time.Second { t.Error("error are not set checkInterval") @@ -180,7 +180,7 @@ func TestKafkaReporterOption(t *testing.T) { }, { name: "with serviceInstance props", - option: WithKafkaInstanceProps(instanceProps), + option: WithInstanceProps(instanceProps), verifyFunc: func(t *testing.T, reporter *kafkaReporter) { var value string var ok bool @@ -194,7 +194,7 @@ func TestKafkaReporterOption(t *testing.T) { }, { name: "with logger", - option: WithKafkaLogger(logger), + option: WithLogger(logger), verifyFunc: func(t *testing.T, reporter *kafkaReporter) { if reporter.logger != logger { t.Error("error are not set logger") @@ -203,7 +203,7 @@ func TestKafkaReporterOption(t *testing.T) { }, { name: "with topic management", - option: WithKafkaTopicManagement("test_management"), + option: WithTopicManagement("test_management"), verifyFunc: func(t *testing.T, reporter *kafkaReporter) { if reporter.topicManagement != "test_management" { t.Error("error are not set WithKafkaTopicManagement") @@ -212,7 +212,7 @@ func TestKafkaReporterOption(t *testing.T) { }, { name: "with topic segment", - option: WithKafkaTopicSegment("test_segment"), + option: WithTopicSegment("test_segment"), verifyFunc: func(t *testing.T, reporter *kafkaReporter) { if reporter.topicSegment != "test_segment" { t.Error("error are not set WithKafkaTopicSegment") diff --git a/sarama/test/docker-compose.yml b/kafkareporter/test/docker-compose.yml similarity index 93% rename from sarama/test/docker-compose.yml rename to kafkareporter/test/docker-compose.yml index 34f5c4c..e968a85 100644 --- a/sarama/test/docker-compose.yml +++ b/kafkareporter/test/docker-compose.yml @@ -37,7 +37,7 @@ services: condition: service_healthy kafka: - image: wurstmeister/kafka:2.12 + image: wurstmeister/kafka:2.12-2.4.1 networks: - e2e expose: @@ -50,10 +50,10 @@ services: timeout: 2s retries: 15 - sarama: + kafkareporter: build: context: ../ - dockerfile: ./test/docker/Dockerfile.sarama + dockerfile: ./test/docker/Dockerfile.kafkareporter networks: - e2e expose: diff --git a/sarama/test/docker/Dockerfile.sarama b/kafkareporter/test/docker/Dockerfile.kafkareporter similarity index 91% rename from sarama/test/docker/Dockerfile.sarama rename to kafkareporter/test/docker/Dockerfile.kafkareporter index a2d7c80..1940a93 100644 --- a/sarama/test/docker/Dockerfile.sarama +++ b/kafkareporter/test/docker/Dockerfile.kafkareporter @@ -16,8 +16,8 @@ FROM golang:1.16 -ADD ./sarama /sarama -WORKDIR /sarama +ADD ./kafkareporter /kafkareporter +WORKDIR /kafkareporter EXPOSE 8081 diff --git a/sarama/test/expected/service-instance.yml b/kafkareporter/test/expected/service-instance.yml similarity index 100% rename from sarama/test/expected/service-instance.yml rename to kafkareporter/test/expected/service-instance.yml diff --git a/sarama/test/expected/service.yml b/kafkareporter/test/expected/service.yml similarity index 100% rename from sarama/test/expected/service.yml rename to kafkareporter/test/expected/service.yml diff --git a/sarama/test/expected/trace-info-detail.yml b/kafkareporter/test/expected/trace-info-detail.yml similarity index 100% rename from sarama/test/expected/trace-info-detail.yml rename to kafkareporter/test/expected/trace-info-detail.yml diff --git a/sarama/test/go_sarama_plugin_test.yaml b/kafkareporter/test/go_kafka_reporter_plugin_test.yaml similarity index 97% rename from sarama/test/go_sarama_plugin_test.yaml rename to kafkareporter/test/go_kafka_reporter_plugin_test.yaml index 8ac6ea0..f2efcdd 100644 --- a/sarama/test/go_sarama_plugin_test.yaml +++ b/kafkareporter/test/go_kafka_reporter_plugin_test.yaml @@ -40,7 +40,7 @@ trigger: action: http interval: 3s times: 5 - url: http://127.0.0.1:${sarama_8081}/info + url: http://127.0.0.1:${kafkareporter_8081}/info method: GET verify: diff --git a/sarama/test/server/server.go b/kafkareporter/test/server/server.go similarity index 93% rename from sarama/test/server/server.go rename to kafkareporter/test/server/server.go index 5c2eabb..97cd561 100644 --- a/sarama/test/server/server.go +++ b/kafkareporter/test/server/server.go @@ -22,19 +22,18 @@ import ( "net/http" "github.com/SkyAPM/go2sky" - "github.com/SkyAPM/go2sky-plugins/sarama" + "github.com/SkyAPM/go2sky-plugins/kafkareporter" ) -var kafka = []string{"kafka:9092"} - const ( + broker = "kafka:9092" service = "kafka-reporter" addr = ":8081" ) func main() { // init tracer - re, err := sarama.NewKafkaReporter(kafka) + re, err := kafkareporter.New([]string{broker}) if err != nil { log.Fatalf("create kafka reporter error: %v \n", err) } diff --git a/sarama/example_kafka_test.go b/sarama/example_kafka_test.go deleted file mode 100644 index 8c32a01..0000000 --- a/sarama/example_kafka_test.go +++ /dev/null @@ -1,38 +0,0 @@ -// -// Copyright 2021 SkyAPM org -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package sarama - -import ( - "log" - - "github.com/SkyAPM/go2sky" -) - -func ExampleNewKafkaReporter() { - r, err := NewKafkaReporter([]string{"localhost:9092"}) - if err != nil { - log.Fatalf("new kafka reporter error %v \n", err) - } - defer r.Close() - - _, err = go2sky.NewTracer("example", go2sky.WithReporter(r)) - if err != nil { - log.Fatalf("create tracer error %v \n", err) - } - - // Output: -} From 2218acda5cc4c0e660c26ffedb79dc6886856bd7 Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Sun, 26 Sep 2021 14:55:33 +0800 Subject: [PATCH 05/22] fix kafkareporter docker-compose.yml context dir --- kafkareporter/test/docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafkareporter/test/docker-compose.yml b/kafkareporter/test/docker-compose.yml index e968a85..7625c2d 100644 --- a/kafkareporter/test/docker-compose.yml +++ b/kafkareporter/test/docker-compose.yml @@ -52,8 +52,8 @@ services: kafkareporter: build: - context: ../ - dockerfile: ./test/docker/Dockerfile.kafkareporter + context: ../../ + dockerfile: ./kafkareporter/test/docker/Dockerfile.kafkareporter networks: - e2e expose: From ddc989f28ed002e92c617bb15c3e69e78663896c Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Sun, 26 Sep 2021 14:57:05 +0800 Subject: [PATCH 06/22] remove GOPROXY in kafkareporter docker-compose.yml --- kafkareporter/test/docker-compose.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/kafkareporter/test/docker-compose.yml b/kafkareporter/test/docker-compose.yml index 7625c2d..1deda76 100644 --- a/kafkareporter/test/docker-compose.yml +++ b/kafkareporter/test/docker-compose.yml @@ -60,8 +60,6 @@ services: - 8081 ports: - 8081:8081 - environment: - GOPROXY: "https://goproxy.io" healthcheck: test: ["CMD", "curl", "http://127.0.0.1:8081/healthCheck"] interval: 5s From ec6b65181adee79724a63403f97584e6b21ff89d Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Sun, 26 Sep 2021 19:15:57 +0800 Subject: [PATCH 07/22] fix docker-compose.yml in kafkareporter --- kafkareporter/kafka.go | 1 + kafkareporter/test/docker-compose.yml | 23 ++++++++++++----------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/kafkareporter/kafka.go b/kafkareporter/kafka.go index 92d9543..38b57ee 100644 --- a/kafkareporter/kafka.go +++ b/kafkareporter/kafka.go @@ -61,6 +61,7 @@ type kafkaReporter struct { // New create a new reporter to send data to kafka. func New(addrs []string, opts ...KafkaReporterOption) (go2sky.Reporter, error) { r := &kafkaReporter{ + c: sarama.NewConfig(), logger: log.New(os.Stderr, defaultKafkaLogPrefix, log.LstdFlags), checkInterval: defaultCheckInterval, topicManagement: defaultTopicManagement, diff --git a/kafkareporter/test/docker-compose.yml b/kafkareporter/test/docker-compose.yml index 1deda76..4322a1c 100644 --- a/kafkareporter/test/docker-compose.yml +++ b/kafkareporter/test/docker-compose.yml @@ -14,7 +14,7 @@ # limitations under the License. # -version: '2.1' +version: "2.1" services: oap: @@ -32,9 +32,13 @@ services: interval: 5s timeout: 60s retries: 120 - depends_on: - kafka: - condition: service_healthy + + zookeeper: + image: wurstmeister/zookeeper + networks: + - e2e + ports: + - "2181:2181" kafka: image: wurstmeister/kafka:2.12-2.4.1 @@ -44,11 +48,10 @@ services: - 9092 ports: - 9092:9092 - healthcheck: - test: ["CMD", "nc", "-vz", "localhost", "9092"] - interval: 2s - timeout: 2s - retries: 15 + environment: + KAFKA_ADVERTISED_HOST_NAME: kafka + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 kafkareporter: build: @@ -68,8 +71,6 @@ services: depends_on: oap: condition: service_healthy - kafka: - condition: service_healthy networks: e2e: From b74aad6b911f6d8c38e96801701923a70cc85a0c Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Mon, 27 Sep 2021 13:41:17 +0800 Subject: [PATCH 08/22] fix go_kafka_reporter_plugin_test.yaml --- kafkareporter/test/go_kafka_reporter_plugin_test.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafkareporter/test/go_kafka_reporter_plugin_test.yaml b/kafkareporter/test/go_kafka_reporter_plugin_test.yaml index f2efcdd..f78aab7 100644 --- a/kafkareporter/test/go_kafka_reporter_plugin_test.yaml +++ b/kafkareporter/test/go_kafka_reporter_plugin_test.yaml @@ -52,13 +52,13 @@ verify: interval: 10000 cases: # basic check: service list - - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls + - query: swctl --display yaml --base-url=http://127.0.0.1:${oap_12800}/graphql service ls expected: expected/service.yml # native management: service instance list - - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider + - query: swctl --display yaml --base-url=http://127.0.0.1:${oap_12800}/graphql instance list --service-name=e2e-service-provider expected: expected/service-instance.yml # native tracing: trace detail - - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls|grep -A 5 '/info'|tail -n1|awk -F ' ' '{print $2}') + - query: swctl --display yaml --base-url=http://127.0.0.1:${oap_12800}/graphql trace $(swctl --display yaml --base-url=http://127.0.0.1:${oap_12800}/graphql trace ls|grep -A 5 '/info'|tail -n1|awk -F ' ' '{print $2}') expected: expected/trace-info-detail.yml From cf23d19d9ea29ca0f97be6b4287be0047c59cea2 Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Mon, 27 Sep 2021 14:11:06 +0800 Subject: [PATCH 09/22] fix ports in kafkareporter/test --- kafkareporter/test/docker-compose.yml | 12 +++++++----- .../test/go_kafka_reporter_plugin_test.yaml | 8 ++++---- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/kafkareporter/test/docker-compose.yml b/kafkareporter/test/docker-compose.yml index 4322a1c..9d11cd4 100644 --- a/kafkareporter/test/docker-compose.yml +++ b/kafkareporter/test/docker-compose.yml @@ -19,11 +19,13 @@ version: "2.1" services: oap: image: ghcr.io/apache/skywalking/oap:1730f2c84bbd4da999ec2c74d1c26db31d5a0d24 + networks: + - e2e expose: - 11800 - 12800 - networks: - - e2e + ports: + - 12800 restart: on-failure environment: SW_KAFKA_FETCHER_SERVERS: kafka:9092 @@ -38,7 +40,7 @@ services: networks: - e2e ports: - - "2181:2181" + - 2181 kafka: image: wurstmeister/kafka:2.12-2.4.1 @@ -47,7 +49,7 @@ services: expose: - 9092 ports: - - 9092:9092 + - 9092 environment: KAFKA_ADVERTISED_HOST_NAME: kafka KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" @@ -62,7 +64,7 @@ services: expose: - 8081 ports: - - 8081:8081 + - 8081 healthcheck: test: ["CMD", "curl", "http://127.0.0.1:8081/healthCheck"] interval: 5s diff --git a/kafkareporter/test/go_kafka_reporter_plugin_test.yaml b/kafkareporter/test/go_kafka_reporter_plugin_test.yaml index f78aab7..899f26c 100644 --- a/kafkareporter/test/go_kafka_reporter_plugin_test.yaml +++ b/kafkareporter/test/go_kafka_reporter_plugin_test.yaml @@ -40,7 +40,7 @@ trigger: action: http interval: 3s times: 5 - url: http://127.0.0.1:${kafkareporter_8081}/info + url: http://${kafkareporter_host}:${kafkareporter_8081}/info method: GET verify: @@ -52,13 +52,13 @@ verify: interval: 10000 cases: # basic check: service list - - query: swctl --display yaml --base-url=http://127.0.0.1:${oap_12800}/graphql service ls + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls expected: expected/service.yml # native management: service instance list - - query: swctl --display yaml --base-url=http://127.0.0.1:${oap_12800}/graphql instance list --service-name=e2e-service-provider + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider expected: expected/service-instance.yml # native tracing: trace detail - - query: swctl --display yaml --base-url=http://127.0.0.1:${oap_12800}/graphql trace $(swctl --display yaml --base-url=http://127.0.0.1:${oap_12800}/graphql trace ls|grep -A 5 '/info'|tail -n1|awk -F ' ' '{print $2}') + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls|grep -A 5 '/info'|tail -n1|awk -F ' ' '{print $2}') expected: expected/trace-info-detail.yml From 1ce6f2f9f9af4dce9d00386c4d62f12a886496de Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Mon, 27 Sep 2021 14:36:46 +0800 Subject: [PATCH 10/22] fix go_kafka_reporter_plugin_test.yaml verify --- kafkareporter/test/go_kafka_reporter_plugin_test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafkareporter/test/go_kafka_reporter_plugin_test.yaml b/kafkareporter/test/go_kafka_reporter_plugin_test.yaml index 899f26c..d7eb581 100644 --- a/kafkareporter/test/go_kafka_reporter_plugin_test.yaml +++ b/kafkareporter/test/go_kafka_reporter_plugin_test.yaml @@ -56,7 +56,7 @@ verify: expected: expected/service.yml # native management: service instance list - - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=kafka-reporter expected: expected/service-instance.yml # native tracing: trace detail From 481b66e9aa5f92f0e1cff09364ec71cfbfe9ce18 Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Mon, 27 Sep 2021 17:07:40 +0800 Subject: [PATCH 11/22] active kafka fetcher in go_kafka_reporter_plugin_test.yaml --- kafkareporter/test/docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/kafkareporter/test/docker-compose.yml b/kafkareporter/test/docker-compose.yml index 9d11cd4..946cb8d 100644 --- a/kafkareporter/test/docker-compose.yml +++ b/kafkareporter/test/docker-compose.yml @@ -28,6 +28,7 @@ services: - 12800 restart: on-failure environment: + SW_KAFKA_FETCHER: default SW_KAFKA_FETCHER_SERVERS: kafka:9092 healthcheck: test: ["CMD", "sh", "-c", "nc -zn 127.0.0.1 11800"] From 942e2d22cefb22c122d7d6032c58ebdf3ef10e93 Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Mon, 27 Sep 2021 17:20:32 +0800 Subject: [PATCH 12/22] remove healthy check in kafkareporter/test --- kafkareporter/test/docker-compose.yml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/kafkareporter/test/docker-compose.yml b/kafkareporter/test/docker-compose.yml index 946cb8d..29daaa2 100644 --- a/kafkareporter/test/docker-compose.yml +++ b/kafkareporter/test/docker-compose.yml @@ -30,11 +30,6 @@ services: environment: SW_KAFKA_FETCHER: default SW_KAFKA_FETCHER_SERVERS: kafka:9092 - healthcheck: - test: ["CMD", "sh", "-c", "nc -zn 127.0.0.1 11800"] - interval: 5s - timeout: 60s - retries: 120 zookeeper: image: wurstmeister/zookeeper @@ -71,9 +66,6 @@ services: interval: 5s timeout: 20s retries: 10 - depends_on: - oap: - condition: service_healthy networks: e2e: From ad53aebcd258d801d43be1bdaae8ec339c1f0387 Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Wed, 29 Sep 2021 14:58:33 +0800 Subject: [PATCH 13/22] create topics in kafkareporter docker-compose.yml --- kafkareporter/test/docker-compose.yml | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/kafkareporter/test/docker-compose.yml b/kafkareporter/test/docker-compose.yml index 29daaa2..857b188 100644 --- a/kafkareporter/test/docker-compose.yml +++ b/kafkareporter/test/docker-compose.yml @@ -30,6 +30,19 @@ services: environment: SW_KAFKA_FETCHER: default SW_KAFKA_FETCHER_SERVERS: kafka:9092 + SW_KAFKA_FETCHER_PARTITIONS: 1 + SW_KAFKA_FETCHER_PARTITIONS_FACTOR: 1 + + ui: + image: apache/skywalking-ui:8.5.0 + expose: + - 8080 + ports: + - 8080 + networks: + - e2e + environment: + - SW_OAP_ADDRESS=oap:12800 zookeeper: image: wurstmeister/zookeeper @@ -48,8 +61,8 @@ services: - 9092 environment: KAFKA_ADVERTISED_HOST_NAME: kafka - KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_CREATE_TOPICS: "skywalking-metrics:1:1,skywalking-managements:1:1,skywalking-profilings:1:1,skywalking-segments:1:1,skywalking-meters:1:1" kafkareporter: build: From e09d30789505f9f11c47d55b9aece3f3aea51e8f Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Wed, 29 Sep 2021 18:42:05 +0800 Subject: [PATCH 14/22] add EntrySpan to kafkareporter e2e test --- kafkareporter/test/server/server.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/kafkareporter/test/server/server.go b/kafkareporter/test/server/server.go index 97cd561..900b793 100644 --- a/kafkareporter/test/server/server.go +++ b/kafkareporter/test/server/server.go @@ -48,8 +48,16 @@ func main() { _, _ = res.Write([]byte("success")) }) route.HandleFunc("/info", func(res http.ResponseWriter, req *http.Request) { - span, _, err := tracer.CreateLocalSpan( - context.Background(), + ctx := context.Background() + span, ctx, err := tracer.CreateEntrySpan(ctx, "/info", func(key string) (s string, e error) { + return "", nil + }) + if err != nil { + log.Fatalf("create span error: %v \n", err) + } + defer span.End() + span, _, err = tracer.CreateLocalSpan( + ctx, go2sky.WithOperationName("info"), ) if err != nil { From 8472e79edf031249e2209fdaa8d7d3e02e6df4d8 Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Wed, 29 Sep 2021 19:04:15 +0800 Subject: [PATCH 15/22] update trace-info-detail in kafkareporter --- kafkareporter/test/expected/trace-info-detail.yml | 11 ++++++----- kafkareporter/test/server/server.go | 8 -------- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/kafkareporter/test/expected/trace-info-detail.yml b/kafkareporter/test/expected/trace-info-detail.yml index e650d34..9813bff 100644 --- a/kafkareporter/test/expected/trace-info-detail.yml +++ b/kafkareporter/test/expected/trace-info-detail.yml @@ -21,15 +21,16 @@ spans: spanid: 0 parentspanid: -1 refs: [] - servicecode: {{ notEmpty .servicecode }} - serviceinstancename: {{ notEmpty .serviceinstancename }} + servicecode: kafka-reporter + serviceinstancename: "" starttime: {{ gt .starttime 0 }} endtime: {{ gt .endtime 0 }} - type: Local + endpointname: /info + type: Entry peer: "" - component: "" + component: Unknown iserror: false - layer: "" + layer: Unknown tags: [] logs: [] {{- end }} \ No newline at end of file diff --git a/kafkareporter/test/server/server.go b/kafkareporter/test/server/server.go index 900b793..f80f4b5 100644 --- a/kafkareporter/test/server/server.go +++ b/kafkareporter/test/server/server.go @@ -56,14 +56,6 @@ func main() { log.Fatalf("create span error: %v \n", err) } defer span.End() - span, _, err = tracer.CreateLocalSpan( - ctx, - go2sky.WithOperationName("info"), - ) - if err != nil { - log.Fatalf("create span error: %v \n", err) - } - defer span.End() _, _ = res.Write([]byte("info success")) }) From 913fa3b26cb1fd881f9c557c2511f9d1b926ff5d Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Wed, 29 Sep 2021 19:04:59 +0800 Subject: [PATCH 16/22] remove useless code in kafkareporter --- kafkareporter/test/server/server.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kafkareporter/test/server/server.go b/kafkareporter/test/server/server.go index f80f4b5..5fafb9a 100644 --- a/kafkareporter/test/server/server.go +++ b/kafkareporter/test/server/server.go @@ -48,8 +48,7 @@ func main() { _, _ = res.Write([]byte("success")) }) route.HandleFunc("/info", func(res http.ResponseWriter, req *http.Request) { - ctx := context.Background() - span, ctx, err := tracer.CreateEntrySpan(ctx, "/info", func(key string) (s string, e error) { + span, _, err := tracer.CreateEntrySpan(context.Background(), "/info", func(key string) (s string, e error) { return "", nil }) if err != nil { From 1bd7926a59882f6200dad234aff8ec0d0447068b Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Thu, 30 Sep 2021 14:36:50 +0800 Subject: [PATCH 17/22] update service-instance.yml in kafkareporter/test --- kafkareporter/test/expected/service-instance.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafkareporter/test/expected/service-instance.yml b/kafkareporter/test/expected/service-instance.yml index a55bbe8..04ed1d3 100644 --- a/kafkareporter/test/expected/service-instance.yml +++ b/kafkareporter/test/expected/service-instance.yml @@ -21,7 +21,7 @@ {{- range .attributes }} {{- if eq .name "OS Name" }} - name: OS Name - value: Linux + value: linux {{- end }} {{- if eq .name "hostname" }} - name: hostname @@ -29,13 +29,13 @@ {{- end }} {{- if eq .name "Process No." }} - name: Process No. - value: "1" + value: {{ notEmpty .value }} {{- end }} {{- if eq .name "ipv4s" }} - name: ipv4s value: {{ notEmpty .value }} {{- end }} {{- end}} - language: go + language: GO instanceuuid: {{ b64enc "kafka-reporter" }}.1_{{ b64enc "provider1" }} {{- end}} \ No newline at end of file From e20e649a3de233a9c18af0d78cfcb023de339ca1 Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Thu, 30 Sep 2021 15:03:35 +0800 Subject: [PATCH 18/22] fix service-instance.yml in kafkareporter/test --- kafkareporter/test/expected/service-instance.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafkareporter/test/expected/service-instance.yml b/kafkareporter/test/expected/service-instance.yml index 04ed1d3..2d7fa88 100644 --- a/kafkareporter/test/expected/service-instance.yml +++ b/kafkareporter/test/expected/service-instance.yml @@ -33,7 +33,7 @@ {{- end }} {{- if eq .name "ipv4s" }} - name: ipv4s - value: {{ notEmpty .value }} + value: "{{ notEmpty .value }}" {{- end }} {{- end}} language: GO From 17ba8e9a5858e6409307819859b981112a164205 Mon Sep 17 00:00:00 2001 From: zhang-wei Date: Thu, 30 Sep 2021 17:44:19 +0800 Subject: [PATCH 19/22] Update kafkareporter/test/expected/service-instance.yml --- kafkareporter/test/expected/service-instance.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafkareporter/test/expected/service-instance.yml b/kafkareporter/test/expected/service-instance.yml index 2d7fa88..e2bd484 100644 --- a/kafkareporter/test/expected/service-instance.yml +++ b/kafkareporter/test/expected/service-instance.yml @@ -29,7 +29,7 @@ {{- end }} {{- if eq .name "Process No." }} - name: Process No. - value: {{ notEmpty .value }} + value: "{{ notEmpty .value }}" {{- end }} {{- if eq .name "ipv4s" }} - name: ipv4s From 75712b705600cc993ca1e593de400fdb34bb3e30 Mon Sep 17 00:00:00 2001 From: matianjun1 Date: Fri, 8 Oct 2021 11:13:50 +0800 Subject: [PATCH 20/22] fix serviceinstancename in kafkareporter/test --- kafkareporter/test/expected/service-instance.yml | 2 +- kafkareporter/test/expected/trace-info-detail.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kafkareporter/test/expected/service-instance.yml b/kafkareporter/test/expected/service-instance.yml index e2bd484..1d7aeeb 100644 --- a/kafkareporter/test/expected/service-instance.yml +++ b/kafkareporter/test/expected/service-instance.yml @@ -33,7 +33,7 @@ {{- end }} {{- if eq .name "ipv4s" }} - name: ipv4s - value: "{{ notEmpty .value }}" + value: {{ notEmpty .value }} {{- end }} {{- end}} language: GO diff --git a/kafkareporter/test/expected/trace-info-detail.yml b/kafkareporter/test/expected/trace-info-detail.yml index 9813bff..61a5941 100644 --- a/kafkareporter/test/expected/trace-info-detail.yml +++ b/kafkareporter/test/expected/trace-info-detail.yml @@ -22,7 +22,7 @@ spans: parentspanid: -1 refs: [] servicecode: kafka-reporter - serviceinstancename: "" + serviceinstancename: provider1 starttime: {{ gt .starttime 0 }} endtime: {{ gt .endtime 0 }} endpointname: /info From 1ce91a1e1807db741057127c5bdaa950f3e3ff87 Mon Sep 17 00:00:00 2001 From: zhang-wei Date: Sun, 10 Oct 2021 23:34:31 +0800 Subject: [PATCH 21/22] replace range with contains --- kafkareporter/test/expected/trace-info-detail.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafkareporter/test/expected/trace-info-detail.yml b/kafkareporter/test/expected/trace-info-detail.yml index 61a5941..95eabdd 100644 --- a/kafkareporter/test/expected/trace-info-detail.yml +++ b/kafkareporter/test/expected/trace-info-detail.yml @@ -15,7 +15,7 @@ # spans: - {{- range .spans}} + {{- contains .spans}} - traceid: {{ notEmpty .traceid }} segmentid: {{ notEmpty .segmentid }} spanid: 0 @@ -33,4 +33,4 @@ spans: layer: Unknown tags: [] logs: [] - {{- end }} \ No newline at end of file + {{- end }} \ No newline at end of file From 171c05c6beada730abd353f75290e42227e8bbca Mon Sep 17 00:00:00 2001 From: zhang-wei Date: Sun, 10 Oct 2021 23:37:15 +0800 Subject: [PATCH 22/22] replace range with contains --- kafkareporter/test/expected/service-instance.yml | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/kafkareporter/test/expected/service-instance.yml b/kafkareporter/test/expected/service-instance.yml index 1d7aeeb..c10ceb3 100644 --- a/kafkareporter/test/expected/service-instance.yml +++ b/kafkareporter/test/expected/service-instance.yml @@ -14,28 +14,20 @@ # limitations under the License. # -{{- range .}} +{{- contains .}} - id: {{ b64enc "kafka-reporter" }}.1_{{ b64enc "provider1" }} name: {{ notEmpty .name }} attributes: - {{- range .attributes }} - {{- if eq .name "OS Name" }} + {{- contains .attributes }} - name: OS Name value: linux - {{- end }} - {{- if eq .name "hostname" }} - name: hostname value: {{ notEmpty .value }} - {{- end }} - {{- if eq .name "Process No." }} - - name: Process No. + - name: "Process No." value: "{{ notEmpty .value }}" - {{- end }} - {{- if eq .name "ipv4s" }} - name: ipv4s value: {{ notEmpty .value }} {{- end }} - {{- end}} language: GO instanceuuid: {{ b64enc "kafka-reporter" }}.1_{{ b64enc "provider1" }} -{{- end}} \ No newline at end of file +{{- end }} \ No newline at end of file