Skip to content

Commit

Permalink
BytesCoder with some failures
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Jan 6, 2022
1 parent c11a651 commit 290d661
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 176 deletions.
3 changes: 2 additions & 1 deletion sdks/node-ts/src/apache_beam/base.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as runnerApi from './proto/beam_runner_api';
import { BytesCoder, Coder, IterableCoder, KVCoder } from './coders/standard_coders';
import { Coder } from './coders/coders'
import { BytesCoder, IterableCoder, KVCoder } from './coders/standard_coders';
import * as util from 'util';
import * as translations from './internal/translations'

Expand Down
32 changes: 32 additions & 0 deletions sdks/node-ts/src/apache_beam/coders/coders.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Writer, Reader } from 'protobufjs';

interface Class<T> {
new(...args: any[]): T;
}

class CoderRegistry {
internal_registry = {};
get(urn: string): Coder<any> {
const constructor: Class<Coder<any>> = this.internal_registry[urn];
if (constructor === undefined) {
return null!;
}
return new constructor();
}

register(urn: string, coderClass: Class<Coder<any>>) {
this.internal_registry[urn] = coderClass;
}
}
export const CODER_REGISTRY = new CoderRegistry();

export enum Context {
wholeStream = "wholeStream",
needsDelimiters = "needsDelimiters"
}

export interface Coder<T> {
encode(element: T, writer: Writer, context: Context);

decode(reader: Reader, context: Context): T;
}
73 changes: 38 additions & 35 deletions sdks/node-ts/src/apache_beam/coders/standard_coders.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,8 @@
import { Writer, Reader } from 'protobufjs';

interface Class<T> {
new(...args: any[]): T;
}

class CoderRegistry {
internal_registry = {};
get(urn: string): Coder<any> {
const constructor: Class<Coder<any>> = this.internal_registry[urn];
if (constructor === undefined) {
return null!;
}
return new constructor();
}

register(urn: string, coderClass: Class<Coder<any>>) {
this.internal_registry[urn] = coderClass;
}
}
export const CODER_REGISTRY = new CoderRegistry();

export enum Context {
selfDelimiting,
needsDelimiters
}

export interface Coder<T> {
encode(element: T, writer: Writer, context: Context);

decode(reader: Reader, context: Context): T;
}
import { Coder, Context, CODER_REGISTRY } from "./coders";

class FakeCoder<T> implements Coder<T> {
encode(element: T, writer: Writer) {
encode(value: T, writer: Writer, context: Context) {
throw new Error('Not implemented!');
}

Expand All @@ -41,10 +11,43 @@ class FakeCoder<T> implements Coder<T> {
}
}

export class BytesCoder extends FakeCoder<ArrayBuffer> {
export class BytesCoder implements Coder<Uint8Array> {
static URN: string = "beam:coder:bytes:v1";
constructor() {
super();

encode(value: Uint8Array, writer: Writer, context: Context) {
var writeBytes =
function writeBytes_for(val, buf, pos) {
for (var i = 0; i < val.length; ++i)
buf[pos + i] = val[i];
};

var len = value.length;
var hackedWriter = <any> writer;
switch (context) {
case Context.wholeStream:
hackedWriter._push(writeBytes, len, value);
break;
case Context.needsDelimiters:
writer.int32(len)
hackedWriter._push(writeBytes, len, value);
break;
default:
throw new Error("Unknown type of encoding context");
}
}

decode(reader: Reader, context: Context): Uint8Array {
switch (context) {
case Context.wholeStream:
return reader.buf;
break;
case Context.needsDelimiters:
var length = reader.int32();
var value = reader.buf.slice(reader.pos, reader.pos + length)
return value;
default:
throw new Error("Unknown type of decoding context");
}
}
}
CODER_REGISTRY.register(BytesCoder.URN, BytesCoder);
Expand Down
Loading

0 comments on commit 290d661

Please sign in to comment.