Skip to content

Commit

Permalink
Broker ingress and Dispatcher now uses the protocol binding instead o…
Browse files Browse the repository at this point in the history
…f json to process events (#751)

* adding documentation dev files and organizing a little bit better the content

* fixed links refs on broker and source readmes + added source diagram image

* fixed copyright headers and added structure for new perf test resources and results

* fixed parallelism annotation in trigger performance readme and resources + recreated perf test images

* added multi consumer broker test cases

* finished source performance tests yamls + renamed generated perf benchmarks images + readme

* using binary protocol in the broker ingress for it to be used as an entry point for the source in the conformance tests, the changes to the broker to support this are going to be made in another commit + created source perf test results and published images, the performance gains while using the binary protocol binding vs json manipulation seems to be significant memorywise

* moved source results to its corresponding releases

* added tests to protocol spec

using ce protocol binding to get msg from a rabbitmq msg delivery

* improved failure handling in dispatcher + skipping tests cause wabbit does not support header on its test channels

excluding adapter from the golangci-lint while wabbit is been removed from the repo

added missing ce attributes to msg headers in ingres

added source to message header so it will be filtered appropriately

added support for timestamp in the ingress formating

trying different approach with cloudevents and filters

* fixed filters and improved msg readbinary method

fixed broker perf test setup + removed unused file + rebased

* fixed adapter's unit tests

removed ununsed header

using the contentype header

avoid using rabbitmq specific headers when translating rabbitmq messages to cloudevents + small refactor on the dispatcher

* Added content type safeguard in case the header is not present on the rabbitmq message + fixed malformed json in e2e test producer

fixing e2e and conformance tests to match expected output

removed unnecesary cast

* now using the obtained body of the request created by post message instead of the one defined by the test

* disabling lint and codecov for the dispatcher

* fixed imports on uncommented tests
  • Loading branch information
gabo1208 authored May 16, 2022
1 parent aaae796 commit 41df0b6
Show file tree
Hide file tree
Showing 27 changed files with 698 additions and 382 deletions.
1 change: 1 addition & 0 deletions .codecov.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
ignore:
- "**/zz_generated*.go" # Ignore generated files.
- "pkg/client"
- "pkg/dispatcher" #temporary until wabbit is gone
- "third_party"
- "vendor"
- "test"
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ run:
skip-dirs:
- pkg/client
- pkg/internal/thirdparty
- pkg/dispatcher

linters:
enable:
Expand Down
28 changes: 14 additions & 14 deletions cmd/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package main

import (
"encoding/json"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -186,23 +185,24 @@ func (env *envConfig) ServeHTTP(writer http.ResponseWriter, request *http.Reques
}

func (env *envConfig) send(event *cloudevents.Event, span *trace.Span) (int, time.Duration, error) {
bytes, err := json.Marshal(event)
if err != nil {
return http.StatusBadRequest, noDuration, fmt.Errorf("failed to marshal event, %w", err)
}

tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext())
headers := amqp.Table{
"type": event.Type(),
"source": event.Source(),
"subject": event.Subject(),
"traceparent": tp,
"tracestate": ts,
"content-type": event.DataContentType(),
"specversion": event.SpecVersion(),
"time": cloudevents.Timestamp{Time: event.Time().UTC()}.String(),
"type": event.Type(),
"source": event.Source(),
"subject": event.Subject(),
"id": event.ID(),
"dataschema": event.DataSchema(),
"traceparent": tp,
"tracestate": ts,
}

for key, val := range event.Extensions() {
headers[key] = val
}

start := time.Now()
dc, err := env.channel.PublishWithDeferredConfirm(
env.ExchangeName,
Expand All @@ -211,11 +211,11 @@ func (env *envConfig) send(event *cloudevents.Event, span *trace.Span) (int, tim
false, // immediate
amqp.Publishing{
Headers: headers,
ContentType: "application/json",
Body: bytes,
MessageId: event.ID(),
ContentType: event.DataContentType(),
Body: event.Data(),
DeliveryMode: amqp.Persistent,
})

if err != nil {
return http.StatusInternalServerError, noDuration, fmt.Errorf("failed to publish message: %w", err)
}
Expand Down
54 changes: 17 additions & 37 deletions pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@ import (
"github.com/NeowayLabs/wabbit/amqp"
"github.com/NeowayLabs/wabbit/amqptest"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol/http"

"go.uber.org/zap"

"knative.dev/eventing-rabbitmq/pkg/rabbit"
"knative.dev/eventing/pkg/adapter/v2"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/kncloudevents"
Expand Down Expand Up @@ -256,58 +253,41 @@ func (a *Adapter) PollForMessages(channel *wabbit.Channel,

func (a *Adapter) processMessages(wg *sync.WaitGroup, queue <-chan wabbit.Delivery) {
defer wg.Done()

for msg := range queue {
a.logger.Info("Received: ", zap.Any("value", string(msg.Body())))

a.logger.Info("Received: ", zap.String("MessageId", msg.MessageId()))
if err := a.postMessage(msg); err == nil {
a.logger.Info("Successfully sent event to sink")
err = msg.Ack(false)
if err != nil {
a.logger.Error("Sending Ack failed with Delivery Tag")
a.logger.Error("sending Ack failed with Delivery Tag")
}
} else {
a.logger.Error("Sending event to sink failed: ", zap.Error(err))
a.logger.Error("sending event to sink failed: ", zap.Error(err))
err = msg.Nack(false, false)
if err != nil {
a.logger.Error("Sending Nack failed with Delivery Tag")
a.logger.Error("sending Nack failed with Delivery Tag")
}
}
}
}

func (a *Adapter) postMessage(msg wabbit.Delivery) error {
a.logger.Info("url ->" + a.httpMessageSender.Target)
a.logger.Info("target: " + a.httpMessageSender.Target)
req, err := a.httpMessageSender.NewCloudEventRequest(a.context)
if err != nil {
return err
}

var msgBinding binding.Message = NewMessageFromDelivery(msg)

defer func() {
err := msgBinding.Finish(nil)
if err != nil {
a.logger.Error("Something went wrong while trying to finalizing the message", zap.Error(err))
}
}()

// if the msg is a cloudevent send it as it is to http
if msgBinding.ReadEncoding() == binding.EncodingUnknown {
// if the rabbitmq msg is not a cloudevent transform it into one
event := cloudevents.NewEvent()
err = convertToCloudEvent(&event, msg, a)
if err != nil {
a.logger.Error("Error converting RabbitMQ msg to CloudEvent", zap.Error(err))
return err
}

msgBinding = binding.ToMessage(&event)
}

err = http.WriteRequest(a.context, msgBinding, req)
err = rabbit.ConvertMessageToHTTPRequest(
a.context,
a.config.Name,
a.config.Namespace,
a.config.QueueConfig.Name,
msg,
req,
a.logger)
if err != nil {
a.logger.Error(fmt.Sprintf("Error writting event to http, encoding: %s", msgBinding.ReadEncoding()), zap.Error(err))
a.logger.Error("error writing event to http", zap.Error(err))
return err
}

Expand All @@ -323,12 +303,12 @@ func (a *Adapter) postMessage(msg wabbit.Delivery) error {
})

if err != nil {
a.logger.Error("Error while sending the message", zap.Error(err))
a.logger.Error("error while sending the message", zap.Error(err))
return err
}

if res.StatusCode/100 != 2 {
a.logger.Error("Unexpected status code", zap.Int("status code", res.StatusCode))
a.logger.Error("unexpected status code", zap.Int("status code", res.StatusCode))
return fmt.Errorf("%d %s", res.StatusCode, nethttp.StatusText(res.StatusCode))
}

Expand Down
33 changes: 19 additions & 14 deletions pkg/adapter/adapter_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2020 The Knative Authors
Copyright 2022 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,18 +67,17 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
reqBody: `{"test":"test"}`,
withMsgId: true,
reqHeaders: http.Header{
"Ce-Specversion": []string{"1.0"},
"Ce-Source": []string{"example/source.uri"},
"Ce-Testheader": []string{"testHeader"},
"Specversion": []string{"1.0"},
"Source": []string{"example/source.uri"},
"Testheader": []string{"testHeader"},
},
data: map[string]interface{}{
"test": "test",
},
headers: wabbit.Option{
"ce-specversion": "1.0",
"ce-source": "example/source.uri",
"ce-testheader": "testHeader",
"ignore": "ignore",
"specversion": "1.0",
"source": "example/source.uri",
"testheader": "testHeader",
},
isCe: true,
},
Expand Down Expand Up @@ -167,8 +166,7 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
if err != nil {
t.Errorf("Error unmarshaling wanted request body %s %s", tc.reqBody, err)
}

err = json.Unmarshal([]byte(tc.reqBody), &gotBody)
err = json.Unmarshal(h.body, &gotBody)
if err != nil {
t.Errorf("Error unmarshaling got request body %s %s", h.body, err)
}
Expand All @@ -180,19 +178,26 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
if tc.isCe {
ceHeaders := http.Header{}
for key, value := range h.header {
if strings.HasPrefix(key, "Ce-") {
ceHeaders[key] = value
}
ceHeaders[strings.TrimPrefix(key, "Ce-")] = value
}

if len(ceHeaders) > 0 && len(ceHeaders) != len(tc.reqHeaders) && !reflect.DeepEqual(ceHeaders, tc.reqHeaders) {
if !compareHeaders(tc.reqHeaders, ceHeaders, t) {
t.Errorf("Expected request headers '%s', but got '%s' %s", tc.reqHeaders, ceHeaders, err)
}
}
})
}
}

func compareHeaders(expected, received http.Header, t *testing.T) bool {
for key, val := range expected {
if val2, ok := received[key]; !ok || val[0] != val2[0] {
return false
}
}
return true
}

func TestAdapter_CreateConn(t *testing.T) {
fakeServer := server.NewServer("amqp://localhost:5672/%2f")
err := fakeServer.Start()
Expand Down
135 changes: 0 additions & 135 deletions pkg/adapter/message_test.go

This file was deleted.

Loading

0 comments on commit 41df0b6

Please sign in to comment.