Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintain parent/child shard ordering across shard splits/merges. #155

Merged
merged 1 commit into from
Jun 6, 2024

Conversation

gram-signal
Copy link
Contributor

Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained. IE: given an input pkey=x,val=1 pkey=x,val=2 pkey=x,val=3, the values 1,2,3 will be seen in that order when processed by clients, so long as clients are careful. It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard.

However, shards can be split and merge, to distribute load better and handle per-shard throughput limits. Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one. When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its listShards outputs. A split shard A will create children B and C, both with ParentShardId=A. A merging of shards A and B into C will create a new shard C with ParentShardId=A,AdjacentParentShardId=B. So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained.

kinesis-consumer currently doesn't do this. Instead, upon the initial (and subsequent) listShards call, all visible shards immediately begin processing. Considering this case, where shards split, then merge, and each shard X contains a single record rX:

time ->
  B
 / \
A   D
 \ /
  C

record rD should be processed after both rB and rC are processed, and both rB and rC should wait for rA to be processed. By starting goroutines immediately, any ordering of {rA,rB,rC,rD} might occur within the original code.

This PR utilizes the AllGroup as a book-keeper of fully processed shards, with the Consumer calling CloseShard once it has finished a shard. AllGroup doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to.

This PR created a new CloseableGroup interface rather than append to the existing Group interface to maintain backwards compatibility in existing code that may already implement the Group interface elsewhere. Different Group implementations don't get the ordering described above, but the default Consumer does.

Kinesis allows clients to rely on an invariant that, for a given partition key, the order of records added to the stream will be maintained.  IE: given an input `pkey=x,val=1  pkey=x,val=2  pkey=x,val=3`, the values `1,2,3` will be seen in that order when processed by clients, so long as clients are careful.  It does so by putting all records for a single partition key into a single shard, then maintaining ordering within that shard.

However, shards can be split and merge, to distribute load better and handle per-shard throughput limits.  Kinesis does this currently by (one or many times) splitting a single shard into two or by merging two adjacent shards into one.  When this occurs, Kinesis still allows for ordering consistency by detailing shard parent/child relationships within its `listShards` outputs.  A split shard A will create children B and C, both with `ParentShardId=A`.  A merging of shards A and B into C will create a new shard C with `ParentShardId=A,AdjacentParentShardId=B`.  So long as clients fully process all records in parents (including adjacent parents) before processing the new shard, ordering will be maintained.

`kinesis-consumer` currently doesn't do this.  Instead, upon the initial (and subsequent) `listShards` call, all visible shards immediately begin processing.  Considering this case, where shards split, then merge, and each shard `X` contains a single record `rX`:

```
time ->
  B
 / \
A   D
 \ /
  C
```

record `rD` should be processed after both `rB` and `rC` are processed, and both `rB` and `rC` should wait for `rA` to be processed.  By starting goroutines immediately, any ordering of `{rA,rB,rC,rD}` might occur within the original code.

This PR utilizes the `AllGroup` as a book-keeper of fully processed shards, with the `Consumer` calling `CloseShard` once it has finished a shard.  `AllGroup` doesn't release a shard for processing until its parents have fully been processed, and the consumer just processes the shards it receives as it used to.

This PR created a new `CloseableGroup` interface rather than append to the existing `Group` interface to maintain backwards compatibility in existing code that may already implement the `Group` interface elsewhere.  Different `Group` implementations don't get the ordering described above, but the default `Consumer` does.
@harlow
Copy link
Owner

harlow commented Jun 6, 2024

Nice work thanks for the PR!

@harlow harlow merged commit 6720a01 into harlow:master Jun 6, 2024
@@ -138,6 +144,10 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error {
return <-errc
}

func (c *Consumer) scanSingleShard(ctx context.Context, shardID string, fn ScanFunc) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this function is not in use

// given partition key's data will be provided to clients in-order,
// but when splits or joins happen, we need to process all parents prior
// to processing children or that ordering guarantee is not maintained.
if waitForCloseChannel(ctx, parent1) && waitForCloseChannel(ctx, parent2) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is a bug: even if both parent1 and parent2 are nil, we still send the shard to shagrdc and it starts a new go-routne ti scan the shard that is already is being scanned

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix #161

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants