Skip to content

Commit

Permalink
feat: ucan stream consumer w3 accum store/add size (#146)
Browse files Browse the repository at this point in the history
Adds a consumer to UCAN stream kinesis where total size of store/add
operations is tracked in the system. To achieve this tracking, we simply
consume UCAN stream, filter `store/add` operations and update a DynamoDB
table row incrementing previous value in batches.

This PR adds only one metric consumer. Follow up PRs for remaining
systsem metrics will follow same pattern as what we decide here.

It includes:
- DynamoDB Table w3metrics, with one row per each system metric
- Default config for consumers - more details about those in
#118
- Dead letter queue for failures
- Integration tests to check metrics are updated running the UCAN Stream
consumer

Note that:
> [Numbers can be positive, negative, or zero. Numbers can have up to 38
digits of
precision.](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes)

See DynamoDB Tables content after integration tests ran:
- store table:
https://us-east-2.console.aws.amazon.com/dynamodbv2/home?region=us-east-2#item-explorer?table=pr146-w3infra-store
- w3 metrics table:
https://us-east-2.console.aws.amazon.com/dynamodbv2/home?region=us-east-2#item-explorer?table=pr146-w3infra-w3-metrics
  • Loading branch information
vasco-santos authored Mar 6, 2023
1 parent a632872 commit 9c64395
Show file tree
Hide file tree
Showing 21 changed files with 647 additions and 19 deletions.
8 changes: 7 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 31 additions & 1 deletion stacks/config.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { RemovalPolicy } from 'aws-cdk-lib'
import { Duration, RemovalPolicy } from 'aws-cdk-lib'
import { createRequire } from 'module'
import { StartingPosition } from 'aws-cdk-lib/aws-lambda'
import git from 'git-rev-sync'

/**
Expand Down Expand Up @@ -57,6 +58,35 @@ export function getCustomDomain (stage, hostedZone) {
return { domainName, hostedZone }
}

/**
* @param {import('@serverless-stack/resources').Stack} stack
*/
export function getKinesisEventSourceConfig (stack) {
if (stack.stage !== 'production') {
return {
batchSize: 10,
// The maximum amount of time to gather records before invoking the function.
maxBatchingWindow: Duration.seconds(5),
// If the function returns an error, split the batch in two and retry.
bisectBatchOnError: true,
// Where to begin consuming the stream.
startingPosition: StartingPosition.LATEST
}
}

return {
// Dynamo Transactions allow up to 100 writes per transactions. If we allow 10 capabilities executed per request, we can have up to 100.
// TODO: we use bisectBatchOnError, so maybe we can attempt bigger batch sizes to be optimistic?
batchSize: 10,
// The maximum amount of time to gather records before invoking the function.
maxBatchingWindow: Duration.minutes(2),
// If the function returns an error, split the batch in two and retry.
bisectBatchOnError: true,
// Where to begin consuming the stream.
startingPosition: StartingPosition.TRIM_HORIZON
}
}

export function getApiPackageJson () {
// @ts-expect-error ts thinks this is unused becuase of the ignore
const require = createRequire(import.meta.url)
Expand Down
2 changes: 1 addition & 1 deletion stacks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ export default function (app) {
},
})
app.stack(BusStack)
app.stack(UploadDbStack)
app.stack(UcanInvocationStack)
app.stack(CarparkStack)
app.stack(UploadDbStack)
app.stack(SatnavStack)
app.stack(UploadApiStack)
app.stack(ReplicatorStack)
Expand Down
29 changes: 28 additions & 1 deletion stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ import {
Bucket,
Function,
KinesisStream,
Queue,
use
} from '@serverless-stack/resources'
import { Duration } from 'aws-cdk-lib'

import { BusStack } from './bus-stack.js'
import { getBucketConfig, setupSentry } from './config.js'
import { UploadDbStack } from './upload-db-stack.js'
import {
getBucketConfig,
getKinesisEventSourceConfig,
setupSentry
} from './config.js'

