From eba1839ef7a5c529d9da3cae3f9c7ac192b2fc59 Mon Sep 17 00:00:00 2001 From: usingh83 Date: Wed, 5 May 2021 18:44:47 -0600 Subject: [PATCH] # This is a combination of 2 commits. # This is the 1st commit message: # This is a combination of 2 commits. # This is the 1st commit message: Java PreCommit failure fix spotless failure fix Java PreCommit assign nullable correctly Java_Examples_Dataflow PreCommit assign nullable correctly Java_Examples_Dataflow PreCommit assign nullable correctly Java_Examples_Dataflow PreCommit refix Java_Examples_Dataflow PreCommit fix build failure corrected Spotless check Spotless check reorganizing pipeline delete the unused folder Revert "Delete build.gradle" This reverts commit c39a4e44 Delete build.gradle don't need this file adding comments and java docs, and removing unneeded dependencies. Linting the project and making some stuff private Reorganized and redefined to logic as per standard beam IO structure. Lint the files. Added changes for making the implementation more streamlined and understandable Added a connector that streams data from twitter using a Standard Twitter app. # This is the commit message #2: # This is a combination of 15 commits. # This is the 1st commit message: Added a connector that streams data from twitter using a Standard Twitter app. # This is the commit message #2: Added changes for making the implementation more streamlined and understandable # This is the commit message #3: Lint the files. # This is the commit message #4: Reorganized and redefined to logic as per standard beam IO structure. # This is the commit message #5: Linting the project and making some stuff private # This is the commit message #6: adding comments and java docs, and removing unneeded dependencies. # This is the commit message #7: delete the unused folder # This is the commit message #8: reorganizing pipeline # This is the commit message #9: Spotless check # This is the commit message #10: Spotless check # This is the commit message #11: build failure corrected # This is the commit message #12: Java_Examples_Dataflow PreCommit fix # This is the commit message #13: Java_Examples_Dataflow PreCommit refix # This is the commit message #14: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #15: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #2: # This is a combination of 3 commits. # This is the 1st commit message: Java PreCommit failure fix spotless failure fix Java PreCommit assign nullable correctly Java_Examples_Dataflow PreCommit assign nullable correctly Java_Examples_Dataflow PreCommit assign nullable correctly Java_Examples_Dataflow PreCommit refix Java_Examples_Dataflow PreCommit fix build failure corrected Spotless check Spotless check reorganizing pipeline delete the unused folder Revert "Delete build.gradle" This reverts commit c39a4e44 Delete build.gradle don't need this file adding comments and java docs, and removing unneeded dependencies. Linting the project and making some stuff private Reorganized and redefined to logic as per standard beam IO structure. Lint the files. Added changes for making the implementation more streamlined and understandable Added a connector that streams data from twitter using a Standard Twitter app. # This is the commit message #2: # This is a combination of 15 commits. # This is the 1st commit message: Added a connector that streams data from twitter using a Standard Twitter app. # This is the commit message #2: Added changes for making the implementation more streamlined and understandable # This is the commit message #3: Lint the files. # This is the commit message #4: Reorganized and redefined to logic as per standard beam IO structure. # This is the commit message #5: Linting the project and making some stuff private # This is the commit message #6: adding comments and java docs, and removing unneeded dependencies. # This is the commit message #7: delete the unused folder # This is the commit message #8: reorganizing pipeline # This is the commit message #9: Spotless check # This is the commit message #10: Spotless check # This is the commit message #11: build failure corrected # This is the commit message #12: Java_Examples_Dataflow PreCommit fix # This is the commit message #13: Java_Examples_Dataflow PreCommit refix # This is the commit message #14: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #15: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #3: # This is a combination of 16 commits. # This is the 1st commit message: Added a connector that streams data from twitter using a Standard Twitter app. # This is the commit message #2: Added changes for making the implementation more streamlined and understandable # This is the commit message #3: Lint the files. # This is the commit message #4: Reorganized and redefined to logic as per standard beam IO structure. # This is the commit message #5: Linting the project and making some stuff private # This is the commit message #6: adding comments and java docs, and removing unneeded dependencies. # This is the commit message #7: delete the unused folder # This is the commit message #8: reorganizing pipeline # This is the commit message #9: Spotless check # This is the commit message #10: Spotless check # This is the commit message #11: build failure corrected # This is the commit message #12: Java_Examples_Dataflow PreCommit fix # This is the commit message #13: Java_Examples_Dataflow PreCommit refix # This is the commit message #14: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #15: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #16: Java PreCommit assign nullable correctly Java PreCommit assign nullable correctly spotless failure fix Java PreCommit failure fix correcting the if checks cleaning up and adding readme spotless fixed readme fixed and compileJava fix compileJava fix compileJava fix now spotless fix now Java PreCommi fix Java PreCommit fix # This is a combination of 16 commits. # This is the 1st commit message: Added a connector that streams data from twitter using a Standard Twitter app. # This is the commit message #2: Added changes for making the implementation more streamlined and understandable # This is the commit message #3: Lint the files. # This is the commit message #4: Reorganized and redefined to logic as per standard beam IO structure. # This is the commit message #5: Linting the project and making some stuff private # This is the commit message #6: adding comments and java docs, and removing unneeded dependencies. # This is the commit message #7: delete the unused folder # This is the commit message #8: reorganizing pipeline # This is the commit message #9: Spotless check # This is the commit message #10: Spotless check # This is the commit message #11: build failure corrected # This is the commit message #12: Java_Examples_Dataflow PreCommit fix # This is the commit message #13: Java_Examples_Dataflow PreCommit refix # This is the commit message #14: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #15: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #16: Java PreCommit assign nullable correctly Java PreCommit assign nullable correctly spotless failure fix Java PreCommit failure fix correcting the if checks cleaning up and adding readme spotless fixed readme fixed and compileJava fix compileJava fix compileJava fix now spotless fix now Java PreCommi fix Java PreCommit fix # This is a combination of 3 commits. # This is the 1st commit message: Java PreCommit failure fix spotless failure fix Java PreCommit assign nullable correctly Java_Examples_Dataflow PreCommit assign nullable correctly Java_Examples_Dataflow PreCommit assign nullable correctly Java_Examples_Dataflow PreCommit refix Java_Examples_Dataflow PreCommit fix build failure corrected Spotless check Spotless check reorganizing pipeline delete the unused folder Revert "Delete build.gradle" This reverts commit c39a4e44 Delete build.gradle don't need this file adding comments and java docs, and removing unneeded dependencies. Linting the project and making some stuff private Reorganized and redefined to logic as per standard beam IO structure. Lint the files. Added changes for making the implementation more streamlined and understandable Added a connector that streams data from twitter using a Standard Twitter app. # This is the commit message #2: # This is a combination of 15 commits. # This is the 1st commit message: Added a connector that streams data from twitter using a Standard Twitter app. # This is the commit message #2: Added changes for making the implementation more streamlined and understandable # This is the commit message #3: Lint the files. # This is the commit message #4: Reorganized and redefined to logic as per standard beam IO structure. # This is the commit message #5: Linting the project and making some stuff private # This is the commit message #6: adding comments and java docs, and removing unneeded dependencies. # This is the commit message #7: delete the unused folder # This is the commit message #8: reorganizing pipeline # This is the commit message #9: Spotless check # This is the commit message #10: Spotless check # This is the commit message #11: build failure corrected # This is the commit message #12: Java_Examples_Dataflow PreCommit fix # This is the commit message #13: Java_Examples_Dataflow PreCommit refix # This is the commit message #14: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #15: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #3: # This is a combination of 16 commits. # This is the 1st commit message: Added a connector that streams data from twitter using a Standard Twitter app. # This is the commit message #2: Added changes for making the implementation more streamlined and understandable # This is the commit message #3: Lint the files. # This is the commit message #4: Reorganized and redefined to logic as per standard beam IO structure. # This is the commit message #5: Linting the project and making some stuff private # This is the commit message #6: adding comments and java docs, and removing unneeded dependencies. # This is the commit message #7: delete the unused folder # This is the commit message #8: reorganizing pipeline # This is the commit message #9: Spotless check # This is the commit message #10: Spotless check # This is the commit message #11: build failure corrected # This is the commit message #12: Java_Examples_Dataflow PreCommit fix # This is the commit message #13: Java_Examples_Dataflow PreCommit refix # This is the commit message #14: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #15: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #16: Java PreCommit assign nullable correctly Java PreCommit assign nullable correctly spotless failure fix Java PreCommit failure fix correcting the if checks cleaning up and adding readme spotless fixed readme fixed and compileJava fix compileJava fix compileJava fix now spotless fix now Java PreCommi fix Java PreCommit fix # This is a combination of 16 commits. # This is the 1st commit message: Added a connector that streams data from twitter using a Standard Twitter app. # This is the commit message #2: Added changes for making the implementation more streamlined and understandable # This is the commit message #3: Lint the files. # This is the commit message #4: Reorganized and redefined to logic as per standard beam IO structure. # This is the commit message #5: Linting the project and making some stuff private # This is the commit message #6: adding comments and java docs, and removing unneeded dependencies. # This is the commit message #7: delete the unused folder # This is the commit message #8: reorganizing pipeline # This is the commit message #9: Spotless check # This is the commit message #10: Spotless check # This is the commit message #11: build failure corrected # This is the commit message #12: Java_Examples_Dataflow PreCommit fix # This is the commit message #13: Java_Examples_Dataflow PreCommit refix # This is the commit message #14: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #15: Java_Examples_Dataflow PreCommit assign nullable correctly # This is the commit message #16: Java PreCommit assign nullable correctly Java PreCommit assign nullable correctly spotless failure fix Java PreCommit failure fix correcting the if checks cleaning up and adding readme spotless fixed readme fixed and compileJava fix compileJava fix compileJava fix now spotless fix now Java PreCommi fix Java PreCommit fix Final Commit with all changes Added unit test adding examples for usage usage for TwitterIO added and Java PreCommit failure fix Spotless PreCommit failure fix --- examples/java/build.gradle | 1 - .../ReadFromTwitterDoFn.java | 142 ----------------- .../TwitterStreamGenerator/TwitterConfig.java | 94 ----------- .../TwitterConnection.java | 96 ----------- .../TwitterStreamGenerator/TwitterIO.java | 150 ------------------ .../TwitterStreamGenerator/TwitterStream.java | 75 --------- .../complete/twitterstreamgenerator/README.md | 41 +++++ .../ReadFromTwitterDoFn.java | 130 +++++++++++---- .../twitterstreamgenerator/TwitterConfig.java | 102 +++++++++--- .../TwitterConnection.java | 18 ++- .../twitterstreamgenerator/TwitterIO.java | 119 +++----------- .../twitterstreamgenerator/TwitterStream.java | 43 ++++- .../ReadFromTwitterDoFnTest.java | 90 +++++++++++ 13 files changed, 389 insertions(+), 712 deletions(-) delete mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/ReadFromTwitterDoFn.java delete mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterConfig.java delete mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterConnection.java delete mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterIO.java delete mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterStream.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/README.md create mode 100644 examples/java/src/test/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFnTest.java diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 1e99d25a8421a..d03c5aa2f797f 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -52,7 +52,6 @@ configurations.sparkRunnerPreCommit { } dependencies { - implementation group: 'org.twitter4j', name: 'twitter4j-stream', version: '4.0.7' compile enforcedPlatform(library.java.google_cloud_platform_libraries_bom) compile library.java.vendored_guava_26_0_jre compile library.java.kafka_clients diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/ReadFromTwitterDoFn.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/ReadFromTwitterDoFn.java deleted file mode 100644 index b925e2433505d..0000000000000 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/ReadFromTwitterDoFn.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.examples.complete.TwitterStreamGenerator; - -import java.io.IOException; -import java.io.Serializable; -import java.util.concurrent.BlockingQueue; -import org.apache.beam.sdk.io.range.OffsetRange; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; -import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import twitter4j.Status; - -/** Splittable dofn that read live data off twitter* */ -@DoFn.UnboundedPerElement -final class ReadFromTwitterDoFn extends DoFn { - static Long maxTweetsCount = Long.MAX_VALUE; - - ReadFromTwitterDoFn(Long maxTweetsCount) { - ReadFromTwitterDoFn.maxTweetsCount = maxTweetsCount; - } - - ReadFromTwitterDoFn() {} - - /* Logger for class.*/ - private static final Logger LOG = LoggerFactory.getLogger(ReadFromTwitterDoFn.class); - - static class OffsetTracker extends RestrictionTracker implements Serializable { - private OffsetRange restriction; - - OffsetTracker(OffsetRange holder) { - this.restriction = holder; - } - - @Override - public boolean tryClaim(Long position) { - LOG.info("-------------- Claiming " + position + " used to have: " + restriction); - long fetchedRecords = this.restriction == null ? 0 : this.restriction.getTo() + 1; - if (fetchedRecords > maxTweetsCount) { - return false; - } - this.restriction = new OffsetRange(0, fetchedRecords); - return true; - } - - @Override - public OffsetRange currentRestriction() { - return restriction; - } - - @Override - public SplitResult trySplit(double fractionOfRemainder) { - LOG.info("-------------- Trying to split: fractionOfRemainder=" + fractionOfRemainder); - return SplitResult.of(new OffsetRange(0, 0), restriction); - } - - @Override - public void checkDone() throws IllegalStateException {} - - @Override - public IsBounded isBounded() { - return IsBounded.UNBOUNDED; - } - } - - @GetInitialWatermarkEstimatorState - public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) { - return currentElementTimestamp; - } - - private static Instant ensureTimestampWithinBounds(Instant timestamp) { - if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE; - } - return timestamp; - } - - @NewWatermarkEstimator - public WatermarkEstimators.Manual newWatermarkEstimator( - @WatermarkEstimatorState Instant watermarkEstimatorState) { - return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState)); - } - - @DoFn.GetInitialRestriction - public OffsetRange getInitialRestriction() throws IOException { - return new OffsetRange(0, 0); - } - - @DoFn.NewTracker - public RestrictionTracker newTracker( - @DoFn.Restriction OffsetRange restriction) { - return new OffsetTracker(restriction); - } - - @DoFn.ProcessElement - public DoFn.ProcessContinuation processElement( - @Element TwitterConfig twitterConfig, - DoFn.OutputReceiver out, - RestrictionTracker tracker, - ManualWatermarkEstimator watermarkEstimator) { - LOG.info("In Read From Twitter Do Fn"); - TwitterConnection twitterConnection = TwitterConnection.getInstance(twitterConfig); - BlockingQueue queue = twitterConnection.getQueue(); - while (!queue.isEmpty()) { - Status status = queue.poll(); - if (status != null) { - if (!tracker.tryClaim(status.getId())) { - twitterConnection.closeStream(); - return DoFn.ProcessContinuation.stop(); - } - Instant currentInstant = Instant.ofEpochMilli(status.getCreatedAt().getTime()); - watermarkEstimator.setWatermark(currentInstant); - out.outputWithTimestamp(status, currentInstant); - } - } - return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)); - } -} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterConfig.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterConfig.java deleted file mode 100644 index b30486753b211..0000000000000 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterConfig.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.examples.complete.TwitterStreamGenerator; - -import java.io.Serializable; -import java.util.List; -import java.util.Objects; -import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.checkerframework.checker.initialization.qual.Initialized; -import org.checkerframework.checker.nullness.qual.Nullable; - -/** {@link Serializable} object to store twitter configurations for a connection * */ -@DefaultCoder(SerializableCoder.class) -public class TwitterConfig implements Serializable { - private final String key; - private final String secret; - private final String token; - private final String tokenSecret; - private final List filters; - private final String language; - - @Override - public boolean equals(@Initialized @Nullable Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - TwitterConfig that = (TwitterConfig) o; - return Objects.equals(key, that.key) - && Objects.equals(secret, that.secret) - && Objects.equals(token, that.token) - && Objects.equals(tokenSecret, that.tokenSecret) - && Objects.equals(filters, that.filters) - && Objects.equals(language, that.language); - } - - @Override - public int hashCode() { - return Objects.hash(key, secret, token, tokenSecret, filters, language); - } - - public TwitterConfig( - String key, - String secret, - String token, - String tokenSecret, - List filters, - String language) { - this.key = key; - this.secret = secret; - this.token = token; - this.tokenSecret = tokenSecret; - this.filters = filters; - this.language = language; - } - - public String getKey() { - return key; - } - - public String getSecret() { - return secret; - } - - public String getToken() { - return token; - } - - public String getTokenSecret() { - return tokenSecret; - } - - public List getFilters() { - return filters; - } - - public String getLanguage() { - return language; - } -} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterConnection.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterConnection.java deleted file mode 100644 index 6c566d10fc644..0000000000000 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterConnection.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.examples.complete.TwitterStreamGenerator; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import org.checkerframework.checker.nullness.qual.Nullable; -import twitter4j.*; -import twitter4j.TwitterStream; -import twitter4j.conf.ConfigurationBuilder; - -/** Singleton class for twitter connection* */ -class TwitterConnection { - private final BlockingQueue queue; - private final TwitterStream twitterStream; - private static @Nullable TwitterConnection single_instance = null; - - /** - * Creates a new Twitter connection - * - * @param twitterConfig - */ - private TwitterConnection(TwitterConfig twitterConfig) { - this.queue = new LinkedBlockingQueue<>(); - ConfigurationBuilder cb = new ConfigurationBuilder(); - cb.setDebugEnabled(true) - .setOAuthConsumerKey(twitterConfig.getKey()) - .setOAuthConsumerSecret(twitterConfig.getSecret()) - .setOAuthAccessToken(twitterConfig.getToken()) - .setOAuthAccessTokenSecret(twitterConfig.getTokenSecret()); - - this.twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); - StatusListener listener = - new StatusListener() { - @Override - public void onException(Exception e) { - e.printStackTrace(); - } - - @Override - public void onDeletionNotice(StatusDeletionNotice arg) {} - - @Override - public void onScrubGeo(long userId, long upToStatusId) {} - - @Override - public void onStallWarning(StallWarning warning) {} - - @Override - public void onStatus(Status status) { - queue.offer(status); - } - - @Override - public void onTrackLimitationNotice(int numberOfLimitedStatuses) {} - }; - FilterQuery tweetFilterQuery = new FilterQuery(); - for (String filter : twitterConfig.getFilters()) { - tweetFilterQuery.track(filter); - } - tweetFilterQuery.language(twitterConfig.getLanguage()); - this.twitterStream.addListener(listener); - this.twitterStream.filter(tweetFilterQuery); - } - - public static TwitterConnection getInstance(TwitterConfig twitterConfig) { - if (single_instance != null) { - return single_instance; - } - single_instance = new TwitterConnection(twitterConfig); - return single_instance; - } - - public BlockingQueue getQueue() { - return this.queue; - } - - public void closeStream() { - this.twitterStream.shutdown(); - } -} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterIO.java deleted file mode 100644 index 433cdde082610..0000000000000 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterIO.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.examples.complete.TwitterStreamGenerator; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import twitter4j.Status; - -/** - * An unbounded source for twitter - * stream. - */ -public class TwitterIO { - - /** - * Initializes the stream by converting input to a Twitter connection configuration - * - * @param key - * @param secret - * @param token - * @param tokenSecret - * @param filters - * @param language - * @return - */ - public static PTransform> readStandardStream( - String key, - String secret, - String token, - String tokenSecret, - List filters, - String language, - Long maxTweetsCount) { - return new TwitterIO.Read.Builder() - .setKey(key) - .setSecret(secret) - .setToken(token) - .setTokenSecret(tokenSecret) - .setFilters(filters) - .setLanguage(language) - .setMaxTweetsCount(maxTweetsCount) - .build(); - } - /** A {@link PTransform} to read from Twitter stream. usage and configuration. */ - private static class Read extends PTransform> { - private final String key; - private final String secret; - private final String token; - private final String tokenSecret; - private final List filters; - private final String language; - private final Long maxTweetsCount; - - private Read(Builder builder) { - this.key = builder.key; - this.secret = builder.secret; - this.token = builder.token; - this.tokenSecret = builder.tokenSecret; - this.filters = builder.filters; - this.language = builder.language; - this.maxTweetsCount = builder.maxTweetsCount; - } - - @Override - public PCollection expand(PBegin input) throws IllegalArgumentException { - if (key == null - || secret == null - || token == null - || tokenSecret == null - || filters == null - || language == null) { - throw new IllegalArgumentException("Please provide key, secret, token and token secret"); - } - - return input - .apply(Create.of(new TwitterConfig(key, secret, token, tokenSecret, filters, language))) - .apply(ParDo.of(new ReadFromTwitterDoFn(maxTweetsCount))); - } - - private static class Builder { - private String key = ""; - private String secret = ""; - private String token = ""; - private String tokenSecret = ""; - private List filters = new ArrayList<>(); - private String language = "en"; - private Long maxTweetsCount = Long.MAX_VALUE; - - TwitterIO.Read.Builder setKey(final String key) { - this.key = key; - return this; - } - - TwitterIO.Read.Builder setSecret(final String secret) { - this.secret = secret; - return this; - } - - TwitterIO.Read.Builder setToken(final String token) { - this.token = token; - return this; - } - - TwitterIO.Read.Builder setTokenSecret(final String tokenSecret) { - this.tokenSecret = tokenSecret; - return this; - } - - TwitterIO.Read.Builder setFilters(final List filters) { - this.filters = filters; - return this; - } - - TwitterIO.Read.Builder setLanguage(final String language) { - this.language = language; - return this; - } - - TwitterIO.Read.Builder setMaxTweetsCount(final Long maxTweetsCount) { - this.maxTweetsCount = maxTweetsCount; - return this; - } - - TwitterIO.Read build() { - return new TwitterIO.Read(this); - } - } - } -} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterStream.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterStream.java deleted file mode 100644 index 290a6f5b6f9b6..0000000000000 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterStream.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.examples.complete.TwitterStreamGenerator; - -import java.util.Arrays; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.*; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import twitter4j.Status; - -/** - * The {@link TwitterStream} pipeline is a streaming pipeline which ingests data in JSON format from - * Twitter, and outputs the resulting records to console. Stream configurations are specified by the - * user as template parameters.
- */ -public class TwitterStream { - - /* Logger for class.*/ - private static final Logger LOG = LoggerFactory.getLogger(TwitterStream.class); - - /** - * Main entry point for pipeline execution. - * - * @param args Command line arguments to the pipeline. - */ - public static void main(String[] args) { - Pipeline pipeline = Pipeline.create(); - Window.configure() - .triggering( - Repeatedly.forever( - AfterFirst.of( - AfterPane.elementCountAtLeast(10), - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardMinutes(2))))); - PCollection tweetStream = - pipeline - .apply( - "Create Twitter Connection Configuration", - TwitterIO.readStandardStream( - "", "", "", "", Arrays.asList("", ""), "", Long.MAX_VALUE)) - .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))); - tweetStream.apply( - "Output Tweets to console", - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(@Element Status element, OutputReceiver receiver) { - LOG.info("Output tweets: " + element.getText()); - receiver.output(element); - } - })); - - pipeline.run(); - } -} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/README.md b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/README.md new file mode 100644 index 0000000000000..41e64fa74389a --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/README.md @@ -0,0 +1,41 @@ + + +# Twitter Connector + +This directory contains an example pipelines for how to perform continues stream of data from twitter streaming api ( or any other 3rd party API ). This include: + +
    +
  • Splitable Dofn + — A simple example of implementation of splittable dofn on an unbounded source with a simple incrementing watermarking logic.
  • +
  • Connection Management + — The streaming pipeline example makes sure that only one Twitter connection is active at a time for a configuration. +
  • +
  • Terminating pipeline by time or elements + — The streaming pipeline keeps track of time and data collecting so far and terminated when the limit specified in passed. +
  • +
