Skip to content

Commit

Permalink
feat: support multiple targets (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
MickVanDuijn authored Jan 22, 2023
1 parent 470a1c8 commit fb055a8
Show file tree
Hide file tree
Showing 5 changed files with 429 additions and 47 deletions.
4 changes: 0 additions & 4 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@
"sourceType": "module"
},
"rules": {
"indent": [
"error",
2
],
"linebreak-style": [
"error",
"unix"
Expand Down
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ Identifier that gets passed to all concurrent Lambda invocations. This can be us
### delay *(number)*
Minimum amount of time (in milliseconds) for concurrent functions to run. Concurrent functions are invoked asynchronously. Setting a delay enforces Lambda to create multiple invocations. Defaults to `75` to attempt sub 100ms invocation times.

### target *(string)*
Name of the target function to be warmed. Defaults to `funcName` (the name of the function itself).

Example passing a configuration:

```javascript
Expand Down Expand Up @@ -176,6 +179,30 @@ myFunction:
concurrency: 1
```

## Setting multiple targets
In addition to passing a single-target input (either the function itself or the configured target), Lambda Warmer also accepts an array of events, each allowing a separate config (concurrency, target, etc.). This allows the re-use of a single CloudWatch rule for multiple targets, beyond the limit of CloudWatch itself, which is 5. It also simplifies sharing the rule in Serverless.

```yaml
myFunction:
name: myFunction
handler: myFunction.handler
events:
- schedule:
name: warmer-schedule-name
rate: rate(5 minutes)
enabled: true
input:
- warmer: true
concurrency: 1
target: myOtherFunction
- warmer: true
concurrency: 2
target: myOtherFunction2
- warmer: true
concurrency: 2
target: myOtherFunction3
```

## Logs

Logs are automatically generated unless the `log` configuration option is set to `false`. Logs contain useful information beyond just invocation data. The `warm` field indicates whether or not the Lambda function was already warm when invoked. The `lastAccessed` field is the timestamp (in milliseconds) of the last time the function was accessed by a non-warming event. Similarly, the `lastAccessedSeconds` gives you a counter (in seconds) of how long it's been since it has been accessed. These can be used to determine if your concurrency can be lowered.
Expand Down
117 changes: 74 additions & 43 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
* @license MIT
*/

const id = Date.now().toString() + '-' + ('0000' + Math.floor(Math.random()*1000).toString()).substr(-4)
const id =
Date.now().toString() +
'-' +
('0000' + Math.floor(Math.random() * 1000).toString()).substr(-4)

let warm = false
let lastAccess = null
Expand All @@ -16,35 +19,32 @@ const funcVersion = process.env.AWS_LAMBDA_FUNCTION_VERSION

const delay = ms => new Promise(res => setTimeout(res, ms))

module.exports = (event,cfg = {}) => {

let config = Object.assign({}, {
flag: 'warmer', // default test flag
concurrency: 'concurrency', // default concurrency field
test: 'test', // default test flag
log: true, // default logging to true
correlationId: id, // default the correlationId
delay: 75 // default the delay to 75ms
},cfg)

const handleEvent = (event, config) => {
// If the event is a warmer ping
if (event && event[config.flag]) {

let concurrency = event[config.concurrency]
&& !isNaN(event[config.concurrency])
&& event[config.concurrency] > 1
? event[config.concurrency] : 1

let invokeCount = event['__WARMER_INVOCATION__']
&& !isNaN(event['__WARMER_INVOCATION__'])
? event['__WARMER_INVOCATION__'] : 1

let invokeTotal = event['__WARMER_CONCURRENCY__']
&& !isNaN(event['__WARMER_CONCURRENCY__'])
? event['__WARMER_CONCURRENCY__'] : concurrency
let concurrency =
event[config.concurrency] &&
!isNaN(event[config.concurrency]) &&
event[config.concurrency] > 1
? event[config.concurrency]
: 1

// Default target to funcName
let target = event[config.target] || funcName

let invokeCount =
event['__WARMER_INVOCATION__'] && !isNaN(event['__WARMER_INVOCATION__'])
? event['__WARMER_INVOCATION__']
: 1

let invokeTotal =
event['__WARMER_CONCURRENCY__'] && !isNaN(event['__WARMER_CONCURRENCY__'])
? event['__WARMER_CONCURRENCY__']
: concurrency

let correlationId = event['__WARMER_CORRELATIONID__']
? event['__WARMER_CORRELATIONID__'] : config.correlationId
? event['__WARMER_CORRELATIONID__']
: config.correlationId

// Create log record
let log = {
Expand All @@ -56,7 +56,10 @@ module.exports = (event,cfg = {}) => {
concurrency: invokeTotal,
warm,
lastAccessed: lastAccess,
lastAccessedSeconds: lastAccess === null ? null : ((Date.now()-lastAccess)/1000).toFixed(1)
lastAccessedSeconds:
lastAccess === null
? null
: ((Date.now() - lastAccess) / 1000).toFixed(1)
}

// Log it
Expand All @@ -66,40 +69,40 @@ module.exports = (event,cfg = {}) => {
warm = true
lastAccess = Date.now()

// Fan out if concurrency is set higher than 1
if (concurrency > 1 && !event[config.test]) {
// Check wether this lambda is invoking a different lambda
let isDifferentTarget = target !== funcName

// Fan out if concurrency is set higher than 1
if ((concurrency > 1 || isDifferentTarget) && !event[config.test]) {
// init Lambda service
let lambda = require('./lib/lambda-service')

// init promise array
let invocations = []

// loop through concurrency count
for (let i=2; i <= concurrency; i++) {

for (let i = isDifferentTarget ? 1 : 2; i <= concurrency; i++) {
// Set the params and wait for the final function to finish
let params = {
FunctionName: funcName + ':' + funcVersion,
FunctionName: target,
InvocationType: i === concurrency ? 'RequestResponse' : 'Event',
LogType: 'None',
Payload: Buffer.from(JSON.stringify({
[config.flag]: true, // send warmer flag
'__WARMER_INVOCATION__': i, // send invocation number
'__WARMER_CONCURRENCY__': concurrency, // send total concurrency
'__WARMER_CORRELATIONID__': correlationId // send correlation id
}))
Payload: new Buffer(
JSON.stringify({
[config.flag]: true, // send warmer flag
__WARMER_INVOCATION__: i, // send invocation number
__WARMER_CONCURRENCY__: concurrency, // send total concurrency
__WARMER_CORRELATIONID__: correlationId // send correlation id
})
)
}

// Add promise to invocations array
invocations.push(lambda.invoke(params).promise())

} // end for

// Invoke concurrent functions
return Promise.all(invocations)
.then(() => true)

return Promise.all(invocations).then(() => true)
} else if (invokeCount > 1) {
return delay(config.delay).then(() => true)
}
Expand All @@ -110,5 +113,33 @@ module.exports = (event,cfg = {}) => {
lastAccess = Date.now()
return Promise.resolve(false)
}

}

module.exports = (event, cfg = {}) => {
let config = Object.assign(
{},
{
flag: 'warmer', // default test flag
concurrency: 'concurrency', // default concurrency field
target: 'target', // default target field
test: 'test', // default test flag
log: true, // default logging to true
correlationId: id, // default the correlationId
delay: 75 // default the delay to 75ms
},
cfg
)

if (Array.isArray(event)) {
let i = 0
const handleNext = () => {
if (i < event.length) {
return handleEvent(event[i++], config).then(handleNext)
}
return Promise.resolve(true)
}
return handleNext()
} else {
return handleEvent(event, config)
}
} // end module
Loading

0 comments on commit fb055a8

Please sign in to comment.