Skip to content

Commit

Permalink
Added enum for table's type field (#1453)
Browse files Browse the repository at this point in the history
* added enum for table's type field

Signed-off-by: Nick Nalivaika <ikolina@google.com>

* update version

Signed-off-by: Nick Nalivaika <ikolina@google.com>

* simplify code by filling enumType field for tables when loading compiled graphs

* small fix for biquery adapter

* make linter happy

* fix integration tests

* do not call setOrValidateTableEnumType when check for table validity, just check enumType value

---------

Signed-off-by: Nick Nalivaika <ikolina@google.com>
  • Loading branch information
kolina authored Feb 15, 2023
1 parent 09447c4 commit d5dfab1
Show file tree
Hide file tree
Showing 26 changed files with 188 additions and 69 deletions.
3 changes: 2 additions & 1 deletion api/commands/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export class Builder {
prunedGraph.projectConfig,
prunedGraph.dataformCoreVersion || "1.0.0"
);
prunedGraph.tables.forEach(utils.setOrValidateTableEnumType);
}

public build(): dataform.ExecutionGraph {
Expand Down Expand Up @@ -94,7 +95,7 @@ export class Builder {
return {
...this.toPartialExecutionAction(table),
type: "table",
tableType: table.type,
tableType: utils.tableTypeEnumToString(table.enumType),
tasks: table.disabled
? []
: this.adapter.publishTasks(table, runConfig, tableMetadata).build(),
Expand Down
9 changes: 7 additions & 2 deletions api/commands/compile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import deepmerge from "deepmerge";
import { validWarehouses } from "df/api/dbadapters";
import { coerceAsError, ErrorWithCause } from "df/common/errors/errors";
import { decode64 } from "df/common/protos";
import { setOrValidateTableEnumType } from "df/core/utils";
import { dataform } from "df/protos/ts";

// Project config properties that are required.
Expand Down Expand Up @@ -55,12 +56,16 @@ export async function compile(

const result = await CompileChildProcess.forkProcess().compile(compileConfig);

let compileResult: dataform.CompiledGraph;
if (compileConfig.useMain) {
const decodedResult = decode64(dataform.CoreExecutionResponse, result);
return dataform.CompiledGraph.create(decodedResult.compile.compiledGraph);
compileResult = dataform.CompiledGraph.create(decodedResult.compile.compiledGraph);
} else {
compileResult = decode64(dataform.CompiledGraph, result);
}

return decode64(dataform.CompiledGraph, result);
compileResult.tables.forEach(setOrValidateTableEnumType);
return compileResult;
}

export class CompileChildProcess {
Expand Down
3 changes: 2 additions & 1 deletion api/commands/prune.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export function prune(
compiledGraph: dataform.ICompiledGraph,
runConfig: dataform.IRunConfig
): dataform.ICompiledGraph {
compiledGraph.tables.forEach(utils.setOrValidateTableEnumType);
const includedActionNames = computeIncludedActionNames(compiledGraph, runConfig);
return {
...compiledGraph,
Expand All @@ -28,7 +29,7 @@ function computeIncludedActionNames(
runConfig: dataform.IRunConfig
): Set<string> {
// Remove inline tables.
const filteredTables = compiledGraph.tables.filter(t => t.type !== "inline");
const filteredTables = compiledGraph.tables.filter(t => t.enumType !== dataform.TableType.INLINE);

// Union all tables, operations, assertions.
const allActions: CompileAction[] = [].concat(
Expand Down
4 changes: 3 additions & 1 deletion cli/console.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { IInitResult } from "df/api/commands/init";
import { prettyJsonStringify } from "df/api/utils";
import { setOrValidateTableEnumType, tableTypeEnumToString } from "df/core/utils";
import { dataform } from "df/protos/ts";
import * as readlineSync from "readline-sync";

Expand Down Expand Up @@ -112,10 +113,11 @@ export function printCompiledGraph(graph: dataform.ICompiledGraph, verbose: bool
(graph.operations ? graph.operations.length : 0);
writeStdOut(successOutput(`Compiled ${actionCount} action(s).`));
if (graph.tables && graph.tables.length) {
graph.tables.forEach(setOrValidateTableEnumType);
writeStdOut(`${graph.tables.length} dataset(s):`);
graph.tables.forEach(compiledTable => {
writeStdOut(
`${datasetString(compiledTable.target, compiledTable.type, compiledTable.disabled)}`,
`${datasetString(compiledTable.target, tableTypeEnumToString(compiledTable.enumType), compiledTable.disabled)}`,
1
);
});
Expand Down
17 changes: 9 additions & 8 deletions core/adapters/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as semver from "semver";

import { IAdapter } from "df/core/adapters";
import { Task, Tasks } from "df/core/tasks";
import { tableTypeEnumToString } from "df/core/utils"
import { dataform } from "df/protos/ts";

export abstract class Adapter implements IAdapter {
Expand Down Expand Up @@ -34,15 +35,15 @@ export abstract class Adapter implements IAdapter {
}`;
}

public baseTableType(type: string) {
switch (type) {
case "table":
case "incremental":
public baseTableType(enumType: dataform.TableType) {
switch (enumType) {
case dataform.TableType.TABLE:
case dataform.TableType.INCREMENTAL:
return dataform.TableMetadata.Type.TABLE;
case "view":
case dataform.TableType.VIEW:
return dataform.TableMetadata.Type.VIEW;
default:
throw new Error(`Unexpected table type: ${type}`);
throw new Error(`Unexpected table type: ${tableTypeEnumToString(enumType)}`);
}
}

Expand Down Expand Up @@ -133,7 +134,7 @@ from (${query}) as insertions`;
let preOps = table.preOps;
if (
semver.gt(this.dataformCoreVersion, "1.4.8") &&
table.type === "incremental" &&
table.enumType === dataform.TableType.INCREMENTAL &&
this.shouldWriteIncrementally(runConfig, tableMetadata)
) {
preOps = table.incrementalPreOps;
Expand All @@ -149,7 +150,7 @@ from (${query}) as insertions`;
let postOps = table.postOps;
if (
semver.gt(this.dataformCoreVersion, "1.4.8") &&
table.type === "incremental" &&
table.enumType === dataform.TableType.INCREMENTAL &&
this.shouldWriteIncrementally(runConfig, tableMetadata)
) {
postOps = table.incrementalPostOps;
Expand Down
6 changes: 3 additions & 3 deletions core/adapters/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ export class BigQueryAdapter extends Adapter implements IAdapter {

this.preOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement));

const baseTableType = this.baseTableType(table.type);
const baseTableType = this.baseTableType(table.enumType);
if (tableMetadata && tableMetadata.type !== baseTableType) {
tasks.add(
Task.statement(this.dropIfExists(table.target, this.oppositeTableType(baseTableType)))
);
}

if (table.type === "incremental") {
if (table.enumType === dataform.TableType.INCREMENTAL) {
if (!this.shouldWriteIncrementally(runConfig, tableMetadata)) {
tasks.add(Task.statement(this.createOrReplace(table)));
} else {
Expand Down Expand Up @@ -94,7 +94,7 @@ export class BigQueryAdapter extends Adapter implements IAdapter {
? "materialized "
: ""
}${this.tableTypeAsSql(
this.baseTableType(table.type)
this.baseTableType(table.enumType)
)} ${this.resolveTarget(table.target)} ${
table.bigquery && table.bigquery.partitionBy
? `partition by ${table.bigquery.partitionBy} `
Expand Down
4 changes: 2 additions & 2 deletions core/adapters/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export interface IAdapter {
assertTasks(assertion: dataform.IAssertion, projectConfig: dataform.IProjectConfig): Tasks;

dropIfExists(target: dataform.ITarget, type: dataform.TableMetadata.Type): string;
baseTableType(type: string): dataform.TableMetadata.Type;
baseTableType(enumType: dataform.TableType): dataform.TableMetadata.Type;

indexAssertion(dataset: string, indexCols: string[]): string;
rowConditionsAssertion(dataset: string, rowConditions: string[]): string;
Expand Down Expand Up @@ -135,7 +135,7 @@ export function collectEvaluationQueries(
} else {
try {
if (queryOrAction instanceof dataform.Table) {
if (queryOrAction.type === "incremental") {
if (queryOrAction.enumType === dataform.TableType.INCREMENTAL) {
const incrementalTableQueries = queryOrAction.incrementalPreOps.concat(
queryOrAction.incrementalQuery,
queryOrAction.incrementalPostOps
Expand Down
4 changes: 2 additions & 2 deletions core/adapters/presto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ export class PrestoAdapter extends Adapter implements IAdapter {

this.preOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement));

const baseTableType = this.baseTableType(table.type);
const baseTableType = this.baseTableType(table.enumType);
if (tableMetadata && tableMetadata.type !== baseTableType) {
tasks.add(
Task.statement(this.dropIfExists(table.target, this.oppositeTableType(baseTableType)))
);
}

if (table.type === "incremental") {
if (table.enumType === dataform.TableType.INCREMENTAL) {
throw new Error("Incremental table types are not currently supported for Presto.");
} else {
tasks.add(Task.statement(this.createOrReplace(table)));
Expand Down
10 changes: 5 additions & 5 deletions core/adapters/redshift.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ export class RedshiftAdapter extends Adapter implements IAdapter {

this.preOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement));

const baseTableType = this.baseTableType(table.type);
const baseTableType = this.baseTableType(table.enumType);
if (tableMetadata && tableMetadata.type !== baseTableType) {
tasks.add(
Task.statement(this.dropIfExists(table.target, this.oppositeTableType(baseTableType)))
);
}

if (table.type === "incremental") {
if (table.enumType === dataform.TableType.INCREMENTAL) {
if (!this.shouldWriteIncrementally(runConfig, tableMetadata)) {
tasks.addAll(this.createOrReplace(table));
} else {
Expand Down Expand Up @@ -90,15 +90,15 @@ export class RedshiftAdapter extends Adapter implements IAdapter {
}

private createOrReplace(table: dataform.ITable) {
if (table.type === "view") {
if (table.enumType === dataform.TableType.VIEW) {
const isBindDefined = table.redshift && table.redshift.hasOwnProperty("bind");
const bindDefaultValue = semver.gte(this.dataformCoreVersion, "1.4.1") ? false : true;
const bind =
(isBindDefined ? table.redshift.bind : bindDefaultValue) && this.isBindSupported();
return (
Tasks.create()
// Drop the view in case we are changing the number of column(s) (or their types).
.add(Task.statement(this.dropIfExists(table.target, this.baseTableType(table.type))))
.add(Task.statement(this.dropIfExists(table.target, this.baseTableType(table.enumType))))
.add(Task.statement(this.createOrReplaceView(table.target, table.query, bind)))
);
}
Expand All @@ -108,7 +108,7 @@ export class RedshiftAdapter extends Adapter implements IAdapter {
});

return Tasks.create()
.add(Task.statement(this.dropIfExists(tempTableTarget, this.baseTableType(table.type))))
.add(Task.statement(this.dropIfExists(tempTableTarget, this.baseTableType(table.enumType))))
.add(Task.statement(this.createTable(table, tempTableTarget)))
.add(Task.statement(this.dropIfExists(table.target, dataform.TableMetadata.Type.TABLE)))
.add(
Expand Down
6 changes: 3 additions & 3 deletions core/adapters/snowflake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ export class SnowflakeAdapter extends Adapter implements IAdapter {

this.preOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement));

const baseTableType = this.baseTableType(table.type);
const baseTableType = this.baseTableType(table.enumType);
if (tableMetadata && tableMetadata.type !== baseTableType) {
tasks.add(
Task.statement(this.dropIfExists(table.target, this.oppositeTableType(baseTableType)))
);
}

if (table.type === "incremental") {
if (table.enumType === dataform.TableType.INCREMENTAL) {
if (!this.shouldWriteIncrementally(runConfig, tableMetadata)) {
tasks.add(Task.statement(this.createOrReplace(table)));
} else {
Expand Down Expand Up @@ -80,7 +80,7 @@ export class SnowflakeAdapter extends Adapter implements IAdapter {
}

private createOrReplace(table: dataform.ITable) {
if (table.type === "view") {
if (table.enumType === dataform.TableType.VIEW) {
return this.createOrReplaceView(table.target, table.query, table.snowflake?.secure, table.materialized);
}
return `create or replace ${
Expand Down
8 changes: 4 additions & 4 deletions core/adapters/sqldatawarehouse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ export class SQLDataWarehouseAdapter extends Adapter implements IAdapter {

this.preOps(table, runConfig, tableMetadata).forEach(statement => tasks.add(statement));

const baseTableType = this.baseTableType(table.type);
const baseTableType = this.baseTableType(table.enumType);
if (tableMetadata && tableMetadata.type !== baseTableType) {
tasks.add(
Task.statement(this.dropIfExists(table.target, this.oppositeTableType(baseTableType)))
);
}

if (table.type === "incremental") {
if (table.enumType === dataform.TableType.INCREMENTAL) {
if (!this.shouldWriteIncrementally(runConfig, tableMetadata)) {
tasks.addAll(this.createOrReplace(table, !!tableMetadata));
} else {
Expand Down Expand Up @@ -90,7 +90,7 @@ from (${query}
}

private createOrReplace(table: dataform.ITable, alreadyExists: boolean) {
if (table.type === "view") {
if (table.enumType === dataform.TableType.VIEW) {
return Tasks.create().add(
Task.statement(
`${alreadyExists ? "alter" : "create"} view ${this.resolveTarget(table.target)} as ${
Expand All @@ -105,7 +105,7 @@ from (${query}
});

return Tasks.create()
.add(Task.statement(this.dropIfExists(tempTableTarget, this.baseTableType(table.type))))
.add(Task.statement(this.dropIfExists(tempTableTarget, this.baseTableType(table.enumType))))
.add(Task.statement(this.createTable(table, tempTableTarget)))
.add(Task.statement(this.dropIfExists(table.target, dataform.TableMetadata.Type.TABLE)))
.add(
Expand Down
Loading

0 comments on commit d5dfab1

Please sign in to comment.