Skip to content

Commit

Permalink
Merge pull request #14 from bcgov-nr/fix/kinesisLogin
Browse files Browse the repository at this point in the history
fix: kinesis login timeout
  • Loading branch information
mbystedt authored Sep 8, 2022
2 parents 2c63d30 + dc9182a commit 71b16e6
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 50 deletions.
2 changes: 1 addition & 1 deletion helm/broker-app/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ version: 1.0.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
appVersion: "1.0.1"
appVersion: "1.0.2"
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nr-broker",
"version": "1.0.1",
"version": "1.0.2",
"description": "",
"author": "",
"private": true,
Expand Down
2 changes: 2 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ export const AWS_REGION = 'ca-central-1';
export const AWS_KINESIS_BUFFER_TIME = 100;
export const AWS_KINESIS_MAX_RECORDS = 10;

export const TOKEN_RENEW_RATIO = 0.75;

export const HEADER_VAULT_ROLE_ID = 'x-vault-role-id';

export const TOKEN_SERVICE_WRAP_TTL = 60;
Expand Down
95 changes: 87 additions & 8 deletions src/kinesis/aws-kinesis.service.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,60 @@
import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Kinesis, PutRecordsCommand } from '@aws-sdk/client-kinesis';
import { STSClient, AssumeRoleCommand } from '@aws-sdk/client-sts';
import { Injectable, Logger } from '@nestjs/common';
import { createHash } from 'crypto';
import { Subject, bufferTime, filter } from 'rxjs';
import {
Subject,
bufferTime,
filter,
take,
Observable,
switchMap,
shareReplay,
asyncScheduler,
lastValueFrom,
} from 'rxjs';
import { KinesisService } from './kinesis.service';
import { AWS_KINESIS_BUFFER_TIME, AWS_KINESIS_MAX_RECORDS } from '../constants';
import {
AWS_KINESIS_BUFFER_TIME,
AWS_KINESIS_MAX_RECORDS,
AWS_REGION,
TOKEN_RENEW_RATIO,
} from '../constants';

