Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: convert view to TypeScript #311

Merged
merged 1 commit into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 37 additions & 34 deletions sdk/src/view-support.js → sdk/src/view-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,41 @@
* limitations under the License.
*/

const path = require('path');
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
import * as path from 'path';
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import AnySupport from './protobuf-any';
import { Metadata } from './metadata';
import { ServiceMap } from './kalix';
import View from './view';
import * as proto from '../proto/protobuf-bundle';

const debug = require('debug')('kalix-view');
// Bind to stdout
debug.log = console.log.bind(console);
const AnySupport = require('./protobuf-any');
const { Metadata } = require('./metadata');

module.exports = class ViewServices {
namespace protocol {
export type StreamIn = proto.kalix.component.view.IViewStreamIn;
export type StreamOut = proto.kalix.component.view.IViewStreamOut;
export type Call = grpc.ServerDuplexStream<StreamIn, StreamOut>;
}

class ViewServices {
private services: { [serviceName: string]: View };

constructor() {
this.services = {};
}

addService(component, allComponents) {
addService(component: View, _allComponents: ServiceMap): void {
this.services[component.serviceName] = component;
}

componentType() {
componentType(): string {
return 'kalix.component.view.Views';
}

register(server) {
register(server: grpc.Server): void {
// Nothing to register
const includeDirs = [
path.join(__dirname, '..', 'proto'),
Expand All @@ -53,15 +64,16 @@ module.exports = class ViewServices {
);
const grpcDescriptor = grpc.loadPackageDefinition(packageDefinition);

const viewService = grpcDescriptor.kalix.component.view.Views.service;
const viewService = (grpcDescriptor as any).kalix.component.view.Views
.service;

server.addService(viewService, {
handle: this.handle.bind(this),
});
}

handle(call) {
const failAndEndCall = function (description) {
handle(call: protocol.Call): void {
const failAndEndCall = function (_description: string): void {
// FIXME no failure reporting in protocol and this does not reach the proxy as a failure
/*
call.write({
Expand All @@ -73,46 +85,37 @@ module.exports = class ViewServices {
call.end();
};

call.on('data', (viewStreamIn) => {
call.on('data', (viewStreamIn: protocol.StreamIn) => {
// FIXME: It is currently only implemented to support one request (ReceiveEvent) with one response (Upsert).
// see https://github.com/lightbend/kalix-proxy/issues/186
// and https://github.com/lightbend/kalix-proxy/issues/187
if (viewStreamIn.receive) {
if (viewStreamIn.receive?.serviceName) {
const receiveEvent = viewStreamIn.receive,
service = this.services[receiveEvent.serviceName];
if (service) {
service = this.services[viewStreamIn.receive.serviceName];
if (service && service.updateHandlers && receiveEvent.commandName) {
const updateHandler =
service.updateHandlers[receiveEvent.commandName];
if (updateHandler) {
try {
const anySupport = new AnySupport(service.root),
metadata = new Metadata(
receiveEvent.metadata ? receiveEvent.metadata.entries : [],
),
metadata = Metadata.fromProtocol(receiveEvent.metadata),
payload = anySupport.deserialize(receiveEvent.payload),
existingState = receiveEvent.bySubjectLookupResult
? anySupport.deserialize(
receiveEvent.bySubjectLookupResult.value,
)
: undefined,
grpcMethod = service.service.methods[receiveEvent.commandName],
/**
* Context for a view update event.
*
* @interface module:kalix.View.UpdateHandlerContext
* @property {module:kalix.Metadata} metadata for the event
* @property {string} commandName
*/
context = {
context: View.UpdateHandlerContext = {
viewId: service.options.viewId,
eventSubject: receiveEvent.metadata['ce-subject'],
eventSubject: metadata.cloudevent.subject,
metadata: metadata,
commandName: receiveEvent.commandName,
};
const result = updateHandler(payload, existingState, context);
if (result) {
const resultProto =
grpcMethod.resolvedResponseType.create(result),
grpcMethod.resolvedResponseType!.create(result),
resultPayload = AnySupport.serialize(
resultProto,
false,
Expand All @@ -121,8 +124,6 @@ module.exports = class ViewServices {
call.write({
upsert: {
row: {
index: receiveEvent.initialTable,
key: receiveEvent.key,
value: resultPayload,
},
},
Expand Down Expand Up @@ -159,10 +160,10 @@ module.exports = class ViewServices {
} else {
console.error(
"Received event for unknown service: '%s'",
viewStreamIn.init.serviceName,
viewStreamIn.receive.serviceName,
);
failAndEndCall(
"Service '" + viewStreamIn.init.serviceName + "' unknown.",
"Service '" + viewStreamIn.receive.serviceName + "' unknown.",
);
}
} else {
Expand All @@ -175,4 +176,6 @@ module.exports = class ViewServices {
call.end();
});
}
};
}

export = ViewServices;
142 changes: 0 additions & 142 deletions sdk/src/view.js

This file was deleted.

100 changes: 100 additions & 0 deletions sdk/src/view.jsdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2021 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Context for a view update event.
*
* @interface module:kalix.View.UpdateHandlerContext
* @property {module:kalix.Metadata} metadata for the event
* @property {string} commandName
*/

/**
* Options for a view.
*
* @typedef module:kalix.View~options
* @property {string} [viewId=serviceName] The id for the view, used for persisting the view.
* @property {array<string>} [includeDirs=["."]] The directories to include when looking up imported protobuf files.
*/

/**
* View handlers
* The names of the properties must match the names of all the view methods specified in the gRPC
* descriptor.
*
* @typedef module:kalix.View~handlers
* @type {Object<string, module:kalix.View~handler>}
*/

/**
* A handler for transforming an incoming event and the previous view state into a new state
*
* @callback module:kalix.View~handler
* @param {Object} event The event, this will be of the type of the gRPC event handler input type.
* @param {undefined|module:kalix.Serializable} state The previous view state or 'undefined' if no previous state was stored.
* @param {module:kalix.View.UpdateHandlerContext} context The view handler context.
* @returns {undefined|module:kalix.Serializable} The state to store in the view or undefined to not update/store state for the event
*/

/**
* @classdesc Create a new view.
*
* @class module:kalix.View
* @implements module:kalix.Component
* @param {string|string[]} desc A descriptor or list of descriptors to parse, containing the service to serve.
* @param {string} serviceName The fully qualified name of the service that provides this interface.
* @param {module:kalix.View~options=} options The options for this view
*/

/**
* @name module:kalix.View#options
* @type {module:kalix.View~options}
*/


/**
* @name module:kalix.View#serviceName
* @type {string}
*/

/**
* @name module:kalix.View#service
* @type {protobuf.Service}
*/

/**
* @function module:kalix.View#componentType
* @return {string} view component type.
*/

/**
* Lookup a protobuf message type.
*
* This is provided as a convenience to lookup protobuf message types.
*
* @function module:kalix.View#lookupType
* @param {string} messageType The fully qualified name of the type to lookup.
* @return {protobuf.Type} The protobuf message type.
*/

/**
* Set the update handlers of the view. Only used for updates where event transformation is enabled through
* "transform_updates: true" in the grpc descriptor.
*
* @function module:kalix.View#setUpdateHandlers
* @param {module:kalix.View~handlers} handlers The handler callbacks.
* @return {module:kalix.View} This view.
*/
Loading