Skip to content

Commit

Permalink
[Go SDK] Decrease sampling frequency for streaming jobs to avoid over…
Browse files Browse the repository at this point in the history
…sampling (#29774)
  • Loading branch information
zechenj18 authored Dec 14, 2023
1 parent 1c7d178 commit 52f4fc0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/runtime/exec/datasink.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (n *DataSink) StartBundle(ctx context.Context, id string, data DataContext)
// TODO[BEAM-6374): Properly handle the multiplex and flatten cases.
// Right now we just stop datasink collection.
if n.PCol != nil {
atomic.StoreInt64(&n.PCol.elementCount, 0)
atomic.StoreInt64(&n.PCol.bundleElementCount, 0)
n.PCol.resetSize()
}
return nil
Expand All @@ -86,7 +86,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value *FullValue, values
// TODO[BEAM-6374): Properly handle the multiplex and flatten cases.
// Right now we just stop datasink collection.
if n.PCol != nil {
atomic.AddInt64(&n.PCol.elementCount, 1)
atomic.AddInt64(&n.PCol.bundleElementCount, 1)
n.PCol.addSize(int64(byteCount))
}
return nil
Expand Down
14 changes: 8 additions & 6 deletions sdks/go/pkg/beam/core/runtime/exec/pcollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type PCollection struct {
elementCoder ElementEncoder
windowCoder WindowEncoder

elementCount int64 // must use atomic operations.
bundleElementCount int64 // must use atomic operations.
pCollectionElementCount int64 // track the total number of elements this instance has processed. Local use only, no concurrent read/write.
sizeMu sync.Mutex
sizeCount, sizeSum, sizeMin, sizeMax int64
dataSampler *DataSampler
Expand All @@ -68,7 +69,7 @@ func (p *PCollection) Up(ctx context.Context) error {

// StartBundle resets collected metrics for this PCollection, and propagates bundle start.
func (p *PCollection) StartBundle(ctx context.Context, id string, data DataContext) error {
atomic.StoreInt64(&p.elementCount, 0)
atomic.StoreInt64(&p.bundleElementCount, 0)
p.nextSampleIdx = 1
p.resetSize()
return MultiStartBundle(ctx, id, data, p.Out)
Expand All @@ -85,8 +86,8 @@ func (w *byteCounter) Write(p []byte) (n int, err error) {

// ProcessElement increments the element count and sometimes takes size samples of the elements.
func (p *PCollection) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
cur := atomic.AddInt64(&p.elementCount, 1)
if cur == p.nextSampleIdx {
cur := atomic.AddInt64(&p.bundleElementCount, 1)
if cur+p.pCollectionElementCount == p.nextSampleIdx {
// Always encode the first 3 elements. Otherwise...
// We pick the next sampling index based on how large this pcollection already is.
// We don't want to necessarily wait until the pcollection has doubled, so we reduce the range.
Expand All @@ -97,7 +98,7 @@ func (p *PCollection) ProcessElement(ctx context.Context, elm *FullValue, values
if p.nextSampleIdx < 4 {
p.nextSampleIdx++
} else {
p.nextSampleIdx = cur + p.r.Int63n(cur/10+2) + 1
p.nextSampleIdx = cur + p.r.Int63n((cur+p.pCollectionElementCount)/10+2) + 1
}

if p.dataSampler == nil {
Expand Down Expand Up @@ -140,6 +141,7 @@ func (p *PCollection) resetSize() {

// FinishBundle propagates bundle termination.
func (p *PCollection) FinishBundle(ctx context.Context) error {
p.pCollectionElementCount += atomic.LoadInt64(&p.bundleElementCount)
return MultiFinishBundle(ctx, p.Out)
}

Expand All @@ -165,7 +167,7 @@ func (p *PCollection) snapshot() PCollectionSnapshot {
defer p.sizeMu.Unlock()
return PCollectionSnapshot{
ID: p.PColID,
ElementCount: atomic.LoadInt64(&p.elementCount),
ElementCount: atomic.LoadInt64(&p.bundleElementCount),
SizeCount: p.sizeCount,
SizeSum: p.sizeSum,
SizeMin: p.sizeMin,
Expand Down

0 comments on commit 52f4fc0

Please sign in to comment.