Skip to content

Commit

Permalink
feat(bqstream): add test case for unmarshalling error
Browse files Browse the repository at this point in the history
  • Loading branch information
akashrpo committed Oct 14, 2022
1 parent 3987820 commit d7dee7f
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions services/streammanager/bqstream/bqstreammanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ func TestTimeout(t *testing.T) {
"ProjectId": bqCredentials.ProjectID,
}
destination := backendconfig.DestinationT{Config: config}
opts := common.Opts{Timeout: 1 * time.Microsecond};
opts := common.Opts{Timeout: 1 * time.Microsecond}
producer, err := bqstream.NewProducer(&destination, opts)
if err != nil {
t.Errorf(" %+v", err)
return
}

assert.NotNil(t, producer.Client);
assert.Equal(t, opts, producer.Opts);
assert.NotNil(t, producer.Client)
assert.Equal(t, opts, producer.Opts)

payload := `{
"datasetId": "timeout_test",
Expand Down Expand Up @@ -254,3 +254,26 @@ func TestProduceWithWithSingleRecord(t *testing.T) {
assert.Equal(t, "Success", statusMsg)
assert.NotEmpty(t, respMsg)
}

func TestProduceFailedCase(t *testing.T) {
ctrl := gomock.NewController(t)
mockClient := mock_bqstream.NewMockBQClient(ctrl)
producer := &bqstream.BQStreamProducer{Client: mockClient}

// properties -> string
sampleEventJson, _ := json.Marshal(map[string]interface{}{
"datasetId": "bigquery_batching",
"tableId": "Streaming",
"properties": json.RawMessage(`"id"`),
})

var genericRecs []*bqstream.GenericRecord
var genericRec *bqstream.GenericRecord
_ = json.Unmarshal([]byte("\"id\""), &genericRec)
genericRecs = append(genericRecs, genericRec)

statusCode, statusMsg, respMsg := producer.Produce(sampleEventJson, map[string]string{})
assert.Equal(t, 400, statusCode)
assert.Equal(t, "Failure", statusMsg)
assert.Contains(t, respMsg, "error in unmarshalling data")
}

0 comments on commit d7dee7f

Please sign in to comment.