Skip to content

Commit

Permalink
Improve upsert update validation
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Jul 13, 2024
1 parent cf84581 commit 5af91d1
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 21 deletions.
52 changes: 50 additions & 2 deletions runner/src/dml-handler/dml-handler-fixture.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ describe('DML Handler Fixture Tests', () => {
expect(await dmlHandler.select(TABLE_DEFINITION_NAMES, { account_id: 'unknown_near' })).toEqual([]);
});

test('insert two rows with serial ID column', async () => {
test('insert two rows with serial column', async () => {
const inputObj = [{
account_id: 'TEST_NEAR',
block_height: 1,
Expand Down Expand Up @@ -217,7 +217,55 @@ describe('DML Handler Fixture Tests', () => {
accounts_liked: [],
}];

await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, upsertObj, ['account_id'], ['content', 'block_timestamp'])).rejects.toThrow('Conflict update criteria cannot match multiple rows');
await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, upsertObj, ['account_id'], ['content', 'block_timestamp'])).rejects.toThrow('Conflict update criteria cannot affect row twice');
});

test('reject upsert due to duplicate row', async () => {
const inputObj = [{
id: 0,
account_id: 'TEST_NEAR',
block_height: 1,
content: "CONTENT",
accounts_liked: [],
},
{
id: 0,
account_id: 'TEST_NEAR',
block_height: 1,
content: "CONTENT",
accounts_liked: [],
}];

await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, inputObj, ['id', 'account_id'], ['content'])).rejects.toThrow('Conflict update criteria cannot affect row twice');
});

test('reject upsert after specifying serial column value', async () => {
const inputObjWithSerial = [{
id: 0, // Specifying a serial value does not change the next produced serial value (Which would be 0 in this case)
account_id: 'TEST_NEAR',
block_height: 1,
content: "CONTENT",
accounts_liked: [],
}];
const inputObj = [{
account_id: 'TEST_NEAR',
block_height: 1,
content: "CONTENT",
accounts_liked: [],
}];

await dmlHandler.upsert(TABLE_DEFINITION_NAMES, inputObjWithSerial, ['id', 'account_id'], ['content']);
await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, inputObj, ['id', 'account_id'], ['content'])).rejects.toThrow('Cannot insert row twice into the same table');
});

test('reject insert after not specifying primary key value', async () => {
const inputObj = [{
block_height: 1,
content: "CONTENT",
accounts_liked: [],
}];

await expect(dmlHandler.upsert(TABLE_DEFINITION_NAMES, inputObj, ['id', 'account_id'], ['content'])).rejects.toThrow('Inserted row must specify value for primary key columns');
});

test('delete rows', async () => {
Expand Down
40 changes: 21 additions & 19 deletions runner/src/dml-handler/dml-handler-fixture.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ class TableData {
removeEntitiesByCriteria(criteria: WhereClauseMulti): PostgresRowEntity[] {
const remainingRows: PostgresRowEntity[] = [];
const matchedRows: PostgresRowEntity[] = [];
this.data.map(row => {
if (row.isEqualCriteria(criteria)) {
matchedRows.push(row)
this.data.map(entity => {
if (entity.isEqualCriteria(criteria)) {
matchedRows.push(entity)
} else {
remainingRows.push(row);
remainingRows.push(entity);
}
});
this.data = remainingRows;
Expand Down Expand Up @@ -220,10 +220,7 @@ class IndexerData {

selectColumnsFromRow(row: PostgresRow, columnsToSelect: string[]): PostgresRow {
return columnsToSelect.reduce((newRow, columnName) => {
if (columnName in row) {
newRow[columnName] = row[columnName];
return newRow;
}
newRow[columnName] = columnName in row ? row[columnName] : undefined;
return newRow;
}, {} as PostgresRow);
}
Expand All @@ -236,6 +233,7 @@ class IndexerData {

return tableData;
}

public select(tableName: string, criteria: WhereClauseMulti, limit: number | null): PostgresRow[] {
const tableData = this.getTableData(tableName);
return tableData.getEntitiesByCriteria(criteria, limit).map(entity => entity.data);
Expand All @@ -246,54 +244,58 @@ class IndexerData {
// TODO: Verify columns are correctly named, and have any required values
// TODO: Verify inserts are unique before actual insertion
const tableData = this.getTableData(tableName);
const insertedRows: PostgresRow[] = [];
const insertedRows: PostgresRowEntity[] = [];

for (const row of rowsToInsert) {
if (!tableData.rowIsUnique(row)) {
throw new Error('Cannot insert row twice into the same table');
}
insertedRows.push(tableData.insertRow(row).data);
insertedRows.push(tableData.insertRow(row));
}

return insertedRows;
return insertedRows.map(entity => entity.data);
}

public update(tableName: string, criteria: WhereClauseSingle, updateObject: PostgresRow): PostgresRow[] {
// TODO: Validate criteria passed in has valid column names
const tableData = this.getTableData(tableName);
const updatedRows: PostgresRow[] = [];
const updatedRows: PostgresRowEntity[] = [];

const matchedRows = tableData.removeEntitiesByCriteria(criteria);
for (const rowEntity of matchedRows) {
rowEntity.update(updateObject);
updatedRows.push(tableData.insertEntity(rowEntity).data);
updatedRows.push(tableData.insertEntity(rowEntity));
}

return updatedRows;
return updatedRows.map(entity => entity.data);
}

public upsert(tableName: string, rowsToUpsert: PostgresRow[], conflictColumns: string[], updateColumns: string[]): PostgresRow[] {
// TODO: Verify conflictColumns is a superset of primary key set (For uniqueness constraint)
const tableData = this.getTableData(tableName);
const upsertedRows: PostgresRow[] = [];
const upsertedRows: PostgresRowEntity[] = [];

for (const row of rowsToUpsert) {
const updateCriteriaObject = this.selectColumnsFromRow(row, conflictColumns);
const rowsMatchingUpdate = tableData.removeEntitiesByCriteria(updateCriteriaObject);

if (rowsMatchingUpdate.length > 1) {
throw new Error('Conflict update criteria cannot match multiple rows');
throw new Error('Conflict update criteria cannot affect row twice');
} else if (rowsMatchingUpdate.length == 1) {
const matchedEntity = rowsMatchingUpdate[0];
if (upsertedRows.some(upsertedEntity => upsertedEntity.isEqualEntity(matchedEntity))) {
throw new Error('Conflict update criteria cannot affect row twice');
}

const updateObject = this.selectColumnsFromRow(row, updateColumns);
matchedEntity.update(updateObject);
upsertedRows.push(tableData.insertEntity(matchedEntity).data);
upsertedRows.push(tableData.insertEntity(matchedEntity));
} else {
upsertedRows.push(tableData.insertRow(row).data);
upsertedRows.push(tableData.insertRow(row));
}
}

return upsertedRows;
return upsertedRows.map(entity => entity.data);
}

public delete(tableName: string, deleteCriteria: WhereClauseMulti): PostgresRow[] {
Expand Down

0 comments on commit 5af91d1

Please sign in to comment.