Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support of running all cells of a data doc #1102

Merged
merged 4 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "querybook",
"version": "3.14.4",
"version": "3.15.0",
"description": "A Big Data Webapp",
"private": true,
"scripts": {
Expand Down
31 changes: 29 additions & 2 deletions querybook/server/datasources/datadoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
verify_data_cells_permission,
)
from app.datasource import register, api_assert, with_impression
from app.flask_app import socketio
from app.flask_app import socketio, celery
from app.db import DBSession, with_session
from const.impression import ImpressionItemType
from const.query_execution import QueryExecutionType
from env import QuerybookSettings

from lib.celery.cron import validate_cron
from lib.logger import get_logger
from lib.notify.utils import get_user_preferred_notifier, notify_user
from lib.scheduled_datadoc.validator import validate_datadoc_schedule_config
from lib.scheduled_datadoc.legacy import convert_if_legacy_datadoc_schedule

Expand All @@ -30,7 +32,6 @@
update_datadoc_schedule_owner,
)
from models.environment import Environment
from lib.notify.utils import notify_user

LOG = get_logger(__file__)

Expand Down Expand Up @@ -379,6 +380,32 @@ def run_data_doc(id):
run_and_log_scheduled_task(schedule.id, session=session)


@register("/datadoc/<int:id>/run/", methods=["POST"])
def adhoc_run_data_doc(id):
assert_can_write(id)
verify_data_doc_permission(id)

notifier_name = get_user_preferred_notifier(current_user.id)

result = celery.send_task(
"tasks.run_datadoc.run_datadoc",
args=[],
kwargs={
"doc_id": id,
"user_id": current_user.id,
"execution_type": QueryExecutionType.ADHOC.value,
"notifications": [
{
"config": {"to_user": [current_user.id]},
"on": 0,
"with": notifier_name,
}
],
},
)
return result


@register("/datadoc/<int:doc_id>/editor/", methods=["GET"])
def get_datadoc_editors(doc_id):
return logic.get_data_doc_editors_by_doc_id(doc_id)
Expand Down
21 changes: 13 additions & 8 deletions querybook/server/lib/notify/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@
from app.db import with_session


@with_session
def get_user_preferred_notifier(user_id, session=None):
notification_preference = user_logic.get_user_settings(
user_id, "notification_preference", session=session
)
return (
notification_preference.value
if notification_preference is not None
else DEFAULT_NOTIFIER
)


def notify_recipients(recipients, template_name, template_params, notifier_name):
notifier = get_notifier_class(notifier_name)
markdown_message = render_message(template_name, template_params)
Expand All @@ -13,14 +25,7 @@ def notify_recipients(recipients, template_name, template_params, notifier_name)
@with_session
def notify_user(user, template_name, template_params, notifier_name=None, session=None):
if notifier_name is None:
notification_preference = user_logic.get_user_settings(
user.id, "notification_preference", session=session
)
notifier_name = (
notification_preference.value
if notification_preference is not None
else DEFAULT_NOTIFIER
)
notifier_name = get_user_preferred_notifier(user.id, session=session)
if notifier_name is None:
return
notifier = get_notifier_class(notifier_name)
Expand Down
44 changes: 32 additions & 12 deletions querybook/server/tasks/run_datadoc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from celery import chain

from app.db import DBSession
from app.flask_app import celery
from app.flask_app import celery, socketio

from const.query_execution import QueryExecutionStatus, QueryExecutionType
from const.schedule import TaskRunStatus
Expand Down Expand Up @@ -55,8 +55,9 @@ def run_datadoc_with_config(
runner_id = user_id if user_id is not None else data_doc.owner_uid
query_cells = data_doc.get_query_cells()

# Create db entry record
record_id = create_task_run_record_for_celery_task(self, session=session)
# Create db entry record only for scheduled run
if execution_type == QueryExecutionType.SCHEDULED.value:
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
record_id = create_task_run_record_for_celery_task(self, session=session)

completion_params = {
"doc_id": doc_id,
Expand Down Expand Up @@ -89,6 +90,7 @@ def run_datadoc_with_config(
"engine_id": engine_id,
"uid": runner_id,
},
"data_doc_id": doc_id,
}
tasks_to_run.append(
_start_query_execution_task.si(
Expand All @@ -115,18 +117,34 @@ def _start_query_execution_task(
previous_query_status,
cell_id,
query_execution_params,
data_doc_id,
):
if previous_query_status != QueryExecutionStatus.DONE.value:
raise Exception(GENERIC_QUERY_FAILURE_MSG)

with DBSession() as session:
query_execution_id = qe_logic.create_query_execution(
query_execution = qe_logic.create_query_execution(
**query_execution_params, session=session
).id
)
datadoc_logic.append_query_executions_to_data_cell(
cell_id, [query_execution_id], session=session
cell_id,
[query_execution.id],
session=session,
)

socketio.emit(
"data_doc_query_execution",
(
None,
query_execution.to_dict(),
cell_id,
True, # True here indicates the message is from data doc run (fromDataDocRun).
),
namespace="/datadoc",
room=data_doc_id,
broadcast=True,
)
return query_execution_id
return query_execution.id


@celery.task
Expand Down Expand Up @@ -189,10 +207,12 @@ def on_datadoc_completion(
error_msg = str(e)
LOG.error(e, exc_info=True)
finally:
update_task_run_record(
id=record_id,
status=TaskRunStatus.SUCCESS if is_success else TaskRunStatus.FAILURE,
error_message=error_msg,
)
# when record_id is None, it's trigerred by adhoc datadoc run, no need to update the record.
if record_id:
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
update_task_run_record(
id=record_id,
status=TaskRunStatus.SUCCESS if is_success else TaskRunStatus.FAILURE,
error_message=error_msg,
)

return is_success
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import React, { useMemo } from 'react';

import { DataDocDAGExporterContext } from 'context/DataDocDAGExporter';
import {
useExporterDAG,
useQueryCells,
useUnusedQueryCells,
} from 'hooks/dag/useExporterDAG';
import { useExporterDAG, useUnusedQueryCells } from 'hooks/dag/useExporterDAG';
import { useCurrentExporter } from 'hooks/dag/useExporterSettings';
import { useSavedDAG } from 'hooks/dag/useSavedDAG';
import { useQueryCells } from 'hooks/dataDoc/useQueryCells';
import { DataDocResource } from 'resource/dataDoc';
import { IconButton } from 'ui/Button/IconButton';
import { Markdown } from 'ui/Markdown/Markdown';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { fetchDAGExporters } from 'redux/dataDoc/action';
import { IStoreState } from 'redux/store/types';
import { IconButton } from 'ui/Button/IconButton';

import { DataDocRunAllButton } from './DataDocRunAllButton';
import { DataDocScheduleButton } from './DataDocScheduleButton';
import { DeleteDataDocButton } from './DeleteDataDocButton';

Expand Down Expand Up @@ -76,6 +77,10 @@ export const DataDocRightSidebar: React.FunctionComponent<IProps> = ({
<DataDocScheduleButton isEditable={isEditable} docId={dataDoc.id} />
);

const runAllButtonDOM = isEditable && (
<DataDocRunAllButton docId={dataDoc.id} />
);

const buttonSection = (
<div className="DataDocRightSidebar-button-section vertical-space-between">
<div className="DataDocRightSidebar-button-section-top flex-column">
Expand Down Expand Up @@ -111,6 +116,7 @@ export const DataDocRightSidebar: React.FunctionComponent<IProps> = ({
/>
</div>
<div className="DataDocRightSidebar-button-section-bottom flex-column mb8">
{runAllButtonDOM}
{isEditable && exporterExists && (
<DataDocDAGExporterButton docId={dataDoc.id} />
)}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import React, { useCallback } from 'react';
import toast from 'react-hot-toast';

import { useQueryCells } from 'hooks/dataDoc/useQueryCells';
import { useMakeSelector } from 'hooks/redux/useMakeSelector';
import { sendConfirm } from 'lib/querybookUI';
import { makeLatestQueryExecutionsSelector } from 'redux/queryExecutions/selector';
import { DataDocResource } from 'resource/dataDoc';
import { IconButton } from 'ui/Button/IconButton';
import { Message } from 'ui/Message/Message';

interface IProps {
docId: number;
}

export const DataDocRunAllButton: React.FunctionComponent<IProps> = ({
docId,
}) => {
const queryCells = useQueryCells(docId);
const latestQueryExecutions = useMakeSelector(
makeLatestQueryExecutionsSelector,
queryCells.map((c) => c.id) ?? []
);
const hasQueryRunning = latestQueryExecutions.some((q) => q.status < 3);
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved

const ConfirmMessageDOM = useCallback(
() => (
<div>
<div>
{`You will be executing ${queryCells.length} query cells sequentially. If any of them
fails, the sequence of execution will be stopped.`}
</div>
{hasQueryRunning && (
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
<Message type="warning" className="mt8">
There are some query cells still running. Do you want to
run anyway?
</Message>
)}
</div>
),
[queryCells.length, hasQueryRunning]
);

const onRunAll = useCallback(() => {
sendConfirm({
header: 'Run All Cells',
message: ConfirmMessageDOM(),
onConfirm: () => {
DataDocResource.run(docId).then(() => {
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
toast.success('DataDoc execution started!');
});
},
confirmText: 'Run',
});
}, [ConfirmMessageDOM, docId]);

return (
<IconButton
icon="PlayCircle"
onClick={onRunAll}
tooltip="Run all cells in the doc"
tooltipPos="left"
title="Run All"
/>
);
};
19 changes: 0 additions & 19 deletions querybook/webapp/hooks/dag/useExporterDAG.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,18 @@ import {
useState,
} from 'react';
import { useDrop } from 'react-dnd';
import { useSelector } from 'react-redux';
import { Edge, Node, Position, ReactFlowInstance } from 'reactflow';

import { queryCellDraggableType } from 'components/DataDocDAGExporter/DataDocDAGExporter';
import { IDataQueryCell } from 'const/datadoc';
import { usePrevious } from 'hooks/usePrevious';
import { hashString } from 'lib/data-doc/data-doc-utils';
import * as dataDocSelectors from 'redux/dataDoc/selector';
import { IStoreState } from 'redux/store/types';
import { IDragItem } from 'ui/DraggableList/types';
import { QueryCellNode } from 'ui/FlowGraph/QueryCellNode';

export const queryCellNode = 'queryCellNode';
export const QueryDAGNodeTypes = { queryCellNode: QueryCellNode };

export function useQueryCells(docId: number) {
const { dataDocCells } = useSelector((state: IStoreState) =>
dataDocSelectors.dataDocSelector(state, docId)
);

const queryCells: IDataQueryCell[] = useMemo(
() =>
dataDocCells.filter(
(cells) => cells.cell_type === 'query'
) as IDataQueryCell[],
[dataDocCells]
);

return queryCells;
}

export const initialNodePosition = { x: 0, y: 0 };
export const edgeStyle = { stroke: 'var(--bg-dark)' };

Expand Down
22 changes: 22 additions & 0 deletions querybook/webapp/hooks/dataDoc/useQueryCells.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { useMemo } from 'react';
import { useSelector } from 'react-redux';

import { IDataQueryCell } from 'const/datadoc';
import * as dataDocSelectors from 'redux/dataDoc/selector';
import { IStoreState } from 'redux/store/types';

export function useQueryCells(docId: number) {
const { dataDocCells } = useSelector((state: IStoreState) =>
dataDocSelectors.dataDocSelector(state, docId)
);

const queryCells: IDataQueryCell[] = useMemo(
() =>
dataDocCells.filter(
(cells) => cells.cell_type === 'query'
) as IDataQueryCell[],
[dataDocCells]
);

return queryCells;
}
6 changes: 4 additions & 2 deletions querybook/webapp/lib/data-doc/datadoc-socketio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export interface IDataDocSocketEvent {
(
queryExecution: IQueryExecution,
dataCellId: number,
fromDataDocRun: boolean,
jczhong84 marked this conversation as resolved.
Show resolved Hide resolved
isSameOrigin: boolean
) => any
>;
Expand Down Expand Up @@ -412,12 +413,13 @@ export class DataDocSocket {

this.socket.on(
'data_doc_query_execution',
(originator, rawQueryExecution, dataCellId) =>
(originator, rawQueryExecution, dataCellId, fromDataDocRun) =>
this.resolvePromiseAndEvent(
'receiveQueryExecution',
originator,
rawQueryExecution,
dataCellId
dataCellId,
fromDataDocRun
)
);

Expand Down
9 changes: 7 additions & 2 deletions querybook/webapp/redux/dataDocWebsocket/dataDocWebsocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,13 @@ export function openDataDoc(docId: number): ThunkResult<Promise<any>> {
},

receiveQueryExecution: {
resolve: (queryExecution, dataCellId, isSameOrigin) => {
if (!isSameOrigin) {
resolve: (
queryExecution,
dataCellId,
fromDataDocRun,
isSameOrigin
) => {
if (!isSameOrigin || fromDataDocRun) {
(dispatch as QueryExecutionDispatch)(
receiveQueryExecution(queryExecution, dataCellId)
);
Expand Down
Loading