Skip to content

Commit

Permalink
modify event type column
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Nov 6, 2023
1 parent de98803 commit b7e40f8
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 22 deletions.
6 changes: 2 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.42.0...HEAD)

### Added
* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Save into Marquez model datasets sent via `DatasetEvent` event type
* API: support `JobEvent` [`#2661`](https://github.com/MarquezProject/marquez/pull/2661) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Save into Marquez model jobs and datasets sent via `JobEvent` event type.

Expand Down Expand Up @@ -30,10 +32,6 @@
* Web: fix Unix epoch time display for null `endedAt` values [`#2647`](https://github.com/MarquezProject/marquez/pull/2647) [@merobi-hub](https://github.com/merobi-hub)
*Fixes the issue of the GUI displaying Unix epoch time (midnight on January 1, 1970) in the case of running jobs/null `endedAt` values.*

### Added
* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Save into Marquez model datasets sent via `DatasetEvent` event type

## [0.41.0](https://github.com/MarquezProject/marquez/compare/0.40.0...0.41.0) - 2023-09-20
### Added
* API: add support for the following parameters in the `SearchDao` [`#2556`](https://github.com/MarquezProject/marquez/pull/2556) [@tati](https://github.com/tati) [@wslulciuc](https://github.com/wslulciuc)
Expand Down
12 changes: 6 additions & 6 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ enum SpecEventType {
+ "job_namespace, "
+ "event, "
+ "producer, "
+ "spec_event_type) "
+ "_event_type) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, 'RUN_EVENT')")
void createLineageEvent(
String eventType,
Expand All @@ -111,7 +111,7 @@ void createLineageEvent(
+ "event_time, "
+ "event, "
+ "producer, "
+ "spec_event_type) "
+ "_event_type) "
+ "VALUES (?, ?, ?, 'DATASET_EVENT')")
void createDatasetEvent(Instant eventTime, PGobject event, String producer);

Expand All @@ -122,13 +122,13 @@ void createLineageEvent(
+ "job_namespace, "
+ "event, "
+ "producer, "
+ "spec_event_type) "
+ "_event_type) "
+ "VALUES (?, ?, ?, ?, ?, 'JOB_EVENT')")
void createJobEvent(
Instant eventTime, String jobName, String jobNamespace, PGobject event, String producer);

@SqlQuery(
"SELECT event FROM lineage_events WHERE run_uuid = :runUuid AND spec_event_type='RUN_EVENT'")
"SELECT event FROM lineage_events WHERE run_uuid = :runUuid AND _event_type='RUN_EVENT'")
List<LineageEvent> findLineageEventsByRunUuid(UUID runUuid);

@SqlQuery(
Expand All @@ -137,7 +137,7 @@ void createJobEvent(
FROM lineage_events le
WHERE (le.event_time < :before
AND le.event_time >= :after)
AND le.spec_event_type='RUN_EVENT'
AND le._event_type='RUN_EVENT'
ORDER BY le.event_time DESC
LIMIT :limit OFFSET :offset""")
List<LineageEvent> getAllLineageEventsDesc(
Expand All @@ -149,7 +149,7 @@ List<LineageEvent> getAllLineageEventsDesc(
FROM lineage_events le
WHERE (le.event_time < :before
AND le.event_time >= :after)
AND le.spec_event_type='RUN_EVENT'
AND le._event_type='RUN_EVENT'
ORDER BY le.event_time ASC
LIMIT :limit OFFSET :offset""")
List<LineageEvent> getAllLineageEventsAsc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.jdbi.v3.core.Jdbi;

@Slf4j
public class V66_2_JobFacetsBackfillJobVersion implements JavaMigration {
public class V66_3_JobFacetsBackfillJobVersion implements JavaMigration {

public static final String UPDATE_QUERY =
"""
Expand All @@ -24,7 +24,7 @@ public class V66_2_JobFacetsBackfillJobVersion implements JavaMigration {

@Override
public MigrationVersion getVersion() {
return MigrationVersion.fromVersion("66.2");
return MigrationVersion.fromVersion("66.3");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,4 @@ ALTER TABLE job_facets ALTER COLUMN lineage_event_type DROP NOT NULL;
ALTER TABLE job_facets DROP CONSTRAINT job_facets_run_uuid_fkey;
ALTER TABLE job_facets ADD COLUMN job_version_uuid uuid REFERENCES job_versions (uuid);

CREATE INDEX job_facets_job_version_uuid ON job_facets (job_version_uuid);

ALTER TABLE lineage_events ADD COLUMN spec_event_type VARCHAR(64);
UPDATE lineage_events SET spec_event_type = 'RunEvent';
CREATE INDEX job_facets_job_version_uuid ON job_facets (job_version_uuid);
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TYPE EVENT_TYPE AS ENUM ('RUN_EVENT', 'DATASET_EVENT', 'JOB_EVENT');

ALTER TABLE lineage_events ADD COLUMN _event_type EVENT_TYPE;
ALTER TABLE lineage_events ALTER COLUMN _event_type SET DEFAULT 'RUN_EVENT';
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@

/** Test to validate if a job_version_column is filled properly within job_facets table */
@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
@FlywayTarget("66.1")
@FlywayTarget("66.3")
@FlywaySkipRepeatable()
@Slf4j
public class V66_2_JobFacetsBackfillJobVersionTest {
public class V66_3_JobFacetsBackfillJobVersionTest {

private static V66_2_JobFacetsBackfillJobVersion migration =
new V66_2_JobFacetsBackfillJobVersion();
private static V66_3_JobFacetsBackfillJobVersion migration =
new V66_3_JobFacetsBackfillJobVersion();
static Jdbi jdbi;
private static OpenLineageDao openLineageDao;

@BeforeAll
public static void setUpOnce(Jdbi jdbi) {
V66_2_JobFacetsBackfillJobVersionTest.jdbi = jdbi;
V66_3_JobFacetsBackfillJobVersionTest.jdbi = jdbi;
openLineageDao = jdbi.onDemand(OpenLineageDao.class);
}

Expand Down Expand Up @@ -91,7 +91,7 @@ public Connection getConnection() {
}
};
// apply migrations in order
new V66_2_JobFacetsBackfillJobVersion().migrate(context);
new V66_3_JobFacetsBackfillJobVersion().migrate(context);
} catch (Exception e) {
throw new AssertionError("Unable to execute migration", e);
}
Expand Down

0 comments on commit b7e40f8

Please sign in to comment.