Skip to content

Commit

Permalink
fix: replay request fails with 400 - request neither has anonymousId …
Browse files Browse the repository at this point in the history
…nor userId (#3911)
  • Loading branch information
atzoum authored Sep 26, 2023
1 parent 61883be commit 85adbbf
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 5 deletions.
1 change: 1 addition & 0 deletions backend-config/replay_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (config *ConfigT) ApplyReplaySources() {
}
newDestination := *d
newDestination.ID = id
newDestination.IsProcessorEnabled = true // processor is always enabled for replay destinations
return &newDestination
}), []*DestinationT{nil})

Expand Down
10 changes: 9 additions & 1 deletion backend-config/replay_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ func TestApplyReplayConfig(t *testing.T) {
},
Destinations: []DestinationT{
{
ID: "d-1",
ID: "d-1",
RevisionID: "rev-1",
IsProcessorEnabled: false,
},
{
ID: "d-2",
RevisionID: "rev-2",
},
},
},
Expand Down Expand Up @@ -56,6 +62,8 @@ func TestApplyReplayConfig(t *testing.T) {
require.Equal(t, map[string]interface{}{}, c.Sources[1].Config)
require.Len(t, c.Sources[1].Destinations, 1)
require.Equal(t, "er-d-1", c.Sources[1].Destinations[0].ID)
require.Equal(t, true, c.Sources[1].Destinations[0].IsProcessorEnabled)
require.Equal(t, "rev-1", c.Sources[1].Destinations[0].RevisionID)
})

t.Run("Invalid Replay Config", func(t *testing.T) {
Expand Down
9 changes: 6 additions & 3 deletions backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,19 @@ type ConfigT struct {

func (c *ConfigT) SourcesMap() map[string]*SourceT {
sourcesMap := make(map[string]*SourceT)
for _, source := range c.Sources {
for i := range c.Sources {
source := c.Sources[i]
sourcesMap[source.ID] = &source
}
return sourcesMap
}

func (c *ConfigT) DestinationsMap() map[string]*DestinationT {
destinationsMap := make(map[string]*DestinationT)
for _, source := range c.Sources {
for _, destination := range source.Destinations {
for i := range c.Sources {
source := c.Sources[i]
for j := range source.Destinations {
destination := source.Destinations[j]
destinationsMap[destination.ID] = &destination
}
}
Expand Down
69 changes: 69 additions & 0 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,66 @@ var _ = Describe("Gateway", func() {
}
}
})

It("should accept valid replay request and store to jobsdb", func() {
handler := gateway.webReplayHandler()

validBody := []byte(fmt.Sprintf(`{"batch":[%s]}`, string(createValidBody("custom-property", "custom-value"))))

c.mockJobsDB.EXPECT().WithStoreSafeTx(
gomock.Any(),
gomock.Any()).Times(1).Do(func(
ctx context.Context,
f func(tx jobsdb.StoreSafeTx) error,
) {
_ = f(jobsdb.EmptyStoreSafeTx())
}).Return(nil)
c.mockJobsDB.
EXPECT().StoreEachBatchRetryInTx(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).
DoAndReturn(
func(
ctx context.Context,
tx jobsdb.StoreSafeTx,
jobBatches [][]*jobsdb.JobT,
) (map[uuid.UUID]string, error) {
c.asyncHelper.ExpectAndNotifyCallbackWithName("jobsdb_store")()
return jobsToEmptyErrors(ctx, tx, jobBatches)
}).
Times(1)

expectHandlerResponse(
handler,
authorizedReplayRequest(
ReplaySourceID,
bytes.NewBuffer(validBody)),
http.StatusOK,
"OK",
"replay",
)

Eventually(
func() bool {
stat := statsStore.Get(
"gateway.write_key_successful_requests",
map[string]string{
"source": "_source",
"sourceID": ReplaySourceID,
"workspaceId": WorkspaceID,
"writeKey": ReplayWriteKey,
"reqType": "replay",
"sourceType": "eventStream",
"sdkVersion": sdkStatTag,
},
)
return stat != nil && stat.LastValue() == float64(1)
},
1*time.Second,
).Should(BeTrue())
})
})

Context("Bots", func() {
Expand Down Expand Up @@ -1373,6 +1433,15 @@ func authorizedRequest(username string, body io.Reader) *http.Request {
return req
}

func authorizedReplayRequest(sourceID string, body io.Reader) *http.Request {
req := unauthorizedRequest(body)
req.Header.Set("X-Rudder-Source-Id", sourceID)
// set anonymousId header to ensure everything goes into same batch
req.Header.Set("AnonymousId", "094985f8-b4eb-43c3-bc8a-e8b75aae9c7c")
req.RemoteAddr = TestRemoteAddressWithPort
return req
}

func expectHandlerResponse(handler http.HandlerFunc, req *http.Request, responseStatus int, responseBody, reqType string) {
var body string
Eventually(func() int {
Expand Down
2 changes: 1 addition & 1 deletion gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,
}

gw.requestSizeStat.Observe(float64(len(body)))
if req.reqType != "batch" {
if req.reqType != "batch" && req.reqType != "replay" {
body, err = sjson.SetBytes(body, "type", req.reqType)
if err != nil {
err = errors.New(response.NotRudderEvent)
Expand Down

0 comments on commit 85adbbf

Please sign in to comment.