Skip to content

Commit

Permalink
chore: convert view to TypeScript (#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter authored May 2, 2022
1 parent a4cfa7c commit 3c472a4
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 176 deletions.
71 changes: 37 additions & 34 deletions sdk/src/view-support.js → sdk/src/view-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,41 @@
* limitations under the License.
*/

const path = require('path');
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
import * as path from 'path';
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import AnySupport from './protobuf-any';
import { Metadata } from './metadata';
import { ServiceMap } from './kalix';
import View from './view';
import * as proto from '../proto/protobuf-bundle';

const debug = require('debug')('kalix-view');
// Bind to stdout
debug.log = console.log.bind(console);
const AnySupport = require('./protobuf-any');
const { Metadata } = require('./metadata');

module.exports = class ViewServices {
namespace protocol {
export type StreamIn = proto.kalix.component.view.IViewStreamIn;
export type StreamOut = proto.kalix.component.view.IViewStreamOut;
export type Call = grpc.ServerDuplexStream<StreamIn, StreamOut>;
}

class ViewServices {
private services: { [serviceName: string]: View };

constructor() {
this.services = {};
}

addService(component, allComponents) {
addService(component: View, _allComponents: ServiceMap): void {
this.services[component.serviceName] = component;
}

componentType() {
componentType(): string {
return 'kalix.component.view.Views';
}

register(server) {
register(server: grpc.Server): void {
// Nothing to register
const includeDirs = [
path.join(__dirname, '..', 'proto'),
Expand All @@ -53,15 +64,16 @@ module.exports = class ViewServices {
);
const grpcDescriptor = grpc.loadPackageDefinition(packageDefinition);

const viewService = grpcDescriptor.kalix.component.view.Views.service;
const viewService = (grpcDescriptor as any).kalix.component.view.Views
.service;

server.addService(viewService, {
handle: this.handle.bind(this),
});
}

handle(call) {
const failAndEndCall = function (description) {
handle(call: protocol.Call): void {
const failAndEndCall = function (_description: string): void {
// FIXME no failure reporting in protocol and this does not reach the proxy as a failure
/*
call.write({
Expand All @@ -73,46 +85,37 @@ module.exports = class ViewServices {
call.end();
};

call.on('data', (viewStreamIn) => {
call.on('data', (viewStreamIn: protocol.StreamIn) => {
// FIXME: It is currently only implemented to support one request (ReceiveEvent) with one response (Upsert).
// see https://github.com/lightbend/kalix-proxy/issues/186
// and https://github.com/lightbend/kalix-proxy/issues/187
if (viewStreamIn.receive) {
if (viewStreamIn.receive?.serviceName) {
const receiveEvent = viewStreamIn.receive,
service = this.services[receiveEvent.serviceName];
if (service) {
service = this.services[viewStreamIn.receive.serviceName];
if (service && service.updateHandlers && receiveEvent.commandName) {
const updateHandler =
service.updateHandlers[receiveEvent.commandName];
if (updateHandler) {
try {
const anySupport = new AnySupport(service.root),
metadata = new Metadata(
receiveEvent.metadata ? receiveEvent.metadata.entries : [],
),
metadata = Metadata.fromProtocol(receiveEvent.metadata),
payload = anySupport.deserialize(receiveEvent.payload),
existingState = receiveEvent.bySubjectLookupResult
? anySupport.deserialize(
receiveEvent.bySubjectLookupResult.value,
)
: undefined,
grpcMethod = service.service.methods[receiveEvent.commandName],
/**
* Context for a view update event.
*
* @interface module:kalix.View.UpdateHandlerContext
* @property {module:kalix.Metadata} metadata for the event
* @property {string} commandName
*/
context = {
context: View.UpdateHandlerContext = {
viewId: service.options.viewId,
eventSubject: receiveEvent.metadata['ce-subject'],
eventSubject: metadata.cloudevent.subject,
metadata: metadata,
commandName: receiveEvent.commandName,
};
const result = updateHandler(payload, existingState, context);
if (result) {
const resultProto =
grpcMethod.resolvedResponseType.create(result),
grpcMethod.resolvedResponseType!.create(result),
resultPayload = AnySupport.serialize(
resultProto,
false,
Expand All @@ -121,8 +124,6 @@ module.exports = class ViewServices {
call.write({
upsert: {
row: {
index: receiveEvent.initialTable,
key: receiveEvent.key,
value: resultPayload,
},
},
Expand Down Expand Up @@ -159,10 +160,10 @@ module.exports = class ViewServices {
} else {
console.error(
"Received event for unknown service: '%s'",
viewStreamIn.init.serviceName,
viewStreamIn.receive.serviceName,
);
failAndEndCall(
"Service '" + viewStreamIn.init.serviceName + "' unknown.",
"Service '" + viewStreamIn.receive.serviceName + "' unknown.",
);
}
} else {
Expand All @@ -175,4 +176,6 @@ module.exports = class ViewServices {
call.end();
});
}
};
}

export = ViewServices;
142 changes: 0 additions & 142 deletions sdk/src/view.js

This file was deleted.

100 changes: 100 additions & 0 deletions sdk/src/view.jsdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2021 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Context for a view update event.
*
* @interface module:kalix.View.UpdateHandlerContext
* @property {module:kalix.Metadata} metadata for the event
* @property {string} commandName
*/

/**
* Options for a view.
*
* @typedef module:kalix.View~options
* @property {string} [viewId=serviceName] The id for the view, used for persisting the view.
* @property {array<string>} [includeDirs=["."]] The directories to include when looking up imported protobuf files.
*/

/**
* View handlers
* The names of the properties must match the names of all the view methods specified in the gRPC
* descriptor.
*
* @typedef module:kalix.View~handlers
* @type {Object<string, module:kalix.View~handler>}
*/

/**
* A handler for transforming an incoming event and the previous view state into a new state
*
* @callback module:kalix.View~handler
* @param {Object} event The event, this will be of the type of the gRPC event handler input type.
* @param {undefined|module:kalix.Serializable} state The previous view state or 'undefined' if no previous state was stored.
* @param {module:kalix.View.UpdateHandlerContext} context The view handler context.
* @returns {undefined|module:kalix.Serializable} The state to store in the view or undefined to not update/store state for the event
*/

/**
* @classdesc Create a new view.
*
* @class module:kalix.View
* @implements module:kalix.Component
* @param {string|string[]} desc A descriptor or list of descriptors to parse, containing the service to serve.
* @param {string} serviceName The fully qualified name of the service that provides this interface.
* @param {module:kalix.View~options=} options The options for this view
*/

/**
* @name module:kalix.View#options
* @type {module:kalix.View~options}
*/


/**
* @name module:kalix.View#serviceName
* @type {string}
*/

/**
* @name module:kalix.View#service
* @type {protobuf.Service}
*/

/**
* @function module:kalix.View#componentType
* @return {string} view component type.
*/

/**
* Lookup a protobuf message type.
*
* This is provided as a convenience to lookup protobuf message types.
*
* @function module:kalix.View#lookupType
* @param {string} messageType The fully qualified name of the type to lookup.
* @return {protobuf.Type} The protobuf message type.
*/

/**
* Set the update handlers of the view. Only used for updates where event transformation is enabled through
* "transform_updates: true" in the grpc descriptor.
*
* @function module:kalix.View#setUpdateHandlers
* @param {module:kalix.View~handlers} handlers The handler callbacks.
* @return {module:kalix.View} This view.
*/
Loading

0 comments on commit 3c472a4

Please sign in to comment.