Skip to content

Commit

Permalink
Set default zero for power balance snapshot on no return from balance…
Browse files Browse the repository at this point in the history
… aggregator (#1732)

Ref #1655
  • Loading branch information
aminlatifi authored Jan 18, 2025
1 parent 2825385 commit e4f6b26
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ import { convertTimeStampToSeconds } from '../../utils/utils';
export class GivPowerBalanceAggregatorAdapterMock
implements IGivPowerBalanceAggregator
{
private excludedAddresses: Set<string> = new Set();

addExcludedAddresses(addresses: string[]): void {
addresses.forEach(address => {
this.excludedAddresses.add(address);
});
}

clearExcludedAddresses(): void {
this.excludedAddresses.clear();
}

async getAddressesBalance(
params: BalancesAtTimestampInputParams,
): Promise<BalanceResponse[]> {
Expand All @@ -21,14 +33,16 @@ export class GivPowerBalanceAggregatorAdapterMock
'addresses length can not be greater than NUMBER_OF_BALANCE_AGGREGATOR_BATCH that is defined in .env',
);
}
return _.uniq(params.addresses).map(address => {
return {
address,
balance: 13, // Just an example balance
updatedAt: new Date('2023-08-10T16:18:02.655Z'),
networks: [100],
};
});
return _.uniq(params.addresses)
.filter(address => !this.excludedAddresses.has(address))
.map(address => {
return {
address,
balance: 13, // Just an example balance
updatedAt: new Date('2023-08-10T16:18:02.655Z'),
networks: [100],
};
});
}

