From 25f61c86f41be47151e18318cb9dd242b4d2f7e6 Mon Sep 17 00:00:00 2001 From: fkarakas Date: Fri, 17 Sep 2021 17:39:12 +0200 Subject: [PATCH] Add decoder endpoint Allow to decode the kafka message body by calling an endpoint --- README.md | 1 + server/config/config.go | 7 +- server/decoder/decoder.go | 7 ++ server/decoder/httpdecoder/http.go | 101 +++++++++++++++++ server/decoder/httpdecoder/http_test.go | 119 ++++++++++++++++++++ server/docker-compose.yml | 2 +- server/go.mod | 6 +- server/go.sum | 22 +++- server/helper/helper.go | 46 +------- server/helper/{http.go => http_action.go} | 39 +++++++ server/helper/kafka.go | 19 +++- server/helper/mock_server.go | 45 ++++++++ server/restapi/routes.go | 4 - server/runner/kafka/kafka.go | 16 ++- server/runner/kafka/watchable_store.go | 5 +- server/runner/kafka/watchable_store_test.go | 2 +- server/store/kafka/watchable.go | 24 +++- server/store/kafka/watchable_test.go | 102 ++++++++++++++++- server/store/rest/rest.go | 18 ++- server/store/rest/rest_test.go | 19 +++- 20 files changed, 529 insertions(+), 75 deletions(-) create mode 100644 server/decoder/decoder.go create mode 100644 server/decoder/httpdecoder/http.go create mode 100644 server/decoder/httpdecoder/http_test.go rename server/helper/{http.go => http_action.go} (69%) create mode 100644 server/helper/mock_server.go diff --git a/README.md b/README.md index 932fe5a..cd36beb 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,7 @@ URL Parameters: | STATIC_FILES_DIR | ../client/build | location of the UI static files for the HTML & js files | | DATA_ROOT_DIR | ./.db | Default location of internal database files | | API_SERVER_ONLY | false | when true, only the rest api is exposed without serving the static files and default route is / (instead of /api) | +| KAFKA_MESSAGE_BODY_DECODER | | set an endpoint for decoding kafka message payload. Post with payload {id:xxx target-topic:yyy ...} | ## Development diff --git a/server/config/config.go b/server/config/config.go index a89aac4..9382d1f 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -80,8 +80,13 @@ func APIServerOnly() bool { return getBool("API_SERVER_ONLY", false) } +// URL for the http decoder can start with http:// or not func KafkaMessageBodyDecoder() string { - return getString("KAFKA_MESSAGE_BODY_DECODER", "") + u := strings.ToLower(getString("KAFKA_MESSAGE_BODY_DECODER", "")) + if strings.HasPrefix(u, "http://") || strings.HasPrefix(u, "https://") { + return u + } + return "http://" + u } func DataRootDir() string { diff --git a/server/decoder/decoder.go b/server/decoder/decoder.go new file mode 100644 index 0000000..54282ce --- /dev/null +++ b/server/decoder/decoder.go @@ -0,0 +1,7 @@ +package decoder + +import "github.com/etf1/kafka-message-scheduler/schedule" + +type Decoder interface { + Decode(s schedule.Schedule) (schedule.Schedule, error) +} diff --git a/server/decoder/httpdecoder/http.go b/server/decoder/httpdecoder/http.go new file mode 100644 index 0000000..5f86266 --- /dev/null +++ b/server/decoder/httpdecoder/http.go @@ -0,0 +1,101 @@ +package httpdecoder + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" + + "github.com/etf1/kafka-message-scheduler-admin/server/store/rest" + "github.com/etf1/kafka-message-scheduler/schedule" + "github.com/etf1/kafka-message-scheduler/schedule/kafka" + log "github.com/sirupsen/logrus" +) + +var ( + ErrUnknownScheduleType = fmt.Errorf("unknown schedule type") +) + +type Decoder struct { + URL string +} + +// Post sends an http post request with the payload and return the response +func (h Decoder) Post(payload interface{}) ([]byte, error) { + jsonStr, err := json.Marshal(payload) + if err != nil { + return nil, err + } + + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, h.URL, bytes.NewBuffer(jsonStr)) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "application/json; charset=UTF-8") + + client := &http.Client{ + Timeout: 1 * time.Second, + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + log.Println("response status code:", resp.StatusCode) + log.Println("response headers:", resp.Header) + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("request failed: %v %s", resp.StatusCode, string(body)) + } + + return body, nil +} + +// Decode returns a copy of the input schedule, its field 'value' replaced by the response from the http request +func (h Decoder) Decode(s schedule.Schedule) (schedule.Schedule, error) { + switch sch := s.(type) { + case *kafka.Schedule: + if len(sch.Message.Value) == 0 { + return s, nil + } + + data, err := h.Post(sch) + if err != nil { + return s, err + } + + if len(data) != 0 { + sch.Message.Value = data + } + + return sch, nil + case rest.Schedule: + if len(sch.MessageValue) == 0 { + return s, nil + } + + data, err := h.Post(sch) + if err != nil { + return s, err + } + + if len(data) != 0 { + sch.MessageValue = data + } + + return sch, nil + default: + return s, fmt.Errorf("%w: %T", ErrUnknownScheduleType, s) + } +} diff --git a/server/decoder/httpdecoder/http_test.go b/server/decoder/httpdecoder/http_test.go new file mode 100644 index 0000000..becfaf1 --- /dev/null +++ b/server/decoder/httpdecoder/http_test.go @@ -0,0 +1,119 @@ +// INTEGRATION TESTS +package httpdecoder_test + +import ( + "fmt" + "net/http" + "reflect" + "testing" + + confluent "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/etf1/kafka-message-scheduler-admin/server/decoder/httpdecoder" + "github.com/etf1/kafka-message-scheduler-admin/server/helper" + "github.com/etf1/kafka-message-scheduler/schedule/kafka" +) + +// Rule #1: when received http response code != 200, the kafka message body should be unchanged +func TestHTTPDecoder_not200(t *testing.T) { + helper.VerifyIfSkipIntegrationTests(t) + tests := []struct { + code int + }{ + {http.StatusNotFound}, + {http.StatusBadRequest}, + {http.StatusInternalServerError}, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("http_#%v", i), func(t *testing.T) { + func() { + server := helper.MockServer(tt.code, "response") + defer server.Close() + + dec := httpdecoder.Decoder{ + URL: server.URL, + } + + topic := "topic" + sch := kafka.Schedule{ + Message: &confluent.Message{ + TopicPartition: confluent.TopicPartition{ + Topic: &topic, + }, + Key: []byte("video-1"), + Value: []byte("content"), + }, + } + + // create a copy of the schedule + sch2 := helper.CopyKafkaSchedule(sch) + + sch3, err := dec.Decode(&sch) + if err == nil { + t.Error("expected error") + } + + sch4, ok := sch3.(*kafka.Schedule) + if !ok { + t.Errorf("unexpected type: %v", sch3) + } + + if reflect.DeepEqual(*sch4, sch2) == false { + t.Errorf("should be equal") + } + }() + }) + } +} + +// Rule #2: when received http response code == 200 and not empty response, +// the kafka message value should be changed +func TestHTTPDecoder_200(t *testing.T) { + helper.VerifyIfSkipIntegrationTests(t) + + tests := []struct { + response string + expectedMessageValue string + }{ + // message content updated with new content + {"response", "response"}, + // message content should not be changed when empty response + {"", "content"}, + } + + for i, tt := range tests { + t.Run(fmt.Sprintf("case #%v", i+1), func(t *testing.T) { + server := helper.MockServer(200, tt.response) + defer server.Close() + + dec := httpdecoder.Decoder{ + URL: server.URL, + } + + topic := "topic" + sch := kafka.Schedule{ + Message: &confluent.Message{ + TopicPartition: confluent.TopicPartition{ + Topic: &topic, + }, + Key: []byte("video-1"), + Value: []byte("content"), + }, + } + + sch2, err := dec.Decode(&sch) + if err != nil { + t.Error("unexpected error") + } + + sch3, ok := sch2.(*kafka.Schedule) + if !ok { + t.Errorf("unexpected type: %T", sch2) + } + + if string(sch3.Message.Value) != tt.expectedMessageValue { + t.Fatalf("message content not correct: %q", string(sch.Message.Value)) + } + }) + } +} diff --git a/server/docker-compose.yml b/server/docker-compose.yml index 35685cb..0f06e87 100644 --- a/server/docker-compose.yml +++ b/server/docker-compose.yml @@ -45,4 +45,4 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' \ No newline at end of file + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' diff --git a/server/go.mod b/server/go.mod index 73b89cb..4bd736b 100644 --- a/server/go.mod +++ b/server/go.mod @@ -4,19 +4,23 @@ go 1.15 require ( github.com/blevesearch/bleve/v2 v2.0.3 - github.com/blevesearch/bleve_index_api v1.0.0 github.com/confluentinc/confluent-kafka-go v1.5.2 github.com/drhodes/golorem v0.0.0-20160418191928-ecccc744c2d9 github.com/etf1/kafka-message-scheduler v0.0.4-0.20210615142246-56c1d6186d8f github.com/gemnasium/logrus-graylog-hook v2.0.7+incompatible + github.com/google/go-cmp v0.5.5 // indirect github.com/gorilla/mux v1.8.0 github.com/influxdata/influxdb v1.8.5 // indirect + github.com/kr/pretty v0.2.0 // indirect github.com/prometheus/client_golang v1.8.0 github.com/rs/cors v1.7.0 + github.com/sergi/go-diff v1.0.0 github.com/sirupsen/logrus v1.8.1 + github.com/stretchr/testify v1.6.1 // indirect github.com/tevjef/go-runtime-metrics v0.0.0-20170326170900-527a54029307 go.etcd.io/bbolt v1.3.5 golang.org/x/sys v0.0.0-20210304124612-50617c2ba197 // indirect + google.golang.org/protobuf v1.25.0 // indirect ) //replace github.com/etf1/kafka-message-scheduler => /Users/fkarakas/go/src/github.com/etf1/kafka-message-scheduler diff --git a/server/go.sum b/server/go.sum index 680f549..7f8f1b5 100644 --- a/server/go.sum +++ b/server/go.sum @@ -176,6 +176,7 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= @@ -188,8 +189,10 @@ github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -273,8 +276,9 @@ github.com/kljensen/snowball v0.6.0/go.mod h1:27N7E8fVU5H68RlUmnWwZCfxgt4POBJfEN github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -395,6 +399,7 @@ github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= +github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -426,8 +431,9 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/tevjef/go-runtime-metrics v0.0.0-20170326170900-527a54029307 h1:gfHmITmROhuSzCe1+/zYncE940TURIlOGCQCwfuP6jM= github.com/tevjef/go-runtime-metrics v0.0.0-20170326170900-527a54029307/go.mod h1:DUsG+nVLc17b0zOGba8MjqDv/ZKYJHD83hIX042cuKw= @@ -638,6 +644,7 @@ google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9/go.mod h1:n3cpQtvx google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20200108215221-bd8f9a0ef82f/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM= @@ -648,13 +655,17 @@ google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -672,8 +683,9 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/server/helper/helper.go b/server/helper/helper.go index fa37775..60268a8 100644 --- a/server/helper/helper.go +++ b/server/helper/helper.go @@ -4,60 +4,20 @@ import ( "crypto/rand" "fmt" "math/big" - "net" "os" "strings" - "sync" "testing" - "time" lorem "github.com/drhodes/golorem" log "github.com/sirupsen/logrus" ) const ( - MaxRand = 1000000 - LoremMin = 50 - LoremMax = 100 - PortRange = 500 - PortStartRange = 9002 - MaxRetries = 5 + MaxRand = 1000000 + LoremMin = 50 + LoremMax = 100 ) -var ( - mu sync.Mutex -) - -func WaitForHTTPServer(addr string) error { - count := 1 - - for { - timeout := 1 * time.Second - - _, err := net.DialTimeout("tcp", addr, timeout) - if err != nil { - log.Printf("unreachable host %v: %v", addr, err) - } else { - log.Printf("reachable host %v", addr) - return nil - } - - time.Sleep(timeout) - - count++ - if count == MaxRetries { - return fmt.Errorf("unreachable host after %v retries: %v", MaxRetries, addr) - } - } -} - -func NextServerAddr(prefix string) string { - defer mu.Unlock() - mu.Lock() - // TODO: check is the port is available before returning - return fmt.Sprintf("%s:%d", prefix, PortStartRange+RandNumWithMax(PortRange)) -} - func Lipsum() string { return lorem.Paragraph(LoremMin, LoremMax) } diff --git a/server/helper/http.go b/server/helper/http_action.go similarity index 69% rename from server/helper/http.go rename to server/helper/http_action.go index 05d5efc..4146f7c 100644 --- a/server/helper/http.go +++ b/server/helper/http_action.go @@ -6,7 +6,9 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net" "net/http" + "sync" "time" log "github.com/sirupsen/logrus" @@ -14,6 +16,13 @@ import ( const ( DefaultTimeout = 5 * time.Second + PortRange = 500 + PortStartRange = 9002 + MaxRetries = 5 +) + +var ( + mu sync.Mutex ) func Get(host, url string, timeout time.Duration) (*http.Response, error) { @@ -76,3 +85,33 @@ func DecodeJSON(host, url string, timeout time.Duration, v interface{}, checkRes return nil } + +func WaitForHTTPServer(addr string) error { + count := 1 + + for { + timeout := 1 * time.Second + + _, err := net.DialTimeout("tcp", addr, timeout) + if err != nil { + log.Printf("unreachable host %v: %v", addr, err) + } else { + log.Printf("reachable host %v", addr) + return nil + } + + time.Sleep(timeout) + + count++ + if count == MaxRetries { + return fmt.Errorf("unreachable host after %v retries: %v", MaxRetries, addr) + } + } +} + +func NextServerAddr(prefix string) string { + defer mu.Unlock() + mu.Lock() + // TODO: check is the port is available before returning + return fmt.Sprintf("%s:%d", prefix, PortStartRange+RandNumWithMax(PortRange)) +} diff --git a/server/helper/kafka.go b/server/helper/kafka.go index 19a38f2..c1829a6 100644 --- a/server/helper/kafka.go +++ b/server/helper/kafka.go @@ -152,7 +152,7 @@ func CreateTopics(nbTopic int, nbPartitions []int, prefix string) ([]string, err } for _, result := range results { - log.Printf("%s\n", result) + log.Printf("%s", result) } return topics, nil @@ -300,3 +300,20 @@ func AssertMessagesinTopic(topic string, msgs []*confluent.Message) error { return nil } + +func CopyKafkaSchedule(s kafka_schedule.Schedule) kafka_schedule.Schedule { + s2 := s + msg2 := *s.Message + s2.Message = &msg2 + + return s2 +} + +type KafkaMessageSimpleDecoder struct { + Called int +} + +func (k *KafkaMessageSimpleDecoder) Decode(s schedule.Schedule) (schedule.Schedule, error) { + k.Called++ + return s, nil +} diff --git a/server/helper/mock_server.go b/server/helper/mock_server.go new file mode 100644 index 0000000..05cf7b7 --- /dev/null +++ b/server/helper/mock_server.go @@ -0,0 +1,45 @@ +package helper + +import ( + "fmt" + "net/http" + "net/http/httptest" + + "github.com/sergi/go-diff/diffmatchpatch" + log "github.com/sirupsen/logrus" +) + +type MockedServer struct { + s *httptest.Server + successful int + failed []string +} + +// mockServerForQuery returns a mock server that only responds to a particular query string. +func MockServerForQuery(query string, code int, body string) *MockedServer { + server := &MockedServer{} + + server.s = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if query != "" && r.URL.RawQuery != query { + dmp := diffmatchpatch.New() + diffs := dmp.DiffMain(query, r.URL.RawQuery, false) + log.Printf("Query != Expected Query: %s", dmp.DiffPrettyText(diffs)) + server.failed = append(server.failed, r.URL.RawQuery) + http.Error(w, "fail", 999) + return + } + server.successful++ + + w.WriteHeader(code) + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + fmt.Fprint(w, body) + })) + + return server +} + +// Create a mock HTTP Server that will return a response with HTTP code and body. +func MockServer(code int, body string) *httptest.Server { + serv := MockServerForQuery("", code, body) + return serv.s +} diff --git a/server/restapi/routes.go b/server/restapi/routes.go index 7460f1f..d0533bf 100644 --- a/server/restapi/routes.go +++ b/server/restapi/routes.go @@ -183,10 +183,6 @@ func getSchedule(d db.DB) func(w http.ResponseWriter, r *http.Request) { respondWithError(w, err.Error()) return } - fmt.Printf(">>>>>>>> %+v %T", sch, sch) - - a, _ := json.Marshal(sch) - fmt.Println("xxxxxx", string(a)) if len(sch) == 0 { respondWithJSON(w, http.StatusNotFound, nil) diff --git a/server/runner/kafka/kafka.go b/server/runner/kafka/kafka.go index cdc4636..fad04f0 100644 --- a/server/runner/kafka/kafka.go +++ b/server/runner/kafka/kafka.go @@ -10,6 +10,8 @@ import ( "github.com/etf1/kafka-message-scheduler-admin/server/config" "github.com/etf1/kafka-message-scheduler-admin/server/db/blevedb" "github.com/etf1/kafka-message-scheduler-admin/server/db/simple" + "github.com/etf1/kafka-message-scheduler-admin/server/decoder" + "github.com/etf1/kafka-message-scheduler-admin/server/decoder/httpdecoder" "github.com/etf1/kafka-message-scheduler-admin/server/helper" "github.com/etf1/kafka-message-scheduler-admin/server/resolver/schedulers/httpresolver" "github.com/etf1/kafka-message-scheduler-admin/server/runner" @@ -59,6 +61,14 @@ func (r *Runner) Start() error { log.Println(variable[0], "=>", variable[1]) } + var dec decoder.Decoder + + if decoderURL := config.KafkaMessageBodyDecoder(); decoderURL != "" { + dec = httpdecoder.Decoder{ + URL: decoderURL, + } + } + // cold DB resolver := httpresolver.NewResolver(config.SchedulersAddr()) bboltStore, err := bbolt.NewStore(dir + "schedules.bbolt") @@ -67,7 +77,7 @@ func (r *Runner) Start() error { } defer bboltStore.Close() - watchableStore, err := NewWatchableStoreFromResolver(resolver, SchedulesTopics) + watchableStore, err := NewWatchableStoreFromResolver(resolver, SchedulesTopics, dec) if err != nil { return fmt.Errorf("cannot create watchable store: %w", err) } @@ -90,7 +100,7 @@ func (r *Runner) Start() error { } defer historyBboltStore.Close() - historyWatchableStore, err := NewWatchableStoreFromResolver(resolver, HistoryTopic) + historyWatchableStore, err := NewWatchableStoreFromResolver(resolver, HistoryTopic, dec) if err != nil { return fmt.Errorf("cannot create history watchable store: %w", err) } @@ -108,7 +118,7 @@ func (r *Runner) Start() error { // live DB liveDB := simple.DB{ - Store: rest.NewStore(resolver), + Store: rest.NewStore(resolver, dec), } srv := runner.NewServer(coldDB, liveDB, historyDB, resolver) diff --git a/server/runner/kafka/watchable_store.go b/server/runner/kafka/watchable_store.go index e2205f0..0052058 100644 --- a/server/runner/kafka/watchable_store.go +++ b/server/runner/kafka/watchable_store.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/etf1/kafka-message-scheduler-admin/server/decoder" "github.com/etf1/kafka-message-scheduler-admin/server/resolver/schedulers" "github.com/etf1/kafka-message-scheduler-admin/server/resolver/schedulers/httpresolver" "github.com/etf1/kafka-message-scheduler-admin/server/store/kafka" @@ -68,7 +69,7 @@ func (wr *WatchableStoreFromResolver) updateBuckets() error { type TopicFunc func(s httpresolver.Scheduler) []string -func NewWatchableStoreFromResolver(resolver httpresolver.Resolver, topics TopicFunc) (*WatchableStoreFromResolver, error) { +func NewWatchableStoreFromResolver(resolver httpresolver.Resolver, topics TopicFunc, d decoder.Decoder) (*WatchableStoreFromResolver, error) { wr := &WatchableStoreFromResolver{ resolver: resolver, stopChan: make(chan bool, 1), @@ -76,7 +77,7 @@ func NewWatchableStoreFromResolver(resolver httpresolver.Resolver, topics TopicF topics: topics, } - ws, err := kafka.NewWatchableStore() + ws, err := kafka.NewWatchableStore(d) if err != nil { return wr, err } diff --git a/server/runner/kafka/watchable_store_test.go b/server/runner/kafka/watchable_store_test.go index b862bda..9c1403e 100644 --- a/server/runner/kafka/watchable_store_test.go +++ b/server/runner/kafka/watchable_store_test.go @@ -28,7 +28,7 @@ func TestKafkaWatchableStoreFromResolver_Watch(t *testing.T) { resolver := httpresolver.NewResolver(config.SchedulersAddr()) - kstore, err := kafka.NewWatchableStoreFromResolver(resolver, kafka.DefaultTopics) + kstore, err := kafka.NewWatchableStoreFromResolver(resolver, kafka.DefaultTopics, nil) if err != nil { t.Errorf("failed to create kafka store: %v\n", err) } diff --git a/server/store/kafka/watchable.go b/server/store/kafka/watchable.go index 4e17f77..31d03c3 100644 --- a/server/store/kafka/watchable.go +++ b/server/store/kafka/watchable.go @@ -5,23 +5,27 @@ import ( "reflect" "time" + "github.com/etf1/kafka-message-scheduler-admin/server/decoder" "github.com/etf1/kafka-message-scheduler-admin/server/store" + "github.com/etf1/kafka-message-scheduler/schedule" "github.com/etf1/kafka-message-scheduler/schedule/kafka" log "github.com/sirupsen/logrus" ) type WatchableStore struct { consumers map[string]consumer + dec decoder.Decoder processor } -func NewWatchableStore(buckets ...Bucket) (*WatchableStore, error) { +func NewWatchableStore(dec decoder.Decoder, buckets ...Bucket) (*WatchableStore, error) { p := newProcessor(nil) p.start() ws := WatchableStore{ consumers: make(map[string]consumer), processor: p, + dec: dec, } for _, bucket := range buckets { @@ -104,13 +108,25 @@ func (ws WatchableStore) Watch() (chan store.Event, error) { if len(e.Value) == 0 { evtType = store.DeletedType } + + var sch schedule.Schedule = &kafka.Schedule{ + Message: e.Message, + } + + if ws.dec != nil { + sdec, err := ws.dec.Decode(sch) + if err != nil { + log.Warnf("cannot decode kafka schedule %v: %v", sch.ID(), err) + } else { + sch = sdec + } + } + resultChan <- store.Event{ EventType: evtType, Schedule: store.Schedule{ SchedulerName: e.name, - Schedule: kafka.Schedule{ - Message: e.Message, - }, + Schedule: sch, }, } case storeResetType: diff --git a/server/store/kafka/watchable_test.go b/server/store/kafka/watchable_test.go index 726ee70..51953c0 100644 --- a/server/store/kafka/watchable_test.go +++ b/server/store/kafka/watchable_test.go @@ -27,7 +27,7 @@ func TestKafkaWatchableStore_Watch(t *testing.T) { {"scheduler-1", helper.GetDefaultBootstrapServers(), []string{topics[0]}}, {"scheduler-2", helper.GetDefaultBootstrapServers(), []string{topics[1]}}, } - kstore, err := kafka.NewWatchableStore(buckets...) + kstore, err := kafka.NewWatchableStore(nil, buckets...) if err != nil { t.Errorf("failed to create kafka store: %v\n", err) } @@ -103,7 +103,7 @@ loop: } } -// Rule #1: test AddBuckets, adding bucket after instanciation should work properly +// Rule #2: test AddBuckets, adding bucket after instanciation should work properly // updating bucket's wrong config or adding new bucket, an event StoreResetType should be thrown func TestKafkaWatchableStore_AddBuckets(t *testing.T) { helper.VerifyIfSkipIntegrationTests(t) @@ -121,7 +121,7 @@ func TestKafkaWatchableStore_AddBuckets(t *testing.T) { // fix the bucket config bucket2Fix := kafka.Bucket{"scheduler-2", helper.GetDefaultBootstrapServers(), []string{topics[1]}} - kstore, err := kafka.NewWatchableStore() + kstore, err := kafka.NewWatchableStore(nil) if err != nil { t.Errorf("failed to create kafka store: %v\n", err) } @@ -208,3 +208,99 @@ loop: t.Errorf("unexpected deleted count: %v", deleted) } } + +// Rule #3: when a decoder is specified, it should be used to update message value (body) +func TestKafkaWatchableStore_Decoder(t *testing.T) { + helper.VerifyIfSkipIntegrationTests(t) + + now := time.Now() + + topics, err := helper.CreateTopics(2, []int{1, 1}, "schedules") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + buckets := []kafka.Bucket{ + {"scheduler-1", helper.GetDefaultBootstrapServers(), []string{topics[0]}}, + {"scheduler-2", helper.GetDefaultBootstrapServers(), []string{topics[1]}}, + } + + dec := &helper.KafkaMessageSimpleDecoder{} + kstore, err := kafka.NewWatchableStore(dec, buckets...) + if err != nil { + t.Errorf("failed to create kafka store: %v\n", err) + } + defer kstore.Close() + + msgs := make([]*confluent.Message, 20) + for i := 0; i < 20; i++ { + var value interface{} = "value" + if i%2 == 0 { + value = nil + } + topic := topics[0] + if i >= 10 { + topic = topics[1] + } + msgs[i] = helper.Message(topic, fmt.Sprintf("schedule-%v", i), value, now.Add(1*time.Hour).Unix()) + } + + msgs1 := msgs[:10] + msgs2 := msgs[10:] + + helper.ProduceMessages(msgs1) + helper.ProduceMessages(msgs2) + + err = helper.AssertMessagesinTopic(topics[0], msgs1) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + err = helper.AssertMessagesinTopic(topics[1], msgs2) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + lst, err := kstore.Watch() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + total := 0 + upsert := 0 + deleted := 0 + +loop: + for { + select { + case evt, ok := <-lst: + if !ok { + break + } + total++ + if evt.EventType == store.DeletedType { + deleted++ + } + if evt.EventType == store.UpsertType { + upsert++ + } + case <-time.After(5 * time.Second): + break loop + } + } + + t.Logf("upsert=%v deleted=%v total=%v", upsert, deleted, total) + + if total != 20 { + t.Errorf("unexpected total count: %v", total) + } + if upsert != 10 { + t.Errorf("unexpected upsert count: %v", upsert) + } + if deleted != 10 { + t.Errorf("unexpected deleted count: %v", deleted) + } + if dec.Called != 20 { + t.Errorf("unexpected decoder count: %v", dec.Called) + } +} diff --git a/server/store/rest/rest.go b/server/store/rest/rest.go index f99da73..0cf39ff 100644 --- a/server/store/rest/rest.go +++ b/server/store/rest/rest.go @@ -10,6 +10,7 @@ import ( log "github.com/sirupsen/logrus" + "github.com/etf1/kafka-message-scheduler-admin/server/decoder" "github.com/etf1/kafka-message-scheduler-admin/server/helper" "github.com/etf1/kafka-message-scheduler-admin/server/resolver/schedulers/httpresolver" "github.com/etf1/kafka-message-scheduler-admin/server/store" @@ -45,11 +46,13 @@ func (s Schedule) String() string { type HTTPRetriever struct { httpresolver.Resolver + dec decoder.Decoder } -func NewStore(r httpresolver.Resolver) *HTTPRetriever { +func NewStore(r httpresolver.Resolver, dec decoder.Decoder) *HTTPRetriever { return &HTTPRetriever{ Resolver: r, + dec: dec, } } @@ -126,9 +129,20 @@ func (h HTTPRetriever) getSchedules(schedulerName string, filter func(s schedule for _, s := range res { if filter(s) { + var sch schedule.Schedule = s + + if h.dec != nil { + sdec, err := h.dec.Decode(s) + if err != nil { + log.Warnf("cannot decode rest schedule %v: %v", err, s.ID()) + } else { + sch = sdec + } + } + result = append(result, store.Schedule{ SchedulerName: schedulerName, - Schedule: s, + Schedule: sch, }) } } diff --git a/server/store/rest/rest_test.go b/server/store/rest/rest_test.go index ff756a5..f520d0f 100644 --- a/server/store/rest/rest_test.go +++ b/server/store/rest/rest_test.go @@ -1,10 +1,8 @@ // INTEGRATION TESTS // requirements: a running kafka-message-scheduler:mini -// you can start the integration stack with: `make dev.up` -// WARNING: schedules defined in scheduler made be gone when triggered -// you should then restart the scheduler service: `make dev.restart.scheduler` - +// you can start the integration stack with: `make up` +// or `docker-compose -p dev restart scheduler` package rest_test import ( @@ -23,15 +21,18 @@ func getSchedulerName() string { return "localhost" } +// Rule #1: Get should work as expected func TestRest_Get(t *testing.T) { helper.VerifyIfSkipIntegrationTests(t) schedulerName := getSchedulerName() + dec := &helper.KafkaMessageSimpleDecoder{} rstore := rest.NewStore( httpresolver.Resolver{ Hosts: []string{schedulerName}, }, + dec, ) result, err := rstore.Get(schedulerName, "schedule-1") @@ -42,17 +43,24 @@ func TestRest_Get(t *testing.T) { if len(result) == 0 { t.Errorf("unexpected result: %v", result) } + + if dec.Called == 0 { + t.Errorf("unexpected decoder count: %v", dec.Called) + } } +// Rule #2: List should work as expected func TestRest_List(t *testing.T) { helper.VerifyIfSkipIntegrationTests(t) schedulerName := getSchedulerName() + dec := &helper.KafkaMessageSimpleDecoder{} rstore := rest.NewStore( httpresolver.Resolver{ Hosts: []string{schedulerName}, }, + dec, ) // wait for goroutines to be scheduled @@ -76,4 +84,7 @@ func TestRest_List(t *testing.T) { if v := count; v == 0 { t.Errorf("unexpected schedules length: %v", v) } + if dec.Called == 0 { + t.Errorf("unexpected decoder count: %v", dec.Called) + } }