forked from grafadruid/go-druid
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtasks.go
102 lines (92 loc) · 3.34 KB
/
tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package druid
import (
"net/http"
"strings"
)
const (
tasksEndpoint = "druid/indexer/v1/tasks"
taskStatusEndpoint = "druid/indexer/v1/task/:taskId/status"
tasksCompleteEndpoint = "druid/indexer/v1/completeTasks"
tasksRunningEndpoint = "druid/indexer/v1/runningTasks"
tasksWaitingEndpoint = "druid/indexer/v1/waitingTasks"
tasksPendingEndpoint = "druid/indexer/v1/pendingTasks"
taskPayloadEndpoint = "druid/indexer/v1/task/:taskId"
taskSegmentsEndpoint = "druid/indexer/v1/task/:taskId/segments"
taskReportEndpoint = "druid/indexer/v1/task/:taskId/reports"
taskSubmitEndpoint = "druid/indexer/v1/task"
taskShutdownEndpoint = "druid/indexer/v1/task/:taskId/shutdown"
tasksShutdownAllEndpoint = "druid/indexer/v1/datasources/:datasource/shutdownAllTasks"
tasksStatusesEndpoint = "druid/indexer/v1/taskStatus"
taskDeletePendingSegments = "druid/indexer/v1/pendingSegments/:datasource"
)
// TasksService is a service that runs requests to druid tasks API.
type TasksService struct {
client *Client
}
// SubmitTaskResponse is a response object of Druid Task API Submit task method.
type SubmitTaskResponse struct {
Task string `json:"task"`
}
// ShutdownTaskResponse is a response object of Druid SupervisorService's Terminate method.
type ShutdownTaskResponse struct {
Task string `json:"task"`
}
// SubmitTask submits an ingestion specification to druid tasks API with a pre-configured druid client.
// https://druid.apache.org/docs/latest/api-reference/tasks-api/#submit-a-task
func (s *TasksService) SubmitTask(spec *TaskIngestionSpec) (string, error) {
r, err := s.client.NewRequest("POST", taskSubmitEndpoint, spec)
if err != nil {
return "", err
}
var result SubmitTaskResponse
_, err = s.client.Do(r, &result)
if err != nil {
return "", err
}
return result.Task, nil
}
// GetStatus calls druid tasks service's Get status API.
// https://druid.apache.org/docs/latest/api-reference/tasks-api/#get-task-status
func (s *TasksService) GetStatus(taskId string) (TaskStatusResponse, error) {
r, err := s.client.NewRequest("GET", applyTaskId(taskStatusEndpoint, taskId), nil)
var result TaskStatusResponse
if err != nil {
return result, err
}
_, err = s.client.Do(r, &result)
if err != nil {
return result, err
}
return result, nil
}
// Shutdown calls druid task service's shutdown task API.
// https://druid.apache.org/docs/latest/api-reference/tasks-api/#shut-down-a-task
func (s *TasksService) Shutdown(taskId string) (string, error) {
r, err := s.client.NewRequest("POST", applyTaskId(taskShutdownEndpoint, taskId), "")
var result ShutdownTaskResponse
if err != nil {
return "", err
}
_, err = s.client.Do(r, &result)
if err != nil {
return result.Task, err
}
return result.Task, nil
}
func applyTaskId(input string, taskId string) string {
return strings.Replace(input, ":taskId", taskId, 1)
}
// GetRunningTasks calls druid task service's running tasks API.
// https://druid.apache.org/docs/latest/api-reference/tasks-api#get-an-array-of-running-tasks
func (s *TasksService) GetRunningTasks(options RunningTasksOptions) ([]*RunningTask, error) {
r, err := s.client.NewRequest(http.MethodGet, tasksRunningEndpoint, options)
var result []*RunningTask
if err != nil {
return nil, err
}
_, err = s.client.Do(r, &result)
if err != nil {
return result, err
}
return result, nil
}