Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-94] Add UnboundedCountingInput#withRate #17

Closed
wants to merge 5 commits into from

Conversation

tgroh
Copy link
Member

@tgroh tgroh commented Mar 3, 2016

The period between elements controls the rate at which
UnboundedCountingInput will output elements. This is an aggregate rate
across all instances of the source, and thus elements will not
necessarily be output "smoothly", or within the first period. The
aggregate rate, however, will be approximately equal to the provided
rate.

Add package-private CountingSource.createUnbounded() to expose the
UnboundedCountingSource type. Make UnboundedCountingSource
package-private.

@tgroh tgroh force-pushed the rate_limited_counting_source branch 3 times, most recently from a0d056b to cd1d616 Compare March 4, 2016 02:43
@tgroh
Copy link
Member Author

tgroh commented Mar 4, 2016

R: @dhalperi

The commit that removes Travis notifications must be removed before merging, but otherwise contains no changes to the codebase.

@tgroh tgroh force-pushed the rate_limited_counting_source branch 2 times, most recently from fe366ea to f275b10 Compare March 10, 2016 23:36
@dhalperi
Copy link
Contributor

This is ready for review, yes?

@tgroh
Copy link
Member Author

tgroh commented Mar 15, 2016

yes

On Tue, Mar 15, 2016 at 1:31 PM, Daniel Halperin notifications@github.com
wrote:

This is ready for review, yes?


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub
#17 (comment)

@tgroh tgroh force-pushed the rate_limited_counting_source branch 2 times, most recently from 87f40b7 to 0c67679 Compare March 21, 2016 16:13
@tgroh tgroh changed the title [BEAM-94] Add UnboundedCountingInput#withPeriod [BEAM-94] Add UnboundedCountingInput#withRate Mar 28, 2016
@tgroh tgroh force-pushed the rate_limited_counting_source branch from 0c67679 to b626813 Compare March 28, 2016 18:24
@tgroh
Copy link
Member Author

tgroh commented Mar 28, 2016

Additionally: R: @peihe

