diff --git a/.github/workflows/plugin_test.yaml b/.github/workflows/plugin_test.yaml index 3e6979c..6e73c3b 100644 --- a/.github/workflows/plugin_test.yaml +++ b/.github/workflows/plugin_test.yaml @@ -42,6 +42,7 @@ jobs: resty/test/go_resty_plugin_test.yaml: resty/** kratos/test/go_kratos_plugin_test.yaml: kratos/** sql/test/sql_plugin_test.yaml: sql/** + kafkareporter/test/go_kafka_reporter_plugin_test.yaml: kafkareporter/** PluginsTest: name: Plugin diff --git a/README.md b/README.md index f701391..a3c04bb 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,12 @@ The plugins of [go2sky](https://github.com/SkyAPM/go2sky) ## Plugin Summary +### Reporter Plugins + +1. [kafkareporter](kafkareporter/README.md) + ### Trace Plugins + 1. [http server & client](http/README.md) 1. [gin](gin/README.md) 1. [gear](gear/README.md) @@ -17,5 +22,6 @@ The plugins of [go2sky](https://github.com/SkyAPM/go2sky) 1. [sql](sql/README.md) ### Log Plugins + 1. [logrus](logrus/README.md) 1. [zap](zap/README.md) diff --git a/kafkareporter/README.md b/kafkareporter/README.md new file mode 100644 index 0000000..c913726 --- /dev/null +++ b/kafkareporter/README.md @@ -0,0 +1,18 @@ +# Go2sky with kafka reporter + +## Installation + +```go +go get -u github.com/SkyAPM/go2sky-plugins/kafkareporter +``` + +## Usage + +```go +r, err := kafkareporter.New([]string{"localhost:9092"}) +if err != nil { + log.Fatalf("new kafka reporter error %v \n", err) +} +defer r.Close() +tracer, err := go2sky.NewTracer("example", go2sky.WithReporter(r)) +``` diff --git a/kafkareporter/doc.go b/kafkareporter/doc.go new file mode 100644 index 0000000..28c2b32 --- /dev/null +++ b/kafkareporter/doc.go @@ -0,0 +1,18 @@ +// +// Copyright 2021 SkyAPM org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package sarama is a plugin that can be one kafka reporter as go2sky.Reporter. +package kafkareporter diff --git a/kafkareporter/go.mod b/kafkareporter/go.mod new file mode 100644 index 0000000..ceca56f --- /dev/null +++ b/kafkareporter/go.mod @@ -0,0 +1,10 @@ +module github.com/SkyAPM/go2sky-plugins/kafkareporter + +go 1.16 + +require ( + github.com/Shopify/sarama v1.29.1 + github.com/SkyAPM/go2sky v1.2.0 + google.golang.org/protobuf v1.27.1 + skywalking.apache.org/repo/goapi v0.0.0-20210820070710-e10b78bbf481 +) diff --git a/kafkareporter/go.sum b/kafkareporter/go.sum new file mode 100644 index 0000000..90ba271 --- /dev/null +++ b/kafkareporter/go.sum @@ -0,0 +1,225 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/99designs/gqlgen v0.13.0/go.mod h1:NV130r6f4tpRWuAI+zsrSdooO/eWUv+Gyyoi3rEfXIk= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Shopify/sarama v1.29.1 h1:wBAacXbYVLmWieEA/0X/JagDdCZ8NVFOfS6l6+2u5S0= +github.com/Shopify/sarama v1.29.1/go.mod h1:mdtqvCSg8JOxk8PmpTNGyo6wzd4BMm4QXSfDnTXmgkE= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/SkyAPM/go2sky v1.2.0 h1:DTkNf16MVZ/IHC6Bj9kFIoPKodgyXTPLHYca8qMgKbw= +github.com/SkyAPM/go2sky v1.2.0/go.mod h1:LzuySkt/TsQL8FMANZu4BX/6Rn0UfGu5KS/r9TwG/dc= +github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= +github.com/agnivade/levenshtein v1.0.3/go.mod h1:4SFRZbbXWLF4MU1T9Qg0pGgH3Pjs+t6ie5efyrwRJXs= +github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= +github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/trifles v0.0.0-20190318185328-a8d75aae118c/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= +github.com/go-chi/chi v3.3.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= +github.com/gogo/protobuf v1.0.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/context v0.0.0-20160226214623-1ea25387ff6f/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= +github.com/gorilla/mux v1.6.1/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA= +github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8= +github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= +github.com/matryer/moq v0.0.0-20200106131100-75d0ddfc0007/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mitchellh/mapstructure v0.0.0-20180203102830-a4e142e9c047/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= +github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= +github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= +github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/shurcooL/vfsgen v0.0.0-20180121065927-ffb13db8def0/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= +github.com/vektah/dataloaden v0.2.1-0.20190515034641-a19b9a6e7c9e/go.mod h1:/HUdMve7rvxZma+2ZELQeNh88+003LL7Pf/CZ089j8U= +github.com/vektah/gqlparser/v2 v2.1.0/go.mod h1:SyUiHgLATUR8BiYURfTirrTcGpcE+4XkV2se04Px1Ms= +github.com/xdg/scram v1.0.3/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.1-0.20181010134911-4d1c5fb19474/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190125232054-d66bd3c5d5a6/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190515012406-7d7faa4812bd/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200114235610-7ae403b6b589/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0= +google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +skywalking.apache.org/repo/goapi v0.0.0-20210401062122-a049ca15c62d/go.mod h1:S9co6uRVlbQU7PnN7RpEqRtLRCF5n0E9TB7RSmqC5kw= +skywalking.apache.org/repo/goapi v0.0.0-20210820070710-e10b78bbf481 h1:K8jQuADJdwsl4+3P6g/nFjRo9ADNhal2MWUW2R4D8Xk= +skywalking.apache.org/repo/goapi v0.0.0-20210820070710-e10b78bbf481/go.mod h1:2abOB2LaQEsJLmollzCt5kNfVMWFGKE58905uYzs+sc= +sourcegraph.com/sourcegraph/appdash v0.0.0-20180110180208-2cc67fd64755/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= +sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.mod h1:L5q+DGLGOQFpo1snNEkLOJT2d1YTW66rWNzatr3He1k= diff --git a/kafkareporter/internal/tool/os_util.go b/kafkareporter/internal/tool/os_util.go new file mode 100644 index 0000000..90d0b9b --- /dev/null +++ b/kafkareporter/internal/tool/os_util.go @@ -0,0 +1,70 @@ +// +// Copyright 2021 SkyAPM org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package tool + +import ( + "net" + "os" + "runtime" + "strconv" +) + +func ProcessNo() string { + if os.Getpid() > 0 { + return strconv.Itoa(os.Getpid()) + } + return "" +} + +func HostName() string { + if hs, err := os.Hostname(); err == nil { + return hs + } + return "unknown" +} + +func OSName() string { + return runtime.GOOS +} + +func AllIPV4() (ipv4s []string) { + adders, err := net.InterfaceAddrs() + if err != nil { + return + } + + for _, addr := range adders { + if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { + if ipNet.IP.To4() != nil { + ipv4 := ipNet.IP.String() + if ipv4 == "127.0.0.1" || ipv4 == "localhost" { + continue + } + ipv4s = append(ipv4s, ipv4) + } + } + } + return +} + +func IPV4() string { + ipv4s := AllIPV4() + if len(ipv4s) > 0 { + return ipv4s[0] + } + return "no-hostname" +} diff --git a/kafkareporter/kafka.go b/kafkareporter/kafka.go new file mode 100644 index 0000000..38b57ee --- /dev/null +++ b/kafkareporter/kafka.go @@ -0,0 +1,340 @@ +// +// Copyright 2021 SkyAPM org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package kafkareporter + +import ( + "context" + "log" + "os" + "sync" + "time" + + "github.com/Shopify/sarama" + "github.com/SkyAPM/go2sky" + "github.com/SkyAPM/go2sky-plugins/kafkareporter/internal/tool" + "google.golang.org/protobuf/proto" + commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3" + agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + managementv3 "skywalking.apache.org/repo/goapi/collect/management/v3" +) + +const ( + defaultCheckInterval = 20 * time.Second + defaultKafkaLogPrefix = "go2sky-kafka" + topicKeyRegister = "register-" + defaultTopicManagement = "skywalking-managements" + defaultTopicSegment = "skywalking-segments" +) + +type kafkaReporter struct { + c *sarama.Config + producer sarama.AsyncProducer + service string + serviceInstance string + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + instanceProps map[string]string + logger *log.Logger + topicManagement string + topicSegment string + checkInterval time.Duration + // cdsInterval time.Duration + // cdsService *go2sky.ConfigDiscoveryService + // cdsClient configuration.ConfigurationDiscoveryServiceClient +} + +// New create a new reporter to send data to kafka. +func New(addrs []string, opts ...KafkaReporterOption) (go2sky.Reporter, error) { + r := &kafkaReporter{ + c: sarama.NewConfig(), + logger: log.New(os.Stderr, defaultKafkaLogPrefix, log.LstdFlags), + checkInterval: defaultCheckInterval, + topicManagement: defaultTopicManagement, + topicSegment: defaultTopicSegment, + } + + for _, o := range opts { + o(r) + } + + p, err := sarama.NewAsyncProducer(addrs, r.c) + if err != nil { + return nil, err + } + r.producer = p + + if r.c.Producer.Return.Errors { + go func() { + for e := range p.Errors() { + r.logger.Printf("send kafka err: %v", e.Err) + } + }() + } + + return r, nil +} + +// KafkaReporterOption allows for functional options to adjust behaviour +// of a kafka reporter to be created by New +type KafkaReporterOption func(r *kafkaReporter) + +// WithConfig setup sarama.Config for kafka reporter +func WithConfig(c *sarama.Config) KafkaReporterOption { + return func(r *kafkaReporter) { + r.c = c + } +} + +// WithCheckInterval setup service and endpoint registry check interval +func WithCheckInterval(interval time.Duration) KafkaReporterOption { + return func(r *kafkaReporter) { + r.checkInterval = interval + } +} + +// WithInstanceProps setup service instance properties eg: org=SkyAPM +func WithInstanceProps(props map[string]string) KafkaReporterOption { + return func(r *kafkaReporter) { + r.instanceProps = props + } +} + +// WithLogger setup logger for kafka reporter +func WithLogger(logger *log.Logger) KafkaReporterOption { + return func(r *kafkaReporter) { + r.logger = logger + } +} + +// WithTopicManagement setup service management topic +func WithTopicManagement(topicManagement string) KafkaReporterOption { + return func(r *kafkaReporter) { + r.topicManagement = topicManagement + } +} + +// WithTopicSegment setup service segment topic +func WithTopicSegment(topicSegment string) KafkaReporterOption { + return func(r *kafkaReporter) { + r.topicSegment = topicSegment + } +} + +func (r *kafkaReporter) Boot(service string, serviceInstance string, cdsWatchers []go2sky.AgentConfigChangeWatcher) { + r.service = service + r.serviceInstance = serviceInstance + r.ctx, r.cancel = context.WithCancel(context.Background()) + r.check() +} + +func (r *kafkaReporter) reportInstanceProperties() error { + props := buildOSInfo() + if r.instanceProps != nil { + for k, v := range r.instanceProps { + props = append(props, &commonv3.KeyStringValuePair{ + Key: k, + Value: v, + }) + } + } + instanceProperties := &managementv3.InstanceProperties{ + Service: r.service, + ServiceInstance: r.serviceInstance, + Properties: props, + } + b, err := proto.Marshal(instanceProperties) + if err != nil { + return err + } + + r.producer.Input() <- &sarama.ProducerMessage{ + Topic: r.topicManagement, + Key: sarama.StringEncoder(topicKeyRegister + instanceProperties.ServiceInstance), + Value: sarama.ByteEncoder(b), + } + return nil +} + +func (r *kafkaReporter) check() { + if r.checkInterval < 0 || r.producer == nil { + return + } + r.wg.Add(1) + go func() { + defer r.wg.Done() + ticker := time.NewTicker(r.checkInterval) + defer ticker.Stop() + instancePropertiesSubmitted := false + for { + select { + case <-r.ctx.Done(): + return + case <-ticker.C: + if !instancePropertiesSubmitted { + err := r.reportInstanceProperties() + if err != nil { + r.logger.Printf("report serviceInstance properties error %v", err) + continue + } + instancePropertiesSubmitted = true + } + + instancePingPkg := &managementv3.InstancePingPkg{ + Service: r.service, + ServiceInstance: r.serviceInstance, + } + b, err := proto.Marshal(instancePingPkg) + if err != nil { + r.logger.Printf("send keep alive signal error %v", err) + continue + } + + r.producer.Input() <- &sarama.ProducerMessage{ + Topic: r.topicManagement, + Key: sarama.StringEncoder(instancePingPkg.ServiceInstance), + Value: sarama.ByteEncoder(b), + } + } + } + }() +} + +func (r *kafkaReporter) Send(spans []go2sky.ReportedSpan) { + spanSize := len(spans) + if spanSize < 1 { + return + } + rootSpan := spans[spanSize-1] + rootCtx := rootSpan.Context() + segmentObject := &agentv3.SegmentObject{ + TraceId: rootCtx.TraceID, + TraceSegmentId: rootCtx.SegmentID, + Spans: make([]*agentv3.SpanObject, spanSize), + Service: r.service, + ServiceInstance: r.serviceInstance, + } + for i, s := range spans { + spanCtx := s.Context() + segmentObject.Spans[i] = &agentv3.SpanObject{ + SpanId: spanCtx.SpanID, + ParentSpanId: spanCtx.ParentSpanID, + StartTime: s.StartTime(), + EndTime: s.EndTime(), + OperationName: s.OperationName(), + Peer: s.Peer(), + SpanType: s.SpanType(), + SpanLayer: s.SpanLayer(), + ComponentId: s.ComponentID(), + IsError: s.IsError(), + Tags: s.Tags(), + Logs: s.Logs(), + } + srr := make([]*agentv3.SegmentReference, 0) + if i == (spanSize-1) && spanCtx.ParentSpanID > -1 { + srr = append(srr, &agentv3.SegmentReference{ + RefType: agentv3.RefType_CrossThread, + TraceId: spanCtx.TraceID, + ParentTraceSegmentId: spanCtx.ParentSegmentID, + ParentSpanId: spanCtx.ParentSpanID, + ParentService: r.service, + ParentServiceInstance: r.serviceInstance, + }) + } + if len(s.Refs()) > 0 { + for _, tc := range s.Refs() { + srr = append(srr, &agentv3.SegmentReference{ + RefType: agentv3.RefType_CrossProcess, + TraceId: spanCtx.TraceID, + ParentTraceSegmentId: tc.ParentSegmentID, + ParentSpanId: tc.ParentSpanID, + ParentService: tc.ParentService, + ParentServiceInstance: tc.ParentServiceInstance, + ParentEndpoint: tc.ParentEndpoint, + NetworkAddressUsedAtPeer: tc.AddressUsedAtClient, + }) + } + } + segmentObject.Spans[i].Refs = srr + } + + b, err := proto.Marshal(segmentObject) + if err != nil { + r.logger.Printf("reporter segment err %v", err) + return + } + select { + case <-r.ctx.Done(): + r.logger.Printf("reporter segment closed") + return + default: + } + r.producer.Input() <- &sarama.ProducerMessage{ + Topic: r.topicSegment, + Key: sarama.StringEncoder(segmentObject.TraceSegmentId), + Value: sarama.ByteEncoder(b), + } +} + +func (r *kafkaReporter) Close() { + r.cancel() + r.wg.Wait() + if err := r.producer.Close(); err != nil { + r.logger.Print(err) + } +} + +func buildOSInfo() (props []*commonv3.KeyStringValuePair) { + processNo := tool.ProcessNo() + if processNo != "" { + kv := &commonv3.KeyStringValuePair{ + Key: "Process No.", + Value: processNo, + } + props = append(props, kv) + } + + hostname := &commonv3.KeyStringValuePair{ + Key: "hostname", + Value: tool.HostName(), + } + props = append(props, hostname) + + language := &commonv3.KeyStringValuePair{ + Key: "language", + Value: "go", + } + props = append(props, language) + + osName := &commonv3.KeyStringValuePair{ + Key: "OS Name", + Value: tool.OSName(), + } + props = append(props, osName) + + ipv4s := tool.AllIPV4() + if len(ipv4s) > 0 { + for _, ipv4 := range ipv4s { + kv := &commonv3.KeyStringValuePair{ + Key: "ipv4", + Value: ipv4, + } + props = append(props, kv) + } + } + return +} diff --git a/kafkareporter/kafka_test.go b/kafkareporter/kafka_test.go new file mode 100644 index 0000000..92dbddc --- /dev/null +++ b/kafkareporter/kafka_test.go @@ -0,0 +1,289 @@ +// +// Copyright 2021 SkyAPM org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package kafkareporter + +import ( + "context" + "log" + "os" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/Shopify/sarama/mocks" + "github.com/SkyAPM/go2sky" + "github.com/SkyAPM/go2sky/propagation" + "google.golang.org/protobuf/proto" + commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3" + agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + managementv3 "skywalking.apache.org/repo/goapi/collect/management/v3" +) + +const ( + sample = 1 + traceID = "1f2d4bf47bf711eab794acde48001122" + parentSegmentID = "1e7c204a7bf711eab858acde48001122" + parentSpanID = 0 + parentService = "service" + parentServiceInstance = "serviceInstance" + parentEndpoint = "/foo/bar" + addressUsedAtClient = "foo.svc:8787" + + mockService = "service" + mockServiceInstance = "serviceInstance" +) + +var header string + +func init() { + scx := propagation.SpanContext{ + Sample: sample, + TraceID: traceID, + ParentSegmentID: parentSegmentID, + ParentSpanID: parentSpanID, + ParentService: parentService, + ParentServiceInstance: parentServiceInstance, + ParentEndpoint: parentEndpoint, + AddressUsedAtClient: addressUsedAtClient, + } + header = scx.EncodeSW8() +} + +func TestKafkaReporterE2E(t *testing.T) { + r := createKafkaReporter() + tracer, err := go2sky.NewTracer(mockService, go2sky.WithReporter(r), go2sky.WithInstance(mockServiceInstance)) + if err != nil { + t.Error(err) + } + + c := mocks.NewTestConfig() + c.Producer.Return.Successes = true + c.Producer.Return.Errors = false + mp := mocks.NewAsyncProducer(t, c) + mp.ExpectInputAndSucceed() + r.producer = mp + + entrySpan, ctx, err := tracer.CreateEntrySpan(context.Background(), "/rest/api", func(key string) (string, error) { + return header, nil + }) + if err != nil { + t.Error(err) + } + exitSpan, err := tracer.CreateExitSpan(ctx, "/foo/bar", "foo.svc:8787", func(key, value string) error { + scx := propagation.SpanContext{} + if key == propagation.Header { + err = scx.DecodeSW8(value) + if err != nil { + t.Fatal(err) + } + } + return nil + }) + if err != nil { + t.Error(err) + } + exitSpan.End() + entrySpan.End() + for msg := range r.producer.Successes() { + r.Close() + if msg.Topic != r.topicSegment { + t.Errorf("Excepted kafka topic is %s not %s", r.topicSegment, msg.Topic) + } + v, _ := msg.Value.Encode() + var s agentv3.SegmentObject + if err := proto.Unmarshal(v, &s); err != nil { + t.Fatal(err) + } + if s.TraceId != traceID { + t.Errorf("trace id parse error") + } + if len(s.Spans) == 0 { + t.Error("empty spans") + } + if s.Service != mockService { + t.Error("error are not set service") + } + if s.ServiceInstance != mockServiceInstance { + t.Error("error are not set service instance") + } + } +} + +func TestKafkaReporter_Close(t *testing.T) { + r := createKafkaReporter() + tracer, err := go2sky.NewTracer(mockService, go2sky.WithReporter(r), go2sky.WithInstance(mockServiceInstance)) + if err != nil { + t.Error(err) + } + c := mocks.NewTestConfig() + c.Producer.Return.Errors = false + mp := mocks.NewAsyncProducer(t, c) + r.producer = mp + + entry, _, err := tracer.CreateEntrySpan(context.Background(), "/close", func(key string) (s string, err error) { + return header, nil + }) + if err != nil { + t.Error(err) + } + r.Close() + entry.End() +} + +func TestKafkaReporterOption(t *testing.T) { + // props + instanceProps := make(map[string]string) + instanceProps["org"] = "SkyAPM" + + // log + logger := log.New(os.Stderr, "WithLogger", log.LstdFlags) + + // kafka config + c := sarama.NewConfig() + + tests := []struct { + name string + option KafkaReporterOption + verifyFunc func(t *testing.T, reporter *kafkaReporter) + }{ + { + name: "with kafka config", + option: WithConfig(c), + verifyFunc: func(t *testing.T, reporter *kafkaReporter) { + if reporter.c != c { + t.Error("error are not set WithKafkaConfig") + } + }, + }, + { + name: "with check interval", + option: WithCheckInterval(time.Second), + verifyFunc: func(t *testing.T, reporter *kafkaReporter) { + if reporter.checkInterval != time.Second { + t.Error("error are not set checkInterval") + } + }, + }, + { + name: "with serviceInstance props", + option: WithInstanceProps(instanceProps), + verifyFunc: func(t *testing.T, reporter *kafkaReporter) { + var value string + var ok bool + if value, ok = reporter.instanceProps["org"]; !ok { + t.Error("error are not set instanceProps") + } + if value != "SkyAPM" { + t.Error("error are not set instanceProps") + } + }, + }, + { + name: "with logger", + option: WithLogger(logger), + verifyFunc: func(t *testing.T, reporter *kafkaReporter) { + if reporter.logger != logger { + t.Error("error are not set logger") + } + }, + }, + { + name: "with topic management", + option: WithTopicManagement("test_management"), + verifyFunc: func(t *testing.T, reporter *kafkaReporter) { + if reporter.topicManagement != "test_management" { + t.Error("error are not set WithKafkaTopicManagement") + } + }, + }, + { + name: "with topic segment", + option: WithTopicSegment("test_segment"), + verifyFunc: func(t *testing.T, reporter *kafkaReporter) { + if reporter.topicSegment != "test_segment" { + t.Error("error are not set WithKafkaTopicSegment") + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reporter := createKafkaReporter() + tt.option(reporter) + tt.verifyFunc(t, reporter) + }) + } +} + +func TestKafkaReporter_reportInstanceProperties(t *testing.T) { + customProps := make(map[string]string) + customProps["org"] = "SkyAPM" + osProps := buildOSInfo() + for k, v := range customProps { + osProps = append(osProps, &commonv3.KeyStringValuePair{ + Key: k, + Value: v, + }) + } + + reporter := createKafkaReporter() + reporter.service = mockService + reporter.serviceInstance = mockServiceInstance + reporter.instanceProps = customProps + + c := mocks.NewTestConfig() + c.Producer.Return.Successes = true + c.Producer.Return.Errors = false + mp := mocks.NewAsyncProducer(t, c) + mp.ExpectInputAndSucceed() + reporter.producer = mp + + err := reporter.reportInstanceProperties() + if err != nil { + t.Error() + } + for msg := range reporter.producer.Successes() { + reporter.producer.Close() + if msg.Topic != reporter.topicManagement { + t.Errorf("Excepted kafka topic is %s not %s", reporter.topicManagement, msg.Topic) + } + v, _ := msg.Value.Encode() + var s managementv3.InstanceProperties + if err := proto.Unmarshal(v, &s); err != nil { + t.Fatal(err) + } + if s.Service != mockService { + t.Error("error are not set service") + } + if s.ServiceInstance != mockServiceInstance { + t.Error("error are not set service instance") + } + if len(s.Properties) != len(osProps) { + t.Error("error are not set service Properties") + } + } +} + +func createKafkaReporter() *kafkaReporter { + reporter := &kafkaReporter{ + logger: log.New(os.Stderr, "go2sky", log.LstdFlags), + topicManagement: defaultTopicManagement, + topicSegment: defaultTopicSegment, + } + return reporter +} diff --git a/kafkareporter/test/docker-compose.yml b/kafkareporter/test/docker-compose.yml new file mode 100644 index 0000000..857b188 --- /dev/null +++ b/kafkareporter/test/docker-compose.yml @@ -0,0 +1,84 @@ +# +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: "2.1" + +services: + oap: + image: ghcr.io/apache/skywalking/oap:1730f2c84bbd4da999ec2c74d1c26db31d5a0d24 + networks: + - e2e + expose: + - 11800 + - 12800 + ports: + - 12800 + restart: on-failure + environment: + SW_KAFKA_FETCHER: default + SW_KAFKA_FETCHER_SERVERS: kafka:9092 + SW_KAFKA_FETCHER_PARTITIONS: 1 + SW_KAFKA_FETCHER_PARTITIONS_FACTOR: 1 + + ui: + image: apache/skywalking-ui:8.5.0 + expose: + - 8080 + ports: + - 8080 + networks: + - e2e + environment: + - SW_OAP_ADDRESS=oap:12800 + + zookeeper: + image: wurstmeister/zookeeper + networks: + - e2e + ports: + - 2181 + + kafka: + image: wurstmeister/kafka:2.12-2.4.1 + networks: + - e2e + expose: + - 9092 + ports: + - 9092 + environment: + KAFKA_ADVERTISED_HOST_NAME: kafka + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_CREATE_TOPICS: "skywalking-metrics:1:1,skywalking-managements:1:1,skywalking-profilings:1:1,skywalking-segments:1:1,skywalking-meters:1:1" + + kafkareporter: + build: + context: ../../ + dockerfile: ./kafkareporter/test/docker/Dockerfile.kafkareporter + networks: + - e2e + expose: + - 8081 + ports: + - 8081 + healthcheck: + test: ["CMD", "curl", "http://127.0.0.1:8081/healthCheck"] + interval: 5s + timeout: 20s + retries: 10 + +networks: + e2e: diff --git a/kafkareporter/test/docker/Dockerfile.kafkareporter b/kafkareporter/test/docker/Dockerfile.kafkareporter new file mode 100644 index 0000000..1940a93 --- /dev/null +++ b/kafkareporter/test/docker/Dockerfile.kafkareporter @@ -0,0 +1,26 @@ +# +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM golang:1.16 + +ADD ./kafkareporter /kafkareporter +WORKDIR /kafkareporter + +EXPOSE 8081 + +ENTRYPOINT ["go"] + +CMD ["run", "test/server/server.go"] \ No newline at end of file diff --git a/kafkareporter/test/expected/service-instance.yml b/kafkareporter/test/expected/service-instance.yml new file mode 100644 index 0000000..c10ceb3 --- /dev/null +++ b/kafkareporter/test/expected/service-instance.yml @@ -0,0 +1,33 @@ +# +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +{{- contains .}} +- id: {{ b64enc "kafka-reporter" }}.1_{{ b64enc "provider1" }} + name: {{ notEmpty .name }} + attributes: + {{- contains .attributes }} + - name: OS Name + value: linux + - name: hostname + value: {{ notEmpty .value }} + - name: "Process No." + value: "{{ notEmpty .value }}" + - name: ipv4s + value: {{ notEmpty .value }} + {{- end }} + language: GO + instanceuuid: {{ b64enc "kafka-reporter" }}.1_{{ b64enc "provider1" }} +{{- end }} \ No newline at end of file diff --git a/kafkareporter/test/expected/service.yml b/kafkareporter/test/expected/service.yml new file mode 100644 index 0000000..d50c80a --- /dev/null +++ b/kafkareporter/test/expected/service.yml @@ -0,0 +1,19 @@ +# +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +- id: {{ b64enc "kafka-reporter" }}.1 + name: kafka-reporter + group: "" \ No newline at end of file diff --git a/kafkareporter/test/expected/trace-info-detail.yml b/kafkareporter/test/expected/trace-info-detail.yml new file mode 100644 index 0000000..95eabdd --- /dev/null +++ b/kafkareporter/test/expected/trace-info-detail.yml @@ -0,0 +1,36 @@ +# +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spans: + {{- contains .spans}} + - traceid: {{ notEmpty .traceid }} + segmentid: {{ notEmpty .segmentid }} + spanid: 0 + parentspanid: -1 + refs: [] + servicecode: kafka-reporter + serviceinstancename: provider1 + starttime: {{ gt .starttime 0 }} + endtime: {{ gt .endtime 0 }} + endpointname: /info + type: Entry + peer: "" + component: Unknown + iserror: false + layer: Unknown + tags: [] + logs: [] + {{- end }} \ No newline at end of file diff --git a/kafkareporter/test/go_kafka_reporter_plugin_test.yaml b/kafkareporter/test/go_kafka_reporter_plugin_test.yaml new file mode 100644 index 0000000..d7eb581 --- /dev/null +++ b/kafkareporter/test/go_kafka_reporter_plugin_test.yaml @@ -0,0 +1,64 @@ +# +# Copyright 2021 SkyAPM org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +setup: + env: compose + file: docker-compose.yml + timeout: 1200 + steps: + - name: install swctl + command: | + if ! command -v swctl &> /dev/null; then + mkdir -p /tmp/skywalking-infra-e2e/bin && cd /tmp/skywalking-infra-e2e + mkdir -p swctl && cd swctl + curl -kLo skywalking-cli.tar.gz https://github.com/apache/skywalking-cli/archive/4d1cb83e24ff58988f4aba0daa50259593b11670.tar.gz + tar -zxf skywalking-cli.tar.gz --strip=1 + utype=$(uname | awk '{print tolower($0)}') + make $utype && mv bin/swctl-*-$utype-amd64 ../bin/swctl + export PATH="$PATH:/tmp/skywalking-infra-e2e/bin" + echo "success to install swctl" + fi + +cleanup: + # always never success failure + on: always + +trigger: + action: http + interval: 3s + times: 5 + url: http://${kafkareporter_host}:${kafkareporter_8081}/info + method: GET + +verify: + # verify with retry strategy + retry: + # max retry count + count: 10 + # the interval between two retries, in millisecond. + interval: 10000 + cases: + # basic check: service list + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls + expected: expected/service.yml + + # native management: service instance list + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=kafka-reporter + expected: expected/service-instance.yml + + # native tracing: trace detail + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls|grep -A 5 '/info'|tail -n1|awk -F ' ' '{print $2}') + expected: expected/trace-info-detail.yml diff --git a/kafkareporter/test/server/server.go b/kafkareporter/test/server/server.go new file mode 100644 index 0000000..5fafb9a --- /dev/null +++ b/kafkareporter/test/server/server.go @@ -0,0 +1,67 @@ +// +// Copyright 2021 SkyAPM org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package main + +import ( + "context" + "log" + "net/http" + + "github.com/SkyAPM/go2sky" + "github.com/SkyAPM/go2sky-plugins/kafkareporter" +) + +const ( + broker = "kafka:9092" + service = "kafka-reporter" + addr = ":8081" +) + +func main() { + // init tracer + re, err := kafkareporter.New([]string{broker}) + if err != nil { + log.Fatalf("create kafka reporter error: %v \n", err) + } + + tracer, err := go2sky.NewTracer(service, go2sky.WithReporter(re), go2sky.WithInstance("provider1")) + if err != nil { + log.Fatalf("crate tracer error: %v \n", err) + } + + route := http.NewServeMux() + route.HandleFunc("/healthCheck", func(res http.ResponseWriter, req *http.Request) { + _, _ = res.Write([]byte("success")) + }) + route.HandleFunc("/info", func(res http.ResponseWriter, req *http.Request) { + span, _, err := tracer.CreateEntrySpan(context.Background(), "/info", func(key string) (s string, e error) { + return "", nil + }) + if err != nil { + log.Fatalf("create span error: %v \n", err) + } + defer span.End() + + _, _ = res.Write([]byte("info success")) + }) + + log.Println("start server") + err = http.ListenAndServe(addr, route) + if err != nil { + log.Fatalf("server start error: %v \n", err) + } +}