+ +## Requirements + +- Java 8 +- Twitter developer app account and streaming credentials. + +This section describes what is needed to get the example up and running. + +- Gradle preparation +- Local execution \ No newline at end of file diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFn.java b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFn.java index 91567dc4509da..d85bcf746fcdc 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFn.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFn.java @@ -19,7 +19,10 @@ import java.io.IOException; import java.io.Serializable; +import java.util.Objects; import java.util.concurrent.BlockingQueue; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; @@ -27,6 +30,8 @@ import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -35,45 +40,88 @@ /** Splittable dofn that read live data off twitter. * */ @DoFn.UnboundedPerElement -final class ReadFromTwitterDoFn extends DoFn { - Long maxTweetsCount; +final class ReadFromTwitterDoFn extends DoFn { - ReadFromTwitterDoFn(Long maxTweetsCount) { - this.maxTweetsCount = maxTweetsCount; - } + private final DateTime startTime; + ReadFromTwitterDoFn() { + this.startTime = new DateTime(); + } /* Logger for class.*/ private static final Logger LOG = LoggerFactory.getLogger(ReadFromTwitterDoFn.class); - static class OffsetTracker extends RestrictionTracker implements Serializable { - private OffsetRange restriction; - private final Long maxTweetsCount; + static class OffsetHolder implements Serializable { + public final @Nullable TwitterConfig twitterConfig; + public final @Nullable Long fetchedRecords; + + OffsetHolder(@Nullable TwitterConfig twitterConfig, @Nullable Long fetchedRecords) { + this.twitterConfig = twitterConfig; + this.fetchedRecords = fetchedRecords; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OffsetHolder that = (OffsetHolder) o; + return Objects.equals(twitterConfig, that.twitterConfig) + && Objects.equals(fetchedRecords, that.fetchedRecords); + } + + @Override + public int hashCode() { + return Objects.hash(twitterConfig, fetchedRecords); + } + } + + static class OffsetTracker extends RestrictionTracker + implements Serializable { + private OffsetHolder restriction; + private final DateTime startTime; - OffsetTracker(OffsetRange holder, Long maxTweetsCount) { + OffsetTracker(OffsetHolder holder, DateTime startTime) { this.restriction = holder; - this.maxTweetsCount = maxTweetsCount; + this.startTime = startTime; } @Override - public boolean tryClaim(Long position) { - LOG.info("-------------- Claiming " + position + " used to have: " + restriction); - long fetchedRecords = this.restriction == null ? 0 : this.restriction.getTo() + 1; - if (fetchedRecords > maxTweetsCount) { + public boolean tryClaim(TwitterConfig twitterConfig) { + LOG.info( + "-------------- Claiming " + + twitterConfig.hashCode() + + " used to have: " + + restriction.fetchedRecords); + long fetchedRecords = + this.restriction == null || this.restriction.fetchedRecords == null + ? 0 + : this.restriction.fetchedRecords + 1; + long elapsedTime = System.currentTimeMillis() - startTime.getMillis(); + final long millis = 60 * 1000; + LOG.info( + "-------------- Time running: {} / {}", + elapsedTime, + (twitterConfig.getMinutesToRun() * millis)); + if (fetchedRecords > twitterConfig.getTweetsCount() + || elapsedTime > twitterConfig.getMinutesToRun() * millis) { return false; } - this.restriction = new OffsetRange(0, fetchedRecords); + this.restriction = new OffsetHolder(twitterConfig, fetchedRecords); return true; } @Override - public OffsetRange currentRestriction() { + public OffsetHolder currentRestriction() { return restriction; } @Override - public SplitResult trySplit(double fractionOfRemainder) { + public SplitResult trySplit(double fractionOfRemainder) { LOG.info("-------------- Trying to split: fractionOfRemainder=" + fractionOfRemainder); - return SplitResult.of(new OffsetRange(0, 0), restriction); + return SplitResult.of(new OffsetHolder(null, 0L), restriction); } @Override @@ -106,37 +154,59 @@ public WatermarkEstimators.Manual newWatermarkEstimator( } @DoFn.GetInitialRestriction - public OffsetRange getInitialRestriction() throws IOException { - return new OffsetRange(0, 0); + public OffsetHolder getInitialRestriction(@Element TwitterConfig twitterConfig) + throws IOException { + return new OffsetHolder(null, 0L); } @DoFn.NewTracker - public RestrictionTracker newTracker( - @DoFn.Restriction OffsetRange restriction) { - return new OffsetTracker(restriction, maxTweetsCount); + public RestrictionTracker newTracker( + @Element TwitterConfig twitterConfig, @DoFn.Restriction OffsetHolder restriction) { + return new OffsetTracker(restriction, startTime); + } + + @GetRestrictionCoder + public Coder getRestrictionCoder() { + return SerializableCoder.of(OffsetHolder.class); } @DoFn.ProcessElement public DoFn.ProcessContinuation processElement( @Element TwitterConfig twitterConfig, - DoFn.OutputReceiver out, - RestrictionTracker tracker, + DoFn.OutputReceiver out, + RestrictionTracker tracker, ManualWatermarkEstimator watermarkEstimator) { LOG.info("In Read From Twitter Do Fn"); TwitterConnection twitterConnection = TwitterConnection.getInstance(twitterConfig); BlockingQueue queue = twitterConnection.getQueue(); + if (queue.isEmpty()) { + if (checkIfDone(twitterConnection, twitterConfig, tracker)) { + return DoFn.ProcessContinuation.stop(); + } + } while (!queue.isEmpty()) { Status status = queue.poll(); + if (checkIfDone(twitterConnection, twitterConfig, tracker)) { + return DoFn.ProcessContinuation.stop(); + } if (status != null) { - if (!tracker.tryClaim(status.getId())) { - twitterConnection.closeStream(); - return DoFn.ProcessContinuation.stop(); - } Instant currentInstant = Instant.ofEpochMilli(status.getCreatedAt().getTime()); watermarkEstimator.setWatermark(currentInstant); - out.outputWithTimestamp(status, currentInstant); + out.outputWithTimestamp(status.getText(), currentInstant); } } return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)); } + + boolean checkIfDone( + TwitterConnection twitterConnection, + TwitterConfig twitterConfig, + RestrictionTracker tracker) { + if (!tracker.tryClaim(twitterConfig)) { + twitterConnection.closeStream(); + return true; + } else { + return false; + } + } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConfig.java b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConfig.java index 5b368e2c657f5..60346893fceb0 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConfig.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConfig.java @@ -18,11 +18,11 @@ package org.apache.beam.examples.complete.twitterstreamgenerator; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.SerializableCoder; -import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.Nullable; /** {@link Serializable} object to store twitter configurations for a connection. * */ @@ -34,9 +34,22 @@ public class TwitterConfig implements Serializable { private final String tokenSecret; private final List filters; private final String language; + private final Long tweetsCount; + private final Integer minutesToRun; + + private TwitterConfig(TwitterConfig.Builder builder) { + this.key = builder.key; + this.secret = builder.secret; + this.token = builder.token; + this.tokenSecret = builder.tokenSecret; + this.filters = builder.filters; + this.language = builder.language; + this.tweetsCount = builder.tweetsCount; + this.minutesToRun = builder.minutesToRun; + } @Override - public boolean equals(@Initialized @Nullable Object o) { + public boolean equals(@Nullable Object o) { if (this == o) { return true; } @@ -49,27 +62,15 @@ public boolean equals(@Initialized @Nullable Object o) { && Objects.equals(token, that.token) && Objects.equals(tokenSecret, that.tokenSecret) && Objects.equals(filters, that.filters) - && Objects.equals(language, that.language); + && Objects.equals(language, that.language) + && Objects.equals(tweetsCount, that.tweetsCount) + && Objects.equals(minutesToRun, that.minutesToRun); } @Override public int hashCode() { - return Objects.hash(key, secret, token, tokenSecret, filters, language); - } - - public TwitterConfig( - String key, - String secret, - String token, - String tokenSecret, - List filters, - String language) { - this.key = key; - this.secret = secret; - this.token = token; - this.tokenSecret = tokenSecret; - this.filters = filters; - this.language = language; + return Objects.hash( + key, secret, token, tokenSecret, filters, language, tweetsCount, minutesToRun); } public String getKey() { @@ -95,4 +96,67 @@ public List getFilters() { public String getLanguage() { return language; } + + public Long getTweetsCount() { + return tweetsCount; + } + + public Integer getMinutesToRun() { + return minutesToRun; + } + + public static class Builder { + private String key = ""; + private String secret = ""; + private String token = ""; + private String tokenSecret = ""; + private List filters = new ArrayList<>(); + private String language = "en"; + private Long tweetsCount = Long.MAX_VALUE; + private Integer minutesToRun = Integer.MAX_VALUE; + + TwitterConfig.Builder setKey(final String key) { + this.key = key; + return this; + } + + TwitterConfig.Builder setSecret(final String secret) { + this.secret = secret; + return this; + } + + TwitterConfig.Builder setToken(final String token) { + this.token = token; + return this; + } + + TwitterConfig.Builder setTokenSecret(final String tokenSecret) { + this.tokenSecret = tokenSecret; + return this; + } + + TwitterConfig.Builder setFilters(final List filters) { + this.filters = filters; + return this; + } + + TwitterConfig.Builder setLanguage(final String language) { + this.language = language; + return this; + } + + TwitterConfig.Builder setTweetsCount(final Long tweetsCount) { + this.tweetsCount = tweetsCount; + return this; + } + + TwitterConfig.Builder setMinutesToRun(final Integer minutesToRun) { + this.minutesToRun = minutesToRun; + return this; + } + + TwitterConfig build() { + return new TwitterConfig(this); + } + } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConnection.java b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConnection.java index d5c6814239372..e6cb223f26867 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConnection.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterConnection.java @@ -18,8 +18,8 @@ package org.apache.beam.examples.complete.twitterstreamgenerator; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; -import org.checkerframework.checker.nullness.qual.Nullable; import twitter4j.FilterQuery; import twitter4j.StallWarning; import twitter4j.Status; @@ -34,14 +34,15 @@ class TwitterConnection { private final BlockingQueue queue; private final TwitterStream twitterStream; private static final Object lock = new Object(); - private static @Nullable TwitterConnection singleInstance = null; + static final ConcurrentHashMap INSTANCE_MAP = + new ConcurrentHashMap<>(); /** * Creates a new Twitter connection. * - * @param twitterConfig + * @param twitterConfig configuration for twitter connection */ - private TwitterConnection(TwitterConfig twitterConfig) { + TwitterConnection(TwitterConfig twitterConfig) { this.queue = new LinkedBlockingQueue<>(); ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) @@ -89,12 +90,13 @@ public void onTrackLimitationNotice(int numberOfLimitedStatuses) {} public static TwitterConnection getInstance(TwitterConfig twitterConfig) { synchronized (lock) { - if (singleInstance != null) { - return singleInstance; + if (INSTANCE_MAP.containsKey(twitterConfig)) { + return INSTANCE_MAP.get(twitterConfig); } - singleInstance = new TwitterConnection(twitterConfig); + TwitterConnection singleInstance = new TwitterConnection(twitterConfig); + INSTANCE_MAP.put(twitterConfig, singleInstance); + return singleInstance; } - return singleInstance; } public BlockingQueue getQueue() { diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterIO.java index d30156367339b..4e2711baf4211 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterIO.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterIO.java @@ -24,121 +24,54 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import twitter4j.Status; /** * An unbounded source for twitter - * stream. + * stream. PTransforms for streaming live tweets from twitter. Reading from Twitter is supported by + * read() + * + *

