-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Allow to decode the kafka message body by calling an endpoint
- Loading branch information
Showing
20 changed files
with
529 additions
and
75 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package decoder | ||
|
||
import "github.com/etf1/kafka-message-scheduler/schedule" | ||
|
||
type Decoder interface { | ||
Decode(s schedule.Schedule) (schedule.Schedule, error) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package httpdecoder | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/etf1/kafka-message-scheduler-admin/server/store/rest" | ||
"github.com/etf1/kafka-message-scheduler/schedule" | ||
"github.com/etf1/kafka-message-scheduler/schedule/kafka" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
var ( | ||
ErrUnknownScheduleType = fmt.Errorf("unknown schedule type") | ||
) | ||
|
||
type Decoder struct { | ||
URL string | ||
} | ||
|
||
// Post sends an http post request with the payload and return the response | ||
func (h Decoder) Post(payload interface{}) ([]byte, error) { | ||
jsonStr, err := json.Marshal(payload) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, h.URL, bytes.NewBuffer(jsonStr)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
req.Header.Set("Content-Type", "application/json; charset=UTF-8") | ||
|
||
client := &http.Client{ | ||
Timeout: 1 * time.Second, | ||
} | ||
|
||
resp, err := client.Do(req) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer resp.Body.Close() | ||
|
||
log.Println("response status code:", resp.StatusCode) | ||
log.Println("response headers:", resp.Header) | ||
|
||
body, err := ioutil.ReadAll(resp.Body) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
return nil, fmt.Errorf("request failed: %v %s", resp.StatusCode, string(body)) | ||
} | ||
|
||
return body, nil | ||
} | ||
|
||
// Decode returns a copy of the input schedule, its field 'value' replaced by the response from the http request | ||
func (h Decoder) Decode(s schedule.Schedule) (schedule.Schedule, error) { | ||
switch sch := s.(type) { | ||
case *kafka.Schedule: | ||
if len(sch.Message.Value) == 0 { | ||
return s, nil | ||
} | ||
|
||
data, err := h.Post(sch) | ||
if err != nil { | ||
return s, err | ||
} | ||
|
||
if len(data) != 0 { | ||
sch.Message.Value = data | ||
} | ||
|
||
return sch, nil | ||
case rest.Schedule: | ||
if len(sch.MessageValue) == 0 { | ||
return s, nil | ||
} | ||
|
||
data, err := h.Post(sch) | ||
if err != nil { | ||
return s, err | ||
} | ||
|
||
if len(data) != 0 { | ||
sch.MessageValue = data | ||
} | ||
|
||
return sch, nil | ||
default: | ||
return s, fmt.Errorf("%w: %T", ErrUnknownScheduleType, s) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
// INTEGRATION TESTS | ||
package httpdecoder_test | ||
|
||
import ( | ||
"fmt" | ||
"net/http" | ||
"reflect" | ||
"testing" | ||
|
||
confluent "github.com/confluentinc/confluent-kafka-go/kafka" | ||
"github.com/etf1/kafka-message-scheduler-admin/server/decoder/httpdecoder" | ||
"github.com/etf1/kafka-message-scheduler-admin/server/helper" | ||
"github.com/etf1/kafka-message-scheduler/schedule/kafka" | ||
) | ||
|
||
// Rule #1: when received http response code != 200, the kafka message body should be unchanged | ||
func TestHTTPDecoder_not200(t *testing.T) { | ||
helper.VerifyIfSkipIntegrationTests(t) | ||
tests := []struct { | ||
code int | ||
}{ | ||
{http.StatusNotFound}, | ||
{http.StatusBadRequest}, | ||
{http.StatusInternalServerError}, | ||
} | ||
|
||
for i, tt := range tests { | ||
t.Run(fmt.Sprintf("http_#%v", i), func(t *testing.T) { | ||
func() { | ||
server := helper.MockServer(tt.code, "response") | ||
defer server.Close() | ||
|
||
dec := httpdecoder.Decoder{ | ||
URL: server.URL, | ||
} | ||
|
||
topic := "topic" | ||
sch := kafka.Schedule{ | ||
Message: &confluent.Message{ | ||
TopicPartition: confluent.TopicPartition{ | ||
Topic: &topic, | ||
}, | ||
Key: []byte("video-1"), | ||
Value: []byte("content"), | ||
}, | ||
} | ||
|
||
// create a copy of the schedule | ||
sch2 := helper.CopyKafkaSchedule(sch) | ||
|
||
sch3, err := dec.Decode(&sch) | ||
if err == nil { | ||
t.Error("expected error") | ||
} | ||
|
||
sch4, ok := sch3.(*kafka.Schedule) | ||
if !ok { | ||
t.Errorf("unexpected type: %v", sch3) | ||
} | ||
|
||
if reflect.DeepEqual(*sch4, sch2) == false { | ||
t.Errorf("should be equal") | ||
} | ||
}() | ||
}) | ||
} | ||
} | ||
|
||
// Rule #2: when received http response code == 200 and not empty response, | ||
// the kafka message value should be changed | ||
func TestHTTPDecoder_200(t *testing.T) { | ||
helper.VerifyIfSkipIntegrationTests(t) | ||
|
||
tests := []struct { | ||
response string | ||
expectedMessageValue string | ||
}{ | ||
// message content updated with new content | ||
{"response", "response"}, | ||
// message content should not be changed when empty response | ||
{"", "content"}, | ||
} | ||
|
||
for i, tt := range tests { | ||
t.Run(fmt.Sprintf("case #%v", i+1), func(t *testing.T) { | ||
server := helper.MockServer(200, tt.response) | ||
defer server.Close() | ||
|
||
dec := httpdecoder.Decoder{ | ||
URL: server.URL, | ||
} | ||
|
||
topic := "topic" | ||
sch := kafka.Schedule{ | ||
Message: &confluent.Message{ | ||
TopicPartition: confluent.TopicPartition{ | ||
Topic: &topic, | ||
}, | ||
Key: []byte("video-1"), | ||
Value: []byte("content"), | ||
}, | ||
} | ||
|
||
sch2, err := dec.Decode(&sch) | ||
if err != nil { | ||
t.Error("unexpected error") | ||
} | ||
|
||
sch3, ok := sch2.(*kafka.Schedule) | ||
if !ok { | ||
t.Errorf("unexpected type: %T", sch2) | ||
} | ||
|
||
if string(sch3.Message.Value) != tt.expectedMessageValue { | ||
t.Fatalf("message content not correct: %q", string(sch.Message.Value)) | ||
} | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.