Skip to content

Commit

Permalink
chore: webhook v0 deprecation while defaulting to v1 (#5187)
Browse files Browse the repository at this point in the history
* chore: webhook v0 deprecation while defaulting to v1

* chore: test cases changes, panic upon version mismatch

* chore: fmt, lint issues cleared

* chore: panic on version mismatch without waiting for first event

* chore: integration test cases fixed for v0 deprecation

* chore: commented code cleanup

* chore: unwanted files cleanup
  • Loading branch information
vinayteki95 authored Oct 24, 2024
1 parent ab62155 commit 9d27a0f
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 43 deletions.
18 changes: 2 additions & 16 deletions gateway/webhook/webhookTransformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,13 @@ type sourceTransformAdapter interface {
getTransformerURL(sourceType string) (string, error)
}

type v0Adapter struct{}

type v1Adapter struct{}

type V1TransformerEvent struct {
Event json.RawMessage `json:"event"`
Source backendconfig.SourceT `json:"source"`
}

func (v0 *v0Adapter) getTransformerEvent(authCtx *gwtypes.AuthRequestContext, body []byte) ([]byte, error) {
return body, nil
}

func (v0 *v0Adapter) getTransformerURL(sourceType string) (string, error) {
return getTransformerURL(transformer.V0, sourceType)
}

func (v1 *v1Adapter) getTransformerEvent(authCtx *gwtypes.AuthRequestContext, body []byte) ([]byte, error) {
source := authCtx.Source

Expand All @@ -68,12 +58,8 @@ func (v1 *v1Adapter) getTransformerURL(sourceType string) (string, error) {
}

func newSourceTransformAdapter(version string) sourceTransformAdapter {
switch version {
case "v1":
return &v1Adapter{}
}

return &v0Adapter{}
// V0 Deprecation: this function returns v1 adapter by default, thereby deprecating v0
return &v1Adapter{}
}

func getTransformerURL(version, sourceType string) (string, error) {
Expand Down
19 changes: 0 additions & 19 deletions gateway/webhook/webhookTransformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,6 @@ import (
"github.com/rudderlabs/rudder-server/services/transformer"
)

func TestV0Adapter(t *testing.T) {
v0Adapter := newSourceTransformAdapter(transformer.V0)

t.Run("should return the right url", func(t *testing.T) {
testSrcType := "testSrcType"
testSrcTypeLower := "testsrctype"
url, err := v0Adapter.getTransformerURL(testSrcType)
require.Nil(t, err)
require.True(t, strings.HasSuffix(url, fmt.Sprintf("/%s/sources/%s", transformer.V0, testSrcTypeLower)))
})

t.Run("should return the body as is", func(t *testing.T) {
testBody := []byte("testBody")
retBody, err := v0Adapter.getTransformerEvent(nil, testBody)
require.Equal(t, testBody, retBody)
require.Nil(t, err)
})
}

func TestV1Adapter(t *testing.T) {
t.Run("should return the right url", func(t *testing.T) {
v1Adapter := newSourceTransformAdapter(transformer.V1)
Expand Down
3 changes: 2 additions & 1 deletion regulation-worker/cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ func handler(t *testing.T, minioConfig map[string]interface{}, redisAddress stri
srvMux.Get("/features", func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"regulations": ["BRAZE", "AM", "INTERCOM", "CLEVERTAP", "AF", "MP", "GA", "ITERABLE", "ENGAGE", "CUSTIFY", "SENDGRID", "SPRIG"]
"regulations": ["BRAZE", "AM", "INTERCOM", "CLEVERTAP", "AF", "MP", "GA", "ITERABLE", "ENGAGE", "CUSTIFY", "SENDGRID", "SPRIG"],
"supportSourceTransformV1": true
}`))
})
return srvMux
Expand Down
4 changes: 3 additions & 1 deletion services/transformer/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var defaultTransformerFeatures = `{
"HS": true
},
"regulations": ["AM"],
"supportSourceTransformV1": true,
}`

func NewFeaturesService(ctx context.Context, config *config.Config, featConfig FeaturesServiceOptions) FeaturesService {
Expand Down Expand Up @@ -73,7 +74,8 @@ func (*noopService) Regulations() []string {
}

func (*noopService) SourceTransformerVersion() string {
return V0
// v0 is deprecated
return V1
}

func (*noopService) TransformerProxyVersion() string {
Expand Down
8 changes: 6 additions & 2 deletions services/transformer/features_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ type featuresService struct {
}

func (t *featuresService) SourceTransformerVersion() string {
// V0 Deprecation: This function will verify if `supportSourceTransformV1` is available and enabled
// if `supportSourceTransformV1` is not enabled, transformer is not compatible and server will panic with appropriate message.
if gjson.GetBytes(t.features, "supportSourceTransformV1").Bool() {
return V1
}

return V0
panic("Webhook source v0 version has been deprecated. This is a breaking change. Upgrade transformer version to greater than 1.50.0 for v1")
}

func (t *featuresService) TransformerProxyVersion() string {
Expand Down Expand Up @@ -116,6 +117,9 @@ func (t *featuresService) makeFeaturesFetchCall() bool {

if res.StatusCode == 200 {
t.features = body

// we are calling this to see if the transformer version is deprecated. if so, we panic.
t.SourceTransformerVersion()
} else if res.StatusCode == 404 {
t.features = json.RawMessage(defaultTransformerFeatures)
}
Expand Down
62 changes: 59 additions & 3 deletions services/transformer/features_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ = Describe("Transformer features", func() {
}, 2*time.Second, 10*time.Millisecond).Should(BeFalse())
})

It("before features are fetched, SourceTransformerVersion should return v0", func() {
It("before features are fetched, SourceTransformerVersion should return v1(default) because v0 is deprecated", func() {
handler := &featuresService{
features: json.RawMessage(defaultTransformerFeatures),
logger: logger.NewLogger(),
Expand All @@ -48,7 +48,7 @@ var _ = Describe("Transformer features", func() {
},
}

Expect(handler.SourceTransformerVersion()).To(Equal(V0))
Expect(handler.SourceTransformerVersion()).To(Equal(V1))
})

It("before features are fetched, TransformerProxyVersion should return v0", func() {
Expand Down Expand Up @@ -81,6 +81,7 @@ var _ = Describe("Transformer features", func() {
Expect(handler.RouterTransform("ACTIVE_CAMPAIGN")).To(BeFalse())
Expect(handler.RouterTransform("ALGOLIA")).To(BeFalse())
Expect(handler.Regulations()).To(Equal([]string{"AM"}))
Expect(handler.SourceTransformerVersion()).To(Equal(V1))
})

It("if transformer returns 404, features should be same as defaultTransformerFeatures", func() {
Expand All @@ -103,6 +104,61 @@ var _ = Describe("Transformer features", func() {
Expect(handler.RouterTransform("ALGOLIA")).To(BeFalse())
})

It("If source transform is not v1, it should panic as v0 is deprecated", func() {
defer func() {
if r := recover(); r == nil {
Fail("The function `SourceTransformerVersion()` is supposed to panic. It did not.")
} else {
if err, ok := r.(error); ok {
Expect(err.Error()).To(Equal("Webhook source v0 version has been deprecated. This is a breaking change. Upgrade transformer version to greater than 1.50.0 for v1"))
} else {
Expect(r).To(Equal("Webhook source v0 version has been deprecated. This is a breaking change. Upgrade transformer version to greater than 1.50.0 for v1"))
}
}
}()

mockTransformerResp := `{
"routerTransform": {
"a": true,
"b": true
},
"regulations": ["AM"],
"supportSourceTransformV1": false,
"supportTransformerProxyV1": true
}`
transformerServer := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(mockTransformerResp))
}))

featConfig := FeaturesServiceOptions{
PollInterval: time.Duration(1),
TransformerURL: transformerServer.URL,
FeaturesRetryMaxAttempts: 1,
}

handler := &featuresService{
features: json.RawMessage(defaultTransformerFeatures),
logger: logger.NewLogger().Child("transformer-features"),
waitChan: make(chan struct{}),
options: featConfig,
client: &http.Client{
Transport: &http.Transport{
DisableKeepAlives: config.Default.GetBool("Transformer.Client.disableKeepAlives", true),
MaxConnsPerHost: config.Default.GetInt("Transformer.Client.maxHTTPConnections", 100),
MaxIdleConnsPerHost: config.Default.GetInt("Transformer.Client.maxHTTPIdleConnections", 10),
IdleConnTimeout: config.Default.GetDuration("Transformer.Client.maxIdleConnDuration", 30, time.Second),
},
Timeout: config.Default.GetDuration("HttpClient.processor.timeout", 30, time.Second),
},
}
handler.syncTransformerFeatureJson(context.TODO())

<-handler.Wait()

handler.SourceTransformerVersion()
})

It("Get should return features fetched from transformer", func() {
mockTransformerResp := `{
"routerTransform": {
Expand Down Expand Up @@ -130,7 +186,7 @@ var _ = Describe("Transformer features", func() {
Expect(handler.RouterTransform("HS")).To(BeFalse())
Expect(handler.RouterTransform("a")).To(BeTrue())
Expect(handler.RouterTransform("b")).To(BeTrue())
Expect(handler.SourceTransformerVersion()).To(Equal(V1))
Expect(handler.SourceTransformerVersion()).To(Equal(V1)) // V1 is default (V0 is deprecated)
Expect(handler.TransformerProxyVersion()).To(Equal(V1))
Expect(handler.Regulations()).To(Equal([]string{"AM"}))
})
Expand Down
2 changes: 1 addition & 1 deletion testhelper/transformertest/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (b *Builder) Build() *httptest.Server {
mux.HandleFunc("/batch", b.routerBatchTransformHandler)

// features
features := []byte(`{"routerTransform": {}}`)
features := []byte(`{"routerTransform": {}, "supportSourceTransformV1": true}`)
for destType := range b.routerTransforms {
features, _ = sjson.SetBytes(features, "routerTransform."+destType, true)
}
Expand Down

0 comments on commit 9d27a0f

Please sign in to comment.