Standard Twitter API can be read using a list of Twitter Config + * readStandardStream(List) + * + *

We allow multiple Twitter configurations to demonstrate how multiple twitter streams can be + * combined in a single pipeline. + * + *

TwitterIO.readStandardStream( Arrays.asList( new TwitterConfig .Builder() .setKey("") + * .setSecret("") .setToken("") .setTokenSecret("") .setFilters(Arrays.asList("", "")) + * .setLanguage("en") .setTweetsCount(10L) .setMinutesToRun(1) .build())) */ public class TwitterIO { /** * Initializes the stream by converting input to a Twitter connection configuration. * - * @param key - * @param secret - * @param token - * @param tokenSecret - * @param filters - * @param language - * @return + * @param twitterConfigs list of twitter config + * @return PTransform of statuses */ - public static PTransform> readStandardStream( - String key, - String secret, - String token, - String tokenSecret, - List filters, - String language, - Long maxTweetsCount) { - return new TwitterIO.Read.Builder() - .setKey(key) - .setSecret(secret) - .setToken(token) - .setTokenSecret(tokenSecret) - .setFilters(filters) - .setLanguage(language) - .setMaxTweetsCount(maxTweetsCount) - .build(); + public static PTransform> readStandardStream( + List twitterConfigs) { + return new TwitterIO.Read.Builder().setTwitterConfig(twitterConfigs).build(); } + /** A {@link PTransform} to read from Twitter stream. usage and configuration. */ - private static class Read extends PTransform> { - private final String key; - private final String secret; - private final String token; - private final String tokenSecret; - private final List filters; - private final String language; - private final Long maxTweetsCount; + private static class Read extends PTransform> { + private final List twitterConfigs; private Read(Builder builder) { - this.key = builder.key; - this.secret = builder.secret; - this.token = builder.token; - this.tokenSecret = builder.tokenSecret; - this.filters = builder.filters; - this.language = builder.language; - this.maxTweetsCount = builder.maxTweetsCount; + this.twitterConfigs = builder.twitterConfigs; } @Override - public PCollection expand(PBegin input) throws IllegalArgumentException { - if (key == null - || secret == null - || token == null - || tokenSecret == null - || filters == null - || language == null) { - throw new IllegalArgumentException("Please provide key, secret, token and token secret"); - } - - return input - .apply(Create.of(new TwitterConfig(key, secret, token, tokenSecret, filters, language))) - .apply(ParDo.of(new ReadFromTwitterDoFn(maxTweetsCount))); + public PCollection expand(PBegin input) throws IllegalArgumentException { + return input.apply(Create.of(this.twitterConfigs)).apply(ParDo.of(new ReadFromTwitterDoFn())); } private static class Builder { - private String key = ""; - private String secret = ""; - private String token = ""; - private String tokenSecret = ""; - private List filters = new ArrayList<>(); - private String language = "en"; - private Long maxTweetsCount = Long.MAX_VALUE; - - TwitterIO.Read.Builder setKey(final String key) { - this.key = key; - return this; - } - - TwitterIO.Read.Builder setSecret(final String secret) { - this.secret = secret; - return this; - } - - TwitterIO.Read.Builder setToken(final String token) { - this.token = token; - return this; - } - - TwitterIO.Read.Builder setTokenSecret(final String tokenSecret) { - this.tokenSecret = tokenSecret; - return this; - } - - TwitterIO.Read.Builder setFilters(final List filters) { - this.filters = filters; - return this; - } - - TwitterIO.Read.Builder setLanguage(final String language) { - this.language = language; - return this; - } + List twitterConfigs = new ArrayList<>(); - TwitterIO.Read.Builder setMaxTweetsCount(final Long maxTweetsCount) { - this.maxTweetsCount = maxTweetsCount; + TwitterIO.Read.Builder setTwitterConfig(final List twitterConfigs) { + this.twitterConfigs = twitterConfigs; return this; } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterStream.java b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterStream.java index 355a3a74eb5bd..df79fac7dd4bd 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterStream.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/twitterstreamgenerator/TwitterStream.java @@ -37,6 +37,31 @@ * The {@link TwitterStream} pipeline is a streaming pipeline which ingests data in JSON format from * Twitter, and outputs the resulting records to console. Stream configurations are specified by the * user as template parameters.
+ * + *

Concepts: API connectors and streaming; splittable Dofn and watermarking ; logging + * + *

To execute this pipeline locally, specify key, secret, token, token-secret and filters to + * filter stream with, for your twitter streaming app.You can also set number of tweets ( use set + * TweetsCount - default Long.MAX_VALUE ) you wish to stream and/or the number of minutes to run the + * pipeline ( use set MinutesToRun: default Integer.MAX_VALUE ) : + * + *

{@code
+ * new TwitterConfig
+ *        .Builder()
+ *        .setKey("")
+ *        .setSecret("")
+ *        .setToken("")
+ *        .setTokenSecret("")
+ *        .setFilters(Arrays.asList("", "")).build()
+ * }
+ * + *

To change the runner( does not works on Dataflow ), specify: + * + *

{@code
+ * --runner=YOUR_SELECTED_RUNNER
+ * }
+ * + * See examples/java/README.md for instructions about how to configure different runners. */ public class TwitterStream { @@ -57,20 +82,30 @@ public static void main(String[] args) { AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(2))))); - PCollection tweetStream = + PCollection tweetStream = pipeline .apply( "Create Twitter Connection Configuration", TwitterIO.readStandardStream( - "", "", "", "", Arrays.asList("", ""), "", Long.MAX_VALUE)) + Arrays.asList( + new TwitterConfig.Builder() + .setKey("") + .setSecret("") + .setToken("") + .setTokenSecret("") + .setFilters(Arrays.asList("", "")) + .setLanguage("en") + .setTweetsCount(10L) + .setMinutesToRun(1) + .build()))) .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))); tweetStream.apply( "Output Tweets to console", ParDo.of( - new DoFn() { + new DoFn() { @ProcessElement public void processElement(@Element Status element, OutputReceiver receiver) { - LOG.info("Output tweets: " + element.getText()); + LOG.info("Output tweets: " + element); receiver.output(element); } })); diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFnTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFnTest.java new file mode 100644 index 0000000000000..6ba34a3579c62 --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/twitterstreamgenerator/ReadFromTwitterDoFnTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.twitterstreamgenerator; + +import static org.junit.Assert.assertArrayEquals; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.IntStream; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import twitter4j.Status; + +@RunWith(JUnit4.class) +public class ReadFromTwitterDoFnTest { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public final ExpectedException expectedException = ExpectedException.none(); + @Mock TwitterConnection twitterConnection; + @Mock Status status1; + @Mock Status status2; + @Mock Status status3; + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + + @Before + public void setUp() throws JsonProcessingException { + MockitoAnnotations.initMocks(this); + when(status1.getText()).thenReturn("Breaking News1"); + when(status1.getCreatedAt()).thenReturn(new Date()); + when(status2.getText()).thenReturn("Breaking News2"); + when(status2.getCreatedAt()).thenReturn(new Date()); + when(status3.getText()).thenReturn("Breaking News3"); + when(status3.getCreatedAt()).thenReturn(new Date()); + queue.offer(status1); + queue.offer(status2); + queue.offer(status3); + } + + @Test + public void testTwitterRead() throws JsonProcessingException { + TwitterConfig twitterConfig = new TwitterConfig.Builder().setTweetsCount(3L).build(); + TwitterConnection.INSTANCE_MAP.put(twitterConfig, twitterConnection); + when(twitterConnection.getQueue()).thenReturn(queue); + PCollection result = + pipeline + .apply("Create Twitter Connection Configuration", Create.of(twitterConfig)) + .apply(ParDo.of(new ReadFromTwitterDoFn())); + PAssert.that(result) + .satisfies( + pcollection -> { + List output = new ArrayList<>(); + pcollection.forEach(output::add); + String[] expected = {"Breaking News1", "Breaking News2", "Breaking News3"}; + String[] actual = new String[output.size()]; + IntStream.range(0, output.size()).forEach((i) -> actual[i] = output.get(i)); + assertArrayEquals("Mismatch found in output", actual, expected); + return null; + }); + pipeline.run(); + } +}