Skip to content

Commit

Permalink
Set an electric.syncing config flag in Postgres during a sync trans…
Browse files Browse the repository at this point in the history
…action to enable user defined triggers to chose how to run during a sync
  • Loading branch information
samwillis committed Oct 14, 2024
1 parent bd1b3b9 commit 17920aa
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/clean-squids-obey.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/pglite-sync': patch
---

Set an `electric.syncing` config flag in Postgres during a sync transaction to enable user defined triggers to chose how to run during a sync.
7 changes: 7 additions & 0 deletions packages/pglite-sync/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ async function createPlugin(
case 'up-to-date':
await pg.transaction(async (tx) => {
if (debug) console.log('up-to-date, committing all messages')

// Set the syncing flag to true during this transaction so that
// user defined triggers on the table are able to chose how to run
// during a sync
tx.exec(`SET LOCAL ${metadataSchema}.syncing = true;`)

if (truncateNeeded) {
truncateNeeded = false
// TODO: sync into shadow table and reference count
Expand Down Expand Up @@ -407,6 +413,7 @@ async function migrateShapeMetadataTables({
}: MigrateShapeMetadataTablesOptions) {
await pg.exec(
`
SET ${metadataSchema}.syncing = false;
CREATE SCHEMA IF NOT EXISTS "${metadataSchema}";
CREATE TABLE IF NOT EXISTS ${subscriptionMetadataTableName(metadataSchema)} (
shape_key TEXT PRIMARY KEY,
Expand Down
63 changes: 63 additions & 0 deletions packages/pglite-sync/test/sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,4 +467,67 @@ describe('pglite-sync', () => {

await shape.unsubscribe()
})

it('sets the syncing flag to true when syncing begins', async () => {
let feedMessage: (message: Message) => Promise<void> = async (_) => {}
MockShapeStream.mockImplementation(() => ({
subscribe: vi.fn((cb: (messages: Message[]) => Promise<void>) => {
feedMessage = (message) => cb([message, upToDateMsg])
}),
unsubscribeAll: vi.fn(),
}))

await pg.exec(`
CREATE TABLE test_syncing (
id TEXT PRIMARY KEY,
value TEXT,
is_syncing BOOLEAN
);
CREATE OR REPLACE FUNCTION check_syncing()
RETURNS TRIGGER AS $$
DECLARE
is_syncing BOOLEAN;
BEGIN
is_syncing := COALESCE(current_setting('electric.syncing', true)::boolean, false);
IF is_syncing THEN
NEW.is_syncing := TRUE;
ELSE
NEW.is_syncing := FALSE;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER test_syncing_trigger
BEFORE INSERT ON test_syncing
FOR EACH ROW EXECUTE FUNCTION check_syncing();
`)

const shape = await pg.electric.syncShapeToTable({
shape: { url: 'http://localhost:3000/v1/shape/test_syncing' },
table: 'test_syncing',
primaryKey: ['id'],
})

await feedMessage({
headers: { operation: 'insert' },
offset: '-1',
key: 'id1',
value: {
id: 'id1',
value: 'test value',
},
})

const result = await pg.sql`SELECT * FROM test_syncing WHERE id = 'id1'`
expect(result.rows).toHaveLength(1)
expect(result.rows[0]).toEqual({
id: 'id1',
value: 'test value',
is_syncing: true,
})

await shape.unsubscribe()
})
})

0 comments on commit 17920aa

Please sign in to comment.