Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add raw OpenLineage get event API #2070

Merged
merged 3 commits into from
Sep 15, 2022
Merged

add raw OpenLineage get event API #2070

merged 3 commits into from
Sep 15, 2022

Conversation

mobuchowski
Copy link
Contributor

Adds an API that returns raw OpenLineage events sorted by time, and optionally filtered by namespace.
Filtering by namespace takes into account both job and dataset namespaces.

Closes: #1927

Signed-off-by: Maciej Obuchowski obuchowski.maciej@gmail.com

@mobuchowski mobuchowski force-pushed the events/raw-event-api branch from cf37442 to 5b237b3 Compare August 12, 2022 14:15
@mobuchowski mobuchowski force-pushed the events/raw-event-api branch from 5b237b3 to 97d55ea Compare August 29, 2022 16:21
Copy link
Member

@julienledem julienledem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, I would avoid the unnecessary parsing, unparsing of json on the server side to the RawLineageEvent structure and just use Jackson's JSONObject there.

@mobuchowski mobuchowski force-pushed the events/raw-event-api branch from 97d55ea to f1868f8 Compare August 31, 2022 13:26
@boring-cyborg boring-cyborg bot added api API layer changes client/java labels Aug 31, 2022
@mobuchowski mobuchowski force-pushed the events/raw-event-api branch from f1868f8 to c774003 Compare August 31, 2022 13:28
@codecov
Copy link

codecov bot commented Aug 31, 2022

Codecov Report

Merging #2070 (48606ff) into main (effe09d) will increase coverage by 0.13%.
The diff coverage is 92.10%.

@@             Coverage Diff              @@
##               main    #2070      +/-   ##
============================================
+ Coverage     75.14%   75.28%   +0.13%     
- Complexity     1023     1034      +11     
============================================
  Files           202      203       +1     
  Lines          4836     4871      +35     
  Branches        392      396       +4     
============================================
+ Hits           3634     3667      +33     
  Misses          762      762              
- Partials        440      442       +2     
Impacted Files Coverage Δ
api/src/main/java/marquez/db/Columns.java 81.81% <ø> (ø)
api/src/main/java/marquez/db/OpenLineageDao.java 95.11% <ø> (ø)
...src/main/java/marquez/api/OpenLineageResource.java 84.61% <75.00%> (+0.40%) ⬆️
.../java/src/main/java/marquez/client/MarquezUrl.java 60.31% <90.00%> (+5.60%) ⬆️
api/src/main/java/marquez/MarquezApp.java 66.23% <100.00%> (+0.90%) ⬆️
api/src/main/java/marquez/MarquezContext.java 84.93% <100.00%> (ø)
...rc/main/java/marquez/api/models/SortDirection.java 100.00% <100.00%> (ø)
...va/src/main/java/marquez/client/MarquezClient.java 59.90% <100.00%> (+2.46%) ⬆️
...va/src/main/java/marquez/client/MarquezPathV1.java 62.29% <100.00%> (+0.62%) ⬆️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@mobuchowski mobuchowski force-pushed the events/raw-event-api branch 5 times, most recently from 855ff8c to f91ba35 Compare August 31, 2022 14:54
@mobuchowski mobuchowski marked this pull request as ready for review August 31, 2022 14:58
@mobuchowski
Copy link
Contributor Author

@julienledem fixed - deserializing to JsonNode. Might also just return String...

Added tests.