async getLatestBalances(
Expand Down
101 changes: 99 additions & 2 deletions src/services/cronJobs/fillSnapshotBalances.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { assert } from 'chai';
import sinon from 'sinon';
import { In } from 'typeorm';
import {
addFillPowerSnapshotBalanceJobsToQueue,
processFillPowerSnapshotJobs,
Expand All @@ -16,7 +17,10 @@ import { PowerSnapshot } from '../../entities/powerSnapshot';
import { PowerBoostingSnapshot } from '../../entities/powerBoostingSnapshot';
import { AppDataSource } from '../../orm';
import { PowerBalanceSnapshot } from '../../entities/powerBalanceSnapshot';
import { getPowerBalanceAggregatorAdapter } from '../../adapters/adaptersFactory';
import {
getPowerBalanceAggregatorAdapter,
mockPowerBalanceAggregator,
} from '../../adapters/adaptersFactory';
import { convertTimeStampToSeconds } from '../../utils/utils';

describe(
Expand All @@ -33,6 +37,7 @@ async function processFillPowerSnapshotJobsTestCases() {
);
await PowerBalanceSnapshot.clear();
await PowerBoostingSnapshot.clear();
mockPowerBalanceAggregator.clearExcludedAddresses();
});

afterEach(() => {
Expand Down Expand Up @@ -295,6 +300,98 @@ async function processFillPowerSnapshotJobsTestCases() {
await sleep(4_000);

assert.equal((await getPowerBoostingSnapshotWithoutBalance()).length, 0);
// assert.isEmpty(await getPowerBoostingSnapshotWithoutBalance());
});

it('should fill zero snapshot balance for users with no balance on balance aggreagator', async () => {
const powerSnapshotTime = new Date().getTime() - 10 * 3600 * 1000; // 10 hour earlier
const excludedAddresses: string[] = [];
const excludedUserIds: number[] = [];

const ITERATIONS = 10;
for (let i = 0; i < ITERATIONS; i++) {
const user = await saveUserDirectlyToDb(generateRandomEtheriumAddress());
const user2 = await saveUserDirectlyToDb(generateRandomEtheriumAddress());

excludedAddresses.push(user2.walletAddress as string);
excludedUserIds.push(user2.id);

const project = await saveProjectDirectlyToDb(createProjectData());
const powerSnapshots = PowerSnapshot.create([
{
time: new Date(powerSnapshotTime + (i + 1) * 1000),
},
{
time: new Date(powerSnapshotTime + 500 + (i + 1) * 1000),
},
]);
await PowerSnapshot.save(powerSnapshots);

const powerBoostingSnapshots = PowerBoostingSnapshot.create([
{
userId: user.id,
projectId: project.id,
percentage: 10,
powerSnapshot: powerSnapshots[0],
},
{
userId: user2.id,
projectId: project.id,
percentage: 20,
powerSnapshot: powerSnapshots[0],
},
{
userId: user.id,
projectId: project.id,
percentage: 30,
powerSnapshot: powerSnapshots[1],
},
{
userId: user2.id,
projectId: project.id,
percentage: 40,
powerSnapshot: powerSnapshots[1],
},
]);
await PowerBoostingSnapshot.save(powerBoostingSnapshots);

const powerBalances = PowerBalanceSnapshot.create([
{
userId: user.id,
powerSnapshot: powerSnapshots[0],
},
{
userId: user2.id,
powerSnapshot: powerSnapshots[0],
},
{
userId: user.id,
powerSnapshot: powerSnapshots[1],
},
{
userId: user2.id,
powerSnapshot: powerSnapshots[1],
},
]);
await PowerBalanceSnapshot.save(powerBalances);
}

mockPowerBalanceAggregator.addExcludedAddresses(excludedAddresses);

assert.isNotEmpty(await getPowerBoostingSnapshotWithoutBalance());

await addFillPowerSnapshotBalanceJobsToQueue();

await sleep(4_000);

assert.equal((await getPowerBoostingSnapshotWithoutBalance()).length, 0);

const excludedUsersPowerBalances = await PowerBalanceSnapshot.find({
where: {
userId: In(excludedUserIds),
},
});

assert.lengthOf(excludedUsersPowerBalances, ITERATIONS * 2);
assert.isTrue(excludedUsersPowerBalances.every(pb => pb.balance === 0));
});
}
37 changes: 27 additions & 10 deletions src/services/cronJobs/fillSnapshotBalances.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,27 +120,44 @@ export function processFillPowerSnapshotJobs() {
// Process in batches
for (let i = 0; i < Math.ceil(data.length / batchNumber); i++) {
const batch = data.slice(i * batchNumber, (i + 1) * batchNumber);
const addresses = batch.map(item => item.walletAddress);
const addressesToFetch = new Set<string>(
batch.map(item => item.walletAddress),
);
const balances =
await getPowerBalanceAggregatorAdapter().getAddressesBalance({
timestamp,
addresses,
addresses: Array.from(addressesToFetch),
});

const groupByWalletAddress = _.groupBy(batch, item =>
item.walletAddress.toLowerCase(),
);

const snapshotBalances = balances
.map(balance =>
groupByWalletAddress[balance.address.toLowerCase()].map(item => ({
balance: balance.balance,
const snapshotBalances = balances.map(balance => {
const address = balance.address.toLowerCase();

// Remove the address from the set
addressesToFetch.delete(address);

return groupByWalletAddress[address].map(item => ({
balance: balance.balance,
powerSnapshotId: item.powerSnapshotId,
userId: item!.userId,
}));
});

// Fill zero for the missing balances
for (const missedAddress of addressesToFetch) {
snapshotBalances.push(
groupByWalletAddress[missedAddress].map(item => ({
balance: 0,
powerSnapshotId: item.powerSnapshotId,
userId: item!.userId,
userId: item.userId,
})),
)
.flat();
await addOrUpdatePowerSnapshotBalances(snapshotBalances);
);
}

await addOrUpdatePowerSnapshotBalances(snapshotBalances.flat());
}
} catch (e) {
logger.error('processFillPowerSnapshotJobs >> error', e);
Expand Down

0 comments on commit e4f6b26

Please sign in to comment.