Skip to content

Commit

Permalink
feat: ucan stream consumer store/add and store/remove count
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Mar 7, 2023
1 parent 9c64395 commit 2d5315e
Show file tree
Hide file tree
Showing 13 changed files with 624 additions and 51 deletions.
49 changes: 46 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 39 additions & 1 deletion stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,29 @@ export function UcanInvocationStack({ stack, app }) {
}
})

// metrics store/add count
const metricsStoreAddTotalDLQ = new Queue(stack, 'metrics-store-add-total-dlq')
const metricsStoreAddTotalConsumer = new Function(stack, 'metrics-store-add-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-store-add-total.consumer',
deadLetterQueue: metricsStoreAddTotalDLQ.cdk.queue,
})

// metrics store/remove count
const metricsStoreRemoveTotalDLQ = new Queue(stack, 'metrics-store-remove-total-dlq')
const metricsStoreRemoveTotalConsumer = new Function(stack, 'metrics-store-remove-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-store-remove-total.consumer',
deadLetterQueue: metricsStoreRemoveTotalDLQ.cdk.queue,
})

// metrics store/add size total
const metricsStoreAddSizeTotalDLQ = new Queue(stack, 'metrics-store-add-size-total-dlq')
const metricsStoreAddSizeTotalConsumer = new Function(stack, 'metrics-store-add-size-total-consumer', {
environment: {
Expand All @@ -70,7 +93,22 @@ export function UcanInvocationStack({ stack, app }) {
}
},
consumers: {
// consumer1: 'functions/consumer1.handler'
metricsStoreAddTotalConsumer: {
function: metricsStoreAddTotalConsumer,
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
},
metricsStoreRemoveTotalConsumer: {
function: metricsStoreRemoveTotalConsumer,
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
},
metricsStoreAddSizeTotalConsumer: {
function: metricsStoreAddSizeTotalConsumer,
// TODO: Set kinesis filters when supported by SST
Expand Down
3 changes: 3 additions & 0 deletions stacks/upload-api-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ export function UploadApiStack({ stack, app }) {
'GET /error': 'functions/get.error',
'GET /version': 'functions/get.version'
},
accessLog: {
format:'{"requestTime":"$context.requestTime","requestId":"$context.requestId","httpMethod":"$context.httpMethod","path":"$context.path","routeKey":"$context.routeKey","status":$context.status,"responseLatency":$context.responseLatency,"integrationRequestId":"$context.integration.requestId","integrationStatus":"$context.integration.status","integrationLatency":"$context.integration.latency","integrationServiceStatus":"$context.integration.integrationStatus","ip":"$context.identity.sourceIp","userAgent":"$context.identity.userAgent"}'
}
})

stack.addOutputs({
Expand Down
12 changes: 10 additions & 2 deletions test/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ test('w3infra integration flow', async t => {

// Get metrics before upload
const beforeOperationMetrics = await getMetrics(t)
const beforeStoreAddTotal = beforeOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_TOTAL)
const beforeStoreAddSizeTotal = beforeOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_SIZE_TOTAL)

const s3Client = getAwsBucketClient()
Expand Down Expand Up @@ -159,13 +160,20 @@ test('w3infra integration flow', async t => {
beforeStoreAddSizeTotal && await pWaitFor(async () => {
const afterOperationMetrics = await getMetrics(t)
const afterStoreAddSizeTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
const afterStoreAddTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_TOTAL)

// If staging accept more broad condition given multiple parallel tests can happen there
if (stage === 'staging') {
return afterStoreAddSizeTotal?.value >= beforeStoreAddSizeTotal.value + carSize
return (
afterStoreAddSizeTotal?.value >= beforeStoreAddSizeTotal.value + carSize &&
afterStoreAddTotal?.value >= beforeStoreAddTotal?.value + carSize
)
}

return afterStoreAddSizeTotal?.value === beforeStoreAddSizeTotal.value + carSize
return (
afterStoreAddSizeTotal?.value === beforeStoreAddSizeTotal.value + carSize &&
afterStoreAddTotal?.value === beforeStoreAddTotal?.value + 1
)
})
})

Expand Down
7 changes: 5 additions & 2 deletions ucan-invocation/constants.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// UCAN protocol
export const STORE_ADD = 'store/add'
export const STORE_REMOVE = 'store/remove'

// Metrics
export const METRICS_NAMES = {
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`
}
STORE_ADD_TOTAL: `${STORE_ADD}-total`,
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`,
STORE_REMOVE_TOTAL: `${STORE_REMOVE}-total`,
}
46 changes: 46 additions & 0 deletions ucan-invocation/functions/metrics-store-add-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
} = process.env

await updateStoreAddTotal(ucanInvocations, {
metricsTable: createMetricsTable(AWS_REGION, tableName, {
endpoint: dbEndpoint
})
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').TotalSizeCtx} ctx
*/
export async function updateStoreAddTotal (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementStoreAddTotal(invocationsWithStoreAdd)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
46 changes: 46 additions & 0 deletions ucan-invocation/functions/metrics-store-remove-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
} = process.env

await updateStoreRemoveTotal(ucanInvocations, {
metricsTable: createMetricsTable(AWS_REGION, tableName, {
endpoint: dbEndpoint
})
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').TotalSizeCtx} ctx
*/
export async function updateStoreRemoveTotal (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementStoreRemoveTotal(invocationsWithStoreRemove)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
44 changes: 44 additions & 0 deletions ucan-invocation/tables/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,28 @@ export function createMetricsTable (region, tableName, options = {}) {
})

return {
/**
* Increment total count from store/add operations.
*
* @param {Capabilities} operationsInv
*/
incrementStoreAddTotal: async (operationsInv) => {
const invTotalSize = operationsInv.length

const updateCmd = new UpdateItemCommand({
TableName: tableName,
UpdateExpression: `ADD #value :value`,
ExpressionAttributeNames: {'#value': 'value'},
ExpressionAttributeValues: {
':value': { N: String(invTotalSize) },
},
Key: marshall({
name: METRICS_NAMES.STORE_ADD_TOTAL
})
})

await dynamoDb.send(updateCmd)
},
/**
* Increment total value from new given operations.
*
Expand All @@ -52,6 +74,28 @@ export function createMetricsTable (region, tableName, options = {}) {
})
})

await dynamoDb.send(updateCmd)
},
/**
* Increment total count from store/remove operations.
*
* @param {Capabilities} operationsInv
*/
incrementStoreRemoveTotal: async (operationsInv) => {
const invTotalSize = operationsInv.length

const updateCmd = new UpdateItemCommand({
TableName: tableName,
UpdateExpression: `ADD #value :value`,
ExpressionAttributeNames: {'#value': 'value'},
ExpressionAttributeValues: {
':value': { N: String(invTotalSize) },
},
Key: marshall({
name: METRICS_NAMES.STORE_REMOVE_TOTAL
})
})

await dynamoDb.send(updateCmd)
}
}
Expand Down
Loading

0 comments on commit 2d5315e

Please sign in to comment.