Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow specifying a custom status when failing #253

Merged
merged 3 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/src/modules/javascript/examples/action/src/action.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ function processStreamed(context) {
function respondWith(response, context) {
// need to accumulate effects, before replying, forwarding, or failing
response.effects.forEach(effect => context.effect(two.service.methods.Call, { id: effect.id }, effect.synchronous));
if (response.fail) context.fail(response.fail);
if (response.fail) context.fail(
response.fail,
9, // optional parameter, sets the gRPC status code to 9 - FAILED_PRECONDITION
);
else if (response.forward) context.forward(two.service.methods.Call, { id: response.forward });
else if (response.reply) context.write(Response.create({ message: response.reply }));
else context.write(); // empty message
Expand Down
5 changes: 4 additions & 1 deletion docs/src/modules/javascript/examples/action/src/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ function createReplyForGroup(group: ProcessGroup): replies.Reply {
step.effect.synchronous || false
);
} else if (step.fail) {
reply = replies.failure(step.fail.message || "");
reply = replies.failure(
step.fail.message || "",
replies.GrpcStatus.FailedPrecondition, // Optional parameter, sets the gRPC code
);
}
});
return reply;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ function getCart(request, cart) {
// tag::add-item[]
function addItem(addItem, cart, ctx) {
if (addItem.quantity < 1) {
ctx.fail("Quantity for item " + addItem.productId + " must be greater than zero.");
ctx.fail(
"Quantity for item " + addItem.productId + " must be greater than zero.",
3, // optional parameter, sets the gRPC status code to 3 - INVALID_ARGUMENT
);
} else {
const itemAdded = ItemAdded.create({
item: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ function addItem(
): replies.Reply {
if (addItem.quantity < 1) {
return replies.failure(
"Quantity for item " + addItem.productId + " must be greater than zero."
"Quantity for item " + addItem.productId + " must be greater than zero.",
replies.GrpcStatus.InvalidArgument, // optional parameter, customise gRPC code
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ function addItem(addItem, cart, ctx) {
// Validation:
// Make sure that it is not possible to add negative quantities
if (addItem.quantity < 1) {
ctx.fail("Quantity for item " + addItem.productId + " must be greater than zero.");
ctx.fail(
"Quantity for item " + addItem.productId + " must be greater than zero.",
3, // optional parameter, sets the gRPC status code to 3 - INVALID_ARGUMENT
);
} else {
// If there is an existing item with that product id, we need to increment its quantity.
const existing = cart.items.find(item => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ function addItem(
// Make sure that it is not possible to add negative quantities
if (addItem.quantity < 1) {
return replies.failure(
"Quantity for item " + addItem.productId + " must be greater than zero."
"Quantity for item " + addItem.productId + " must be greater than zero.",
replies.GrpcStatus.InvalidArgument, // optional parameter, customise gRPC code
);
} else {
// If there is an existing item with that product id, we need to increment its quantity.
Expand Down
48 changes: 48 additions & 0 deletions sdk/integration-test/integration-testkit-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ action.commandHandlers = {
ctx.write({ field: 'Received ' + input.field });
ctx.end();
},
Fail: (input, ctx) => {
ctx.fail("some-error", 6);
},
};

const value_entity = new akkaserverless.ValueEntity(
Expand All @@ -54,6 +57,9 @@ value_entity.commandHandlers = {
);
});
},
Fail: (input, state, ctx) => {
ctx.fail("some-error", 6);
},
};

const entity = new akkaserverless.EventSourcedEntity(
Expand Down Expand Up @@ -83,6 +89,9 @@ entity.setBehavior((e) => {
);
});
},
Fail: (input, state, ctx) => {
ctx.fail("some-error", 6);
},
},
eventHandlers: {},
};
Expand Down Expand Up @@ -115,6 +124,20 @@ describe('The AkkaServerless IntegrationTestkit', function () {
);
});

it('should allow actions to fail with custom code', (done) => {
testkit.clients.ExampleService.Fail(
{ field: 'hello' },
(err, msg) => {
err.should.not.be.undefined;
// Unfortunately, this appears to be the only way the JS library offers to read the error description,
// by reading this unspecified message string.
err.message.should.be.eq('6 ALREADY_EXISTS: some-error');
err.code.should.be.eq(6);
done();
},
);
});

it('should handle value entities sync handlers', (done) => {
testkit.clients.ExampleServiceTwo.DoSomethingOne(
{ field: 'hello' },
Expand All @@ -135,6 +158,18 @@ describe('The AkkaServerless IntegrationTestkit', function () {
);
});

it('should allow value entities to fail with custom code', (done) => {
testkit.clients.ExampleServiceTwo.Fail(
{ field: 'hello' },
(err, msg) => {
err.should.not.be.undefined;
err.message.should.be.eq('6 ALREADY_EXISTS: some-error');
err.code.should.be.eq(6);
done();
},
);
});

it('should handle event sourced entities sync handlers', (done) => {
testkit.clients.ExampleServiceThree.DoSomethingOne(
{ field: 'hello' },
Expand All @@ -154,4 +189,17 @@ describe('The AkkaServerless IntegrationTestkit', function () {
},
);
});

it('should allow event sourced entities to fail with custom code', (done) => {
testkit.clients.ExampleServiceThree.Fail(
{ field: 'hello' },
(err, msg) => {
err.should.not.be.undefined;
err.message.should.be.eq('6 ALREADY_EXISTS: some-error');
err.code.should.be.eq(6);
done();
},
);
});

});
34 changes: 23 additions & 11 deletions sdk/src/action-support.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class ActionHandler {
});
}
if (reply.failure) {
ctx.fail(reply.failure);
ctx.fail(reply.failure.description, reply.failure.status);
} else if (reply.message) {
ctx.write(reply.message, reply.metadata);
} else if (reply.forward) {
Expand Down Expand Up @@ -375,14 +375,12 @@ class ActionHandler {
);
};

this.ctx.fail = (error) => {
this.ctx.fail = (description, status) => {
this.ensureNotCancelled();
this.streamDebug('Failing with %s', error);
this.streamDebug('Failing with %s', description);
this.ctx.alreadyReplied = true;
this.grpcCallback(null, {
failure: {
description: error,
},
failure: this.createFailure(description, status),
sideEffects: effects,
});
};
Expand Down Expand Up @@ -508,13 +506,11 @@ class ActionHandler {
);
};

this.ctx.fail = (error) => {
this.ctx.fail = (description, status) => {
this.ensureNotCancelled();
this.streamDebug('Failing with %s', error);
this.streamDebug('Failing with %s', description);
this.call.write({
failure: {
description: error,
},
failure: this.createFailure(description, status),
sideEffects: effects,
});
effects = []; // clear effects after each streamed write
Expand Down Expand Up @@ -605,6 +601,22 @@ class ActionHandler {
}
}
}

createFailure(description, grpcStatus) {
const failure = {
description: description,
};
if (grpcStatus !== undefined) {
if (grpcStatus === 0) {
throw new Error("gRPC failure status code must not be OK")
}
if (grpcStatus < 0 || grpcStatus > 16) {
throw new Error("Invalid gRPC status code: " + grpcStatus)
}
failure.grpcStatusCode = grpcStatus;
}
return failure;
}
}

module.exports = class ActionServices {
Expand Down
34 changes: 20 additions & 14 deletions sdk/src/command-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,19 @@ class CommandHelper {

const ctx = this.createContext(command.id, metadata);

const errorReply = (msg) => {
const errorReply = (msg, status) => {
return {
failure: {
commandId: command.id,
description: msg,
grpcStatusCode: status,
},
};
};

if (!this.service.methods.hasOwnProperty(command.name)) {
ctx.commandDebug("Command '%s' unknown", command.name);
return errorReply('Unknown command named ' + command.name);
return errorReply('Unknown command named ' + command.name, 12 /* unimplemented */);
} else {
try {
const grpcMethod = this.service.methods[command.name];
Expand Down Expand Up @@ -117,14 +118,14 @@ class CommandHelper {
const msg =
"No handler registered for command '" + command.name + "'";
ctx.commandDebug(msg);
return errorReply(msg);
return errorReply(msg, 12 /* unimplemented */);
}
} catch (err) {
const error = "Error handling command '" + command.name + "'";
ctx.commandDebug(error);
console.error(err);

throw errorReply(error + ': ' + err);
throw errorReply(error + ': ' + err, 2 /* unknown */);
}
}
}
Expand All @@ -149,16 +150,20 @@ class CommandHelper {
return userReply;
}

errorReply(msg, ctx, desc) {
errorReply(msg, status, ctx, desc) {
ctx.commandDebug("%s failed with message '%s'", desc, msg);
const failure = {
commandId: ctx.commandId,
description: msg,
}
if (status !== undefined) {
failure.grpcStatusCode = status
}
return {
reply: {
commandId: ctx.commandId,
clientAction: {
failure: {
commandId: ctx.commandId,
description: msg,
},
failure: failure,
},
},
};
Expand All @@ -168,11 +173,11 @@ class CommandHelper {
const userReply = await this.invoke(handler, ctx);

if (ctx.error !== null) {
return this.errorReply(ctx.error.message, ctx, desc);
return this.errorReply(ctx.error.message, ctx.error.grpcStatus, ctx, desc);
} else if (userReply instanceof Reply) {
if (userReply.failure) {
// handle failure with a separate write to make sure we don't write back events etc
return this.errorReply(userReply.failure, ctx, desc);
return this.errorReply(userReply.failure.description, userReply.failure.status, ctx, desc);
} else {
// effects need to go first to end up in reply
// note that we amend the ctx.reply to get events etc passed along from the entities
Expand Down Expand Up @@ -436,16 +441,17 @@ class CommandHelper {
*
* @function module:akkaserverless.EffectContext#fail
* @param {string} msg The failure message.
* @param {number} [grpcStatus] The grpcStatus.
* @throws An error that captures the failure message. Note that even if you catch the error thrown by this
* method, the command will still be failed with the given message.
*/
fail: (msg) => {
fail: (msg, grpcStatus) => {
accessor.ensureActive();
// We set it here to ensure that even if the user catches the error, for
// whatever reason, we will still fail as instructed.
accessor.error = new ContextFailure(msg);
accessor.error = new ContextFailure(msg, grpcStatus);
// Then we throw, to end processing of the command.
throw error;
throw accessor.error;
},
};
return accessor;
Expand Down
12 changes: 11 additions & 1 deletion sdk/src/context-failure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,22 @@
export class ContextFailure extends Error {
readonly name: string = 'ContextFailure';
readonly msg: string;
readonly grpcStatus?: number;

constructor(msg: string) {
constructor(msg: string, grpcStatus?: number) {
super(msg);
this.msg = msg;
if (Error.captureStackTrace) {
Error.captureStackTrace(this, ContextFailure);
}
if (grpcStatus !== undefined) {
if (grpcStatus === 0) {
throw new Error("gRPC failure status code must not be OK")
}
if (grpcStatus < 0 || grpcStatus > 16) {
throw new Error("Invalid gRPC status code: " + grpcStatus)
}
this.grpcStatus = grpcStatus;
}
}
}
Loading