Skip to content

Commit

Permalink
# This is a combination of 2 commits.
Browse files Browse the repository at this point in the history
# This is the 1st commit message:

Java PreCommit failure fix

spotless failure fix

 Java PreCommit assign nullable correctly

Java_Examples_Dataflow PreCommit assign nullable correctly

Java_Examples_Dataflow PreCommit assign nullable correctly

Java_Examples_Dataflow PreCommit refix

Java_Examples_Dataflow PreCommit fix

build failure corrected

Spotless check

Spotless check

reorganizing pipeline

delete the unused folder

Revert "Delete build.gradle"

This reverts commit c39a4e4

Delete build.gradle

don't need this file

adding comments and java docs, and removing unneeded dependencies.

Linting the project and making some stuff private

Reorganized and redefined to logic as per standard beam IO structure.

Lint the files.

Added changes for making the implementation more streamlined and understandable

Added a connector that streams data from twitter using a Standard Twitter app.

# This is the commit message apache#2:

# This is a combination of 15 commits.
# This is the 1st commit message:

Added a connector that streams data from twitter using a Standard Twitter app.

# This is the commit message apache#2:

Added changes for making the implementation more streamlined and understandable

# This is the commit message apache#3:

Lint the files.

# This is the commit message apache#4:

Reorganized and redefined to logic as per standard beam IO structure.

# This is the commit message apache#5:

Linting the project and making some stuff private

# This is the commit message apache#6:

adding comments and java docs, and removing unneeded dependencies.

# This is the commit message apache#7:

delete the unused folder

# This is the commit message apache#8:

reorganizing pipeline

# This is the commit message apache#9:

Spotless check

# This is the commit message apache#10:

Spotless check

# This is the commit message apache#11:

build failure corrected

# This is the commit message apache#12:

Java_Examples_Dataflow PreCommit fix

# This is the commit message apache#13:

Java_Examples_Dataflow PreCommit refix

# This is the commit message apache#14:

Java_Examples_Dataflow PreCommit assign nullable correctly

# This is the commit message apache#15:

Java_Examples_Dataflow PreCommit assign nullable correctly
  • Loading branch information
usingh83 committed May 7, 2021
1 parent 6aac541 commit 7a3864a
Show file tree
Hide file tree
Showing 12 changed files with 1,157 additions and 0 deletions.
3 changes: 3 additions & 0 deletions examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ configurations.sparkRunnerPreCommit {
}

dependencies {
implementation group: 'org.twitter4j', name: 'twitter4j-stream', version: '4.0.7'
compile enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
compile library.java.vendored_guava_26_0_jre
compile library.java.kafka_clients
Expand Down Expand Up @@ -91,6 +92,8 @@ dependencies {
compile "org.apache.commons:commons-lang3:3.9"
compile "org.apache.httpcomponents:httpclient:4.5.13"
compile "org.apache.httpcomponents:httpcore:4.4.13"
compile ("org.twitter4j:twitter4j-stream:4.0.7")
compile ("org.twitter4j:twitter4j-core:4.0.7")
testCompile project(path: ":runners:direct-java", configuration: "shadow")
testCompile project(":sdks:java:io:google-cloud-platform")
testCompile project(":sdks:java:extensions:ml")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.complete.TwitterStreamGenerator;

import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.BlockingQueue;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Status;

/** Splittable dofn that read live data off twitter* */
@DoFn.UnboundedPerElement
final class ReadFromTwitterDoFn extends DoFn<TwitterConfig, Status> {
static Long maxTweetsCount = Long.MAX_VALUE;

ReadFromTwitterDoFn(Long maxTweetsCount) {
ReadFromTwitterDoFn.maxTweetsCount = maxTweetsCount;
}

ReadFromTwitterDoFn() {}

/* Logger for class.*/
private static final Logger LOG = LoggerFactory.getLogger(ReadFromTwitterDoFn.class);

static class OffsetTracker extends RestrictionTracker<OffsetRange, Long> implements Serializable {
private OffsetRange restriction;

OffsetTracker(OffsetRange holder) {
this.restriction = holder;
}

@Override
public boolean tryClaim(Long position) {
LOG.info("-------------- Claiming " + position + " used to have: " + restriction);
long fetchedRecords = this.restriction == null ? 0 : this.restriction.getTo() + 1;
if (fetchedRecords > maxTweetsCount) {
return false;
}
this.restriction = new OffsetRange(0, fetchedRecords);
return true;
}

@Override
public OffsetRange currentRestriction() {
return restriction;
}

@Override
public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
LOG.info("-------------- Trying to split: fractionOfRemainder=" + fractionOfRemainder);
return SplitResult.of(new OffsetRange(0, 0), restriction);
}

@Override
public void checkDone() throws IllegalStateException {}

@Override
public IsBounded isBounded() {
return IsBounded.UNBOUNDED;
}
}

@GetInitialWatermarkEstimatorState
public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
return currentElementTimestamp;
}

private static Instant ensureTimestampWithinBounds(Instant timestamp) {
if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
} else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
}
return timestamp;
}

@NewWatermarkEstimator
public WatermarkEstimators.Manual newWatermarkEstimator(
@WatermarkEstimatorState Instant watermarkEstimatorState) {
return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
}

@DoFn.GetInitialRestriction
public OffsetRange getInitialRestriction() throws IOException {
return new OffsetRange(0, 0);
}

@DoFn.NewTracker
public RestrictionTracker<OffsetRange, Long> newTracker(
@DoFn.Restriction OffsetRange restriction) {
return new OffsetTracker(restriction);
}

@DoFn.ProcessElement
public DoFn.ProcessContinuation processElement(
@Element TwitterConfig twitterConfig,
DoFn.OutputReceiver<Status> out,
RestrictionTracker<OffsetRange, Long> tracker,
ManualWatermarkEstimator<Instant> watermarkEstimator) {
LOG.info("In Read From Twitter Do Fn");
TwitterConnection twitterConnection = TwitterConnection.getInstance(twitterConfig);
BlockingQueue<Status> queue = twitterConnection.getQueue();
while (!queue.isEmpty()) {
Status status = queue.poll();
if (status != null) {
if (!tracker.tryClaim(status.getId())) {
twitterConnection.closeStream();
return DoFn.ProcessContinuation.stop();
}
Instant currentInstant = Instant.ofEpochMilli(status.getCreatedAt().getTime());
watermarkEstimator.setWatermark(currentInstant);
out.outputWithTimestamp(status, currentInstant);
}
}
return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.complete.TwitterStreamGenerator;

import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.Nullable;

/** {@link Serializable} object to store twitter configurations for a connection * */
@DefaultCoder(SerializableCoder.class)
public class TwitterConfig implements Serializable {
private final String key;
private final String secret;
private final String token;
private final String tokenSecret;
private final List<String> filters;
private final String language;

@Override
public boolean equals(@Initialized @Nullable Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TwitterConfig that = (TwitterConfig) o;
return Objects.equals(key, that.key)
&& Objects.equals(secret, that.secret)
&& Objects.equals(token, that.token)
&& Objects.equals(tokenSecret, that.tokenSecret)
&& Objects.equals(filters, that.filters)
&& Objects.equals(language, that.language);
}

@Override
public int hashCode() {
return Objects.hash(key, secret, token, tokenSecret, filters, language);
}

public TwitterConfig(
String key,
String secret,
String token,
String tokenSecret,
List<String> filters,
String language) {
this.key = key;
this.secret = secret;
this.token = token;
this.tokenSecret = tokenSecret;
this.filters = filters;
this.language = language;
}

public String getKey() {
return key;
}

public String getSecret() {
return secret;
}

public String getToken() {
return token;
}

public String getTokenSecret() {
return tokenSecret;
}

public List<String> getFilters() {
return filters;
}

public String getLanguage() {
return language;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.complete.TwitterStreamGenerator;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.checkerframework.checker.nullness.qual.Nullable;
import twitter4j.*;
import twitter4j.TwitterStream;
import twitter4j.conf.ConfigurationBuilder;

/** Singleton class for twitter connection* */
class TwitterConnection {
private final BlockingQueue<Status> queue;
private final TwitterStream twitterStream;
private static @Nullable TwitterConnection single_instance = null;

/**
* Creates a new Twitter connection
*
* @param twitterConfig
*/
private TwitterConnection(TwitterConfig twitterConfig) {
this.queue = new LinkedBlockingQueue<>();
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(twitterConfig.getKey())
.setOAuthConsumerSecret(twitterConfig.getSecret())
.setOAuthAccessToken(twitterConfig.getToken())
.setOAuthAccessTokenSecret(twitterConfig.getTokenSecret());

this.twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
StatusListener listener =
new StatusListener() {
@Override
public void onException(Exception e) {
e.printStackTrace();
}

@Override
public void onDeletionNotice(StatusDeletionNotice arg) {}

@Override
public void onScrubGeo(long userId, long upToStatusId) {}

@Override
public void onStallWarning(StallWarning warning) {}

@Override
public void onStatus(Status status) {
queue.offer(status);
}

@Override
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
};
FilterQuery tweetFilterQuery = new FilterQuery();
for (String filter : twitterConfig.getFilters()) {
tweetFilterQuery.track(filter);
}
tweetFilterQuery.language(twitterConfig.getLanguage());
this.twitterStream.addListener(listener);
this.twitterStream.filter(tweetFilterQuery);
}

public static TwitterConnection getInstance(TwitterConfig twitterConfig) {
if (single_instance != null) {
return single_instance;
}
single_instance = new TwitterConnection(twitterConfig);
return single_instance;
}

public BlockingQueue<Status> getQueue() {
return this.queue;
}

public void closeStream() {
this.twitterStream.shutdown();
}
}
Loading

0 comments on commit 7a3864a

Please sign in to comment.