diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index 677044fea8..259a500a53 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -117,14 +117,17 @@ public Response getLineageEvents( @QueryParam("before") @DefaultValue("2030-01-01T00:00:00+00:00") ZonedDateTimeParam before, @QueryParam("after") @DefaultValue("1970-01-01T00:00:00+00:00") ZonedDateTimeParam after, @QueryParam("sortDirection") @DefaultValue("desc") SortDirection sortDirection, - @QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit) { + @QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit, + @QueryParam("offset") @DefaultValue("0") @Min(value = 0) int offset) { List events = Collections.emptyList(); switch (sortDirection) { case DESC -> events = - openLineageDao.getAllLineageEventsDesc(before.get(), after.get(), limit); - case ASC -> events = openLineageDao.getAllLineageEventsAsc(before.get(), after.get(), limit); + openLineageDao.getAllLineageEventsDesc(before.get(), after.get(), limit, offset); + case ASC -> events = + openLineageDao.getAllLineageEventsAsc(before.get(), after.get(), limit, offset); } - return Response.ok(new Events(events)).build(); + int totalCount = openLineageDao.getAllLineageTotalCount(before.get(), after.get()); + return Response.ok(new Events(events, totalCount)).build(); } @Value @@ -132,5 +135,7 @@ static class Events { @NonNull @JsonProperty("events") List value; + + int totalCount; } } diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 72dba40d26..c6f7559cec 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -103,8 +103,9 @@ void createLineageEvent( WHERE (le.event_time < :before AND le.event_time >= :after) ORDER BY le.event_time DESC - LIMIT :limit""") - List getAllLineageEventsDesc(ZonedDateTime before, ZonedDateTime after, int limit); + LIMIT :limit OFFSET :offset""") + List getAllLineageEventsDesc( + ZonedDateTime before, ZonedDateTime after, int limit, int offset); @SqlQuery( """ @@ -113,8 +114,17 @@ void createLineageEvent( WHERE (le.event_time < :before AND le.event_time >= :after) ORDER BY le.event_time ASC - LIMIT :limit""") - List getAllLineageEventsAsc(ZonedDateTime before, ZonedDateTime after, int limit); + LIMIT :limit OFFSET :offset""") + List getAllLineageEventsAsc( + ZonedDateTime before, ZonedDateTime after, int limit, int offset); + + @SqlQuery( + """ + SELECT count(*) + FROM lineage_events le + WHERE (le.event_time < :before + AND le.event_time >= :after)""") + int getAllLineageTotalCount(ZonedDateTime before, ZonedDateTime after); default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) { UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper); diff --git a/spec/openapi.yml b/spec/openapi.yml index befc20b41e..38d0d5a766 100644 --- a/spec/openapi.yml +++ b/spec/openapi.yml @@ -91,6 +91,7 @@ paths: - $ref: '#/components/parameters/before' - $ref: '#/components/parameters/after' - $ref: '#/components/parameters/limit' + - $ref: '#/components/parameters/offset' summary: List all received OpenLineage events. description: Returns a list of OpenLineage events, sorted in direction of passed sort parameter. By default it is desc. tags: @@ -945,6 +946,9 @@ components: type: array items: $ref: '#/components/schemas/LineageEvent' + totalCount: + type: number + description: The total number of events returned matching our conditions. CreatedSource: type: object diff --git a/web/src/routes/events/Events.tsx b/web/src/routes/events/Events.tsx index 045b81aa80..d360f8e7ab 100644 --- a/web/src/routes/events/Events.tsx +++ b/web/src/routes/events/Events.tsx @@ -4,6 +4,7 @@ import * as Redux from 'redux' import { Button, + Chip, Container, Table, TableBody, @@ -24,19 +25,22 @@ import { fetchEvents, resetEvents } from '../../store/actionCreators' import { fileSize, formatUpdatedAt } from '../../helpers' import { formatDateAPIQuery, formatDatePicker } from '../../helpers/time' import { saveAs } from 'file-saver' +import { useSearchParams } from 'react-router-dom' import { useTheme } from '@emotion/react' import Box from '@mui/material/Box' +import CircularProgress from '@mui/material/CircularProgress/CircularProgress' import IconButton from '@mui/material/IconButton' import MqDatePicker from '../../components/core/date-picker/MqDatePicker' import MqEmpty from '../../components/core/empty/MqEmpty' import MqJsonView from '../../components/core/json-view/MqJsonView' import MqStatus from '../../components/core/status/MqStatus' import MqText from '../../components/core/text/MqText' -import React from 'react' +import React, { useEffect, useRef } from 'react' import moment from 'moment' interface StateProps { events: Event[] + totalCount: number isEventsLoading: boolean isEventsInit: boolean } @@ -47,7 +51,6 @@ interface EventsState { dateFrom: string dateTo: string page: number - pageIsLast: boolean } interface DispatchProps { @@ -59,29 +62,32 @@ type EventsProps = StateProps & DispatchProps const EVENTS_COLUMNS = ['ID', 'STATE', 'NAME', 'NAMESPACE', 'TIME'] +const PAGE_SIZE = 20 + const Events: React.FC = ({ events, + totalCount, isEventsLoading, isEventsInit, fetchEvents, resetEvents, }) => { - const pageSize = 20 + const [searchParams, setSearchParams] = useSearchParams() const [state, setState] = React.useState({ - page: 1, + page: 0, events: [], rowExpanded: null, - pageIsLast: false, - dateFrom: formatDateAPIQuery(moment().startOf('day').toString()), - dateTo: formatDateAPIQuery(moment().endOf('day').toString()), + dateFrom: + searchParams.get('dateFrom') || formatDateAPIQuery(moment().startOf('day').toString()), + dateTo: searchParams.get('dateTo') || formatDateAPIQuery(moment().endOf('day').toString()), }) - const mounted = React.useRef(false) + const mounted = useRef(false) - React.useEffect(() => { + useEffect(() => { if (!mounted.current) { // on mount - fetchEvents(state.dateFrom, state.dateTo, pageSize) + fetchEvents(state.dateFrom, state.dateTo, PAGE_SIZE, state.page * PAGE_SIZE) mounted.current = true } else { // on update @@ -89,31 +95,27 @@ const Events: React.FC = ({ setState({ ...state, events: events, - pageIsLast: events.length < state.page * pageSize, }) } } }) - React.useEffect(() => { + useEffect(() => { + if (!searchParams.get('dateFrom') && !searchParams.get('dateTo')) { + setSearchParams({ + dateFrom: formatDateAPIQuery(moment().startOf('day').toString()), + dateTo: formatDateAPIQuery(moment().endOf('day').toString()), + }) + } + }, []) + + useEffect(() => { return () => { // on unmount resetEvents() } }, []) - const getEvents = () => { - return state.events?.slice((state.page - 1) * pageSize, pageSize + (state.page - 1) * pageSize) - } - - const pageNavigation = () => { - const titlePos = - state.events?.length > 0 - ? `${pageSize * state.page - pageSize} - ${state.events.length}` - : null - return `${state.page} ${titlePos ? `(${titlePos})` : ''}` - } - const handleChangeDatepicker = (e: any, direction: 'from' | 'to') => { const isDirectionFrom = direction === 'from' const keyDate = isDirectionFrom ? 'dateFrom' : 'dateTo' @@ -121,10 +123,14 @@ const Events: React.FC = ({ fetchEvents( formatDateAPIQuery(isDirectionFrom ? e.toDate() : state.dateFrom), formatDateAPIQuery(isDirectionFrom ? state.dateTo : e.toDate()), - pageSize + PAGE_SIZE, + state.page * PAGE_SIZE ) - setState({ [keyDate]: formatDatePicker(e.toDate()), page: 1, rowExpanded: null } as any) + const params: { [key: string]: string } = {} + searchParams.forEach((value, key) => (params[key] = value)) + setSearchParams({ ...params, [keyDate]: formatDateAPIQuery(e.toDate()) }) + setState({ [keyDate]: formatDatePicker(e.toDate()), page: 0, rowExpanded: null } as any) } const handleClickPage = (direction: 'prev' | 'next') => { @@ -133,7 +139,8 @@ const Events: React.FC = ({ fetchEvents( formatDateAPIQuery(state.dateFrom), formatDateAPIQuery(state.dateTo), - pageSize * directionPage + PAGE_SIZE, + directionPage * PAGE_SIZE ) setState({ ...state, page: directionPage, rowExpanded: null }) } @@ -149,59 +156,45 @@ const Events: React.FC = ({ return ( - + <> - {i18next.t('events_route.title')} - Page: {pageNavigation()} - - {getEvents()?.length > 0 && ( - - - handleClickPage('prev')} - size='large' - > - - - - - handleClickPage('next')} - size='large' - > - - - + + {i18next.t('events_route.title')} + - )} + - handleChangeDatepicker(e, 'from')} - /> - handleChangeDatepicker(e, 'to')} - /> + + handleChangeDatepicker(e, 'from')} + /> + + handleChangeDatepicker(e, 'to')} + /> + + {isEventsLoading && } {state.events?.length === 0 ? ( @@ -219,8 +212,6 @@ const Events: React.FC = ({ > - {/* {EVENTS_COLUMNS.map(field => { - return ( */} {i18next.t('events_columns.id')} @@ -236,12 +227,10 @@ const Events: React.FC = ({ {i18next.t('events_columns.time')} - {/* ) - })} */} - {getEvents()?.map((event, key: number) => { + {events.map((event, key: number) => { return ( = ({ }} > - {event.run.runId} + {event.run.runId} = ({ })} - + + + <> + {PAGE_SIZE * state.page + 1} -{' '} + {Math.min(PAGE_SIZE * (state.page + 1), totalCount)} of {totalCount} + + - handleClickPage('prev')} - size='large' - > - - + + handleClickPage('prev')} + size='large' + > + + + - handleClickPage('next')} - size='large' - > - - + + handleClickPage('next')} + size='large' + disabled={state.page === Math.ceil(totalCount / PAGE_SIZE) - 1} + > + + + @@ -342,6 +341,7 @@ const Events: React.FC = ({ const mapStateToProps = (state: IState) => ({ events: state.events?.result, + totalCount: state.events.totalCount, isEventsLoading: state.events?.isLoading, isEventsInit: state.events?.init, }) diff --git a/web/src/store/actionCreators/index.ts b/web/src/store/actionCreators/index.ts index 0220a81703..09564ceb27 100644 --- a/web/src/store/actionCreators/index.ts +++ b/web/src/store/actionCreators/index.ts @@ -6,215 +6,216 @@ import * as actionTypes from './actionTypes' import { Dataset, DatasetVersion, - Event, + Events, Facets, Job, LineageGraph, Namespace, Run, - Search + Search, } from '../../types/api' import { JobOrDataset } from '../../components/lineage/types' -export const fetchEvents = (after: string, before: string, limit: number) => ({ +export const fetchEvents = (after: string, before: string, limit: number, offset: number) => ({ type: actionTypes.FETCH_EVENTS, payload: { before, after, - limit - } + limit, + offset, + }, }) -export const fetchEventsSuccess = (events: Event[]) => ({ +export const fetchEventsSuccess = (events: Events) => ({ type: actionTypes.FETCH_EVENTS_SUCCESS, payload: { - events - } + events, + }, }) export const resetEvents = () => ({ - type: actionTypes.RESET_EVENTS + type: actionTypes.RESET_EVENTS, }) export const fetchDatasets = (namespace: string, limit: number) => ({ type: actionTypes.FETCH_DATASETS, payload: { namespace, - limit - } + limit, + }, }) export const fetchDatasetsSuccess = (datasets: Dataset[]) => ({ type: actionTypes.FETCH_DATASETS_SUCCESS, payload: { - datasets - } + datasets, + }, }) export const fetchDataset = (namespace: string, name: string) => ({ type: actionTypes.FETCH_DATASET, payload: { namespace, - name - } + name, + }, }) export const fetchDatasetSuccess = (dataset: Dataset) => ({ type: actionTypes.FETCH_DATASET_SUCCESS, payload: { - dataset - } + dataset, + }, }) export const fetchDatasetVersions = (namespace: string, name: string) => ({ type: actionTypes.FETCH_DATASET_VERSIONS, payload: { namespace, - name - } + name, + }, }) export const fetchDatasetVersionsSuccess = (versions: DatasetVersion[]) => ({ type: actionTypes.FETCH_DATASET_VERSIONS_SUCCESS, payload: { - versions - } + versions, + }, }) export const resetDatasetVersions = () => ({ - type: actionTypes.RESET_DATASET_VERSIONS + type: actionTypes.RESET_DATASET_VERSIONS, }) export const resetDataset = () => ({ - type: actionTypes.RESET_DATASET + type: actionTypes.RESET_DATASET, }) export const deleteDataset = (datasetName: string, namespace: string) => ({ type: actionTypes.DELETE_DATASET, payload: { datasetName, - namespace - } + namespace, + }, }) export const deleteDatasetSuccess = (datasetName: string) => ({ type: actionTypes.DELETE_DATASET_SUCCESS, payload: { - datasetName - } + datasetName, + }, }) export const resetDatasets = () => ({ - type: actionTypes.RESET_DATASETS + type: actionTypes.RESET_DATASETS, }) export const fetchJobs = (namespace: string) => ({ type: actionTypes.FETCH_JOBS, payload: { - namespace - } + namespace, + }, }) export const fetchJobsSuccess = (jobs: Job[]) => ({ type: actionTypes.FETCH_JOBS_SUCCESS, payload: { - jobs - } + jobs, + }, }) export const resetJobs = () => ({ - type: actionTypes.RESET_JOBS + type: actionTypes.RESET_JOBS, }) export const deleteJob = (jobName: string, namespace: string) => ({ type: actionTypes.DELETE_JOB, payload: { jobName, - namespace - } + namespace, + }, }) export const deleteJobSuccess = (jobName: string) => ({ type: actionTypes.DELETE_JOB_SUCCESS, payload: { - jobName - } + jobName, + }, }) export const fetchRuns = (jobName: string, namespace: string) => ({ type: actionTypes.FETCH_RUNS, payload: { jobName, - namespace - } + namespace, + }, }) export const fetchRunsSuccess = (jobName: string, jobRuns: Run[]) => ({ type: actionTypes.FETCH_RUNS_SUCCESS, payload: { jobName, - runs: jobRuns - } + runs: jobRuns, + }, }) export const fetchRunFacets = (runId: string) => ({ type: actionTypes.FETCH_RUN_FACETS, payload: { - runId - } + runId, + }, }) export const fetchJobFacets = (runId: string) => ({ type: actionTypes.FETCH_JOB_FACETS, payload: { - runId - } + runId, + }, }) export const fetchFacetsSuccess = (facets: Facets) => ({ type: actionTypes.FETCH_FACETS_SUCCESS, payload: { - facets: facets.facets - } + facets: facets.facets, + }, }) export const resetFacets = () => ({ - type: actionTypes.RESET_FACETS + type: actionTypes.RESET_FACETS, }) export const resetRuns = () => ({ - type: actionTypes.RESET_RUNS + type: actionTypes.RESET_RUNS, }) export const fetchNamespacesSuccess = (namespaces: Namespace[]) => ({ type: actionTypes.FETCH_NAMESPACES_SUCCESS, payload: { - namespaces - } + namespaces, + }, }) export const applicationError = (message: string) => ({ type: actionTypes.APPLICATION_ERROR, payload: { - message - } + message, + }, }) export const dialogToggle = (field: string) => ({ type: actionTypes.DIALOG_TOGGLE, payload: { - field - } + field, + }, }) export const setSelectedNode = (node: string) => ({ type: actionTypes.SET_SELECTED_NODE, - payload: node + payload: node, }) export const setBottomBarHeight = (height: number) => ({ type: actionTypes.SET_BOTTOM_BAR_HEIGHT, - payload: height + payload: height, }) export const fetchLineage = ( @@ -228,32 +229,32 @@ export const fetchLineage = ( nodeType, namespace, name, - depth - } + depth, + }, }) export const fetchLineageSuccess = (lineage: LineageGraph) => ({ type: actionTypes.FETCH_LINEAGE_SUCCESS, - payload: lineage + payload: lineage, }) export const resetLineage = () => ({ - type: actionTypes.RESET_LINEAGE + type: actionTypes.RESET_LINEAGE, }) export const setLineageGraphDepth = (depth: number) => ({ type: actionTypes.SET_LINEAGE_GRAPH_DEPTH, - payload: depth + payload: depth, }) export const setShowFullGraph = (showFullGraph: boolean) => ({ type: actionTypes.SET_SHOW_FULL_GRAPH, - payload: showFullGraph + payload: showFullGraph, }) export const selectNamespace = (namespace: string) => ({ type: actionTypes.SELECT_NAMESPACE, - payload: namespace + payload: namespace, }) export const fetchSearch = (q: string, filter: string, sort: string) => ({ @@ -261,11 +262,11 @@ export const fetchSearch = (q: string, filter: string, sort: string) => ({ payload: { q, filter, - sort - } + sort, + }, }) export const fetchSearchSuccess = (search: Search) => ({ type: actionTypes.FETCH_SEARCH_SUCCESS, - payload: search + payload: search, }) diff --git a/web/src/store/reducers/events.ts b/web/src/store/reducers/events.ts index 817ede0758..abd09e8dbe 100644 --- a/web/src/store/reducers/events.ts +++ b/web/src/store/reducers/events.ts @@ -5,9 +5,19 @@ import { Event } from '../../types/api' import { FETCH_EVENTS, FETCH_EVENTS_SUCCESS, RESET_EVENTS } from '../actionCreators/actionTypes' import { fetchEventsSuccess } from '../actionCreators' -export type IEventsState = { isLoading: boolean; result: Event[]; init: boolean } +export type IEventsState = { + isLoading: boolean + result: Event[] + totalCount: number + init: boolean +} -export const initialState: IEventsState = { isLoading: false, init: false, result: [] } +export const initialState: IEventsState = { + isLoading: false, + init: false, + totalCount: 0, + result: [], +} type IEventsAction = ReturnType @@ -18,7 +28,13 @@ export default (state: IEventsState = initialState, action: IEventsAction): IEve case FETCH_EVENTS: return { ...state, isLoading: true } case FETCH_EVENTS_SUCCESS: - return { ...state, isLoading: false, init: true, result: payload.events } + return { + ...state, + isLoading: false, + init: true, + result: payload.events.events, + totalCount: payload.events.totalCount, + } case RESET_EVENTS: return initialState default: diff --git a/web/src/store/requests/events.ts b/web/src/store/requests/events.ts index 8e344fb2bd..24af0ba441 100644 --- a/web/src/store/requests/events.ts +++ b/web/src/store/requests/events.ts @@ -5,9 +5,15 @@ import { API_URL } from '../../globals' import { Events } from '../../types/api' import { genericFetchWrapper } from './index' -export const getEvents = async (after = '', before = '', limit = 100, sortDirection = 'desc') => { - const url = `${API_URL}/events/lineage?limit=${limit}&before=${before}&after=${after}&sortDirection=${sortDirection}` - return genericFetchWrapper(url, { method: 'GET' }, 'fetchEvents').then((r: Events) => { - return r.events.map(d => ({ ...d })) +export const getEvents = async ( + after = '', + before = '', + limit = 100, + offset = 0, + sortDirection = 'desc' +) => { + const url = `${API_URL}/events/lineage?limit=${limit}&before=${before}&after=${after}&offset=${offset}&sortDirection=${sortDirection}` + return genericFetchWrapper(url, { method: 'GET' }, 'fetchEvents').then((events: Events) => { + return events }) } diff --git a/web/src/store/sagas/index.ts b/web/src/store/sagas/index.ts index 3e765d42ed..d8a47559ca 100644 --- a/web/src/store/sagas/index.ts +++ b/web/src/store/sagas/index.ts @@ -14,9 +14,9 @@ import { FETCH_LINEAGE, FETCH_RUNS, FETCH_RUN_FACETS, - FETCH_SEARCH + FETCH_SEARCH, } from '../actionCreators/actionTypes' -import { Dataset, DatasetVersion, Event, Facets, LineageGraph, Namespaces } from '../../types/api' +import { Dataset, DatasetVersion, Events, Facets, LineageGraph, Namespaces } from '../../types/api' import { all, put, take } from 'redux-saga/effects' const call: any = Effects.call @@ -36,7 +36,7 @@ import { fetchLineageSuccess, fetchNamespacesSuccess, fetchRunsSuccess, - fetchSearchSuccess + fetchSearchSuccess, } from '../actionCreators' import { deleteDataset, @@ -49,7 +49,7 @@ import { getJobs, getNamespaces, getRunFacets, - getRuns + getRuns, } from '../requests' import { getLineage } from '../requests/lineage' import { getSearch } from '../requests/search' @@ -146,7 +146,14 @@ export function* fetchEventsSaga() { while (true) { try { const { payload } = yield take(FETCH_EVENTS) - const events: Event[] = yield call(getEvents, payload.after, payload.before, payload.limit) + const events: Events = yield call( + getEvents, + payload.after, + payload.before, + payload.limit, + payload.offset + ) + console.log(events) yield put(fetchEventsSuccess(events)) } catch (e) { yield put(applicationError('Something went wrong while fetching event runs')) @@ -182,7 +189,11 @@ export function* fetchDatasetVersionsSaga() { while (true) { try { const { payload } = yield take(FETCH_DATASET_VERSIONS) - const datasets: DatasetVersion[] = yield call(getDatasetVersions, payload.namespace, payload.name) + const datasets: DatasetVersion[] = yield call( + getDatasetVersions, + payload.namespace, + payload.name + ) yield put(fetchDatasetVersionsSuccess(datasets)) } catch (e) { yield put(applicationError('Something went wrong while fetching dataset runs')) @@ -228,7 +239,7 @@ export default function* rootSaga(): Generator { fetchLineage(), fetchSearch(), deleteJobSaga(), - deleteDatasetSaga() + deleteDatasetSaga(), ] yield all([...sagasThatAreKickedOffImmediately, ...sagasThatWatchForAction]) diff --git a/web/src/types/api.ts b/web/src/types/api.ts index 9bdc92fcd0..96ff73b858 100644 --- a/web/src/types/api.ts +++ b/web/src/types/api.ts @@ -26,6 +26,7 @@ export interface Namespace { export interface Events { events: Event[] + totalCount: number } export type EventType = 'START' | 'RUNNING' | 'ABORT' | 'FAIL' | 'COMPLETE'