This repository has been archived by the owner on Apr 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwrite.js
82 lines (71 loc) · 2.61 KB
/
write.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import { DynamoDBClient, BatchWriteItemCommand } from '@aws-sdk/client-dynamodb'
import { marshall } from '@aws-sdk/util-dynamodb'
import { pipeline } from 'node:stream/promises'
import * as ndjson from 'it-ndjson'
import batch from 'it-batch'
import fs from 'node:fs'
import ora from 'ora'
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithItems.html#WorkingWithItems.BatchOperations
export const BATCH_WRITE_LIMIT = 25
/**
* Write indexes to the `blocks-cars-positions` table
*
* @typedef {{blockmultihash: string, carpath: string, length: number, offset: number}} BlocksCarsPosition
*
* @param {AsyncIterable<string>} srcStream
* @param {string} dst table name
* @param {number} segment dynamodb scan segment number e.g 0
* @param {number} totalSegments total number of dynamodb scan segments e.g 10
* @param {DynamoDBClient} client
* @param {import('ora').Ora} spinner
*/
export async function write (srcStream, dst, segment, totalSegments, client = new DynamoDBClient(), spinner = ora({ isSilent: true })) {
let srcCount = 0
let dstCount = 0
await pipeline(
srcStream,
ndjson.parse,
(items) => batch(items, BATCH_WRITE_LIMIT),
/** @param {AsyncIterable<Array<BlocksCarsPosition>>} batches */
async function * (batches) {
for await (const batch of batches) {
srcCount += batch.length
spinner.suffixText = `src: ${srcCount} dst: ${dstCount}`
// remove duplicates
const itemMap = new Map()
for (const item of batch) {
itemMap.set(`${item.blockmultihash}#${item.carpath}`, item)
}
/** @type {Array<import('@aws-sdk/client-dynamodb').PutRequest} */
const puts = Array.from(itemMap.values()).map(item => {
return {
PutRequest: {
Item: marshall(item)
}
}
})
const cmd = new BatchWriteItemCommand({
RequestItems: {
[dst]: puts
}
})
const res = await client.send(cmd)
/** @type {Array<import('@aws-sdk/client-dynamodb').PutRequest} */
const unprocessed = res.UnprocessedItems[dst] ?? []
if (unprocessed > 0) {
console.log('UnprocessedItems', unprocessed.length)
dstCount += puts.length - unprocessed.length
for (const item of unprocessed) {
yield item
}
} else {
dstCount += puts.length
}
spinner.suffixText = `src: ${srcCount} dst: ${dstCount}`
}
},
ndjson.stringify,
fs.createWriteStream(`./write-unprocessed-${totalSegments}-${segment}.ndjson`)
)
spinner.succeed()
}