@ResponseMetered
@ExceptionMetered
@GET
@Path("/events/{namespace}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To get events for a given namespace, I would use:

GET /namespaces/{namespace}/events

We could also define a namespace query param, but having the namespace in the path aligns more with how endpoints are defined for Marquez.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this endpoint for now.

import marquez.service.ServiceFactory;

@Path("/api/v1")
public class EventResource extends BaseResource {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define these endpoints in OpenLineageResource?

@Path("/events")
@Produces(APPLICATION_JSON)
public Response get(
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also allow callers

  • before: Return events created before a particular date (as YYYY-MM-DD)
  • after: Return events created after a particular date (as YYYY-MM-DD
  • sort: Sort events in asc or desc order by event_time. Default is desc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Value
static class Events {
@NonNull
@JsonProperty("events")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

api/src/main/java/marquez/db/EventDao.java Outdated Show resolved Hide resolved
Copy link
Member

@wslulciuc wslulciuc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also update our CHANGELOG.md with this sweet sweet endpoint!

import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;

@RegisterRowMapper(RawLineageEventMapper.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you looked into using the @Json instead of defining a custom mapper?

Our PostgresPlugin provides qualified factories that will bind/map the @EncodedJson String to/from json or jsonb-typed columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with LineageEventMapper that does what I want it to.

Copy link
Member

@julienledem julienledem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments. I think this needs to be perf tested before merging

api/src/main/java/marquez/api/EventResource.java Outdated Show resolved Hide resolved
Comment on lines 1 to 5
CREATE INDEX lineage_events_event_time
on lineage_events(event_time DESC);

CREATE INDEX lineage_events_namespace_event_time
on lineage_events(job_namespace, event_time DESC);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe do some perf test on a big db to measure the impact of this on POST /api/v1/lineage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main:

➜  marquez git:(main) ✗ k6 run --vus 25 --duration 30s load.js

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: load.js
     output: -

  scenarios: (100.00%) 1 scenario, 25 max VUs, 1m0s max duration (incl. graceful stop):
           * default: 25 looping VUs for 30s (gracefulStop: 30s)


running (0m30.0s), 00/25 VUs, 18051 complete and 0 interrupted iterations
default ✓ [======================================] 25 VUs  30s

     ✓ status is 201

     checks.........................: 100.00% ✓ 18051      ✗ 0    
     data_received..................: 1.4 MB  48 kB/s
     data_sent......................: 22 MB   738 kB/s
     http_req_blocked...............: avg=4.65µs  min=0s     med=2µs     max=1.87ms   p(90)=3µs     p(95)=4µs    
     http_req_connecting............: avg=1.29µs  min=0s     med=0s      max=1.24ms   p(90)=0s      p(95)=0s     
     http_req_duration..............: avg=41.3ms  min=7.44ms med=35.04ms max=348.88ms p(90)=76.09ms p(95)=92.24ms
       { expected_response:true }...: avg=41.3ms  min=7.44ms med=35.04ms max=348.88ms p(90)=76.09ms p(95)=92.24ms
     http_req_failed................: 0.00%   ✓ 0          ✗ 18051
     http_req_receiving.............: avg=35.3µs  min=5µs    med=26µs    max=5.43ms   p(90)=51µs    p(95)=63µs   
     http_req_sending...............: avg=17.38µs min=3µs    med=13µs    max=10.75ms  p(90)=20µs    p(95)=24µs   
     http_req_tls_handshaking.......: avg=0s      min=0s     med=0s      max=0s       p(90)=0s      p(95)=0s     
     http_req_waiting...............: avg=41.24ms min=7.41ms med=34.98ms max=348.84ms p(90)=76.05ms p(95)=92.2ms 
     http_reqs......................: 18051   600.748714/s
     iteration_duration.............: avg=41.56ms min=7.66ms med=35.33ms max=349.04ms p(90)=76.32ms p(95)=92.47ms
     iterations.....................: 18051   600.748714/s
     vus............................: 25      min=25       max=25 
     vus_max........................: 25      min=25       max=25 

This branch:

➜  marquez git:(events/raw-event-api) ✗ k6 run --vus 25 --duration 30s load.js

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: load.js
     output: -

  scenarios: (100.00%) 1 scenario, 25 max VUs, 1m0s max duration (incl. graceful stop):
           * default: 25 looping VUs for 30s (gracefulStop: 30s)


running (0m30.0s), 00/25 VUs, 18781 complete and 0 interrupted iterations
default ✓ [======================================] 25 VUs  30s

     ✓ status is 201

     checks.........................: 100.00% ✓ 18781      ✗ 0    
     data_received..................: 1.5 MB  50 kB/s
     data_sent......................: 23 MB   769 kB/s
     http_req_blocked...............: avg=4.46µs  min=0s     med=2µs     max=2.85ms   p(90)=4µs     p(95)=4µs    
     http_req_connecting............: avg=674ns   min=0s     med=0s      max=877µs    p(90)=0s      p(95)=0s     
     http_req_duration..............: avg=39.67ms min=7.48ms med=34.11ms max=240.58ms p(90)=72.82ms p(95)=89.13ms
       { expected_response:true }...: avg=39.67ms min=7.48ms med=34.11ms max=240.58ms p(90)=72.82ms p(95)=89.13ms
     http_req_failed................: 0.00%   ✓ 0          ✗ 18781
     http_req_receiving.............: avg=37.35µs min=4µs    med=25µs    max=6.77ms   p(90)=53µs    p(95)=66µs   
     http_req_sending...............: avg=16.82µs min=3µs    med=12µs    max=5.5ms    p(90)=19µs    p(95)=24µs   
     http_req_tls_handshaking.......: avg=0s      min=0s     med=0s      max=0s       p(90)=0s      p(95)=0s     
     http_req_waiting...............: avg=39.62ms min=7.41ms med=34.05ms max=240.54ms p(90)=72.78ms p(95)=89.1ms 
     http_reqs......................: 18781   625.266153/s
     iteration_duration.............: avg=39.94ms min=7.81ms med=34.35ms max=240.94ms p(90)=73.02ms p(95)=89.49ms
     iterations.....................: 18781   625.266153/s
     vus............................: 25      min=25       max=25 
     vus_max........................: 25      min=25       max=25 

Multiple runs have confirmed similar results - no significant difference on small DB running small tests.

Load testing on something larger than small Marquez instance is a separate task, and I think it should not block this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wslulciuc @julienledem btw, we probably should do CREATE INDEX CONCURRENTLY

spec/openapi.yml Outdated
parameters:
- $ref: '#/components/parameters/limit'
- $ref: '#/components/parameters/offset'
summary: List all received OpenLineage events in particular namespace.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify the ordering? Offset might be expensive from a query perspective

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I followed what's happening on other Marquez endpoints. If you want to use this API to get large amount of events, then I agree that OFFSET based strategy will be very inefficient and will be better to utilize event_time based.

Comment on lines 26 to 37
@SqlQuery(
"""
SELECT le.event
FROM lineage_events le, jsonb_array_elements(coalesce(le.event -> 'inputs', '[]'::jsonb) || coalesce(le.event -> 'outputs', '[]'::jsonb)) AS ds
WHERE le.job_namespace = :namespace
OR ds ->> 'namespace' = :namespace
ORDER BY event_time DESC
LIMIT :limit
OFFSET :offset""")
List<JsonNode> getByNamespace(
@Bind("namespace") String namespace, @Bind("limit") int limit, @Bind("offset") int offset);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it would be expensive on a big table. I would check the perf difference of filtering on the columns you added indices for vs this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With @wslulciuc changes described in #2076 the json based filter would be replaced with fast join.

It would be possible to create jsonb index - however, I think it would be overkill.

Comment on lines 18 to 19
LIMIT :limit
OFFSET :offset""")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queries using offset will be slow for large values of offset even with small values of limit. This is because the db needs to count the rows leading to the offset you want. If you index by timestamp it might be better to use that. "timestamp > t limit 100" then to get the next 100, the client can provide the timestamp of the last event they retrieved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added before and after params. They should, combined with an index, provide efficient pagination.

@mobuchowski mobuchowski self-assigned this Sep 12, 2022
@mobuchowski mobuchowski force-pushed the events/raw-event-api branch 2 times, most recently from ff9529d to d0bf768 Compare September 12, 2022 12:25
@boring-cyborg boring-cyborg bot added the docs label Sep 12, 2022
@mobuchowski mobuchowski force-pushed the events/raw-event-api branch 3 times, most recently from a770eee to 4edbbc9 Compare September 13, 2022 20:27
api/src/main/java/marquez/api/OpenLineageResource.java Outdated Show resolved Hide resolved
@SqlQuery(
"""
SELECT le.event
FROM lineage_events le, jsonb_array_elements(coalesce(le.event -> 'inputs', '[]'::jsonb) || coalesce(le.event -> 'outputs', '[]'::jsonb)) AS ds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the jsonb required? It kills the idea of namespace index and is kind of corner-case scenario.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea of getting events from some dataset namespace is not a corner case scenario, but very common one. For example, getting all events that relate to particular Postgres database.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this would be fast in the future, after @wslulciuc split of facets. Then we could just filter by that AFAIK.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. If so, the index lineage_events(job_namespace, event_time DESC); will not help and can be removed, isn't it?

Copy link
Contributor Author

@mobuchowski mobuchowski Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @julienledem @wslulciuc - the query that returns all events connected with namespace as job namespace, or one of input or output dataset namespace won't work, since it requires full lineage_events table scan on every call, together with unnesting namespace from jsonb column. That is extremely expensive.

I've talked with @pawel-big-lebowski and we tried to find faster query. The best we've come to is this:

WITH job_events AS (
    SELECT le.event
    FROM lineage_events le
    WHERE le.job_namespace = :namespace
    AND (le.event_time < :before
    AND le.event_time >= :after)
    ORDER BY le.event_time DESC
), dataset_events AS (
    SELECT le.event, le.event_time
    FROM lineage_events le
    JOIN dataset_versions dv on le.run_uuid = dv.run_uuid
    JOIN datasets_view ds on dv.dataset_uuid = ds.uuid
    JOIN namespaces n on ds.namespace_uuid = n.uuid
    WHERE n.name = :namespace
    AND (le.event_time < :before
    AND le.event_time >= :after)
    ORDER BY le.event_time DESC
)
SELECT le.event
FROM (
    SELECT * FROM dataset_events
    UNION ALL
    SELECT * FROM job_events
) le
ORDER BY le.event_time
LIMIT :limit

It's relatively fast - compared to previous version, which needed to do full sequential scan, it will only need to get all the events that have dataset_versions joined to namespace - which is a fraction compared to full table scan. It would be possible to make that faster if we could "predicate push down" filters on time. created_at is Marquez processing time field - we can't use that.

However, it's relatively pointless. Above query won't take into account events, where one of the input datasets has given namespace, and we don't see a fast way to get them with current database schema.

We could either

  1. get rid of the namespace filtering altogether, leaving only "get all events" endpoint
  2. leave very inefficient endpoint, which will need to sequentially scan all the lineage_events table
  3. use above query, which will be much faster, but won't include input datasets.

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to remove this feature, and restore it if needed in future PR.

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
@mobuchowski mobuchowski force-pushed the events/raw-event-api branch 5 times, most recently from 48454a6 to c9271b8 Compare September 15, 2022 12:18
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
@mobuchowski mobuchowski merged commit 98b7067 into main Sep 15, 2022
@mobuchowski mobuchowski deleted the events/raw-event-api branch September 15, 2022 13:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Option to dump OpenLineage events that correspond to dataset/namespace from web
4 participants