Skip to content

Commit

Permalink
Implement change index lookup by accountId
Browse files Browse the repository at this point in the history
  • Loading branch information
vgrichina committed Feb 10, 2024
1 parent 6639ac9 commit a14fe08
Showing 1 changed file with 73 additions and 22 deletions.
95 changes: 73 additions & 22 deletions storage/lake/changes-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,24 @@ async function writeChangesFile(outPath, changesByAccount) {
await flushPage();
}

function readPage(buffer) {
let offset = 0;
class BufferReader {
constructor(buffer) {
this.buffer = buffer;
this.offset = 0;
}

function readVarint() {
const result = buffer.readUInt8(offset);
offset++;
readVarint() {
const result = this.buffer.readUInt8(this.offset);
this.offset++;
if (result < 0x80) {
return result;
}

let value = result & 0x7F;
let shift = 7;
while (true) {
const byte = buffer.readUInt8(offset);
offset++;
const byte = this.buffer.readUInt8(this.offset);
this.offset++;
value |= (byte & 0x7F) << shift;
if (byte < 0x80) {
return value;
Expand All @@ -116,35 +119,39 @@ function readPage(buffer) {
}
}

function readBuffer() {
readBuffer() {
// TODO: Is +2 correct here for varint length?
if (offset + 2 >= PAGE_SIZE) {
if (this.offset + 2 >= PAGE_SIZE) {
return null;
}
}

const length = readVarint();
const length = this.readVarint();
if (length === 0) {
return null;
}

const result = buffer.slice(offset, offset + length);
offset += length;
const result = this.buffer.slice(this.offset, this.offset + length);
this.offset += length;
return result;
}

function readString() {
return readBuffer()?.toString('utf-8');
readString() {
return this.readBuffer()?.toString('utf-8');
}
}

function readPage(buffer) {
const reader = new BufferReader(buffer);

const result = [];
let accountId;
while (accountId = readString()) {
while (accountId = reader.readString()) {
let key;
while (key = readBuffer()) {
const count = readVarint();
while (key = reader.readBuffer()) {
const count = reader.readVarint();
const changes = new Array(count);
for (let i = 0; i < count; i++) {
changes[i] = readVarint();
changes[i] = reader.readVarint();
if (i > 0) {
changes[i] += changes[i - 1];
}
Expand All @@ -156,18 +163,62 @@ function readPage(buffer) {
return result;
}

async function *readChangesFile(inPath) {
const MAX_ACCOUNT_ID_LENGTH = 64 + 2;

async function *readChangesFile(inPath, { accountId, keyPrefix }) {
const file = await open(inPath, 'r');

const buffer = Buffer.alloc(PAGE_SIZE);

let position = 0;

if (accountId) {
// Binary search for the account page
const { size } = await file.stat();
let left = 0;
let right = Math.floor(size / PAGE_SIZE)
while (left < right) {
const mid = left + Math.floor((right - left) / 2);
await file.read({ buffer, length: MAX_ACCOUNT_ID_LENGTH, position: mid * PAGE_SIZE });

const reader = new BufferReader(buffer);
const midAccountId = reader.readString();
if (midAccountId > accountId) {
right = mid;
} else {
left = mid + 1;
}
console.log('left', left, 'right', right, 'midAccountId', midAccountId, accountId);
}

position = left * PAGE_SIZE;
}

let needToSkip = !!accountId;
let bytesRead;
do {
({ bytesRead } = await file.read({ buffer, length: PAGE_SIZE, position }));
buffer.fill(0, bytesRead);
buffer.fill(0, bytesRead);

if (!accountId) {
yield *readPage(buffer);
} else {
const items = readPage(buffer);
for (let item of items) {
if (item.accountId === accountId) {
needToSkip = false;
yield item;
}

if (needToSkip) {
continue;
}

yield *readPage(buffer);
if (item.accountId !== accountId) {
return;
}
}
}

position += PAGE_SIZE;
} while (bytesRead === PAGE_SIZE);
Expand Down

0 comments on commit a14fe08

Please sign in to comment.