Skip to content

Commit

Permalink
feat: snowpipe http compression for insert request
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Dec 5, 2024
1 parent 34a1caf commit b041dd4
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
Expand All @@ -10,6 +11,9 @@ import (

jsoniter "github.com/json-iterator/go"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model"
"github.com/rudderlabs/rudder-server/utils/httputil"
)
Expand All @@ -18,6 +22,12 @@ type (
API struct {
clientURL string
requestDoer requestDoer
config struct {
enableCompression config.ValueLoader[bool]
}
stats struct {
insertRequestBodySize stats.Histogram
}
}

requestDoer interface {
Expand All @@ -27,11 +37,17 @@ type (

var json = jsoniter.ConfigCompatibleWithStandardLibrary

func New(clientURL string, requestDoer requestDoer) *API {
return &API{
func New(conf *config.Config, statsFactory stats.Stats, clientURL string, requestDoer requestDoer) *API {
a := &API{
clientURL: clientURL,
requestDoer: requestDoer,
}
a.config.enableCompression = conf.GetReloadableBoolVar(true, "SnowpipeStreaming.enableCompression")
a.stats.insertRequestBodySize = statsFactory.NewTaggedStat("snowpipe_streaming_request_body_size", stats.HistogramType, stats.Tags{
"api": "insert",
})

return a
}

func mustRead(r io.Reader) []byte {
Expand Down Expand Up @@ -135,12 +151,27 @@ func (a *API) Insert(ctx context.Context, channelID string, insertRequest *model
return nil, fmt.Errorf("marshalling insert request: %w", err)
}

enableCompression := a.config.enableCompression.Load()

var r io.Reader = bytes.NewBuffer(reqJSON)
payloadSize := len(reqJSON)
if enableCompression {
r, payloadSize, err = gzippedReader(reqJSON)
if err != nil {
return nil, fmt.Errorf("creating gzip reader: %w", err)
}
}
a.stats.insertRequestBodySize.Observe(float64(payloadSize))

insertURL := a.clientURL + "/channels/" + channelID + "/insert"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, insertURL, bytes.NewBuffer(reqJSON))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, insertURL, r)
if err != nil {
return nil, fmt.Errorf("creating insert request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if enableCompression {
req.Header.Set("Content-Encoding", "gzip")
}

resp, reqErr := a.requestDoer.Do(req)
if reqErr != nil {
Expand Down Expand Up @@ -184,3 +215,15 @@ func (a *API) GetStatus(ctx context.Context, channelID string) (*model.StatusRes
}
return &res, nil
}

func gzippedReader(reqJSON []byte) (io.Reader, int, error) {
var b bytes.Buffer
gz := gzip.NewWriter(&b)
if _, err := gz.Write(reqJSON); err != nil {
return nil, 0, fmt.Errorf("writing to gzip writer: %w", err)
}
if err := gz.Close(); err != nil {
return nil, 0, fmt.Errorf("closing gzip writer: %w", err)
}
return &b, b.Len(), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/stats/memstats"

"github.com/rudderlabs/compose-test/compose"
"github.com/rudderlabs/compose-test/testcompose"
"github.com/rudderlabs/rudder-go-kit/config"
Expand All @@ -33,7 +35,7 @@ type integrationTestConfig struct {
db *sql.DB
namespace string
tableName string
snowpipeAPI *api.API
clientURL string
}

func TestAPIIntegration(t *testing.T) {
Expand All @@ -51,9 +53,10 @@ func TestAPIIntegration(t *testing.T) {
t.Run("Create channel + Get channel + Insert data + Status", func(t *testing.T) {
ctx := context.Background()
testConfig := setupIntegrationTestConfig(t, ctx)
snowpipeAPI := api.New(config.New(), stats.NOP, testConfig.clientURL, http.DefaultClient)

t.Log("Creating channel")
createChannelRes, err := testConfig.snowpipeAPI.CreateChannel(ctx, &model.CreateChannelRequest{
createChannelRes, err := snowpipeAPI.CreateChannel(ctx, &model.CreateChannelRequest{
RudderIdentifier: "1",
Partition: "1",
AccountConfig: model.AccountConfig{
Expand All @@ -78,7 +81,7 @@ func TestAPIIntegration(t *testing.T) {
)

t.Log("Getting channel")
getChannelRes, err := testConfig.snowpipeAPI.GetChannel(ctx, createChannelRes.ChannelID)
getChannelRes, err := snowpipeAPI.GetChannel(ctx, createChannelRes.ChannelID)
require.NoError(t, err)
require.Equal(t, createChannelRes, getChannelRes)

Expand All @@ -93,7 +96,7 @@ func TestAPIIntegration(t *testing.T) {
}

t.Log("Inserting records")
insertRes, err := testConfig.snowpipeAPI.Insert(ctx, createChannelRes.ChannelID, &model.InsertRequest{
insertRes, err := snowpipeAPI.Insert(ctx, createChannelRes.ChannelID, &model.InsertRequest{
Rows: rows,
Offset: "8",
})
Expand All @@ -102,7 +105,7 @@ func TestAPIIntegration(t *testing.T) {

t.Log("Checking status")
require.Eventually(t, func() bool {
statusRes, err := testConfig.snowpipeAPI.GetStatus(ctx, createChannelRes.ChannelID)
statusRes, err := snowpipeAPI.GetStatus(ctx, createChannelRes.ChannelID)
if err != nil {
t.Log("Error getting status:", err)
return false
Expand All @@ -121,6 +124,7 @@ func TestAPIIntegration(t *testing.T) {
t.Run("Create + Delete channel", func(t *testing.T) {
ctx := context.Background()
testConfig := setupIntegrationTestConfig(t, ctx)
snowpipeAPI := api.New(config.New(), stats.NOP, testConfig.clientURL, http.DefaultClient)

t.Log("Creating channel")
createChannelReq := &model.CreateChannelRequest{
Expand All @@ -139,25 +143,104 @@ func TestAPIIntegration(t *testing.T) {
Table: testConfig.tableName,
},
}
createChannelRes1, err := testConfig.snowpipeAPI.CreateChannel(ctx, createChannelReq)
createChannelRes1, err := snowpipeAPI.CreateChannel(ctx, createChannelReq)
require.NoError(t, err)
require.True(t, createChannelRes1.Valid)

t.Log("Creating channel again, should return the same channel id")
createChannelRes2, err := testConfig.snowpipeAPI.CreateChannel(ctx, createChannelReq)
createChannelRes2, err := snowpipeAPI.CreateChannel(ctx, createChannelReq)
require.NoError(t, err)
require.True(t, createChannelRes2.Valid)
require.Equal(t, createChannelRes1, createChannelRes2)

t.Log("Deleting channel")
err = testConfig.snowpipeAPI.DeleteChannel(ctx, createChannelRes1.ChannelID, true)
err = snowpipeAPI.DeleteChannel(ctx, createChannelRes1.ChannelID, true)
require.NoError(t, err)

t.Log("Creating channel again, should return a new channel id")
createChannelRes3, err := testConfig.snowpipeAPI.CreateChannel(ctx, createChannelReq)
createChannelRes3, err := snowpipeAPI.CreateChannel(ctx, createChannelReq)
require.NoError(t, err)
require.NotEqual(t, createChannelRes1.ChannelID, createChannelRes3.ChannelID)
})

t.Run("Compression", func(t *testing.T) {
ctx := context.Background()
testConfig := setupIntegrationTestConfig(t, ctx)

testCases := []struct {
name string
enableCompression bool
payloadSize int
}{
{
name: "Compression enabled",
enableCompression: true,
payloadSize: 378,
},
{
name: "Compression disabled",
enableCompression: false,
payloadSize: 839,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := config.New()
c.Set("SnowpipeStreaming.enableCompression", tc.enableCompression)

statsStore, err := memstats.New()
require.NoError(t, err)

snowpipeAPI := api.New(c, statsStore, testConfig.clientURL, http.DefaultClient)

t.Log("Creating channel")
createChannelRes, err := snowpipeAPI.CreateChannel(ctx, &model.CreateChannelRequest{
RudderIdentifier: "1",
Partition: "1",
AccountConfig: model.AccountConfig{
Account: testConfig.credentials.Account,
User: testConfig.credentials.User,
Role: testConfig.credentials.Role,
PrivateKey: strings.ReplaceAll(testConfig.credentials.PrivateKey, "\n", "\\\\\n"),
PrivateKeyPassphrase: testConfig.credentials.PrivateKeyPassphrase,
},
TableConfig: model.TableConfig{
Database: testConfig.credentials.Database,
Schema: testConfig.namespace,
Table: testConfig.tableName,
},
})
require.NoError(t, err)
require.NotEmpty(t, createChannelRes.ChannelID)
require.True(t, createChannelRes.Valid)
require.False(t, createChannelRes.Deleted)
require.EqualValues(t, whutils.ModelTableSchema{"ACTIVE": "boolean", "AGE": "int", "DOB": "datetime", "EMAIL": "string", "ID": "string", "NAME": "string"},
createChannelRes.SnowpipeSchema,
)

rows := []model.Row{
{"ID": "ID1", "NAME": "Alice Johnson", "EMAIL": "alice.johnson@example.com", "AGE": 28, "ACTIVE": true, "DOB": "1995-06-15T12:30:00Z"},
{"ID": "ID2", "NAME": "Bob Smith", "EMAIL": "bob.smith@example.com", "AGE": 35, "ACTIVE": true, "DOB": "1988-01-20T09:30:00Z"},
{"ID": "ID3", "NAME": "Charlie Brown", "EMAIL": "charlie.brown@example.com", "AGE": 22, "ACTIVE": false, "DOB": "2001-11-05T14:45:00Z"},
{"ID": "ID4", "NAME": "Diana Prince", "EMAIL": "diana.prince@example.com", "AGE": 30, "ACTIVE": true, "DOB": "1993-08-18T08:15:00Z"},
{"ID": "ID5", "NAME": "Eve Adams", "AGE": 45, "ACTIVE": true, "DOB": "1978-03-22T16:50:00Z"}, // -- No email
{"ID": "ID6", "NAME": "Frank Castle", "EMAIL": "frank.castle@example.com", "AGE": 38, "ACTIVE": false, "DOB": "1985-09-14T10:10:00Z"},
{"ID": "ID7", "NAME": "Grace Hopper", "EMAIL": "grace.hopper@example.com", "AGE": 85, "ACTIVE": true, "DOB": "1936-12-09T11:30:00Z"},
}

t.Log("Inserting records")
insertRes, err := snowpipeAPI.Insert(ctx, createChannelRes.ChannelID, &model.InsertRequest{
Rows: rows,
Offset: "8",
})
require.NoError(t, err)
require.Equal(t, &model.InsertResponse{Success: true, Errors: nil}, insertRes)
require.Equal(t, tc.payloadSize, statsStore.Get("snowpipe_streaming_request_body_size", stats.Tags{
"api": "insert",
}).LastValue())
})
}
})
}

func setupIntegrationTestConfig(t *testing.T, ctx context.Context) *integrationTestConfig {
Expand Down Expand Up @@ -199,14 +282,13 @@ func setupIntegrationTestConfig(t *testing.T, ctx context.Context) *integrationT
require.NoError(t, sm.CreateTable(ctx, table, tableSchema))

snowpipeClientsURL := fmt.Sprintf("http://localhost:%d", c.Port("rudder-snowpipe-clients", 9078))
a := api.New(snowpipeClientsURL, http.DefaultClient)

return &integrationTestConfig{
credentials: credentials,
db: sm.DB.DB,
namespace: namespace,
tableName: table,
snowpipeAPI: a,
clientURL: snowpipeClientsURL,
}
}

Expand Down
Loading

0 comments on commit b041dd4

Please sign in to comment.