Skip to content

Commit

Permalink
feat: ucan stream consumer upload remove (#157)
Browse files Browse the repository at this point in the history
Part of metrics work #117

Adds system total metrics for `upload/remove`. With these, we can track
number of uploads unlinked from space

Needs:
- [x] #151

---------

Co-authored-by: Alan Shaw <alan.shaw@protocol.ai>
  • Loading branch information
vasco-santos and Alan Shaw authored Mar 14, 2023
1 parent 57842a1 commit 7d1ff48
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 0 deletions.
20 changes: 20 additions & 0 deletions stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ export function UcanInvocationStack({ stack, app }) {
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// metrics upload/remove count
const metricsUploadRemoveTotalDLQ = new Queue(stack, 'metrics-upload-remove-total-dlq')
const metricsUploadRemoveTotalConsumer = new Function(stack, 'metrics-upload-remove-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-upload-remove-total.consumer',
deadLetterQueue: metricsUploadRemoveTotalDLQ.cdk.queue,
})


// create a kinesis stream
const ucanStream = new KinesisStream(stack, 'ucan-stream', {
cdk: {
Expand Down Expand Up @@ -132,6 +144,14 @@ export function UcanInvocationStack({ stack, app }) {
}
}
},
metricsUploadRemoveTotalConsumer: {
function: metricsUploadRemoveTotalConsumer,
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
},
spaceMetricsUploadAddTotalConsumer: {
function: spaceMetricsUploadAddTotalConsumer,
// TODO: Set kinesis filters when supported by SST
Expand Down
3 changes: 3 additions & 0 deletions ucan-invocation/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
export const STORE_ADD = 'store/add'
export const STORE_REMOVE = 'store/remove'
export const UPLOAD_ADD = 'upload/add'
export const UPLOAD_REMOVE = 'upload/remove'

// Admin Metrics
export const METRICS_NAMES = {
UPLOAD_ADD_TOTAL: `${UPLOAD_ADD}-total`,
UPLOAD_REMOVE_TOTAL: `${UPLOAD_REMOVE}-total`,
STORE_ADD_TOTAL: `${STORE_ADD}-total`,
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`,
STORE_REMOVE_TOTAL: `${STORE_REMOVE}-total`,
Expand All @@ -14,6 +16,7 @@ export const METRICS_NAMES = {
// Spade Metrics
export const SPACE_METRICS_NAMES = {
UPLOAD_ADD_TOTAL: `${UPLOAD_ADD}-total`,
UPLOAD_REMOVE_TOTAL: `${UPLOAD_REMOVE}-total`,
STORE_ADD_TOTAL: `${STORE_ADD}-total`,
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`,
STORE_REMOVE_TOTAL: `${STORE_REMOVE}-total`,
Expand Down
46 changes: 46 additions & 0 deletions ucan-invocation/functions/metrics-upload-remove-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_REMOVE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
} = process.env

await updateUploadRemoveTotal(ucanInvocations, {
metricsTable: createMetricsTable(AWS_REGION, tableName, {
endpoint: dbEndpoint
})
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').TotalSizeCtx} ctx
*/
export async function updateUploadRemoveTotal (ucanInvocations, ctx) {
const invocationsWithUploadRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === UPLOAD_REMOVE)
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementUploadRemoveTotal(invocationsWithUploadRemove)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
22 changes: 22 additions & 0 deletions ucan-invocation/tables/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,28 @@ export function createMetricsTable (region, tableName, options = {}) {
})
})

await dynamoDb.send(updateCmd)
},
/**
* Increment total count from upload/remove operations.
*
* @param {Capabilities} operationsInv
*/
incrementUploadRemoveTotal: async (operationsInv) => {
const invTotalSize = operationsInv.length

const updateCmd = new UpdateItemCommand({
TableName: tableName,
UpdateExpression: `ADD #value :value`,
ExpressionAttributeNames: {'#value': 'value'},
ExpressionAttributeValues: {
':value': { N: String(invTotalSize) },
},
Key: marshall({
name: METRICS_NAMES.UPLOAD_REMOVE_TOTAL
})
})

await dynamoDb.send(updateCmd)
}
}
Expand Down
166 changes: 166 additions & 0 deletions ucan-invocation/test/functions/metrics-upload-remove-total.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import { testConsumer as test } from '../helpers/context.js'

import * as Signer from '@ucanto/principal/ed25519'
import * as UploadCapabilities from '@web3-storage/capabilities/upload'

import { createDynamodDb } from '../helpers/resources.js'
import { createSpace } from '../helpers/ucanto.js'
import { randomCAR } from '../helpers/random.js'
import { createDynamoAdminMetricsTable, getItemFromTable} from '../helpers/tables.js'

