Skip to content

Commit

Permalink
[infra] Use correct ML API to query blocking tasks (elastic#167779)
Browse files Browse the repository at this point in the history
While working on elastic#47477, I found
that attempting to re-create a ML job faces a 404 because it uses an
endpoint that has been removed / changed.

This PR updates to use the newer endpoint to find which tasks are
blocking in the ML system (like job deletion) and changes the types to
match the new API.
  • Loading branch information
miltonhultgren authored and dej611 committed Oct 17, 2023
1 parent 7ccbce2 commit 6fc72e9
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const callDeleteJobs = async <JobType extends string>(
};

export const callGetJobDeletionTasks = async (fetch: HttpHandler) => {
const jobDeletionTasksResponse = await fetch('/internal/ml/jobs/deleting_jobs_tasks', {
const jobDeletionTasksResponse = await fetch('/internal/ml/jobs/blocking_jobs_tasks', {
version: '1',
});

Expand Down Expand Up @@ -87,7 +87,7 @@ export const deleteJobsResponsePayloadRT = rt.record(
export type DeleteJobsResponsePayload = rt.TypeOf<typeof deleteJobsResponsePayloadRT>;

export const getJobDeletionTasksResponsePayloadRT = rt.type({
jobIds: rt.array(rt.string),
jobs: rt.array(rt.record(rt.string, rt.string)),
});

export const stopDatafeedsRequestPayloadRT = rt.type({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ const waitUntilJobsAreDeleted = async <JobType extends string>(
) => {
const moduleJobIds = jobTypes.map((jobType) => getJobId(spaceId, logViewId, jobType));
while (true) {
const { jobIds: jobIdsBeingDeleted } = await callGetJobDeletionTasks(fetch);
const needToWait = jobIdsBeingDeleted.some((jobId) => moduleJobIds.includes(jobId));
const { jobs } = await callGetJobDeletionTasks(fetch);
const needToWait = jobs
.flatMap((job) => Object.keys(job))
.some((jobId) => moduleJobIds.includes(jobId));

if (needToWait) {
await timeout(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ export const useLogAnalysisModule = <JobType extends string>({
createPromise: async () => {
return await moduleDescriptor.cleanUpModule(spaceId, logViewId, services.http.fetch);
},
onReject: (e) => {
throw new Error(`Failed to clean up previous ML job: ${e}`);
},
},
[spaceId, logViewId]
);
Expand All @@ -144,8 +147,8 @@ export const useLogAnalysisModule = <JobType extends string>({
.then(() => {
setUpModule(selectedIndices, start, end, datasetFilter);
})
.catch(() => {
dispatchModuleStatus({ type: 'failedSetup' });
.catch((e) => {
dispatchModuleStatus({ type: 'failedSetup', reason: e.toString() });
});
},
[cleanUpModule, dispatchModuleStatus, setUpModule]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type StatusReducerAction =
jobSummaries: FetchJobStatusResponsePayload;
datafeedSetupResults: SetupMlModuleResponsePayload['datafeeds'];
}
| { type: 'failedSetup' }
| { type: 'failedSetup'; reason?: string }
| { type: 'fetchingJobStatuses' }
| {
type: 'fetchedJobStatuses';
Expand Down Expand Up @@ -131,7 +131,7 @@ const createStatusReducer =
}),
{} as Record<JobType, JobStatus>
),
setupStatus: { type: 'failed', reasons: ['unknown'] },
setupStatus: { type: 'failed', reasons: action.reason ? [action.reason] : ['unknown'] },
};
}
case 'fetchingJobStatuses': {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/infra/public/containers/ml/api/ml_cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export const callDeleteJobs = async <JobType extends string>(
};

export const callGetJobDeletionTasks = async (fetch: HttpHandler) => {
const jobDeletionTasksResponse = await fetch('/internal/ml/jobs/deleting_jobs_tasks', {
const jobDeletionTasksResponse = await fetch('/internal/ml/jobs/blocking_jobs_tasks', {
version: '1',
});

Expand Down Expand Up @@ -86,7 +86,7 @@ export const deleteJobsResponsePayloadRT = rt.record(
export type DeleteJobsResponsePayload = rt.TypeOf<typeof deleteJobsResponsePayloadRT>;

export const getJobDeletionTasksResponsePayloadRT = rt.type({
jobIds: rt.array(rt.string),
jobs: rt.array(rt.record(rt.string, rt.string)),
});

export const stopDatafeedsRequestPayloadRT = rt.type({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ const waitUntilJobsAreDeleted = async <JobType extends string>(
) => {
const moduleJobIds = jobTypes.map((jobType) => getJobId(spaceId, sourceId, jobType));
while (true) {
const { jobIds: jobIdsBeingDeleted } = await callGetJobDeletionTasks(fetch);
const needToWait = jobIdsBeingDeleted.some((jobId) => moduleJobIds.includes(jobId));
const { jobs } = await callGetJobDeletionTasks(fetch);
const needToWait = jobs
.flatMap((job) => Object.keys(job))
.some((jobId) => moduleJobIds.includes(jobId));

if (needToWait) {
await timeout(1000);
Expand Down
7 changes: 5 additions & 2 deletions x-pack/plugins/infra/public/containers/ml/infra_ml_module.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ export const useInfraMLModule = <JobType extends string>({
createPromise: async () => {
return await moduleDescriptor.cleanUpModule(spaceId, sourceId, services.http.fetch);
},
onReject: (e) => {
throw new Error(`Failed to clean up previous ML job: ${e}`);
},
},
[spaceId, sourceId]
);
Expand All @@ -121,8 +124,8 @@ export const useInfraMLModule = <JobType extends string>({
.then(() => {
setUpModule(selectedIndices, start, end, filter, partitionField);
})
.catch(() => {
dispatchModuleStatus({ type: 'failedSetup' });
.catch((e) => {
dispatchModuleStatus({ type: 'failedSetup', reason: e.toString() });
});
},
[cleanUpModule, dispatchModuleStatus, setUpModule]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type StatusReducerAction =
jobSummaries: FetchJobStatusResponsePayload;
datafeedSetupResults: SetupMlModuleResponsePayload['datafeeds'];
}
| { type: 'failedSetup' }
| { type: 'failedSetup'; reason?: string }
| { type: 'fetchingJobStatuses' }
| {
type: 'fetchedJobStatuses';
Expand Down Expand Up @@ -131,7 +131,7 @@ const createStatusReducer =
}),
{} as Record<JobType, JobStatus>
),
setupStatus: { type: 'failed', reasons: ['unknown'] },
setupStatus: { type: 'failed', reasons: action.reason ? [action.reason] : ['unknown'] },
};
}
case 'fetchingJobStatuses': {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import { EuiComboBox } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { EuiLoadingSpinner } from '@elastic/eui';
import { useUiTracker } from '@kbn/observability-shared-plugin/public';
import { EuiCallOut } from '@elastic/eui';
import { EuiCode } from '@elastic/eui';
import { useSourceContext } from '../../../../../../containers/metrics_source';
import { useMetricK8sModuleContext } from '../../../../../../containers/ml/modules/metrics_k8s/module';
import { useMetricHostsModuleContext } from '../../../../../../containers/ml/modules/metrics_hosts/module';
Expand Down Expand Up @@ -191,6 +193,12 @@ export const JobSetupScreen = (props: Props) => {
defaultMessage="Something went wrong creating the necessary ML jobs."
/>
<EuiSpacer />
{setupStatus.reasons.map((errorMessage, i) => (
<EuiCallOut key={i} color="danger" iconType="warning" title={errorCalloutTitle}>
<EuiCode transparentBackground>{errorMessage}</EuiCode>
</EuiCallOut>
))}
<EuiSpacer />
<EuiButton data-test-subj="infraJobSetupScreenTryAgainButton" fill onClick={createJobs}>
<FormattedMessage
id="xpack.infra.ml.steps.setupProcess.tryAgainButton"
Expand Down Expand Up @@ -358,3 +366,7 @@ export const JobSetupScreen = (props: Props) => {
</>
);
};

const errorCalloutTitle = i18n.translate('xpack.infra.ml.steps.setupProcess.errorCalloutTitle', {
defaultMessage: 'An error occurred',
});

0 comments on commit 6fc72e9

Please sign in to comment.