From faeeee592be824187c4e6b6df3358585fce70a07 Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Wed, 5 Jan 2022 16:54:17 -0800 Subject: [PATCH] Starting the proto translation of Impulse, ParDo, GBK --- sdks/node-ts/package.json | 1 + sdks/node-ts/src/apache_beam/base.ts | 294 ++++++++++++++++++ .../src/apache_beam/coders/standard_coders.ts | 27 +- sdks/node-ts/src/apache_beam/core.ts | 173 ----------- sdks/node-ts/src/apache_beam/index.ts | 3 +- .../src/apache_beam/internal/translations.ts | 22 +- .../src/apache_beam/transforms/core.ts | 36 +++ sdks/node-ts/test/core_test.ts | 26 +- sdks/node-ts/test/primitives_test.ts | 34 ++ 9 files changed, 433 insertions(+), 183 deletions(-) create mode 100644 sdks/node-ts/src/apache_beam/base.ts delete mode 100644 sdks/node-ts/src/apache_beam/core.ts create mode 100644 sdks/node-ts/src/apache_beam/transforms/core.ts create mode 100644 sdks/node-ts/test/primitives_test.ts diff --git a/sdks/node-ts/package.json b/sdks/node-ts/package.json index c9e3fa0f3538d..41da6ff714a12 100644 --- a/sdks/node-ts/package.json +++ b/sdks/node-ts/package.json @@ -1,5 +1,6 @@ { "devDependencies": { + "fast-deep-equal": "^3.1.3", "@types/mocha": "^9.0.0", "js-yaml": "^4.1.0", "mocha": "^9.1.3", diff --git a/sdks/node-ts/src/apache_beam/base.ts b/sdks/node-ts/src/apache_beam/base.ts new file mode 100644 index 0000000000000..66165998686b3 --- /dev/null +++ b/sdks/node-ts/src/apache_beam/base.ts @@ -0,0 +1,294 @@ +import * as runnerApi from './proto/beam_runner_api'; +import { BytesCoder, Coder, IterableCoder, KVCoder } from './coders/standard_coders'; +import * as util from 'util'; +import * as translations from './internal/translations' + + +// TODO(pabloem): Use something better, hah. +var _pcollection_counter = -1; + +export function pcollectionName() { + _pcollection_counter += 1; + return 'ref_PCollection_' + _pcollection_counter; +} + +var _transform_counter = -1; +export function transformName() { + _transform_counter += 1; + return 'transform_' + _transform_counter; +} + +/** + * Represents an 'edge' in a graph. These may be PCollections, PCollection views, + * and Pipelines themselves. + */ +class PValue { + // TODO: Have a reference to its graph representation + type: string = "unknown"; + name: string; + pipeline: Pipeline; + + constructor(name: string) { + this.name = name; + } + + isPipeline(): boolean { + return this.type === "pipeline"; + } + + // TODO(pabloem): What about multiple outputs? (not strictly necessary ATM. Can do with Filters) + apply(transform: PTransform): PCollection { + const outPcoll = transform.expand(this); + if (outPcoll.type !== 'pcollection') { + throw new Error(util.format('Trahsform %s does not return a PCollection', transform)); + } + return outPcoll as PCollection; + } + + map(callable: DoFn | GenericCallable): PCollection { + return this.apply(new ParDo(callable)); + } + + // Top-level functions: + // - flatMap + // - filter? +} + +/** + * A Pipeline is the base object to start building a Beam DAG. It is the + * first object that a user creates, and then they may start applying + * transformations to it to build a DAG. + */ +export class Pipeline extends PValue { + type: string = "pipeline"; + proto: runnerApi.Pipeline; + + // A map of coder ID to Coder object + coders: {[key: string]: Coder} = {} + + constructor() { + super("root"); + this.proto = runnerApi.Pipeline.create( + {'components': runnerApi.Components.create()} + ); + this.pipeline = this; + } +} + +export class PCollection extends PValue { + type: string = "pcollection"; + proto: runnerApi.PCollection; + + constructor(name: string, pcollectionProto: runnerApi.PCollection, pipeline: Pipeline) { + super(name) + this.proto = pcollectionProto; + this.pipeline = pipeline; + } +} + +export class PTransform { + expand(input: PValue): PValue { + throw new Error('Method expand has not been implemented.'); + } +} + +export class DoFn { + type: string = "dofn"; + process(element: any) { + throw new Error('Method process has not been implemented!'); + } + + startBundle(element: any) { + throw new Error('Method process has not been implemented!'); + } + + finishBundle(element: any) { + throw new Error('Method process has not been implemented!'); + } +} + +export interface GenericCallable { + (input: any): any +} + + + export class Impulse extends PTransform { + // static urn: string = runnerApi.StandardPTransforms_Primitives.IMPULSE.urn; + // TODO: use above line, not below line. + static urn: string = "beam:transform:impulse:v1"; + expand(input: Pipeline): PCollection { + if (!input.isPipeline()) { + throw new Error("User is attempting to apply Impulse transform to a non-pipeline object."); + } + const pipeline = input as Pipeline; + + const pcollName = pcollectionName(); + + const coderId = translations.registerPipelineCoder(runnerApi.Coder.create({'spec': runnerApi.FunctionSpec.create({'urn': BytesCoder.URN})}), pipeline.proto.components!); + pipeline.coders[coderId] = new BytesCoder(); + + const outputProto = runnerApi.PCollection.create({ + 'uniqueName': pcollName, + 'coderId': coderId, + 'isBounded': runnerApi.IsBounded_Enum.BOUNDED + }); + + const impulseProto = runnerApi.PTransform.create({ + // TODO(pabloem): Get the name for the PTransform + 'uniqueName': 'todouniquename', + 'spec': runnerApi.FunctionSpec.create({ + 'urn': translations.DATA_INPUT_URN, + 'payload': translations.IMPULSE_BUFFER + }), + 'outputs': {'out': pcollName} + }); + + return new PCollection(impulseProto.outputs.out, outputProto, input.pipeline); + } + } + + /** + * @returns true if the input is a `DoFn`. + * + * Since type information is lost at runtime, we check the object's attributes + * to determine whether it's a DoFn or not. + * + * @example + * Prints "true" for a new `DoFn` but "false" for a function: + * ```ts + * console.log(new DoFn()); + * console.log(in => in * 2)); + * ``` + * @param callableOrDoFn + * @returns + */ + function isDoFn(callableOrDoFn: DoFn | GenericCallable) { + const df = (callableOrDoFn as DoFn) + if (df.type !== undefined && df.type === "dofn") { + return true; + } else { + return false; + } + } + + export class ParDo extends PTransform { + static _CallableWrapperDoFn = class extends DoFn { + private fn; + constructor(fn: GenericCallable) { + super(); + this.fn = fn; + } + process(element: any) { + return this.fn(element); + } + } + + private doFn; + // static urn: string = runnerApi.StandardPTransforms_Primitives.PAR_DO.urn; + // TODO: use above line, not below line. + static urn: string = "beam:transform:pardo:v1"; + constructor(callableOrDoFn: DoFn | GenericCallable) { + super() + if (isDoFn(callableOrDoFn)) { + this.doFn = callableOrDoFn; + } else { + this.doFn = new ParDo._CallableWrapperDoFn(callableOrDoFn as GenericCallable); + } + } + + expand(input: PCollection): PCollection { + + if (input.type !== 'pcollection') { + throw new Error('ParDo received the wrong input.'); + } + + const pcollName = pcollectionName(); + + const outputProto = runnerApi.PCollection.create({ + 'uniqueName': pcollName, + 'coderId': BytesCoder.URN, // TODO: Get coder URN + 'isBounded': runnerApi.IsBounded_Enum.BOUNDED + }); + + const inputPCollName = (input as PCollection).proto.uniqueName; + + // TODO(pabloem): Get the name for the PTransform + const pardoName = 'todouniquename'; + const inputId = pardoName + '1'; + + const pardoProto = runnerApi.PTransform.create({ + 'uniqueName': pardoName, + 'spec': runnerApi.FunctionSpec.create({ + 'urn': ParDo.urn, + 'payload': runnerApi.ParDoPayload.toBinary( + runnerApi.ParDoPayload.create({ + 'doFn': runnerApi.FunctionSpec.create({ + 'urn': translations.SERIALIZED_JS_DOFN_INFO, + 'payload': new Uint8Array() + }) + })) + }), + 'inputs': {inputId: inputPCollName}, + 'outputs': {'out': pcollName} + }); + + // TODO(pablom): Do this properly + return new PCollection(pardoProto.outputs.out, outputProto, input.pipeline); + } + } + + // TODO(pabloem): Consider not exporting the GBK + export class GroupByKey extends PTransform { + // static urn: string = runnerApi.StandardPTransforms_Primitives.GROUP_BY_KEY.urn; + // TODO: use above line, not below line. + static urn: string = "beam:transform:group_by_key:v1"; + + expand(input: PCollection): PCollection { + const inputPCollectionProto: runnerApi.PCollection = input.type == 'pcollection' ? (input as PCollection).proto : undefined!; + if (inputPCollectionProto === undefined) { + throw new Error('Input is not a PCollection object.'); + } + + const pipelineComponents: runnerApi.Components = input.pipeline.proto.components!; + + const keyCoderId = pipelineComponents.coders[inputPCollectionProto.coderId].componentCoderIds[0]; + const valueCoderId = pipelineComponents.coders[inputPCollectionProto.coderId].componentCoderIds[1]; + + const iterableValueCoderProto = runnerApi.Coder.create({ + 'spec': {'urn': IterableCoder.URN,}, + 'componentCoderIds': [valueCoderId] + }); + const iterableValueCoderId = translations.registerPipelineCoder(iterableValueCoderProto, pipelineComponents)!; + const iterableValueCoder = new IterableCoder(input.pipeline.coders[valueCoderId]); + input.pipeline.coders[iterableValueCoderId] = iterableValueCoder; + + const outputCoderProto = runnerApi.Coder.create({ + 'spec': runnerApi.FunctionSpec.create({'urn': KVCoder.URN}), + 'componentCoderIds': [keyCoderId, iterableValueCoderId] + }) + const outputPcollCoderId = translations.registerPipelineCoder(outputCoderProto, pipelineComponents)!; + + const outputPCollectionProto = runnerApi.PCollection.create({ + 'uniqueName': pcollectionName(), + 'isBounded': inputPCollectionProto.isBounded, + 'coderId': outputPcollCoderId + }); + pipelineComponents.pcollections[outputPCollectionProto.uniqueName] = outputPCollectionProto; + + const ptransformProto = runnerApi.PTransform.create({ + 'uniqueName': 'TODO NAME', + 'spec': runnerApi.FunctionSpec.create({ + 'urn': GroupByKey.urn, + 'payload': null! // TODO(GBK payload????) + }), + 'outputs': {'out': outputPCollectionProto.uniqueName} + }); + pipelineComponents.transforms[ptransformProto.uniqueName] = ptransformProto; + + input.pipeline.coders[outputPcollCoderId] = new KVCoder( + input.pipeline.coders[keyCoderId], + input.pipeline.coders[iterableValueCoderId]); + + return new PCollection(outputPCollectionProto.uniqueName, outputPCollectionProto, input.pipeline); + } + } \ No newline at end of file diff --git a/sdks/node-ts/src/apache_beam/coders/standard_coders.ts b/sdks/node-ts/src/apache_beam/coders/standard_coders.ts index e985a98e1d58b..851ee058055a3 100644 --- a/sdks/node-ts/src/apache_beam/coders/standard_coders.ts +++ b/sdks/node-ts/src/apache_beam/coders/standard_coders.ts @@ -44,4 +44,29 @@ export class BytesCoder extends Coder { return element; } } -CODER_REGISTRY.register(BytesCoder.URN, BytesCoder); \ No newline at end of file +CODER_REGISTRY.register(BytesCoder.URN, BytesCoder); + +export class KVCoder extends Coder { + static URN: string = "beam:coder:kvcoder:v1"; + type: string = 'kvcoder'; + + keyCoder: Coder; + valueCoder: Coder; + + constructor(keyCoder: Coder, valueCoder: Coder) { + super(); + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + } +} + +export class IterableCoder extends Coder { + static URN: string = "beam:coder:iterable:v1"; + type: string = 'iterablecoder'; + + elementCoder: Coder; + constructor(elementCoder: Coder) { + super(); + this.elementCoder = elementCoder; + } +} \ No newline at end of file diff --git a/sdks/node-ts/src/apache_beam/core.ts b/sdks/node-ts/src/apache_beam/core.ts deleted file mode 100644 index 7b7576227e02f..0000000000000 --- a/sdks/node-ts/src/apache_beam/core.ts +++ /dev/null @@ -1,173 +0,0 @@ -const runnerApi = require('./proto/beam_runner_api'); -const translations = require('./internal/translations') - - -// TODO(pabloem): Use something better, hah. -var _pcollection_counter = 0; - -/** - * Represents an 'edge' in a graph. These may be PCollections, PCollection views, - * and Pipelines themselves. - */ -class PValue { - // TODO: Have a reference to its graph representation - type: string = "unknown"; - name: string; - - constructor(name: string) { - this.name = name; - } - - isPipeline(): boolean { - return this.type === "pipeline"; - } - - apply(transform: PTransform): PValue { - return transform.expand(this); - } - - map(callable: DoFn | GenericCallable): PValue { - return this.apply(new ParDo(callable)); - } - - // Top-level functions: - // - flatMap - // - filter? -} - -/** - * A Pipeline is the base object to start building a Beam DAG. It is the - * first object that a user creates, and then they may start applying - * transformations to it to build a DAG. - */ -export class Pipeline extends PValue { - type: string = "pipeline"; - portablePipeline: any; -} - -export class PCollection extends PValue { - type: string = "pcollection"; - portablePcollection: any; - - constructor(name: string, portablePcollection: any) { - super(name) - this.portablePcollection = portablePcollection; - } -} - -export class PTransform { - expand(input: PValue): PValue { - throw new Error('Method expand has not been implemented.'); - } -} - - -/** - * @returns true if the input is a `DoFn`. - * - * Since type information is lost at runtime, we check the object's attributes - * to determine whether it's a DoFn or not. - * - * @example - * Prints "true" for a new `DoFn` but "false" for a function: - * ```ts - * console.log(new DoFn()); - * console.log(in => in * 2)); - * ``` - * @param callableOrDoFn - * @returns - */ -function isDoFn(callableOrDoFn: DoFn | GenericCallable) { - const df = (callableOrDoFn as DoFn) - if (df.type !== undefined && df.type === "dofn") { - return true; - } else { - return false; - } -} - -export class ParDo extends PTransform { - private doFn; - constructor(callableOrDoFn: DoFn | GenericCallable) { - super() - if (isDoFn(callableOrDoFn)) { - this.doFn = callableOrDoFn; - } else { - this.doFn = new _CallableWrapperDoFn(callableOrDoFn as GenericCallable); - } - } - - expand(input: PValue): PValue { - console.log(runnerApi.StandardPTransforms_Primitives.PAR_DO.urn); - - const pardoProto = runnerApi.PTransform.create({ - // TODO(pabloem): Get the name for the PTransform - 'uniqueName': 'todouniquename', - 'spec': runnerApi.FunctionSpec.create({ - // TODO(pabloem): URNS ARE DISAPPEARING! - 'urn': runnerApi.StandardPTransforms_Primitives.PAR_DO.urn, - 'payload': runnerApi.ParDoPayload.create({ - 'doFn': runnerApi.FunctionSpec.create() - }) - }), - // TODO(pabloem): Add inputs - 'inputs': {}, - 'outputs': {'out': 'ref_PCollection_' + _pcollection_counter} - }); - _pcollection_counter += 1; - - // TODO(pablom): Do this properly - return new PCollection(pardoProto.outputs.out, pardoProto); - } -} - -interface GenericCallable { - (input: any): any -} - -export class DoFn { - type: string = "dofn"; - process(element: any) { - throw new Error('Method process has not been implemented!'); - } - - startBundle(element: any) { - throw new Error('Method process has not been implemented!'); - } - - finishBundle(element: any) { - throw new Error('Method process has not been implemented!'); - } -} - -class _CallableWrapperDoFn extends DoFn { - private fn; - constructor(fn: GenericCallable) { - super(); - this.fn = fn; - } - process(element: any) { - return this.fn(element); - } -} - -export class Impulse extends PTransform { - expand(input: PValue): PValue { - if (!input.isPipeline()) { - throw new Error("User is attempting to apply Impulse transform to a non-pipeline object."); - } - - const impulseProto = runnerApi.PTransform.create({ - // TODO(pabloem): Get the name for the PTransform - 'uniqueName': 'todouniquename', - 'spec': runnerApi.FunctionSpec.create({ - 'urn': translations.DATA_INPUT_URN, - 'payload': translations.IMPULSE_BUFFER - }), - 'outputs': {'out': 'ref_PCollection_' + _pcollection_counter} - }); - _pcollection_counter += 1; - - return new PCollection(impulseProto.outputs.out, impulseProto); - } -} \ No newline at end of file diff --git a/sdks/node-ts/src/apache_beam/index.ts b/sdks/node-ts/src/apache_beam/index.ts index 46d458ad7fd8f..ecc6e1c4d3354 100644 --- a/sdks/node-ts/src/apache_beam/index.ts +++ b/sdks/node-ts/src/apache_beam/index.ts @@ -1 +1,2 @@ -export * from './core' +export * from './base' +export * from './transforms/core' diff --git a/sdks/node-ts/src/apache_beam/internal/translations.ts b/sdks/node-ts/src/apache_beam/internal/translations.ts index 5925c1f8e78ba..e008b98bf2bd3 100644 --- a/sdks/node-ts/src/apache_beam/internal/translations.ts +++ b/sdks/node-ts/src/apache_beam/internal/translations.ts @@ -1,6 +1,26 @@ +import * as runnerApi from '../proto/beam_runner_api'; +import equal from 'fast-deep-equal' export const IMPULSE_BUFFER = new TextEncoder().encode("impulse"); export const DATA_INPUT_URN = 'beam:runner:source:v1'; export const DATA_OUTPUT_URN = 'beam:runner:sink:v1'; -export const IDENTITY_DOFN_URN = 'beam:dofn:identity:0.1'; \ No newline at end of file +export const IDENTITY_DOFN_URN = 'beam:dofn:identity:0.1'; + +export const SERIALIZED_JS_DOFN_INFO = "beam:dofn:serialized_js_dofn_info:v1"; + +let _coder_counter = 0; +const _CODER_ID_PREFIX = 'coder_'; + +export function registerPipelineCoder(coderProto: runnerApi.Coder, pipelineComponents: runnerApi.Components) { + for (const coderId of Object.keys(pipelineComponents.coders)) { + const existingCoder = pipelineComponents.coders[coderId]; + if (equal(existingCoder, coderProto) && coderId != undefined) { + return coderId; + } + } + const newCoderId = _CODER_ID_PREFIX + _coder_counter; + _coder_counter += 1; + pipelineComponents.coders[newCoderId] = coderProto; + return newCoderId; +} \ No newline at end of file diff --git a/sdks/node-ts/src/apache_beam/transforms/core.ts b/sdks/node-ts/src/apache_beam/transforms/core.ts new file mode 100644 index 0000000000000..76e26d23bb8e3 --- /dev/null +++ b/sdks/node-ts/src/apache_beam/transforms/core.ts @@ -0,0 +1,36 @@ +import { PTransform, PCollection } from "../base"; +import * as translations from '../internal/translations'; +import * as runnerApi from '../proto/beam_runner_api'; +import { BytesCoder, KVCoder } from "../coders/standard_coders"; + +import {GroupByKey} from '../base' + +export class GroupBy extends PTransform { + keyFn: (element: any) => any; + constructor(key: string | ((element: any) => any)) { + super(); + if ((key as (element: any) => any).call !== undefined) { + this.keyFn = key as (element: any) => any; + } else { + this.keyFn = function(x) { return x[key as string]; }; + } + } + + expand(input: PCollection): PCollection { + let kvPcoll = input.map(function (x) { return {'key': this.keyFn(x), 'value': x};}); + + const inputCoderId = (input as PCollection).proto.coderId; + + // TODO(pabloem): Find the appropriate coder for the key + const keyCoderId = translations.registerPipelineCoder( + runnerApi.Coder.create({'spec': runnerApi.FunctionSpec.create({'urn': BytesCoder.URN}),}), + input.pipeline.proto.components!); + + const kvCoderProto = runnerApi.Coder.create({'spec': runnerApi.FunctionSpec.create({'urn': KVCoder.URN})}) + const kvCoderId = translations.registerPipelineCoder(kvCoderProto, input.pipeline.proto.components!); + + kvPcoll.proto.coderId = kvCoderId; + + return kvPcoll.apply(new GroupByKey()); + } +} \ No newline at end of file diff --git a/sdks/node-ts/test/core_test.ts b/sdks/node-ts/test/core_test.ts index 10d42a8c36f18..e58dddc380183 100644 --- a/sdks/node-ts/test/core_test.ts +++ b/sdks/node-ts/test/core_test.ts @@ -1,12 +1,24 @@ import * as beam from '../src/apache_beam'; +import * as assert from 'assert'; +import { BytesCoder } from '../src/apache_beam/coders/standard_coders'; +// TODO(pabloem): Fix installation. describe("core module", function() { - describe("basic ptransform", function() { - it("runs a basic expansion", function() { - var p = new beam.Pipeline('pipeline-name'); - var res2 = p.apply(new beam.Impulse()) - .apply(new beam.ParDo(function(v) {return v*2;})); - console.log(res2); + describe("runs a basic impulse expansion", function() { + it("runs a basic Impulse expansion", function() { + var p = new beam.Pipeline(); + var res = p.apply(new beam.Impulse()); + + assert.equal(res.type, "pcollection"); + assert.deepEqual(p.coders[res.proto.coderId], new BytesCoder()); + }); + it("runs a ParDo expansion", function() { + var p = new beam.Pipeline(); + var res = p.apply(new beam.Impulse()) + .apply(new beam.ParDo(function(v) {return v*2;})); + }); + it("runs a GroupBy expansion", function() { + }); - }); + }); }); \ No newline at end of file diff --git a/sdks/node-ts/test/primitives_test.ts b/sdks/node-ts/test/primitives_test.ts new file mode 100644 index 0000000000000..bf9d6df8003de --- /dev/null +++ b/sdks/node-ts/test/primitives_test.ts @@ -0,0 +1,34 @@ +import * as beam from '../src/apache_beam'; +import * as assert from 'assert'; +import { BytesCoder, KVCoder } from '../src/apache_beam/coders/standard_coders'; +import {GroupBy} from '../src/apache_beam/transforms/core' +// TODO(pabloem): Fix installation. + +describe("primitives module", function() { + describe("runs a basic impulse expansion", function() { + it("runs a basic Impulse expansion", function() { + var p = new beam.Pipeline(); + var res = p.apply(new beam.Impulse()); + + assert.equal(res.type, "pcollection"); + assert.deepEqual(p.coders[res.proto.coderId], new BytesCoder()); + }); + it("runs a ParDo expansion", function() { + var p = new beam.Pipeline(); + var res = p.apply(new beam.Impulse()) + .apply(new beam.ParDo(function(v) {return v*2;})); + + assert.equal(res.type, "pcollection"); + }); + // it("runs a GroupBy expansion", function() { + // var p = new beam.Pipeline(); + // var res = p.apply(new beam.Impulse()) + // .apply(new beam.ParDo(function(v) {return {"name": "pablo", "lastName": "wat"};})) + // .apply(new GroupBy("lastName")); + + // const coder = p.coders[res.proto.coderId]; + + // assert.deepEqual(coder, new KVCoder(new BytesCoder(), new BytesCoder())); + // }); + }); +}); \ No newline at end of file