Skip to content

Commit

Permalink
This closes #40
Browse files Browse the repository at this point in the history
  • Loading branch information
mxm committed Mar 21, 2016
2 parents fcc6f3c + ba7b7a0 commit c984f3a
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 5 deletions.
12 changes: 7 additions & 5 deletions runners/flink/runner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>1.0-rc2</version>
</dependency>
<!-- Test scoped -->
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
Expand Down Expand Up @@ -121,6 +116,13 @@
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<!-- Optional Pipeline Registration -->
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>1.0-rc2</version>
<optional>true</optional>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.flink;

import com.google.auto.service.AutoService;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineRunner;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar;
import com.google.common.collect.ImmutableList;


/**
* AuteService registrar - will register FlinkRunner and FlinkOptions
* as possible pipeline runner services.
*
* It ends up in META-INF/services and gets picked up by Dataflow.
*
*/
public class FlinkRunnerRegistrar {
private FlinkRunnerRegistrar() { }

@AutoService(PipelineRunnerRegistrar.class)
public static class Runner implements PipelineRunnerRegistrar {
@Override
public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
return ImmutableList.<Class<? extends PipelineRunner<?>>>of(FlinkPipelineRunner.class);
}
}

@AutoService(PipelineOptionsRegistrar.class)
public static class Options implements PipelineOptionsRegistrar {
@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return ImmutableList.<Class<? extends PipelineOptions>>of(FlinkPipelineOptions.class);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.flink;

import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/**
* Tests the proper registration of the Flink runner.
*/
public class FlinkRunnerRegistrarTest {

@Test
public void testFullName() {
String[] args =
new String[] {String.format("--runner=%s", FlinkPipelineRunner.class.getName())};
PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
assertEquals(opts.getRunner(), FlinkPipelineRunner.class);
}

@Test
public void testClassName() {
String[] args =
new String[] {String.format("--runner=%s", FlinkPipelineRunner.class.getSimpleName())};
PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
assertEquals(opts.getRunner(), FlinkPipelineRunner.class);
}

}

0 comments on commit c984f3a

Please sign in to comment.