sourceStream = planCreator.getPipeline().apply(stageName,
+ sourceTable.buildIOReader());
+
+ planCreator.setLatestStream(sourceStream);
+
+ return planCreator.getPipeline();
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java
new file mode 100644
index 0000000000000..50fe8e013af1f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+
+/**
+ * Convertion for Beam SQL.
+ *
+ */
+public enum BeamLogicalConvention implements Convention {
+ INSTANCE;
+
+ @Override
+ public Class getInterface() {
+ return BeamRelNode.class;
+ }
+
+ @Override
+ public String getName() {
+ return "BEAM_LOGICAL";
+ }
+
+ @Override
+ public RelTraitDef getTraitDef() {
+ return ConventionTraitDef.INSTANCE;
+ }
+
+ @Override
+ public boolean satisfies(RelTrait trait) {
+ return this == trait;
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ @Override
+ public boolean canConvertConvention(Convention toConvention) {
+ return false;
+ }
+
+ @Override
+ public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+ return false;
+ }
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java
new file mode 100644
index 0000000000000..f4fc2d866afdd
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rel;
+
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
+import org.beam.dsls.sql.interpreter.BeamSQLSpELExecutor;
+import org.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+import org.beam.dsls.sql.transform.BeamSQLProjectFn;
+
+/**
+ * BeamRelNode to replace a {@code Project} node.
+ *
+ */
+public class BeamProjectRel extends Project implements BeamRelNode {
+
+ /**
+ * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}.
+ *
+ * @param cluster
+ * @param traits
+ * @param input
+ * @param projects
+ * @param rowType
+ */
+ public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ List extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+
+ @Override
+ public Project copy(RelTraitSet traitSet, RelNode input, List projects,
+ RelDataType rowType) {
+ return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType);
+ }
+
+ @Override
+ public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
+ RelNode input = getInput();
+ BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
+
+ String stageName = BeamSQLRelUtils.getStageName(this);
+
+ PCollection upstream = planCreator.getLatestStream();
+
+ BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this);
+
+ PCollection projectStream = upstream.apply(stageName, ParDo
+ .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType))));
+
+ planCreator.setLatestStream(projectStream);
+
+ return planCreator.getPipeline();
+
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java
new file mode 100644
index 0000000000000..4b53943d891ff
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rel;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.calcite.rel.RelNode;
+import org.beam.dsls.sql.planner.BeamPipelineCreator;
+
+/**
+ * A new method {@link #buildBeamPipeline(BeamPipelineCreator)} is added, it's
+ * called by {@link BeamPipelineCreator}.
+ *
+ */
+public interface BeamRelNode extends RelNode {
+
+ /**
+ * A {@link BeamRelNode} is a recursive structure, the
+ * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search)
+ * algorithm.
+ *
+ * @param planCreator
+ * @throws Exception
+ */
+ Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception;
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java
new file mode 100644
index 0000000000000..13dc96285942e
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}.
+ *
+ */
+package org.beam.dsls.sql.rel;
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java
new file mode 100644
index 0000000000000..2ad7c074dbdca
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.beam.dsls.sql.rel.BeamFilterRel;
+import org.beam.dsls.sql.rel.BeamLogicalConvention;
+
+/**
+ * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}.
+ *
+ */
+public class BeamFilterRule extends ConverterRule {
+ public static final BeamFilterRule INSTANCE = new BeamFilterRule();
+
+ private BeamFilterRule() {
+ super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Filter filter = (Filter) rel;
+ final RelNode input = filter.getInput();
+
+ return new BeamFilterRel(filter.getCluster(),
+ filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ filter.getCondition());
+ }
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java
new file mode 100644
index 0000000000000..a44c002f05c00
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Table;
+import org.beam.dsls.sql.rel.BeamIOSinkRel;
+import org.beam.dsls.sql.rel.BeamLogicalConvention;
+
+/**
+ * A {@code ConverterRule} to replace {@link TableModify} with
+ * {@link BeamIOSinkRel}.
+ *
+ */
+public class BeamIOSinkRule extends ConverterRule {
+ public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
+
+ private BeamIOSinkRule() {
+ super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+ "BeamIOSinkRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableModify tableModify = (TableModify) rel;
+ final RelNode input = tableModify.getInput();
+
+ final RelOptCluster cluster = tableModify.getCluster();
+ final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
+ final RelOptTable relOptTable = tableModify.getTable();
+ final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
+ final RelNode convertedInput = convert(input,
+ input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
+ final TableModify.Operation operation = tableModify.getOperation();
+ final List updateColumnList = tableModify.getUpdateColumnList();
+ final List sourceExpressionList = tableModify.getSourceExpressionList();
+ final boolean flattened = tableModify.isFlattened();
+
+ final Table table = tableModify.getTable().unwrap(Table.class);
+
+ switch (table.getJdbcTableType()) {
+ case TABLE:
+ case STREAM:
+ if (operation != TableModify.Operation.INSERT) {
+ throw new UnsupportedOperationException(
+ String.format("Streams doesn't support %s modify operation", operation));
+ }
+ return new BeamIOSinkRel(cluster, traitSet,
+ relOptTable, catalogReader, convertedInput, operation, updateColumnList,
+ sourceExpressionList, flattened);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported table type: %s", table.getJdbcTableType()));
+ }
+ }
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java
new file mode 100644
index 0000000000000..9e4778b5ff045
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.beam.dsls.sql.rel.BeamIOSourceRel;
+import org.beam.dsls.sql.rel.BeamLogicalConvention;
+
+/**
+ * A {@code ConverterRule} to replace {@link TableScan} with
+ * {@link BeamIOSourceRel}.
+ *
+ */
+public class BeamIOSourceRule extends ConverterRule {
+ public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
+
+ private BeamIOSourceRule() {
+ super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+ "BeamIOSourceRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableScan scan = (TableScan) rel;
+
+ return new BeamIOSourceRel(scan.getCluster(),
+ scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable());
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java
new file mode 100644
index 0000000000000..117a056fa0ff7
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.beam.dsls.sql.rel.BeamProjectRel;
+
+/**
+ * A {@code ConverterRule} to replace {@link Project} with
+ * {@link BeamProjectRel}.
+ *
+ */
+public class BeamProjectRule extends ConverterRule {
+ public static final BeamProjectRule INSTANCE = new BeamProjectRule();
+
+ private BeamProjectRule() {
+ super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Project project = (Project) rel;
+ final RelNode input = project.getInput();
+
+ return new BeamProjectRel(project.getCluster(),
+ project.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ project.getProjects(), project.getRowType());
+ }
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java
new file mode 100644
index 0000000000000..634f6a8b37ecb
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link RelOptRule} to generate {@link BeamRelNode}.
+ */
+package org.beam.dsls.sql.rule;
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
new file mode 100644
index 0000000000000..8d31c6def18a6
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.beam.dsls.sql.planner.BeamQueryPlanner;
+
+/**
+ * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
+ */
+public abstract class BaseBeamTable implements ScannableTable, Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -1262988061830914193L;
+ private RelDataType relDataType;
+
+ protected BeamSQLRecordType beamSqlRecordType;
+
+ public BaseBeamTable(RelProtoDataType protoRowType) {
+ this.relDataType = protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY);
+ this.beamSqlRecordType = BeamSQLRecordType.from(relDataType);
+ }
+
+ /**
+ * In Beam SQL, there's no difference between a batch query and a streaming
+ * query. {@link BeamIOType} is used to validate the sources.
+ */
+ public abstract BeamIOType getSourceType();
+
+ /**
+ * create a {@code IO.read()} instance to read from source.
+ *
+ * @return
+ */
+ public abstract PTransform super PBegin, PCollection> buildIOReader();
+
+ /**
+ * create a {@code IO.write()} instance to write to target.
+ *
+ * @return
+ */
+ public abstract PTransform super PCollection, PDone> buildIOWriter();
+
+ @Override
+ public Enumerable scan(DataContext root) {
+ // not used as Beam SQL uses its own execution engine
+ return null;
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return relDataType;
+ }
+
+ /**
+ * Not used {@link Statistic} to optimize the plan.
+ */
+ @Override
+ public Statistic getStatistic() {
+ return Statistics.UNKNOWN;
+ }
+
+ /**
+ * all sources are treated as TABLE in Beam SQL.
+ */
+ @Override
+ public TableType getJdbcTableType() {
+ return TableType.TABLE;
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
new file mode 100644
index 0000000000000..5e55b0fc6e482
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Type as a source IO, determined whether it's a STREAMING process, or batch
+ * process.
+ */
+public enum BeamIOType implements Serializable {
+ BOUNDED, UNBOUNDED;
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
new file mode 100644
index 0000000000000..dc8e38103cc69
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Field type information in {@link BeamSQLRow}.
+ *
+ */
+//@DefaultCoder(BeamSQLRecordTypeCoder.class)
+public class BeamSQLRecordType implements Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -5318734648766104712L;
+ private List fieldsName = new ArrayList<>();
+ private List fieldsType = new ArrayList<>();
+
+ public static BeamSQLRecordType from(RelDataType tableInfo) {
+ BeamSQLRecordType record = new BeamSQLRecordType();
+ for (RelDataTypeField f : tableInfo.getFieldList()) {
+ record.fieldsName.add(f.getName());
+ record.fieldsType.add(f.getType().getSqlTypeName());
+ }
+ return record;
+ }
+
+ public int size() {
+ return fieldsName.size();
+ }
+
+ public List getFieldsName() {
+ return fieldsName;
+ }
+
+ public void setFieldsName(List fieldsName) {
+ this.fieldsName = fieldsName;
+ }
+
+ public List getFieldsType() {
+ return fieldsType;
+ }
+
+ public void setFieldsType(List fieldsType) {
+ this.fieldsType = fieldsType;
+ }
+
+ @Override
+ public String toString() {
+ return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + fieldsType + "]";
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
new file mode 100644
index 0000000000000..c708c4e50b1ce
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
@@ -0,0 +1,71 @@
+package org.beam.dsls.sql.schema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * A {@link Coder} for {@link BeamSQLRecordType}.
+ *
+ */
+public class BeamSQLRecordTypeCoder extends StandardCoder {
+ private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+ private static final VarIntCoder intCoder = VarIntCoder.of();
+
+ private static final BeamSQLRecordTypeCoder INSTANCE = new BeamSQLRecordTypeCoder();
+ private BeamSQLRecordTypeCoder(){}
+
+ public static BeamSQLRecordTypeCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(BeamSQLRecordType value, OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
+ Context nested = context.nested();
+ intCoder.encode(value.size(), outStream, nested);
+ for(String fieldName : value.getFieldsName()){
+ stringCoder.encode(fieldName, outStream, nested);
+ }
+ for(SqlTypeName fieldType : value.getFieldsType()){
+ stringCoder.encode(fieldType.name(), outStream, nested);
+ }
+ outStream.flush();
+ }
+
+ @Override
+ public BeamSQLRecordType decode(InputStream inStream,
+ org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
+ BeamSQLRecordType typeRecord = new BeamSQLRecordType();
+ Context nested = context.nested();
+ int size = intCoder.decode(inStream, nested);
+ for(int idx=0; idx> getCoderArguments() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void verifyDeterministic()
+ throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
new file mode 100644
index 0000000000000..3ec170e85a713
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Repersent a generic ROW record in Beam SQL.
+ *
+ */
+//@DefaultCoder(BeamSqlRowCoder.class)
+public class BeamSQLRow implements Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 4569220242480160895L;
+
+ private List nullFields = new ArrayList<>();
+ private List dataValues;
+ private BeamSQLRecordType dataType;
+
+ public BeamSQLRow(BeamSQLRecordType dataType) {
+ this.dataType = dataType;
+ this.dataValues = new ArrayList<>();
+ for(int idx=0; idx dataValues) {
+ this.dataValues = dataValues;
+ this.dataType = dataType;
+ }
+
+ public void addField(String fieldName, Object fieldValue) {
+ addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
+ }
+
+ public void addField(int index, Object fieldValue) {
+ if(fieldValue == null){
+ dataValues.set(index, fieldValue);
+ if(!nullFields.contains(index)){nullFields.add(index);}
+ return;
+ }
+
+ SqlTypeName fieldType = dataType.getFieldsType().get(index);
+ switch (fieldType) {
+ case INTEGER:
+ case SMALLINT:
+ case TINYINT:
+ if(!(fieldValue instanceof Integer)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case DOUBLE:
+ if(!(fieldValue instanceof Double)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case BIGINT:
+ if(!(fieldValue instanceof Long)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case FLOAT:
+ if(!(fieldValue instanceof Float)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case VARCHAR:
+ if(!(fieldValue instanceof String)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case TIME:
+ case TIMESTAMP:
+ if(!(fieldValue instanceof Date)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ default:
+ throw new UnsupportedDataTypeException(fieldType);
+ }
+ dataValues.set(index, fieldValue);
+ }
+
+
+ public int getInteger(int idx) {
+ return (Integer) getFieldValue(idx);
+ }
+
+ public double getDouble(int idx) {
+ return (Double) getFieldValue(idx);
+ }
+
+ public long getLong(int idx) {
+ return (Long) getFieldValue(idx);
+ }
+
+ public String getString(int idx) {
+ return (String) getFieldValue(idx);
+ }
+
+ public Date getDate(int idx) {
+ return (Date) getFieldValue(idx);
+ }
+
+ public Object getFieldValue(String fieldName) {
+ return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
+ }
+
+ public Object getFieldValue(int fieldIdx) {
+ if(nullFields.contains(fieldIdx)){
+ return null;
+ }
+
+ Object fieldValue = dataValues.get(fieldIdx);
+ SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx);
+
+ switch (fieldType) {
+ case INTEGER:
+ case SMALLINT:
+ case TINYINT:
+ if(!(fieldValue instanceof Integer)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }else{
+ return Integer.valueOf(fieldValue.toString());
+ }
+ case DOUBLE:
+ if(!(fieldValue instanceof Double)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }else{
+ return Double.valueOf(fieldValue.toString());
+ }
+ case BIGINT:
+ if(!(fieldValue instanceof Long)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }else{
+ return Long.valueOf(fieldValue.toString());
+ }
+ case FLOAT:
+ if(!(fieldValue instanceof Float)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }else{
+ return Float.valueOf(fieldValue.toString());
+ }
+ case VARCHAR:
+ if(!(fieldValue instanceof String)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }else{
+ return fieldValue.toString();
+ }
+ case TIME:
+ case TIMESTAMP:
+ if(!(fieldValue instanceof Date)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }else{
+ return fieldValue;
+ }
+ default:
+ throw new UnsupportedDataTypeException(fieldType);
+ }
+ }
+
+ public int size() {
+ return dataValues.size();
+ }
+
+ public List getDataValues() {
+ return dataValues;
+ }
+
+ public void setDataValues(List dataValues) {
+ this.dataValues = dataValues;
+ }
+
+ public BeamSQLRecordType getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(BeamSQLRecordType dataType) {
+ this.dataType = dataType;
+ }
+
+ public void setNullFields(List nullFields) {
+ this.nullFields = nullFields;
+ }
+
+ public List getNullFields() {
+ return nullFields;
+ }
+
+ @Override
+ public String toString() {
+ return "BeamSQLRow [dataValues=" + dataValues + ", dataType=" + dataType + "]";
+ }
+
+ /**
+ * Return data fields as key=value.
+ * @return
+ */
+ public String valueInString() {
+ StringBuffer sb = new StringBuffer();
+ for (int idx = 0; idx < size(); ++idx) {
+ sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx)));
+ }
+ return sb.substring(1);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ BeamSQLRow other = (BeamSQLRow) obj;
+ return toString().equals(other.toString());
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
new file mode 100644
index 0000000000000..de80dd5a16446
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -0,0 +1,132 @@
+package org.beam.dsls.sql.schema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * A {@link Coder} encodes {@link BeamSQLRow}.
+ *
+ */
+public class BeamSqlRowCoder extends StandardCoder{
+ private static final BeamSQLRecordTypeCoder recordTypeCoder = BeamSQLRecordTypeCoder.of();
+
+ private static final ListCoder listCoder = ListCoder.of(BigEndianIntegerCoder.of());
+
+ private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+ private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
+ private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+ private static final DoubleCoder doubleCoder = DoubleCoder.of();
+
+ private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder();
+ private BeamSqlRowCoder(){}
+
+ public static BeamSqlRowCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(BeamSQLRow value, OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
+ recordTypeCoder.encode(value.getDataType(), outStream, context);
+ listCoder.encode(value.getNullFields(), outStream, context);
+
+ Context nested = context.nested();
+
+ for (int idx = 0; idx < value.size(); ++idx) {
+ if(value.getNullFields().contains(idx)){
+ continue;
+ }
+
+ switch (value.getDataType().getFieldsType().get(idx)) {
+ case INTEGER:
+ case SMALLINT:
+ case TINYINT:
+ intCoder.encode(value.getInteger(idx), outStream, nested);
+ break;
+ case DOUBLE:
+ case FLOAT:
+ doubleCoder.encode(value.getDouble(idx), outStream, nested);
+ break;
+ case BIGINT:
+ longCoder.encode(value.getLong(idx), outStream, nested);
+ break;
+ case VARCHAR:
+ stringCoder.encode(value.getString(idx), outStream, nested);
+ break;
+ case TIME:
+ case TIMESTAMP:
+ longCoder.encode(value.getDate(idx).getTime(), outStream, nested);
+ break;
+
+ default:
+ throw new UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx));
+ }
+ }
+ }
+
+ @Override
+ public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ BeamSQLRecordType type = recordTypeCoder.decode(inStream, context);
+ List nullFields = listCoder.decode(inStream, context);
+
+ BeamSQLRow record = new BeamSQLRow(type);
+ record.setNullFields(nullFields);
+
+ for (int idx = 0; idx < type.size(); ++idx) {
+ if(nullFields.contains(idx)){
+ continue;
+ }
+
+ switch (type.getFieldsType().get(idx)) {
+ case INTEGER:
+ case SMALLINT:
+ case TINYINT:
+ record.addField(idx, intCoder.decode(inStream, context));
+ break;
+ case DOUBLE:
+ case FLOAT:
+ record.addField(idx, doubleCoder.decode(inStream, context));
+ break;
+ case BIGINT:
+ record.addField(idx, longCoder.decode(inStream, context));
+ break;
+ case VARCHAR:
+ record.addField(idx, stringCoder.decode(inStream, context));
+ break;
+ case TIME:
+ case TIMESTAMP:
+ record.addField(idx, new Date(longCoder.decode(inStream, context)));
+ break;
+
+ default:
+ throw new UnsupportedDataTypeException(type.getFieldsType().get(idx));
+ }
+ }
+
+ return record;
+ }
+
+ @Override
+ public List extends Coder>> getCoderArguments() {
+ return null;
+ }
+
+ @Override
+ public void verifyDeterministic()
+ throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
new file mode 100644
index 0000000000000..3ab86c52e1e13
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
@@ -0,0 +1,13 @@
+package org.beam.dsls.sql.schema;
+
+public class InvalidFieldException extends RuntimeException {
+
+ public InvalidFieldException() {
+ super();
+ }
+
+ public InvalidFieldException(String message) {
+ super(message);
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
new file mode 100644
index 0000000000000..7f7afb23991ea
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
@@ -0,0 +1,11 @@
+package org.beam.dsls.sql.schema;
+
+import org.apache.calcite.sql.type.SqlTypeName;
+
+public class UnsupportedDataTypeException extends RuntimeException {
+
+ public UnsupportedDataTypeException(SqlTypeName unsupportedType){
+ super(String.format("Not support data type [%s]", unsupportedType));
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
new file mode 100644
index 0000000000000..2570763c3e7b3
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Kafka topic that saves records as CSV format.
+ *
+ */
+public class BeamKafkaCSVTable extends BeamKafkaTable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 4754022536543333984L;
+
+ public static final String DELIMITER = ",";
+ private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaCSVTable.class);
+
+ public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers,
+ List topics) {
+ super(protoRowType, bootstrapServers, topics);
+ }
+
+ @Override
+ public PTransform>, PCollection>
+ getPTransformForInput() {
+ return new CsvRecorderDecoder(beamSqlRecordType);
+ }
+
+ @Override
+ public PTransform, PCollection>>
+ getPTransformForOutput() {
+ return new CsvRecorderEncoder(beamSqlRecordType);
+ }
+
+ /**
+ * A PTransform to convert {@code KV} to {@link BeamSQLRow}.
+ *
+ */
+ public static class CsvRecorderDecoder
+ extends PTransform>, PCollection> {
+ private BeamSQLRecordType recordType;
+
+ public CsvRecorderDecoder(BeamSQLRecordType recordType) {
+ this.recordType = recordType;
+ }
+
+ @Override
+ public PCollection expand(PCollection> input) {
+ return input.apply("decodeRecord", ParDo.of(new DoFn, BeamSQLRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ String rowInString = new String(c.element().getValue());
+ String[] parts = rowInString.split(BeamKafkaCSVTable.DELIMITER);
+ if (parts.length != recordType.size()) {
+ LOG.error(String.format("invalid record: ", rowInString));
+ } else {
+ BeamSQLRow sourceRecord = new BeamSQLRow(recordType);
+ for (int idx = 0; idx < parts.length; ++idx) {
+ sourceRecord.addField(idx, parts[idx]);
+ }
+ c.output(sourceRecord);
+ }
+ }
+ }));
+ }
+ }
+
+ /**
+ * A PTransform to convert {@link BeamSQLRow} to {@code KV}.
+ *
+ */
+ public static class CsvRecorderEncoder
+ extends PTransform, PCollection>> {
+ private BeamSQLRecordType recordType;
+
+ public CsvRecorderEncoder(BeamSQLRecordType recordType) {
+ this.recordType = recordType;
+ }
+
+ @Override
+ public PCollection> expand(PCollection input) {
+ return input.apply("encodeRecord", ParDo.of(new DoFn>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ BeamSQLRow in = c.element();
+ StringBuffer sb = new StringBuffer();
+ for (int idx = 0; idx < in.size(); ++idx) {
+ sb.append(DELIMITER);
+ sb.append(in.getFieldValue(idx).toString());
+ }
+ c.output(KV.of(new byte[] {}, sb.substring(1).getBytes()));
+ }
+ }));
+
+ }
+
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
new file mode 100644
index 0000000000000..29f3f927b4297
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema.kafka;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.BeamIOType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
+ * extend to convert between {@code BeamSQLRow} and {@code KV}.
+ *
+ */
+public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -634715473399906527L;
+
+ private String bootstrapServers;
+ private List topics;
+ private Map configUpdates;
+
+ protected BeamKafkaTable(RelProtoDataType protoRowType) {
+ super(protoRowType);
+ }
+
+ public BeamKafkaTable(RelProtoDataType protoRowType, String bootstrapServers,
+ List topics) {
+ super(protoRowType);
+ this.bootstrapServers = bootstrapServers;
+ this.topics = topics;
+ }
+
+ public BeamKafkaTable updateConsumerProperties(Map configUpdates) {
+ this.configUpdates = configUpdates;
+ return this;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return BeamIOType.UNBOUNDED;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public abstract PTransform>, PCollection>
+ getPTransformForInput();
+
+ /**
+ *
+ * @return
+ */
+ public abstract PTransform, PCollection>>
+ getPTransformForOutput();
+
+ @Override
+ public PTransform super PBegin, PCollection> buildIOReader() {
+ return new PTransform>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 9167792271351182771L;
+
+ @Override
+ public PCollection expand(PBegin input) {
+ return input.apply("read",
+ KafkaIO.read().withBootstrapServers(bootstrapServers).withTopics(topics)
+ .updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of())
+ .withValueCoder(ByteArrayCoder.of()).withoutMetadata())
+ .apply("in_format", getPTransformForInput());
+
+ }
+ };
+ }
+
+ @Override
+ public PTransform super PCollection, PDone> buildIOWriter() {
+ checkArgument(topics != null && topics.size() == 1,
+ "Only one topic can be acceptable as output.");
+
+ return new PTransform, PDone>() {
+ @Override
+ public PDone expand(PCollection input) {
+ return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
+ KafkaIO.write().withBootstrapServers(bootstrapServers)
+ .withTopic(topics.get(0)).withKeyCoder(ByteArrayCoder.of())
+ .withValueCoder(ByteArrayCoder.of()));
+ }
+ };
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
new file mode 100644
index 0000000000000..822fce703da10
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * table schema for KafkaIO.
+ */
+package org.beam.dsls.sql.schema.kafka;
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
new file mode 100644
index 0000000000000..ef9cc7d38b6a0
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * define table schema, to map with Beam IO components.
+ *
+ */
+package org.beam.dsls.sql.schema;
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
new file mode 100644
index 0000000000000..06db2802c393c
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.transform;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
+import org.beam.dsls.sql.rel.BeamFilterRel;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * {@code BeamSQLFilterFn} is the executor for a {@link BeamFilterRel} step.
+ *
+ */
+public class BeamSQLFilterFn extends DoFn {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -1256111753670606705L;
+
+ private String stepName;
+ private BeamSQLExpressionExecutor executor;
+
+ public BeamSQLFilterFn(String stepName, BeamSQLExpressionExecutor executor) {
+ super();
+ this.stepName = stepName;
+ this.executor = executor;
+ }
+
+ @Setup
+ public void setup() {
+ executor.prepare();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ BeamSQLRow in = c.element();
+
+ List result = executor.execute(in);
+
+ if ((Boolean) result.get(0)) {
+ c.output(in);
+ }
+ }
+
+ @Teardown
+ public void close() {
+ executor.close();
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
new file mode 100644
index 0000000000000..1014c0d3f7b14
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.transform;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * A test PTransform to display output in console.
+ *
+ */
+public class BeamSQLOutputToConsoleFn extends DoFn {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -1256111753670606705L;
+
+ private String stepName;
+
+ public BeamSQLOutputToConsoleFn(String stepName) {
+ super();
+ this.stepName = stepName;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ System.out.println("Output: " + c.element().getDataValues());
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
new file mode 100644
index 0000000000000..12061d2f094c2
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.transform;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
+import org.beam.dsls.sql.rel.BeamProjectRel;
+import org.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ *
+ * {@code BeamSQLProjectFn} is the executor for a {@link BeamProjectRel} step.
+ *
+ */
+public class BeamSQLProjectFn extends DoFn {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -1046605249999014608L;
+ private String stepName;
+ private BeamSQLExpressionExecutor executor;
+ private BeamSQLRecordType outputRecordType;
+
+ public BeamSQLProjectFn(String stepName, BeamSQLExpressionExecutor executor,
+ BeamSQLRecordType outputRecordType) {
+ super();
+ this.stepName = stepName;
+ this.executor = executor;
+ this.outputRecordType = outputRecordType;
+ }
+
+ @Setup
+ public void setup() {
+ executor.prepare();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ List results = executor.execute(c.element());
+
+ BeamSQLRow outRow = new BeamSQLRow(outputRecordType);
+ for (int idx = 0; idx < results.size(); ++idx) {
+ outRow.addField(idx, results.get(idx));
+ }
+
+ c.output(outRow);
+ }
+
+ @Teardown
+ public void close() {
+ executor.close();
+ }
+
+}
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
new file mode 100644
index 0000000000000..91b5639c62254
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link PTransform} used in a BeamSQL pipeline.
+ */
+package org.beam.dsls.sql.transform;
diff --git a/dsls/sql/src/main/resources/log4j.properties b/dsls/sql/src/main/resources/log4j.properties
new file mode 100644
index 0000000000000..709484b4951bb
--- /dev/null
+++ b/dsls/sql/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=ERROR,console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
\ No newline at end of file
diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java
new file mode 100644
index 0000000000000..56e45c4732d21
--- /dev/null
+++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
+import org.junit.BeforeClass;
+
+/**
+ * prepare {@code BeamSqlRunner} for test.
+ *
+ */
+public class BasePlanner {
+ public static BeamSqlRunner runner = new BeamSqlRunner();
+
+ @BeforeClass
+ public static void prepare() {
+ runner.addTable("ORDER_DETAILS", getTable());
+ runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
+ runner.addTable("SUB_ORDER_RAM", getTable());
+ }
+
+ private static BaseBeamTable getTable() {
+ final RelProtoDataType protoRowType = new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
+ }
+ };
+
+ return new MockedBeamSQLTable(protoRowType);
+ }
+
+ public static BaseBeamTable getTable(String bootstrapServer, String topic) {
+ final RelProtoDataType protoRowType = new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
+ }
+ };
+
+ Map consumerPara = new HashMap();
+ consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+
+ return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic))
+ .updateConsumerProperties(consumerPara);
+ }
+}
diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java
new file mode 100644
index 0000000000000..a77878fc150ea
--- /dev/null
+++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests to explain queries.
+ *
+ */
+public class BeamPlannerExplainTest extends BasePlanner {
+
+ @Test
+ public void selectAll() throws Exception {
+ String sql = "SELECT * FROM ORDER_DETAILS";
+ String plan = runner.explainQuery(sql);
+
+ String expectedPlan =
+ "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n"
+ + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
+ Assert.assertEquals("explain doesn't match", expectedPlan, plan);
+ }
+
+ @Test
+ public void selectWithFilter() throws Exception {
+ String sql = "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 and price > 20";
+ String plan = runner.explainQuery(sql);
+
+ String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
+ + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
+ + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
+ Assert.assertEquals("explain doesn't match", expectedPlan, plan);
+ }
+
+ @Test
+ public void insertSelectFilter() throws Exception {
+ String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
+ + " order_id, site_id, price " + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 and price > 20";
+ String plan = runner.explainQuery(sql);
+
+ String expectedPlan =
+ "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n"
+ + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[null])\n"
+ + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
+ + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
+ + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
+ Assert.assertEquals("explain doesn't match", expectedPlan, plan);
+ }
+
+}
diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
new file mode 100644
index 0000000000000..eb097a9ed293e
--- /dev/null
+++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import org.apache.beam.sdk.Pipeline;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests to execute a query.
+ *
+ */
+public class BeamPlannerSubmitTest extends BasePlanner {
+ @Test
+ public void insertSelectFilter() throws Exception {
+ String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
+ + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
+ Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+ runner.getPlanner().planner.close();
+
+ pipeline.run().waitUntilFinish();
+
+ Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
+ Assert.assertEquals("order_id=12345,site_id=0,price=20.5,order_time=null", MockedBeamSQLTable.CONTENT.get(0));
+ }
+
+}
diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java
new file mode 100644
index 0000000000000..31f55780c3956
--- /dev/null
+++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.BeamIOType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * A mock table use to check input/output.
+ *
+ */
+public class MockedBeamSQLTable extends BaseBeamTable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1373168368414036932L;
+
+ public static final List CONTENT = new ArrayList<>();
+
+ public MockedBeamSQLTable(RelProtoDataType protoRowType) {
+ super(protoRowType);
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return BeamIOType.UNBOUNDED;
+ }
+
+ @Override
+ public PTransform super PBegin, PCollection> buildIOReader() {
+ BeamSQLRow row1 = new BeamSQLRow(beamSqlRecordType);
+ row1.addField(0, 12345L);
+ row1.addField(1, 0);
+ row1.addField(2, 10.5);
+ row1.addField(3, new Date());
+
+ BeamSQLRow row2 = new BeamSQLRow(beamSqlRecordType);
+ row2.addField(0, 12345L);
+ row2.addField(1, 1);
+ row2.addField(2, 20.5);
+ row2.addField(3, new Date());
+
+ BeamSQLRow row3 = new BeamSQLRow(beamSqlRecordType);
+ row3.addField(0, 12345L);
+ row3.addField(1, 0);
+ row3.addField(2, 20.5);
+ row3.addField(3, new Date());
+
+ BeamSQLRow row4 = new BeamSQLRow(beamSqlRecordType);
+ row4.addField(0, null);
+ row4.addField(1, null);
+ row4.addField(2, 20.5);
+ row4.addField(3, new Date());
+
+ return Create.of(row1, row2, row3);
+ }
+
+ @Override
+ public PTransform super PCollection, PDone> buildIOWriter() {
+ return new OutputStore();
+ }
+
+ /**
+ * Keep output in {@code CONTENT} for validation.
+ *
+ */
+ public static class OutputStore extends PTransform, PDone> {
+
+ @Override
+ public PDone expand(PCollection input) {
+ input.apply(ParDo.of(new DoFn() {
+
+ @Setup
+ public void setup() {
+ CONTENT.clear();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ CONTENT.add(c.element().valueInString());
+ }
+
+ @Teardown
+ public void close() {
+
+ }
+
+ }));
+ return PDone.in(input.getPipeline());
+ }
+
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index ef312c19373f7..5749df1fc9bac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -447,12 +447,12 @@
${project.version}
-
+
org.apache.beam
beam-sdks-java-io-hadoop-input-format
- ${project.version}
+ ${project.version}
-
+
org.apache.beam
beam-runners-core-construction-java