/**
* @param {import('@serverless-stack/resources').StackContext} properties
Expand All @@ -22,6 +28,7 @@ export function UcanInvocationStack({ stack, app }) {

// Get eventBus reference
const { eventBus } = use(BusStack)
const { adminMetricsTable } = use(UploadDbStack)

const ucanBucket = new Bucket(stack, 'ucan-store', {
cors: true,
Expand All @@ -45,6 +52,16 @@ export function UcanInvocationStack({ stack, app }) {
}
})

const metricsStoreAddSizeTotalDLQ = new Queue(stack, 'metrics-store-add-size-total-dlq')
const metricsStoreAddSizeTotalConsumer = new Function(stack, 'metrics-store-add-size-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-store-add-size-total.consumer',
deadLetterQueue: metricsStoreAddSizeTotalDLQ.cdk.queue,
})

// create a kinesis stream
const ucanStream = new KinesisStream(stack, 'ucan-stream', {
cdk: {
Expand All @@ -54,6 +71,16 @@ export function UcanInvocationStack({ stack, app }) {
},
consumers: {
// consumer1: 'functions/consumer1.handler'
metricsStoreAddSizeTotalConsumer: {
function: metricsStoreAddSizeTotalConsumer,
// TODO: Set kinesis filters when supported by SST
// https://github.com/serverless-stack/sst/issues/1407
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
}
},
})

Expand Down
13 changes: 11 additions & 2 deletions stacks/upload-db-stack.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Table } from '@serverless-stack/resources'

import { storeTableProps, uploadTableProps } from '../upload-api/tables/index.js'
import {
adminMetricsTableProps
} from '../ucan-invocation/tables/index.js'
import { setupSentry } from './config.js'

/**
Expand All @@ -21,10 +24,16 @@ export function UploadDbStack({ stack, app }) {
* This table maps stored CAR files (shards) to an upload root cid.
* Used by the upload/* capabilities.
*/
const uploadTable = new Table(stack, 'upload', uploadTableProps)
const uploadTable = new Table(stack, 'upload', uploadTableProps)

/**
* This table tracks w3 wider metrics.
*/
const adminMetricsTable = new Table(stack, 'admin-metrics', adminMetricsTableProps)

return {
storeTable,
uploadTable
uploadTable,
adminMetricsTable
}
}
14 changes: 12 additions & 2 deletions test/helpers/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@ dotenv.config({
})

/**
* @typedef {import("ava").TestFn<Awaited<any>>} TestAnyFn
* @typedef {object} Dynamo
* @property {import('@aws-sdk/client-dynamodb').DynamoDBClient} client
* @property {string} endpoint
* @property {string} region
* @property {string} tableName
*
* @typedef {object} Context
* @property {string} apiEndpoint
* @property {Dynamo} metricsDynamo
*
* @typedef {import("ava").TestFn<Awaited<Context>>} TestContextFn
*/

// eslint-disable-next-line unicorn/prefer-export-from
export const test = /** @type {TestAnyFn} */ (anyTest)
export const test = /** @type {TestContextFn} */ (anyTest)
25 changes: 25 additions & 0 deletions test/helpers/deployment.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@ import {
} from '@serverless-stack/core'
import { createRequire } from 'module'
import { S3Client } from '@aws-sdk/client-s3'
import { DynamoDBClient } from '@aws-sdk/client-dynamodb'

// Either seed.run deployment, or development deploy outputs-file
// https://seed.run/docs/adding-a-post-deploy-phase.html#post-deploy-phase-environment
export const stage = process.env.SEED_STAGE_NAME || State.getStage(process.cwd())

export const getStackName = () => {
const require = createRequire(import.meta.url)
const sst = require('../../sst.json')
return `${stage}-${sst.name}`
}