@Injectable()
export class AwsKinesisService extends KinesisService {
private readonly logger = new Logger(AwsKinesisService.name);
@Inject('AWS_KINESIS_CLIENT')
private client: KinesisClient;
private readonly enc = new TextEncoder();
private recordSubject = new Subject();
private recordSubject = new Subject<Kinesis>();
private reload$ = new Subject<void>();
private cache$: Observable<Kinesis>;

private initialEnv: {
AWS_ACCESS_KEY_ID: string;
AWS_SECRET_ACCESS_KEY: string;
AWS_SESSION_TOKEN: string;
} = {
AWS_ACCESS_KEY_ID: process.env.AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY: process.env.AWS_SECRET_ACCESS_KEY,
AWS_SESSION_TOKEN: process.env.AWS_SESSION_TOKEN,
};

getClient() {
if (!this.cache$) {
this.cache$ = this.reload$.pipe(
switchMap(() => this.connectKinesis()),
shareReplay(1),
);
// empty subscribe to kick it off
this.cache$.subscribe();
this.reload$.next(null);
}

return this.cache$;
}
constructor() {
super();
this.getClient();
this.recordSubject
.pipe(
bufferTime(AWS_KINESIS_BUFFER_TIME, undefined, AWS_KINESIS_MAX_RECORDS),
Expand All @@ -33,7 +73,8 @@ export class AwsKinesisService extends KinesisService {
}),
});
try {
const response = await this.client.send(command);
const client = await lastValueFrom(this.getClient().pipe(take(1)));
const response = await client.send(command);
// If throughput exceeded... try again (and again)
if (response.FailedRecordCount && response.FailedRecordCount > 0) {
response.Records.filter(
Expand All @@ -55,4 +96,42 @@ export class AwsKinesisService extends KinesisService {
// this.logger.debug(`putRecord: ${JSON.stringify(data)}`);
this.recordSubject.next(data);
}

private async connectKinesis() {
// Reset env
process.env.AWS_ACCESS_KEY_ID = this.initialEnv.AWS_ACCESS_KEY_ID;
process.env.AWS_SECRET_ACCESS_KEY = this.initialEnv.AWS_SECRET_ACCESS_KEY;
delete process.env.AWS_SESSION_TOKEN;
const stsClient1 = new STSClient({
region: process.env.AWS_DEFAULT_REGION || AWS_REGION,
});
const stsAssumeRoleCommand = new AssumeRoleCommand({
RoleArn: process.env.AWS_ROLE_ARN,
RoleSessionName: 'broker',
});
// Send command
const stsAssumedRole = await stsClient1.send(stsAssumeRoleCommand);
if (stsAssumedRole && stsAssumedRole.Credentials) {
// Overwrite the environment variables so later requests use assumed identity
process.env.AWS_ACCESS_KEY_ID = stsAssumedRole.Credentials.AccessKeyId;
process.env.AWS_SECRET_ACCESS_KEY =
stsAssumedRole.Credentials.SecretAccessKey;
process.env.AWS_SESSION_TOKEN = stsAssumedRole.Credentials.SessionToken;
const renewAt = Math.round(
(new Date(stsAssumedRole.Credentials.Expiration).getTime() -
new Date().getTime()) *
TOKEN_RENEW_RATIO,
);
this.logger.log(
`Identity assumed (valid till: ${stsAssumedRole.Credentials.Expiration}, renew in: ${renewAt})`,
);
// Schedule renewal
asyncScheduler.schedule(() => {
this.reload$.next();
}, renewAt);
return new Kinesis({
region: process.env.AWS_DEFAULT_REGION || AWS_REGION,
});
}
}
}
40 changes: 2 additions & 38 deletions src/kinesis/kinesis.module.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { Logger, Module } from '@nestjs/common';
import { STSClient, AssumeRoleCommand } from '@aws-sdk/client-sts';
import { Kinesis } from '@aws-sdk/client-kinesis';
import { Module } from '@nestjs/common';
import { AwsKinesisService } from './aws-kinesis.service';
import { FakeKinesisService } from './fake-kinesis.service';
import { KinesisService } from './kinesis.service';
import { AWS_REGION } from '../constants';

function isUsingFakeKinesis() {
return !process.env.APP_ENVIRONMENT;
Expand All @@ -15,40 +12,7 @@ const kinesisServiceProvider = {
useClass: isUsingFakeKinesis() ? FakeKinesisService : AwsKinesisService,
};
@Module({
providers: [
kinesisServiceProvider,
{
provide: 'AWS_KINESIS_CLIENT',
useFactory: async () => {
if (isUsingFakeKinesis()) {
return undefined;
}
const stsClient1 = new STSClient({
region: process.env.AWS_DEFAULT_REGION || AWS_REGION,
});
const stsAssumeRoleCommand = new AssumeRoleCommand({
RoleArn: process.env.AWS_ROLE_ARN,
RoleSessionName: 'broker',
});
const stsAssumedRole = await stsClient1.send(stsAssumeRoleCommand);
if (stsAssumedRole && stsAssumedRole.Credentials) {
// Overwrite the environment variables so later requests use assumed identity
process.env.AWS_ACCESS_KEY_ID =
stsAssumedRole.Credentials.AccessKeyId;
process.env.AWS_SECRET_ACCESS_KEY =
stsAssumedRole.Credentials.SecretAccessKey;
process.env.AWS_SESSION_TOKEN =
stsAssumedRole.Credentials.SessionToken;
logger.log('Identity assumed');
return new Kinesis({
region: process.env.AWS_DEFAULT_REGION || AWS_REGION,
});
}
},
},
],
providers: [kinesisServiceProvider],
exports: [KinesisService],
})
export class KinesisModule {}

const logger = new Logger(KinesisModule.name);
11 changes: 9 additions & 2 deletions src/token/token.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { AxiosRequestConfig, AxiosResponse } from 'axios';
import { map, Observable, switchMap } from 'rxjs';
import { SHORT_ENV_CONVERSION, TOKEN_SERVICE_WRAP_TTL } from '../constants';
import {
SHORT_ENV_CONVERSION,
TOKEN_RENEW_RATIO,
TOKEN_SERVICE_WRAP_TTL,
} from '../constants';

interface VaultTokenLookupDto {
data: {
Expand Down Expand Up @@ -146,7 +150,10 @@ export class TokenService {
? this.tokenLookup.data.last_renewal_time
: this.tokenLookup.data.creation_time;
this.renewAt =
(baseTime + Math.round(this.tokenLookup.data.creation_ttl * 0.75)) *
(baseTime +
Math.round(
this.tokenLookup.data.creation_ttl * TOKEN_RENEW_RATIO,
)) *
1000;
},
});
Expand Down

0 comments on commit 71b16e6

Please sign in to comment.