Skip to content

Commit

Permalink
Added GeneralObjectCoder and using it as coder for most transforms (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
pabloem authored Jan 6, 2022
1 parent 290d661 commit 31d5ebb
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 34 deletions.
108 changes: 108 additions & 0 deletions sdks/node-ts/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sdks/node-ts/package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"devDependencies": {
"bson": "^4.6.0",
"fast-deep-equal": "^3.1.3",
"@types/mocha": "^9.0.0",
"js-yaml": "^4.1.0",
Expand Down
17 changes: 14 additions & 3 deletions sdks/node-ts/src/apache_beam/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Coder } from './coders/coders'
import { BytesCoder, IterableCoder, KVCoder } from './coders/standard_coders';
import * as util from 'util';
import * as translations from './internal/translations'
import { GeneralObjectCoder } from './coders/js_coders';


// TODO(pabloem): Use something better, hah.
Expand Down Expand Up @@ -211,13 +212,23 @@ export class ParDo extends PTransform {
}

const pcollName = pcollectionName();
// TODO(paboem): How do we infer the proper coder for this transform?. For now we use the same as input.
const inputCoderProto = input.pipeline.proto.components?.coders[input.proto.coderId]!;

// For the ParDo output coder, we use a GeneralObjectCoder, which is a Javascript-specific
// coder to encode the various types that exist in JS.
const outputCoder = new GeneralObjectCoder();
const outputCoderProto = runnerApi.Coder.create({
'spec': runnerApi.FunctionSpec.create({
'urn': GeneralObjectCoder.URN,
'payload': new Uint8Array() // TODO(pabloem): Serialize the GeneralObjectCoder properly.
}),
componentCoderIds: []
})
const outputCoderId = translations.registerPipelineCoder(
inputCoderProto,
outputCoderProto,
input.pipeline.proto.components!
);
input.pipeline.coders[outputCoderId] = new BytesCoder();
input.pipeline.coders[outputCoderId] = outputCoder;

const outputProto = runnerApi.PCollection.create({
'uniqueName': pcollName,
Expand Down
56 changes: 56 additions & 0 deletions sdks/node-ts/src/apache_beam/coders/js_coders.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import * as BSON from 'bson';
import { Writer, Reader } from 'protobufjs';
import {Coder, CODER_REGISTRY, Context} from './coders'
import {BoolCoder, DoubleCoder, StrUtf8Coder, VarIntCoder} from './standard_coders'


export class BsonObjectCoder<T> implements Coder<T> {
static URN = "beam:coder:bsonjs:v1";
encode(element: T, writer: Writer, context: Context) {
const buff = BSON.serialize(element);
writer.bytes(buff);
}
decode(reader: Reader, context: Context): T {
const encoded = reader.bytes();
return BSON.deserialize(encoded) as T;
}
}
CODER_REGISTRY.register(BsonObjectCoder.URN, BsonObjectCoder)

export class GeneralObjectCoder<T> implements Coder<T> {
static URN = "beam:coder:genericobjectjs:v1"
componentCoders = {
'string': new StrUtf8Coder(),
'number': new DoubleCoder(), // TODO(pabloem): What about integers? Do we represent always as doubles?
'object': new BsonObjectCoder(),
'boolean': new BoolCoder()
};

// This is a map of type names to type markers. It maps a type name to its
// marker within a stream.
typeMarkers = {
'string': 'S',
'number': 'N',
'object': 'O',
'boolean': 'B'
}
// This is a map of type markers to type names. It maps a type marker to its
// type name.
markerToTypes = {
'S': 'string',
'N': 'number',
'O': 'object',
'B': 'boolean'
};
encode(element: T, writer: Writer, context: Context) {
const type = typeof element;
writer.string(this.typeMarkers[type])
this.componentCoders[type].encode(element, writer, context);
}
decode(reader: Reader, context: Context): T {
const typeMarker = reader.string()
const type = this.markerToTypes[typeMarker];
return this.componentCoders[type].decode(reader, context);
}
}
CODER_REGISTRY.register(GeneralObjectCoder.URN, GeneralObjectCoder)
61 changes: 60 additions & 1 deletion sdks/node-ts/src/apache_beam/coders/standard_coders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export class KVCoder<K, V> extends FakeCoder<KV<K, V>> {
this.valueCoder = valueCoder;
}
}
CODER_REGISTRY.register(KVCoder.URN, KVCoder);

export class IterableCoder<T> extends FakeCoder<Iterable<T>> {
static URN: string = "beam:coder:iterable:v1";
Expand All @@ -80,4 +81,62 @@ export class IterableCoder<T> extends FakeCoder<Iterable<T>> {
super();
this.elementCoder = elementCoder;
}
}
}
CODER_REGISTRY.register(IterableCoder.URN, IterableCoder);

export class StrUtf8Coder extends FakeCoder<String> {
static URN: string = "beam:coder:string_utf8:v1";
type: string = 'stringutf8coder';
encoder = new TextEncoder();
decoder = new TextDecoder();

constructor() {
super();
}

encode(element: String, writer: Writer, context: Context) {
writer.bytes(this.encoder.encode(element as string));
}

decode(reader: Reader, context: Context): String {
return this.decoder.decode(reader.bytes());
}
}
CODER_REGISTRY.register(StrUtf8Coder.URN, StrUtf8Coder);

export class VarIntCoder extends FakeCoder<Long | Number | BigInt> {
static URN: string = "beam:coder:varint:v1";
encode(element: Number | Long | BigInt, writer: Writer, context: Context) {
writer.uint64(element as number);
}

decode(reader: Reader, context: Context): Long | Number | BigInt {
// TODO(pabloem): How do we deal with large integers?
return reader.uint64().low;
}
}
CODER_REGISTRY.register(VarIntCoder.URN, VarIntCoder);

export class DoubleCoder extends FakeCoder<Number> {
static URN: string = "beam:coder:double:v1";
encode(element: Number, writer: Writer, context: Context) {
writer.double(element as number);
}

decode(reader: Reader, context: Context): Number {
return reader.double();
}
}
CODER_REGISTRY.register(DoubleCoder.URN, DoubleCoder);

export class BoolCoder extends FakeCoder<Boolean> {
static URN: string = "beam:coder:bool:v1";
encode(element: Boolean, writer: Writer, context: Context) {
writer.bool(element as boolean);
}

decode(reader: Reader, context: Context): Boolean {
return reader.bool();
}
}
CODER_REGISTRY.register(BoolCoder.URN, BoolCoder);
14 changes: 11 additions & 3 deletions sdks/node-ts/src/apache_beam/transforms/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as runnerApi from '../proto/beam_runner_api';
import { BytesCoder, KVCoder } from "../coders/standard_coders";

import { GroupByKey } from '../base'
import { GeneralObjectCoder } from "../coders/js_coders";

export class GroupBy extends PTransform {
keyFn: (element: any) => any;
Expand All @@ -21,11 +22,18 @@ export class GroupBy extends PTransform {

const inputCoderId = (input as PCollection).proto.coderId;

// TODO(pabloem): Find the appropriate coder for the key
const keyCoder = new GeneralObjectCoder();
const keyCoderProto = runnerApi.Coder.create({
'spec': runnerApi.FunctionSpec.create({
'urn': GeneralObjectCoder.URN,
'payload': new Uint8Array() // TODO(pabloem): Serialize the GeneralObjectCoder properly.
}),
componentCoderIds: []
})
const keyCoderId = translations.registerPipelineCoder(
runnerApi.Coder.create({ 'spec': runnerApi.FunctionSpec.create({ 'urn': BytesCoder.URN }), }),
keyCoderProto,
input.pipeline.proto.components!);
input.pipeline.coders[keyCoderId] = new BytesCoder();
input.pipeline.coders[keyCoderId] = keyCoder;

const kvCoderProto = runnerApi.Coder.create({
'spec': runnerApi.FunctionSpec.create({ 'urn': KVCoder.URN }),
Expand Down
24 changes: 0 additions & 24 deletions sdks/node-ts/test/core_test.ts
Original file line number Diff line number Diff line change
@@ -1,24 +0,0 @@
import * as beam from '../src/apache_beam';
import * as assert from 'assert';
import { BytesCoder } from '../src/apache_beam/coders/standard_coders';
// TODO(pabloem): Fix installation.

describe("core module", function() {
describe("runs a basic impulse expansion", function() {
it("runs a basic Impulse expansion", function() {
var p = new beam.Pipeline();
var res = p.apply(new beam.Impulse());

assert.equal(res.type, "pcollection");
assert.deepEqual(p.coders[res.proto.coderId], new BytesCoder());
});
it("runs a ParDo expansion", function() {
var p = new beam.Pipeline();
var res = p.apply(new beam.Impulse())
.apply(new beam.ParDo(function(v) {return v*2;}));
});
it("runs a GroupBy expansion", function() {

});
});
});
Loading

0 comments on commit 31d5ebb

Please sign in to comment.