export const getCloudflareBucketClient = () => new S3Client({
region: 'auto',
endpoint: process.env.R2_ENDPOINT || '',
Expand Down Expand Up @@ -86,3 +93,21 @@ const getAwsRegion = () => {

return 'us-west-2'
}

/**
* @param {string} tableName
*/
export const getDynamoDb = (tableName) => {
const region = getAwsRegion()
const endpoint = `https://dynamodb.${region}.amazonaws.com`

return {
client: new DynamoDBClient({
region,
endpoint
}),
tableName: `${getStackName()}-${tableName}`,
region,
endpoint
}
}
33 changes: 33 additions & 0 deletions test/helpers/table.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { GetItemCommand, ScanCommand } from '@aws-sdk/client-dynamodb'
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamo
* @param {string} tableName
* @param {object} key
*/
export async function getTableItem (dynamo, tableName, key) {
const cmd = new GetItemCommand({
TableName: tableName,
Key: marshall(key)
})

const response = await dynamo.send(cmd)
return response.Item && unmarshall(response.Item)
}

/**
* @param {import("@aws-sdk/client-dynamodb").DynamoDBClient} dynamo
* @param {string} tableName
* @param {object} [options]
* @param {number} [options.limit]
*/
export async function getAllTableRows (dynamo, tableName, options = {}) {
const cmd = new ScanCommand({
TableName: tableName,
Limit: options.limit || 30
})

const response = await dynamo.send(cmd)
return response.Items?.map(i => unmarshall(i)) || []
}
54 changes: 48 additions & 6 deletions test/upload-api.test.js → test/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,35 @@ import git from 'git-rev-sync'
import pWaitFor from 'p-wait-for'
import { HeadObjectCommand } from '@aws-sdk/client-s3'

import { METRICS_NAMES } from '../ucan-invocation/constants.js'
import { test } from './helpers/context.js'
import {
stage,
getApiEndpoint,
getAwsBucketClient,
getCloudflareBucketClient,
getSatnavBucketInfo,
getCarparkBucketInfo
getCarparkBucketInfo,
getDynamoDb
} from './helpers/deployment.js'
import { getClient } from './helpers/up-client.js'
import { randomFile } from './helpers/random.js'
import { getAllTableRows } from './helpers/table.js'

test('GET /', async t => {
test.before(t => {
t.context = {
apiEndpoint: getApiEndpoint(),
metricsDynamo: getDynamoDb('admin-metrics')
}
})

test('upload-api GET /', async t => {
const apiEndpoint = getApiEndpoint()
const response = await fetch(apiEndpoint)
t.is(response.status, 200)
})

test('GET /version', async t => {
test('upload-api /version', async t => {
const apiEndpoint = getApiEndpoint()

const response = await fetch(`${apiEndpoint}/version`)
Expand All @@ -32,9 +42,14 @@ test('GET /version', async t => {
t.is(body.commit, git.long('.'))
})

test('POST / client can upload a file and list it', async t => {
const apiEndpoint = getApiEndpoint()
const client = await getClient(apiEndpoint)
// Integration test for all flow from uploading a file to Kinesis events consumers and replicator
test('w3infra integration flow', async t => {
const client = await getClient(t.context.apiEndpoint)

// Get metrics before upload
const beforeOperationMetrics = await getMetrics(t)
const beforeStoreAddSizeTotal = beforeOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_SIZE_TOTAL)

const s3Client = getAwsBucketClient()
const r2Client = getCloudflareBucketClient()

Expand All @@ -59,6 +74,8 @@ test('POST / client can upload a file and list it', async t => {
)
t.is(carparkRequest.$metadata.httpStatusCode, 200)

const carSize = carparkRequest.ContentLength

// Check dudewhere
const dudewhereRequest = await r2Client.send(
new HeadObjectCommand({
Expand Down Expand Up @@ -137,4 +154,29 @@ test('POST / client can upload a file and list it', async t => {
}, {
interval: 100,
})

// Check metrics were updated
beforeStoreAddSizeTotal && await pWaitFor(async () => {
const afterOperationMetrics = await getMetrics(t)
const afterStoreAddSizeTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_SIZE_TOTAL)

// If staging accept more broad condition given multiple parallel tests can happen there
if (stage === 'staging') {
return afterStoreAddSizeTotal?.value >= beforeStoreAddSizeTotal.value + carSize
}

return afterStoreAddSizeTotal?.value === beforeStoreAddSizeTotal.value + carSize
})
})

/**
* @param {import("ava").ExecutionContext<import("./helpers/context.js").Context>} t
*/
async function getMetrics (t) {
const metrics = await getAllTableRows(
t.context.metricsDynamo.client,
t.context.metricsDynamo.tableName
)

return metrics
}
7 changes: 7 additions & 0 deletions ucan-invocation/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// UCAN protocol
export const STORE_ADD = 'store/add'

// Metrics
export const METRICS_NAMES = {
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`
}
Loading

0 comments on commit 9c64395

Please sign in to comment.