Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS service endpoints #622

Merged
merged 3 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/aws-config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,23 @@ then separate sections will be specified within the application code.
aws_secret_access_key: ${AWS_SECRET_ACCESS_KEY_SES1}
```

Most AWS services support endpoints. The endpoint may be included in the configuration as desired:
```yaml
sqs_for_shipping:
aws_region: ${AWS_REGION}
aws_endpoint: ${AWS_ENDPOINT_URL_SQS}
aws_access_key_id: 'notneededintest'
aws_secret_access_key: 'notneededintest'
```

## Interfaces

The subsection of `application` in the configuration file should provide:
```typescript
export interface AWSCfgFileItem
{
aws_region?: string,
aws_endpoint?: string,
aws_access_key_id?: string,
aws_secret_access_key?: string,
}
Expand All @@ -63,6 +73,7 @@ export interface AWSServiceConfig
{
name: string,
region: string,
endpoint?: string,
credentials: {
accessKeyId: string,
secretAccessKey: string,
Expand Down
10 changes: 9 additions & 1 deletion packages/aws-config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export interface AWSServiceConfig
{
name: string,
region: string,
endpoint?: string,
credentials: {
accessKeyId: string,
secretAccessKey: string,
Expand All @@ -50,6 +51,7 @@ export interface AWSServiceConfig
export interface AWSCfgFileItem
{
aws_region?: string,
aws_endpoint?: string,
aws_access_key_id?: string,
aws_secret_access_key?: string,
}
Expand Down Expand Up @@ -82,8 +84,14 @@ export function loadAWSConfigByName(ctx: ConfigProvider, cfgname: string): AWSSe
throw new DBOSError.DBOSConfigKeyTypeError(`${cfgname}.aws_secret_access_key`, 'string', typeof(cfgstrs.aws_secret_access_key));
}

if (cfgstrs.aws_endpoint && typeof(cfgstrs.aws_region) !== 'string') {
throw new DBOSError.DBOSConfigKeyTypeError(`${cfgname}.aws_endpoint`, 'string', typeof(cfgstrs.aws_endpoint));
}

return {
name: cfgname, region: cfgstrs.aws_region.toString(),
name: cfgname,
region: cfgstrs.aws_region.toString(),
endpoint: cfgstrs.aws_endpoint ? cfgstrs.aws_endpoint : undefined,
credentials: {
accessKeyId: cfgstrs.aws_access_key_id.toString(),
secretAccessKey: cfgstrs.aws_secret_access_key.toString()
Expand Down
1 change: 1 addition & 0 deletions packages/communicator-email-ses/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class SendEmailCommunicator extends ConfiguredInstance

static createSES(cfg: AWSServiceConfig) {
return new SESv2({
endpoint: cfg.endpoint,
region: cfg.region,
credentials: cfg.credentials,
maxAttempts: cfg.maxRetries,
Expand Down
1 change: 1 addition & 0 deletions packages/component-aws-s3/src/s3_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export class S3Ops extends ConfiguredInstance {

static createS3Client(cfg: AWSServiceConfig) {
return new S3Client({
endpoint: cfg.endpoint,
region: cfg.region,
credentials: cfg.credentials,
maxAttempts: cfg.maxRetries,
Expand Down
4 changes: 2 additions & 2 deletions packages/dbos-sqs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ The `@SQSConfigure` decorator should be applied at the class level to identify t
interface SQSConfig {
awscfgname?: string;
awscfg?: AWSServiceConfig;
queueURL?: string;
queueUrl?: string;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old one is still available, this is the README not the code.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this? A present for DBOS biggest fanboy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently... ;)

getWFKey?: (m: Message) => string; // Calculate workflow OAOO key for each message
}

Expand All @@ -94,7 +94,7 @@ Then, within the class, one or more methods should be decorated to handle SQS me
```typescript
@SQSConfigure({awscfgname: 'sqs_receiver'})
class SQSEventProcessor {
@SQSMessageConsumer({queueURL: process.env['SQS_QUEUE_URL']})
@SQSMessageConsumer({queueUrl: process.env['SQS_QUEUE_URL']})
@Workflow()
static async recvMessage(ctx: WorkflowContext, msg: Message) {
// Workflow code goes here...
Expand Down
21 changes: 16 additions & 5 deletions packages/dbos-sqs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type MessageWithOptionalQueueUrl = MessageWithoutQueueUrl & { QueueUrl?: string
interface SQSConfig {
awscfgname?: string;
awscfg?: AWSServiceConfig;
queueURL?: string;
queueUrl?: string;
queueURL?: string; // disfavored vs. queueUrl
getWFKey?: (m: Message) => string;
}

Expand Down Expand Up @@ -74,7 +75,7 @@ class SQSCommunicator extends ConfiguredInstance
)
{
try {
const smsg = {...msg, QueueUrl: msg.QueueUrl || this.config.queueURL};
const smsg = {...msg, QueueUrl: msg.QueueUrl || this.config.queueUrl || this.config.queueURL};
maxdml marked this conversation as resolved.
Show resolved Hide resolved
return await this.client!.send(new SendMessageCommand(smsg));
}
catch (e) {
Expand All @@ -88,6 +89,7 @@ class SQSCommunicator extends ConfiguredInstance
region: cfg.region,
credentials: cfg.credentials,
maxAttempts: cfg.maxRetries,
endpoint: cfg.endpoint,
//logger: console,
});
}
Expand Down Expand Up @@ -133,7 +135,12 @@ class SQSReceiver implements DBOSEventReceiver
for (const registeredOperation of regops) {
const cro = registeredOperation.classConfig as SQSReceiverClassDefaults;
const mro = registeredOperation.methodConfig as SQSReceiverMethodSpecifics;
const url = cro.config?.queueURL ?? mro.config?.queueURL;
const url =
cro.config?.queueUrl
?? cro.config?.queueURL
?? mro.config?.queueUrl
?? mro.config?.queueURL;

if (url) {
const method = registeredOperation.methodReg;
const cname = method.className;
Expand Down Expand Up @@ -166,7 +173,7 @@ class SQSReceiver implements DBOSEventReceiver
catch (e) {
const err = e as Error;
executor.logger.error(err);
throw new DBOSError.DBOSError(`SQS Receiver for ${cname}.${mname} was unable to connect: ${err.message}`);
throw new DBOSError.DBOSError(`SQS Receiver for ${cname}.${mname} was unable to connect to ${url}: ${err.message}`);
}
this.listeners.push((async (client: SQSClient, url: string) =>
{
Expand Down Expand Up @@ -220,7 +227,11 @@ class SQSReceiver implements DBOSEventReceiver
regops.forEach((registeredOperation) => {
const co = registeredOperation.classConfig as SQSReceiverClassDefaults;
const mo = registeredOperation.methodConfig as SQSReceiverMethodSpecifics;
const url = co.config?.queueURL ?? mo.config?.queueURL;
const url =
co.config?.queueUrl
?? co.config?.queueURL
?? mo.config?.queueUrl
?? mo.config?.queueURL;
if (url) {
const cname = registeredOperation.methodReg.className;
const mname = registeredOperation.methodReg.name;
Expand Down
1 change: 1 addition & 0 deletions packages/dbos-sqs/sqs-test-dbos-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ application:
aws_sqs_configuration: 'aws_config'
sqs_queue: ${SQS_QUEUE_URL}
aws_config:
aws_endpoint: ${AWS_ENDPOINT_URL_SQS}
aws_region: ${AWS_REGION}
aws_access_key_id: ${AWS_ACCESS_KEY_ID}
aws_secret_access_key: ${AWS_SECRET_ACCESS_KEY}
4 changes: 2 additions & 2 deletions packages/dbos-sqs/sqs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class SQSReceiver
{
static msgRcvCount: number = 0;
static msgValueSum: number = 0;
@SQSMessageConsumer({queueURL: process.env['SQS_QUEUE_URL']})
@SQSMessageConsumer({queueUrl: process.env['SQS_QUEUE_URL']})
@Workflow()
static async recvMessage(_ctx: WorkflowContext, msg: Message) {
const ms = msg.Body!;
Expand All @@ -36,7 +36,7 @@ describe("sqs-tests", () => {
}
else {
// This would normally be a global or static or something
sqsCfg = configureInstance(SQSCommunicator, 'default', {awscfgname: 'aws_config', queueURL: process.env['SQS_QUEUE_URL']});
sqsCfg = configureInstance(SQSCommunicator, 'default', {awscfgname: 'aws_config', queueUrl: process.env['SQS_QUEUE_URL']});
}
});

Expand Down