diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go index 0f2de99dd2adf..6385098d013ca 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go @@ -425,16 +425,20 @@ func (c *DataChannel) read(ctx context.Context) { continue // we've already closed this cached reader, skip } r.PTransformDone() - if r.Closed() { - // Clean up local bookkeeping. We'll never see another message - // for it again. We have to be careful not to remove the real - // one, because readers may be initialized after we've seen - // the full stream. - delete(cache, id.instID) - } } seenLast = seenLast[:0] // reset for re-use c.mu.Unlock() + // Scan through the cache and check for any closed readers, and evict them from the cache. + // Readers might be closed out of band from the data messages because we received all data + // for all transforms in an instruction before the instruction even begun. However, we can't + // know this until we received the Control instruction which knows how many transforms for which + // we need to receive data. So we check the cache directly every so often and evict closed + // readers. We will never recieve data for these instructions again. + for instID, r := range cache { + if r.Closed() { + delete(cache, instID) + } + } } } }