diff --git a/storage/lake/changes-index.js b/storage/lake/changes-index.js index c9661e5..7bed594 100644 --- a/storage/lake/changes-index.js +++ b/storage/lake/changes-index.js @@ -93,12 +93,15 @@ async function writeChangesFile(outPath, changesByAccount) { await flushPage(); } -function readPage(buffer) { - let offset = 0; +class BufferReader { + constructor(buffer) { + this.buffer = buffer; + this.offset = 0; + } - function readVarint() { - const result = buffer.readUInt8(offset); - offset++; + readVarint() { + const result = this.buffer.readUInt8(this.offset); + this.offset++; if (result < 0x80) { return result; } @@ -106,8 +109,8 @@ function readPage(buffer) { let value = result & 0x7F; let shift = 7; while (true) { - const byte = buffer.readUInt8(offset); - offset++; + const byte = this.buffer.readUInt8(this.offset); + this.offset++; value |= (byte & 0x7F) << shift; if (byte < 0x80) { return value; @@ -116,35 +119,39 @@ function readPage(buffer) { } } - function readBuffer() { + readBuffer() { // TODO: Is +2 correct here for varint length? - if (offset + 2 >= PAGE_SIZE) { + if (this.offset + 2 >= PAGE_SIZE) { return null; - } + } - const length = readVarint(); + const length = this.readVarint(); if (length === 0) { return null; } - const result = buffer.slice(offset, offset + length); - offset += length; + const result = this.buffer.slice(this.offset, this.offset + length); + this.offset += length; return result; } - function readString() { - return readBuffer()?.toString('utf-8'); + readString() { + return this.readBuffer()?.toString('utf-8'); } +} + +function readPage(buffer) { + const reader = new BufferReader(buffer); const result = []; let accountId; - while (accountId = readString()) { + while (accountId = reader.readString()) { let key; - while (key = readBuffer()) { - const count = readVarint(); + while (key = reader.readBuffer()) { + const count = reader.readVarint(); const changes = new Array(count); for (let i = 0; i < count; i++) { - changes[i] = readVarint(); + changes[i] = reader.readVarint(); if (i > 0) { changes[i] += changes[i - 1]; } @@ -156,18 +163,62 @@ function readPage(buffer) { return result; } -async function *readChangesFile(inPath) { +const MAX_ACCOUNT_ID_LENGTH = 64 + 2; + +async function *readChangesFile(inPath, { accountId, keyPrefix }) { const file = await open(inPath, 'r'); const buffer = Buffer.alloc(PAGE_SIZE); let position = 0; + + if (accountId) { + // Binary search for the account page + const { size } = await file.stat(); + let left = 0; + let right = Math.floor(size / PAGE_SIZE) + while (left < right) { + const mid = left + Math.floor((right - left) / 2); + await file.read({ buffer, length: MAX_ACCOUNT_ID_LENGTH, position: mid * PAGE_SIZE }); + + const reader = new BufferReader(buffer); + const midAccountId = reader.readString(); + if (midAccountId > accountId) { + right = mid; + } else { + left = mid + 1; + } + console.log('left', left, 'right', right, 'midAccountId', midAccountId, accountId); + } + + position = left * PAGE_SIZE; + } + + let needToSkip = !!accountId; let bytesRead; do { ({ bytesRead } = await file.read({ buffer, length: PAGE_SIZE, position })); - buffer.fill(0, bytesRead); + buffer.fill(0, bytesRead); + + if (!accountId) { + yield *readPage(buffer); + } else { + const items = readPage(buffer); + for (let item of items) { + if (item.accountId === accountId) { + needToSkip = false; + yield item; + } + + if (needToSkip) { + continue; + } - yield *readPage(buffer); + if (item.accountId !== accountId) { + return; + } + } + } position += PAGE_SIZE; } while (bytesRead === PAGE_SIZE);