import { updateUploadRemoveTotal } from '../../functions/metrics-upload-remove-total.js'
import { createMetricsTable } from '../../tables/metrics.js'
import { METRICS_NAMES } from '../../constants.js'

const REGION = 'us-west-2'

test.before(async t => {
// Dynamo DB
const {
client: dynamo,
endpoint: dbEndpoint
} = await createDynamodDb({ port: 8000 })

t.context.dbEndpoint = dbEndpoint
t.context.dynamoClient = dynamo
})

test('handles a batch of single invocation with upload/remove', async t => {
const { tableName } = await prepareResources(t.context.dynamoClient)
const uploadService = await Signer.generate()
const alice = await Signer.generate()
const { spaceDid } = await createSpace(alice)
const car = await randomCAR(128)

const metricsTable = createMetricsTable(REGION, tableName, {
endpoint: t.context.dbEndpoint
})

const invocations = [{
carCid: car.cid.toString(),
value: {
att: [
UploadCapabilities.remove.create({
with: spaceDid,
nb: {
root: car.cid
}
})
],
aud: uploadService.did(),
iss: alice.did()
},
ts: Date.now()
}]

// @ts-expect-error
await updateUploadRemoveTotal(invocations, {
metricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, {
name: METRICS_NAMES.UPLOAD_REMOVE_TOTAL
})
t.truthy(item)
t.is(item?.name, METRICS_NAMES.UPLOAD_REMOVE_TOTAL)
t.is(item?.value, 1)
})

test('handles batch of single invocations with multiple upload/remove attributes', async t => {
const { tableName } = await prepareResources(t.context.dynamoClient)
const uploadService = await Signer.generate()
const alice = await Signer.generate()
const { spaceDid } = await createSpace(alice)

const cars = await Promise.all(
Array.from({ length: 10 }).map(() => randomCAR(128))
)

const metricsTable = createMetricsTable(REGION, tableName, {
endpoint: t.context.dbEndpoint
})

const invocations = [{
carCid: cars[0].cid.toString(),
value: {
att: cars.map((car) => UploadCapabilities.remove.create({
with: spaceDid,
nb: {
root: car.cid
}
})),
aud: uploadService.did(),
iss: alice.did()
},
ts: Date.now()
}]

// @ts-expect-error
await updateUploadRemoveTotal(invocations, {
metricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, {
name: METRICS_NAMES.UPLOAD_REMOVE_TOTAL
})

t.truthy(item)
t.is(item?.name, METRICS_NAMES.UPLOAD_REMOVE_TOTAL)
t.is(item?.value, cars.length)
})

test('handles a batch of single invocation without upload/remove', async t => {
const { tableName } = await prepareResources(t.context.dynamoClient)
const uploadService = await Signer.generate()
const alice = await Signer.generate()
const { spaceDid } = await createSpace(alice)
const car = await randomCAR(128)

const metricsTable = createMetricsTable(REGION, tableName, {
endpoint: t.context.dbEndpoint
})

const invocations = [{
carCid: car.cid.toString(),
value: {
att: [
UploadCapabilities.add.create({
with: spaceDid,
nb: {
root: car.cid,
shards: [car.cid]
}
})
],
aud: uploadService.did(),
iss: alice.did()
},
ts: Date.now()
}]

// @ts-expect-error
await updateUploadRemoveTotal(invocations, {
metricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, {
name: METRICS_NAMES.UPLOAD_REMOVE_TOTAL
})

t.truthy(item)
t.is(item?.name, METRICS_NAMES.UPLOAD_REMOVE_TOTAL)
t.is(item?.value, 0)
})

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient
*/
async function prepareResources (dynamoClient) {
const [ tableName ] = await Promise.all([
createDynamoAdminMetricsTable(dynamoClient),
])

return {
tableName
}
}
1 change: 1 addition & 0 deletions ucan-invocation/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export interface MetricsTable {
incrementStoreAddTotal: (incrementSizeTotal: Capability<Ability, `${string}:${string}`, unknown>[]) => Promise<void>
incrementStoreAddSizeTotal: (incrementSizeTotal: Capability<Ability, `${string}:${string}`, unknown>[]) => Promise<void>
incrementStoreRemoveTotal: (incrementSizeTotal: Capability<Ability, `${string}:${string}`, unknown>[]) => Promise<void>
incrementUploadRemoveTotal: (incrementSizeTotal: Capability<Ability, `${string}:${string}`, unknown>[]) => Promise<void>
}

export interface TotalSizeCtx {
Expand Down

0 comments on commit 7d1ff48

Please sign in to comment.