Skip to content

Commit

Permalink
Support filtering missing terms (#91)
Browse files Browse the repository at this point in the history
* Support filtering missing terms

* Fix wf_start_time

* Fix wf_start_time

* rename function
  • Loading branch information
kodless authored Nov 4, 2024
1 parent 160e827 commit ea9aebd
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 38 deletions.
2 changes: 1 addition & 1 deletion app/leek/api/db/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def get_workflow_info(
):
connection = es.connection
root_task = connection.get(index=index_alias, id=root_id)["_source"]
wf_start_time = root_task["queued_at"]
wf_start_time = root_task.get("queued_at", root_task.get("received_at", root_task.get("started_at", root_task.get("timestamp"))))
wf_duration = get_workflow_duration(index_alias, app_env, root_id, wf_start_time)
if wf_duration is None:
return None
Expand Down
15 changes: 12 additions & 3 deletions app/web/src/api/metrics.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class MetricsService implements Metrics {
let query = [getTimeFilterQuery(filters)];
return this.aggregate(app_name, app_env, query, {
seen_tasks: {
terms: { field: "name", size: 1000 },
terms: { field: "name", size: 1000, missing: "N/A", min_doc_count: 0 },
},
});
}
Expand All @@ -83,7 +83,7 @@ export class MetricsService implements Metrics {
let query = [getTimeFilterQuery(filters)];
return this.aggregate(app_name, app_env, query, {
seen_queues: {
terms: { field: "queue", size: 100 },
terms: { field: "queue", size: 100, missing: "N/A", min_doc_count: 0 },
},
});
}
Expand All @@ -92,7 +92,16 @@ export class MetricsService implements Metrics {
let query = [getTimeFilterQuery(filters)];
return this.aggregate(app_name, app_env, query, {
seen_routing_keys: {
terms: { field: "routing_key", size: 100 },
terms: { field: "routing_key", size: 100, missing: "N/A", min_doc_count: 0 },
},
});
}

getSeenExchanges(app_name, app_env, filters: TimeFilters) {
let query = [getTimeFilterQuery(filters)];
return this.aggregate(app_name, app_env, query, {
seen_exchanges: {
terms: { field: "exchange", size: 100, missing: "N/A", min_doc_count: 0 },
},
});
}
Expand Down
64 changes: 58 additions & 6 deletions app/web/src/api/task.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
import { getTimeFilterQuery, search } from "./search";
import {request, buildQueryString} from "./request";

function remove_missing_value(terms, missing_term) {
if (terms && terms.length) {
let index = terms.indexOf(missing_term);

if (index !== -1) {
let new_terms = [...terms];
new_terms.splice(index, 1);
return new_terms;
}
}
return terms;
}

export function getFilterQuery(
app_env: string | undefined,
filters: TaskFilters
Expand All @@ -19,22 +32,30 @@ export function getFilterQuery(
else if (filters.rejection_outcome === "ignored")
rejection_filter = { match: { requeue: { query: false } } };
}
let queues = remove_missing_value(filters.exchange, "N/A")
let exchanges = remove_missing_value(filters.exchange, "N/A")
let routing_keys = remove_missing_value(filters.routing_key, "N/A")
let names = remove_missing_value(filters.name, "N/A")
let f = [
{ match: { kind: "task" } },
app_env && { match: { app_env: app_env } },
filters.name && filters.name.length && { terms: { name: filters.name } },
names && names.length && { terms: { name: names } },
filters.uuid && { match: { uuid: filters.uuid } },
filters.state &&
filters.state.length && { terms: { state: filters.state } },
filters.worker &&
filters.worker.length && { terms: { worker: filters.worker } },
filters.client && { term: { client: filters.client } },
filters.routing_key &&
filters.routing_key.length && {
terms: { routing_key: filters.routing_key },
routing_keys &&
routing_keys.length && {
terms: { routing_key: routing_keys },
},
filters.queue &&
filters.queue.length && { terms: { queue: filters.queue } },
exchanges &&
exchanges.length && {
terms: { exchange: exchanges },
},
queues &&
queues.length && { terms: { queue: queues } },
filters.parent && { match: { parent: filters.parent } },
filters.runtime && {
range: { runtime: { [filters.runtime_op || "gte"]: filters.runtime } },
Expand Down Expand Up @@ -66,13 +87,43 @@ export function getFilterQuery(
return f.filter(Boolean);
}

export function getMissingTermsFilterQuery(
app_env: string | undefined,
filters: TaskFilters
) {
let f = [
filters.exchange &&
filters.exchange.length &&
filters.exchange.indexOf("N/A") !== -1 && {
exists: { field: "exchange" },
},
filters.routing_key &&
filters.routing_key.length &&
filters.routing_key.indexOf("N/A") !== -1 && {
exists: { field: "routing_key" },
},
filters.queue &&
filters.queue.length &&
filters.queue.indexOf("N/A") !== -1 && {
exists: { field: "queue" },
},
filters.name &&
filters.name.length &&
filters.name.indexOf("N/A") !== -1 && {
exists: { field: "name" },
},
];
return f.filter(Boolean);
}

export interface TaskFilters {
name: string[] | null;
uuid: string | null;
state: string[] | null;
worker: string[] | null;
client: string | null;
routing_key: string[] | null;
exchange: string[] | null;
queue: string[] | null;
parent: string | null;
runtime: number | null;
Expand Down Expand Up @@ -129,6 +180,7 @@ export class TaskService implements Task {
query: {
bool: {
must: getFilterQuery(app_env, filters),
must_not: getMissingTermsFilterQuery(app_env, filters),
},
},
sort: [{ timestamp: { order: order } }],
Expand Down
32 changes: 32 additions & 0 deletions app/web/src/components/filters/TaskAttributesFilter.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ const TaskAttributesFilter: React.FC<TasksFilterContextData> = (

const [seenTasks, setSeenTasks] = useState([]);
const [seenRoutingKeys, setSeenRoutingKeys] = useState([]);
const [seenExchanges, setSeenExchanges] = useState([]);
const [seenQueues, setSeenQueues] = useState([]);
const [seenWorkers, setSeenWorkers] = useState([]);

// Fetch progress
const [seenTasksFetching, setSeenTasksFetching] = useState<boolean>();
const [seenRoutingKeysFetching, setSeenRoutingKeysFetching] =
useState<boolean>();
const [seenExchangesFetching, setSeenExchangesFetching] =
useState<boolean>();
const [seenQueuesFetching, setSeenQueuesFetching] = useState<boolean>();
const [seenWorkersFetching, setSeenWorkersFetching] = useState<boolean>();

Expand Down Expand Up @@ -90,6 +93,19 @@ const TaskAttributesFilter: React.FC<TasksFilterContextData> = (
.finally(() => setSeenRoutingKeysFetching(false));
}

function getSeenExchanges(open) {
if (!currentApp || !open) return;
setSeenExchangesFetching(true);
metricsService
.getSeenExchanges(currentApp, currentEnv, props.filters)
.then(handleAPIResponse)
.then((result: any) => {
setSeenExchanges(result.aggregations.seen_exchanges.buckets);
}, handleAPIError)
.catch(handleAPIError)
.finally(() => setSeenExchangesFetching(false));
}

function getSeenQueues(open) {
if (!currentApp || !open) return;
setSeenQueuesFetching(true);
Expand Down Expand Up @@ -222,6 +238,22 @@ const TaskAttributesFilter: React.FC<TasksFilterContextData> = (
</Select>
</FormItem>
</Row>
<Row>
<FormItem name="exchange" style={{ width: "100%" }}>
<Select
placeholder="Exchange"
mode="multiple"
style={{ width: "100%" }}
notFoundContent={
seenExchangesFetching ? loadingIndicator : null
}
onDropdownVisibleChange={getSeenExchanges}
allowClear
>
{seenExchanges.map((exchange, key) => badgedOption(exchange, "", "default exchange"))}
</Select>
</FormItem>
</Row>
<Row>
<FormItem name="queue" style={{ width: "100%" }}>
<Select
Expand Down
4 changes: 2 additions & 2 deletions app/web/src/components/tags/BadgedOption.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ const badgeStyle = {
fontWeight: 600,
};

export const badgedOption = (item) => {
export const badgedOption = (item, represent_value=null, with_value=null) => {
return (
<Option key={item.key} value={item.key}>
<Row style={{ width: "100%" }} justify="space-between">
<Col>{item.key}</Col>
<Col>{item.key === represent_value ? with_value : item.key}</Col>
<Col>
<Badge
count={item.doc_count}
Expand Down
2 changes: 1 addition & 1 deletion demo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ USER appuser

COPY ./src /opt/amqp

CMD ["celery", "worker", "-E", "-A", "leek_demo.app", "-l", "critical", "-n", "test-worker2@%h"]
CMD ["celery", "-A", "leek_demo.app", "worker", "-E", "-l", "critical", "-n", "test-worker2@%h"]
2 changes: 1 addition & 1 deletion demo/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
celery==4.4.2
celery==5.4.0
redis==3.5.3
71 changes: 47 additions & 24 deletions demo/src/leek_demo/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,51 @@

app = Celery('tasks', broker=os.environ['BROKER_URL'])

app.conf.update(
{
"CELERY_SEND_TASK_SENT_EVENT": True,
# Just for demo
"CELERY_IMPORTS": (
"leek_demo.tasks.low",
"leek_demo.tasks.medium",
"leek_demo.tasks.high",
),
"CELERY_QUEUES": (
Queue("low", routing_key="low"),
Queue("medium", routing_key="medium"),
Queue("high", routing_key="high"),
),
"CELERY_DEFAULT_QUEUE": "low",
"CELERY_DEFAULT_EXCHANGE": "tasks",
"CELERY_DEFAULT_EXCHANGE_TYPE": "direct",
"CELERY_DEFAULT_ROUTING_KEY": "low",
"CELERY_ROUTES": (
{"leek_demo.tasks.low.*": {"queue": "low"}},
{"leek_demo.tasks.medium.*": {"queue": "medium"}},
{"leek_demo.tasks.high.*": {"queue": "high"}},
),
}
app.conf.task_send_sent_event = True
app.conf.imports = (
"leek_demo.tasks.low",
"leek_demo.tasks.medium",
"leek_demo.tasks.high",
)
app.conf.task_queues = (
Queue("low", routing_key="low"),
Queue("medium", routing_key="medium"),
Queue("high", routing_key="high"),
)
app.conf.default_queue = "low"
app.conf.default_exchange = "tasks"
app.conf.default_exchange_type = "direct"
app.conf.default_routing_key = "low"

app.conf.task_routes = {
"leek_demo.tasks.low.*": {"queue": "low"},
"leek_demo.tasks.medium.*": {"queue": "medium"},
"leek_demo.tasks.high.*": {"queue": "high"},
}

# For celery versions < 4
# app.conf.update(
# {
# "CELERY_SEND_TASK_SENT_EVENT": True,
# # Just for demo
# "CELERY_IMPORTS": (
# "leek_demo.tasks.low",
# "leek_demo.tasks.medium",
# "leek_demo.tasks.high",
# ),
# "CELERY_QUEUES": (
# Queue("low", routing_key="low"),
# Queue("medium", routing_key="medium"),
# Queue("high", routing_key="high"),
# ),
# "CELERY_DEFAULT_QUEUE": "low",
# "CELERY_DEFAULT_EXCHANGE": "tasks",
# "CELERY_DEFAULT_EXCHANGE_TYPE": "direct",
# "CELERY_DEFAULT_ROUTING_KEY": "low",
# "CELERY_ROUTES": (
# {"leek_demo.tasks.low.*": {"queue": "low"}},
# {"leek_demo.tasks.medium.*": {"queue": "medium"}},
# {"leek_demo.tasks.high.*": {"queue": "high"}},
# ),
# }
# )

0 comments on commit ea9aebd

Please sign in to comment.