Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coalesce rows #89

Merged
merged 15 commits into from
Feb 11, 2019
Merged

Coalesce rows #89

merged 15 commits into from
Feb 11, 2019

Conversation

tims
Copy link
Contributor

@tims tims commented Jan 18, 2019

For #88

/wip

@tims
Copy link
Contributor Author

tims commented Jan 18, 2019

@pradithya @woop @budi This is the PR I mentioned to you in chat. Take a look if you're interested in the implementation details. It's still needs a few changes.

eg

  • change the import spec options field to be sourceOptions, but I want to wait for another PR so we have less merge conflicts.
  • group by entityname +entitykey, instead of FeatureRowKey, but this is dependent on removing history from the serving stores first.

@tims
Copy link
Contributor Author

tims commented Jan 18, 2019

/wip
/hold


public class JobOptions implements Options {

private long sampleLimit;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how all these configuration will affect the ingestion?

@pradithya
Copy link
Collaborator

change the import spec options field to be sourceOptions, but I want to wait for another PR so we have less merge conflicts.

Is it #87 ?

group by entityname +entitykey, instead of FeatureRowKey, but this is dependent on removing history from the serving stores first.

is it possible without #87 ?

public class CoalesceFeatureRows extends
PTransform<PCollection<FeatureRow>, PCollection<FeatureRow>> {

private static final Comparator<Timestamp> TIMESTAMP_COMPARATOR = Comparator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use Timestamps.compare(t1, t2) for this

@@ -170,6 +175,12 @@ public void expand() {
ParDo.of(new RoundEventTimestampsDoFn())),
pFeatureRows.getErrors());

if (jobOptions.isCoalesceRowsEnabled()) {
pFeatureRows = pFeatureRows.apply("foo", new CoalescePFeatureRows(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"foo" can be replaced with "Coalesce Feature Row"

return sampleLimit;
}

public void setSampleimit(long sampleLimit) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo on method name

@tims
Copy link
Contributor Author

tims commented Jan 27, 2019

/hold cancel

/assign pradithya please review

@tims
Copy link
Contributor Author

tims commented Jan 27, 2019

I've included not writing to the feature stores as requested.

@tims
Copy link
Contributor Author

tims commented Jan 27, 2019

Related to #87

@tims
Copy link
Contributor Author

tims commented Jan 27, 2019

Biggest change externally is to the ImportSpec.

From


message ImportSpec {
  string type = 1;
  map<string, string> options = 2;
  repeated string entities = 3;
  Schema schema = 4;
}

To:


message ImportSpec {
  string type = 1;
  map<string, string> sourceOptions = 2;
  map<string, string> jobOptions = 5;
  repeated string entities = 3;
  Schema schema = 4;
}

@tims
Copy link
Contributor Author

tims commented Jan 27, 2019

To turn on coalesceRows you need to pass jobOptions: { coalesceRows.enabled: "true" }

You can also set, coalesceRows.delaySeconds, and coalesceRows.timeoutSeconds, but these settings are only relevant for streaming.

The delay indicates how many seconds the watermark must advance before the rows are flushed, default is 10 seconds.
They timeout indicates how many seconds the watermark must advance before the previous state is cleared. The default is 0, which indicates it will never be cleared.

@tims tims mentioned this pull request Jan 27, 2019
@tims
Copy link
Contributor Author

tims commented Jan 27, 2019

should this be on or off by default?

@pradithya
Copy link
Collaborator

pradithya commented Jan 28, 2019

should this be on or off by default?

I think it should be on by default.
What is the behavior when it's off?

Is this in-scope for 0.1.0?

@tims tims self-assigned this Jan 30, 2019
@tims
Copy link
Contributor Author

tims commented Jan 30, 2019

/assign zhilingc

Can you run with some of your existing workloads?

@zhilingc
Copy link
Collaborator

@tims done, I think we're good to merge it in

@zhilingc
Copy link
Collaborator

/approve

@zhilingc zhilingc closed this Feb 10, 2019
@zhilingc
Copy link
Collaborator

sorry, sausage fingers :(

@zhilingc zhilingc reopened this Feb 10, 2019
@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: zhilingc

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@tims
Copy link
Contributor Author

tims commented Feb 11, 2019

/lgtm

@feast-ci-bot
Copy link
Collaborator

@tims: you cannot LGTM your own PR.

In response to this:

/lgtm

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@tims
Copy link
Contributor Author

tims commented Feb 11, 2019

boo... @pradithya can you lgtm this?

@pradithya
Copy link
Collaborator

/lgtm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants