Skip to content

Commit

Permalink
[BEAM-7730] Introduce Flink 1.9 Runner.
Browse files Browse the repository at this point in the history
  • Loading branch information
David Moravek committed Oct 8, 2019
1 parent 85c0e83 commit c89b232
Show file tree
Hide file tree
Showing 27 changed files with 967 additions and 37 deletions.
2 changes: 1 addition & 1 deletion examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ dependencies {
// https://issues.apache.org/jira/browse/BEAM-3583
// apexRunnerPreCommit project(":runners:apex")
directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow")
flinkRunnerPreCommit project(":runners:flink:1.8")
flinkRunnerPreCommit project(":runners:flink:1.9")
// TODO: Make the netty version used configurable, we add netty-all 4.1.17.Final so it appears on the classpath
// before 4.1.8.Final defined by Apache Beam
sparkRunnerPreCommit "io.netty:netty-all:4.1.17.Final"
Expand Down
2 changes: 1 addition & 1 deletion examples/kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ dependencies {
// https://issues.apache.org/jira/browse/BEAM-3583
// apexRunnerPreCommit project(":runners:apex")
directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow")
flinkRunnerPreCommit project(":runners:flink:1.8")
flinkRunnerPreCommit project(":runners:flink:1.9")
// TODO: Make the netty version used configurable, we add netty-all 4.1.17.Final so it appears on the classpath
// before 4.1.8.Final defined by Apache Beam
sparkRunnerPreCommit "io.netty:netty-all:4.1.17.Final"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import org.apache.flink.api.common.functions.StoppableFunction;

/**
* Custom StoppableFunction for backward compatibility.
*
* @see <a
* href="https://github.com/apache/flink/commit/e95b347dda5233f22fb03e408f2aa521ff924996">Flink
* interface removal commit.</a>
*/
public interface BeamStoppableFunction extends StoppableFunction {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.streaming;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;

/** {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. */
public class StreamSources {

public static <OutT, SrcT extends SourceFunction<OutT>> void run(
StreamSource<OutT, SrcT> streamSource,
Object lockingObject,
StreamStatusMaintainer streamStatusMaintainer,
Output<StreamRecord<OutT>> collector)
throws Exception {
streamSource.run(lockingObject, streamStatusMaintainer, collector);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import org.apache.flink.api.common.functions.StoppableFunction;

/**
* Custom StoppableFunction for backward compatibility.
*
* @see <a
* href="https://github.com/apache/flink/commit/e95b347dda5233f22fb03e408f2aa521ff924996">Flink
* interface removal commit.</a>
*/
public interface BeamStoppableFunction extends StoppableFunction {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.streaming;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;

/** {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. */
public class StreamSources {

public static <OutT, SrcT extends SourceFunction<OutT>> void run(
StreamSource<OutT, SrcT> streamSource,
Object lockingObject,
StreamStatusMaintainer streamStatusMaintainer,
Output<StreamRecord<OutT>> collector)
throws Exception {
streamSource.run(lockingObject, streamStatusMaintainer, collector);
}
}
34 changes: 34 additions & 0 deletions runners/flink/1.9/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '..'

/* All properties required for loading the Flink build script */
project.ext {
// Set the version of all Flink-related dependencies here.
flink_version = '1.9.0'
// Main source directory and Flink version specific code.
main_source_dirs = ["$basePath/src/main/java", "./src/main/java"]
test_source_dirs = ["$basePath/src/test/java", "./src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
archives_base_name = 'beam-runners-flink-1.9'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_runner.gradle"
26 changes: 26 additions & 0 deletions runners/flink/1.9/job-server-container/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server-container'

project.ext {
resource_path = basePath
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server_container.gradle"
31 changes: 31 additions & 0 deletions runners/flink/1.9/job-server/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server'

project.ext {
// Look for the source code in the parent module
main_source_dirs = ["$basePath/src/main/java"]
test_source_dirs = ["$basePath/src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
archives_base_name = 'beam-runners-flink-1.9-job-server'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server.gradle"
Loading

0 comments on commit c89b232

Please sign in to comment.