Skip to content

Commit

Permalink
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
Browse files Browse the repository at this point in the history
…re.app-warehouse
  • Loading branch information
achettyiitr committed Sep 21, 2023
2 parents ae792d0 + 74f5f11 commit dfbbdd1
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 22 deletions.
40 changes: 21 additions & 19 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,10 +1446,17 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
proc.logger.Errorf("Dropping Job since Source not found for sourceId %q: %v", sourceId, sourceError)
continue
}
payloadFunc := ro.Memoize(func() json.RawMessage {
payloadBytes, err := jsonfast.Marshal(singularEvent)
if err != nil {
return nil
}
return payloadBytes
})

if proc.config.enableDedup {
payload, _ := jsonfast.Marshal(singularEvent)
messageSize := int64(len(payload))
p := payloadFunc()
messageSize := int64(len(p))
dedupKey := fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId)
if ok, previousSize := proc.dedup.Set(dedup.KeyValue{Key: dedupKey, Value: messageSize}); !ok {
proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey)
Expand All @@ -1476,30 +1483,21 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
eventParams,
)

payloadFunc := ro.Memoize(func() json.RawMessage {
if proc.transientSources.Apply(source.ID) {
return nil
}
payloadBytes, err := jsonfast.Marshal(singularEvent)
if err != nil {
return nil
}
return payloadBytes
},
)
sourceIsTransient := proc.transientSources.Apply(source.ID)
if proc.config.eventSchemaV2Enabled && // schemas enabled
// source has schemas enabled or if we override schemas for all sources
(source.EventSchemasEnabled || proc.config.eventSchemaV2AllSources) &&
// TODO: could use source.SourceDefinition.Category instead?
commonMetadataFromSingularEvent.SourceJobRunID == "" {
if payload := payloadFunc(); payload != nil {
commonMetadataFromSingularEvent.SourceJobRunID == "" &&
!sourceIsTransient {
if eventPayload := payloadFunc(); eventPayload != nil {
eventSchemaJobs = append(eventSchemaJobs,
&jobsdb.JobT{
UUID: batchEvent.UUID,
UserID: batchEvent.UserID,
Parameters: batchEvent.Parameters,
CustomVal: batchEvent.CustomVal,
EventPayload: payload,
EventPayload: eventPayload,
CreatedAt: time.Now(),
ExpireAt: time.Now(),
WorkspaceId: batchEvent.WorkspaceId,
Expand All @@ -1509,15 +1507,16 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
}

if proc.config.archivalEnabled.Load() &&
commonMetadataFromSingularEvent.SourceJobRunID == "" { // archival enabled
if payload := payloadFunc(); payload != nil {
commonMetadataFromSingularEvent.SourceJobRunID == "" && // archival enabled&&
!sourceIsTransient {
if eventPayload := payloadFunc(); eventPayload != nil {
archivalJobs = append(archivalJobs,
&jobsdb.JobT{
UUID: batchEvent.UUID,
UserID: batchEvent.UserID,
Parameters: batchEvent.Parameters,
CustomVal: batchEvent.CustomVal,
EventPayload: payload,
EventPayload: eventPayload,
CreatedAt: time.Now(),
ExpireAt: time.Now(),
WorkspaceId: batchEvent.WorkspaceId,
Expand All @@ -1540,6 +1539,9 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
jobsdb.Succeeded.State,
types.GATEWAY,
func() json.RawMessage {
if sourceIsTransient {
return []byte(`{}`)
}
if payload := payloadFunc(); payload != nil {
return payload
}
Expand Down
3 changes: 2 additions & 1 deletion runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/bugsnag/bugsnag-go/v2"
"net/http"
"os"
"runtime"
"runtime/pprof"
"strings"
"time"

"github.com/bugsnag/bugsnag-go/v2"

"github.com/rudderlabs/rudder-go-kit/filemanager"

_ "go.uber.org/automaxprocs"
Expand Down
3 changes: 2 additions & 1 deletion warehouse/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/cenkalti/backoff"
"os"
"sync"
"time"

"github.com/cenkalti/backoff"

"github.com/rudderlabs/rudder-server/services/notifier"

"github.com/rudderlabs/rudder-server/warehouse/encoding"
Expand Down
3 changes: 2 additions & 1 deletion warehouse/warehouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package warehouse
import (
"context"
"fmt"
"github.com/ory/dockertest/v3"
"net"
"net/http"
"strconv"
"testing"
"time"

"github.com/ory/dockertest/v3"

"github.com/hashicorp/yamux"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down

0 comments on commit dfbbdd1

Please sign in to comment.