-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Copy job attempt state to configs database #7219
Conversation
tuliren
commented
Oct 20, 2021
•
edited
Loading
edited
- Resolves Move state from jobs db to configs db #5504
- Resolves Update config and job persistence unit tests to run migrations #5857
...eduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java
Outdated
Show resolved
Hide resolved
@@ -22,6 +22,12 @@ create table "public"."airbyte_configs_migrations"( | |||
constraint "airbyte_configs_migrations_pk" | |||
primary key ("installed_rank") | |||
); | |||
create table "public"."sync_state"( | |||
"sync_id" uuid not null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you probably also want to keep the connection_id there. Not even sure you want the sync id actually
I can see the job database being purged but you still want to know what is the latest state for a connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sync_id
is the connection_id
. The reason I name it sync_id
is that the connectionId
is the primary key for a StandardSync
. I imagine that in the future when we implement the project to normalize the tables, the primary key of the Sync
table (removing the Standard-
prefix) will just be id
, not connectionId
. So sync_id
will be consistent with that future schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't a StandardSync
eventually become a Connection
and then this would become ConnectionState
with a connection_id
?
"Sync" / "StandardSync" feel out of place with the UI/how we usually talk about this. I would think that a "Sync" is more of a "Job" today than the connection itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with jared :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I synced with Charles on this. For now we are just going to persist the state in the airbyte_configs
table as a json blob. To not cause confusion, the new type is STANDARD_SYNC_STATE
. When we normalize this table, we can change the name sync
to connection.
airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java
Outdated
Show resolved
Hide resolved
...main/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state.java
Outdated
Show resolved
Hide resolved
...java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java
Outdated
Show resolved
Hide resolved
b2872dc
to
aa20d11
Compare
f781684
to
3198ed4
Compare
3198ed4
to
1f24699
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
// state schema: State.yaml, e.g. { "state": { "cursor": 1000 } } | ||
final Field<JSONB> attemptState = DSL.field("attempts.output -> 'sync' -> 'state'", SQLDataType.JSONB); | ||
|
||
return jobsDatabase.query(ctx -> ctx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! i like this query.
Cannot merge this yet. There is a fatal flaw resulting in lost state. Glad that we have an acceptance test that catches this bug. But also scary that this issue is only caught by one test. I think I have located the root cause, but need more tests. |
Alright, it should be more robust now with the new unit tests. |
* Add migration to create latest state table * Log migration name * Expose db variables to airbyte-db * Implement migration * Fix migration test * temp * Rebase on master * Save state in temporal (airbytehq#7253) * Copy state to airbyte_configs table * Add standard sync state * Move state methods to config repository * Add unit tests * Fix unit tests * Register standard sync state in migration * Add comment * Use config model instead of json node * Add comments * Remove unnecessary method * Fix migration query * Remove unused config database * Move persist statement and log the call * Update dev doc * Add unit tests for sync workflow Co-authored-by: Charles <giardina.charles@gmail.com>