@@ -327,8 +372,14 @@ public boolean advance() throws IOException {
if (Long.MAX_VALUE - source.stride < current) {
return false;
}
Instant nextTimeToProduce = lastTimeProduced.plus(source.period);
if (Instant.now().isBefore(nextTimeToProduce)) {
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return false will indicate no more input available. (Based on advance() javadoc)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the correct behavior - if we're not at or beyond the next time we should produce, we have no input available, but may in the future.

@peihe
Copy link
Contributor

peihe commented Mar 29, 2016

Are there better and more consistent names?

  1. ContingInput.withRate(long, Duration)
    It is not clear if withRate(100, 10) and withRate(50, 5) are the same or not. There is also issue of dividedBy()
    I suggests something like: withMinimalGap() withMinGap(), withInputGap(), withElementGap()
  2. CountingSource.withPeriod(Duration)
    Period is not clear.
    And, I suggests to the the same name as ContingInput.

public void testUnboundedSourceWithPeriod() {
Pipeline p = TestPipeline.create();

Duration period = Duration.millis(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect advance() never returned false, since the period is only 2 millis.

I think make period higher and numElements lower will trigger more code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It almost certainly returns false - it should not take 2 seconds on modern hardware to generate 1000 longs without an explicit wait. On my machine, the testUnboundedSource test runs in about half a second, and this takes >2

@tgroh
Copy link
Member Author

tgroh commented Mar 29, 2016

I think rate is the best name for this, for a couple of reasons - the biggest is that rate is a common term used for time-based deltas; the other is that gap suggests that there is a consistent space between outputs, which is not true - the aggregate rate at any given time after source evaluation starts should approach and not exceed the specified rate, but we make no guarantees about how elements are grouped within that range (and in fact cannot, as that is partially left up to the runner for when it schedules the source).

I have changed the CountingSource#withPeriod to CountingSource#withRate, in part due to the potential impossibility to get an accurate period that satisfies the rate.

.withMaxNumRecords(numElements));

addCountingAsserts(input, numElements);
Instant startTime = Instant.now();
p.run();
assertThat(Instant.now().isAfter(startTime.plus(Duration.millis(3000))), is(true));
assertThat(Instant.now().isAfter(startTime.plus(Duration.millis(4000))), is(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assign constants to variables as testUnboundedSourceWithRate()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@tgroh tgroh force-pushed the rate_limited_counting_source branch from c57ee1b to 9c4d209 Compare March 30, 2016 17:56
@tgroh
Copy link
Member Author

tgroh commented Mar 30, 2016

Replaced the implementation of "do we have available elements", which is a reasonably significant change to how the decision is made. PTAL.

The rate controls the speed at which UnboundedCountingInput outputs
elements. This is an aggregate rate across all instances of the
source, and thus elements will not necessarily be output "smoothly",
or within the first period. The aggregate rate, however, will be
approximately equal to the provided rate.

Add package-private CountingSource#createUnbounded() to expose the
UnboundedCountingSource type. Make UnboundedCountingSource
package-private.
@tgroh tgroh force-pushed the rate_limited_counting_source branch from 9c4d209 to e2f9688 Compare March 30, 2016 18:48
new NowTimestampFn(), Optional.<Long>absent(), Optional.<Duration>absent());
new NowTimestampFn(),
// Elements per period
1L,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typically easier to read as

f(
    1L /* elements per period */,
    Duration.ZERO /* period length */,
    ....
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@dhalperi
Copy link
Contributor

My guess is this would be easier done if first move CountingSource into CountingInput -- reduce some of the extra code you had to write to get around things.

Use the expected produced value by the current time to decide
if we should output elements.
return backlogBytes;
private Instant timeToEmit(long value) {
long periodForValue = value / source.elementsPerPeriod;
return firstStarted.plus(source.period.multipliedBy(periodForValue));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this numeric rounding seems terrible if we have 100 elements per 100s, for example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Shared the logic between getSplitBacklog and here.

Use the expected value (with doubles for partial period precision) for determining
whether to output elements and the size of the split backlog
Short-circuit expected value in UnboundedCountingReader with zero duration
@@ -287,7 +335,8 @@ public UnboundedCountingSource(
}

@Override
public void validate() {}
public void validate() {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@dhalperi
Copy link
Contributor

LGTM. Let me know if you want any more fixes.

@dhalperi
Copy link
Contributor

Merged; no backport.

@asfgit asfgit closed this in 9793fa2 Mar 30, 2016
aljoscha pushed a commit to aljoscha/beam that referenced this pull request Mar 16, 2018
Add support for multiple outputs from executable stages
mareksimunek pushed a commit to mareksimunek/beam that referenced this pull request May 9, 2018
Replace Dataset#getPartitioning() with Dataset#getNumPartitions()
mxm pushed a commit to mxm/beam that referenced this pull request Sep 16, 2019
robertwb pushed a commit to robertwb/incubator-beam that referenced this pull request Apr 30, 2020
* Blog page with sample blogpost

* More blogposts migrated + fix language switch

* fixup! More blogposts migrated + fix language switch

* Add links to blog to footer and header

* fix links

* Add first 3 blogposts to home page

Co-authored-by: Kamil Gabryjelski <kamil.gabryjelski@polidea.com>
robertwb pushed a commit to robertwb/incubator-beam that referenced this pull request Apr 30, 2020
* Blog page with sample blogpost

* More blogposts migrated + fix language switch

* fixup! More blogposts migrated + fix language switch

* Add links to blog to footer and header

* fix links

* Add first 3 blogposts to home page

Co-authored-by: Kamil Gabryjelski <kamil.gabryjelski@polidea.com>
pabloem pushed a commit to pabloem/beam that referenced this pull request Feb 13, 2021
* New DebeziumIO class.

* Merge connector code

* DebeziumIO and MySqlConnector integrated.

* Added FormatFuntion param to Read builder on DebeziumIO.

* Added arguments checker to DebeziumIO.

* Add simple JSON mapper object (#1)

* Add simple JSON mapper object

* Fixed Mapper.

* Add SqlServer connector test

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Fixing MySQL schema DataException

Using file instead of schema should fix it

* MySQL Connector updated from 1.3.0 to 1.3.1

Co-authored-by: osvaldo-salinas <osvaldo.salinas@wizeline.com>
Co-authored-by: Carlos Dominguez <carlos.dominguez@carlos.dominguez>
Co-authored-by: Carlos Domínguez <carlos.dominguez@wizeline.com>

* Add debeziumio tests

* Debeziumio testing json mapper (#3)

* Some code refactors. Use a default DBHistory if not provided

* Add basic tests for Json mapper

* Debeziumio time restriction (apache#5)

* Add simple JSON mapper object

* Fixed Mapper.

* Add SqlServer connector test

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Fixing MySQL schema DataException

Using file instead of schema should fix it

* MySQL Connector updated from 1.3.0 to 1.3.1

* Some code refactors. Use a default DBHistory if not provided

* Adding based-time restriction

Stop polling after specified amount of time

* Add basic tests for Json mapper

* Adding new restriction

Uses a time-based restriction

* Adding optional restrcition

Uses an optional time-based restriction

Co-authored-by: juanitodread <juanitodread@gmail.com>
Co-authored-by: osvaldo-salinas <osvaldo.salinas@wizeline.com>

* Upgrade DebeziumIO connector (apache#4)

* Address comments (Change dependencies to testCompile, Set JsonMapper/Coder as default, refactors) (apache#8)

* Revert file

* Change dependencies to testCompile
* Move Counter sample to unit test

* Set JsonMapper as default mapper function
* Set String Coder as default coder when using JsonMapper
* Change logs from info to debug

* Debeziumio javadoc (apache#9)

* Adding javadoc

* Added some titles and examples

* Added SourceRecordJson doc

* Added Basic Connector doc

* Added KafkaSourceConsumer doc

* Javadoc cleanup

* Removing BasicConnector

No usages of this class were found overall

* Editing documentation

* Debeziumio fetched records restriction (apache#10)

* Adding javadoc

* Adding restriction by number of fetched records

Also adding a quick-fix for null value within SourceRecords
Minor fix on both MySQL and PostgreSQL Connectors Tests

* Run either by time or by number of records

* Added DebeziumOffsetTrackerTest

Tests both restrictions: By amount of time and by Number of records

* Removing comment

* DebeziumIO test for DB2. (apache#11)

* DebeziumIO test for DB2.

* DebeziumIO javadoc.

* Clean code:removed commented code lines on DebeziumIOConnectorTest.java

* Clean code:removing unused imports and using readAsJson().

Co-authored-by: Carlos Domínguez <74681048+carlosdominguezwl@users.noreply.github.com>

* Debezium limit records (now configurable) (apache#12)

* Adding javadoc

* Records Limit is now configurable

(It was fixed before)

* Debeziumio dockerize (apache#13)

* Add mysql docker container to tests

* Move debezium mysql integration test to its own file

* Add assertion to verify that the results contains a record.

* Debeziumio readme (apache#15)

* Adding javadoc

* Adding README file

* Add number of records configuration to the DebeziumIO component (apache#16)

* Code refactors (apache#17)

* Remove/ignore null warnings

* Remove DB2 code

* Remove docker dependency in DebeziumIO unit test and max number of recods to MySql integration test

* Change access modifiers accordingly

* Remove incomplete integration tests (Postgres and SqlServer)

* Add experimenal tag

* Debezium testing stoppable consumer (apache#18)

* Add try-catch-finally, stop SourceTask at finally.

* Fix warnings

* stopConsumer and processedRecords local variables removed. UT for task stop use case added

* Fix minor code style issue

Co-authored-by: juanitodread <juanitodread@gmail.com>

* Fix style issues (check, spotlessApply) (apache#19)

Co-authored-by: Osvaldo Salinas <osvaldo.salinas@osvaldo.salinas>
Co-authored-by: alejandro.maguey <alejandro.maguey@wizeline.com>
Co-authored-by: osvaldo-salinas <osvaldo.salinas@wizeline.com>
Co-authored-by: Carlos Dominguez <carlos.dominguez@carlos.dominguez>
Co-authored-by: Carlos Domínguez <carlos.dominguez@wizeline.com>
Co-authored-by: Carlos Domínguez <74681048+carlosdominguezwl@users.noreply.github.com>
Co-authored-by: Alejandro Maguey <alexmaguey1@gmail.com>
Co-authored-by: Hassan Reyes <hassanreyes@users.noreply.github.com>
pabloem pushed a commit that referenced this pull request Feb 17, 2021
Debeziumio PoC (#7)

* New DebeziumIO class.

* Merge connector code

* DebeziumIO and MySqlConnector integrated.

* Added FormatFuntion param to Read builder on DebeziumIO.

* Added arguments checker to DebeziumIO.

* Add simple JSON mapper object (#1)

* Add simple JSON mapper object

* Fixed Mapper.

* Add SqlServer connector test

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Fixing MySQL schema DataException

Using file instead of schema should fix it

* MySQL Connector updated from 1.3.0 to 1.3.1

Co-authored-by: osvaldo-salinas <osvaldo.salinas@wizeline.com>
Co-authored-by: Carlos Dominguez <carlos.dominguez@carlos.dominguez>
Co-authored-by: Carlos Domínguez <carlos.dominguez@wizeline.com>

* Add debeziumio tests

* Debeziumio testing json mapper (#3)

* Some code refactors. Use a default DBHistory if not provided

* Add basic tests for Json mapper

* Debeziumio time restriction (#5)

* Add simple JSON mapper object

* Fixed Mapper.

* Add SqlServer connector test

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Added PostgreSql Connector Test

PostgreSql now works with Json mapper

* Fixing MySQL schema DataException

Using file instead of schema should fix it

* MySQL Connector updated from 1.3.0 to 1.3.1

* Some code refactors. Use a default DBHistory if not provided

* Adding based-time restriction

Stop polling after specified amount of time

* Add basic tests for Json mapper

* Adding new restriction

Uses a time-based restriction

* Adding optional restrcition

Uses an optional time-based restriction

Co-authored-by: juanitodread <juanitodread@gmail.com>
Co-authored-by: osvaldo-salinas <osvaldo.salinas@wizeline.com>

* Upgrade DebeziumIO connector (#4)

* Address comments (Change dependencies to testCompile, Set JsonMapper/Coder as default, refactors) (#8)

* Revert file

* Change dependencies to testCompile
* Move Counter sample to unit test

* Set JsonMapper as default mapper function
* Set String Coder as default coder when using JsonMapper
* Change logs from info to debug

* Debeziumio javadoc (#9)

* Adding javadoc

* Added some titles and examples

* Added SourceRecordJson doc

* Added Basic Connector doc

* Added KafkaSourceConsumer doc

* Javadoc cleanup

* Removing BasicConnector

No usages of this class were found overall

* Editing documentation

* Debeziumio fetched records restriction (#10)

* Adding javadoc

* Adding restriction by number of fetched records

Also adding a quick-fix for null value within SourceRecords
Minor fix on both MySQL and PostgreSQL Connectors Tests

* Run either by time or by number of records

* Added DebeziumOffsetTrackerTest

Tests both restrictions: By amount of time and by Number of records

* Removing comment

* DebeziumIO test for DB2. (#11)

* DebeziumIO test for DB2.

* DebeziumIO javadoc.

* Clean code:removed commented code lines on DebeziumIOConnectorTest.java

* Clean code:removing unused imports and using readAsJson().

Co-authored-by: Carlos Domínguez <74681048+carlosdominguezwl@users.noreply.github.com>

* Debezium limit records (now configurable) (#12)

* Adding javadoc

* Records Limit is now configurable

(It was fixed before)

* Debeziumio dockerize (#13)

* Add mysql docker container to tests

* Move debezium mysql integration test to its own file

* Add assertion to verify that the results contains a record.

* Debeziumio readme (#15)

* Adding javadoc

* Adding README file

* Add number of records configuration to the DebeziumIO component (#16)

* Code refactors (#17)

* Remove/ignore null warnings

* Remove DB2 code

* Remove docker dependency in DebeziumIO unit test and max number of recods to MySql integration test

* Change access modifiers accordingly

* Remove incomplete integration tests (Postgres and SqlServer)

* Add experimenal tag

* Debezium testing stoppable consumer (#18)

* Add try-catch-finally, stop SourceTask at finally.

* Fix warnings

* stopConsumer and processedRecords local variables removed. UT for task stop use case added

* Fix minor code style issue

Co-authored-by: juanitodread <juanitodread@gmail.com>

* Fix style issues (check, spotlessApply) (#19)

Co-authored-by: Osvaldo Salinas <osvaldo.salinas@osvaldo.salinas>
Co-authored-by: alejandro.maguey <alejandro.maguey@wizeline.com>
Co-authored-by: osvaldo-salinas <osvaldo.salinas@wizeline.com>
Co-authored-by: Carlos Dominguez <carlos.dominguez@carlos.dominguez>
Co-authored-by: Carlos Domínguez <carlos.dominguez@wizeline.com>
Co-authored-by: Carlos Domínguez <74681048+carlosdominguezwl@users.noreply.github.com>
Co-authored-by: Alejandro Maguey <alexmaguey1@gmail.com>
Co-authored-by: Hassan Reyes <hassanreyes@users.noreply.github.com>

Add missing apache license to README.md

Enabling integration test for DebeziumIO (#20)

Rename connector package cdc=>debezium. Update doc references (#21)

Fix code style on DebeziumIOMySqlConnectorIT
ajothomas referenced this pull request in ajothomas/beam Oct 25, 2021
hengfengli referenced this pull request in hengfengli/beam Mar 21, 2022
* feat: adds extra log message in detect dofn

* feat: adds token to log mdc in read dofn

* refactor: list of parents as hash set

This should prevent duplicates

* fix: adds the initial partition as parent

Adds the initial (fake) partition as a parent of any of its children.
This is necessary to correctly apply wait for parents / wait for
children.

* refactor: makes factory methods use singletons

Makes the action factory, dao factory and mapper factory use singletons
in order to avoid duplicate instance creation. This might be a
bottleneck in the future, so we will need to analyse the latency here.

* refactor: renames the column ParentToken

To the plural ParentTokens
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants