Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
Adds support for including custom executor jobs in /jobs (#756)
Browse files Browse the repository at this point in the history
* Adds support for including custom executor jobs in /list

* Fixes broken unit tests

* Addresses feedback from code review

* Adds support for listing jobs with /jobs

* Feedback from code review

* Removes uncommitted jobs from results

* Fixes the filtering for non-custom-executor jobs

* Marks test_kill_task_terminate_with_sigterm as xfail

* Adds missing import
  • Loading branch information
dposada authored and shamsimam committed Mar 21, 2018
1 parent 7ccf781 commit d236ec3
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 114 deletions.
5 changes: 5 additions & 0 deletions executor/tests/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import unittest

import collections
import pytest

import cook.subprocess as cs
import tests.utils as tu
Expand Down Expand Up @@ -35,6 +36,10 @@ def find_process_ids_in_group(group_id):


class SubprocessTest(unittest.TestCase):
# FIXME - remove the xfail mark once the issue with this test failing is resolved:
# https://github.com/twosigma/Cook/issues/737
@pytest.mark.xfail
@unittest.skip('This test fails occasionally')
def test_kill_task_terminate_with_sigterm(self):
task_id = tu.get_random_task_id()

Expand Down
235 changes: 148 additions & 87 deletions scheduler/src/cook/mesos/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
(java.util Date UUID)
javax.servlet.ServletResponse
org.apache.curator.test.TestingServer
org.joda.time.Minutes
(org.joda.time DateTime Minutes)
schema.core.OptionalKey))


Expand Down Expand Up @@ -329,11 +329,19 @@
(-> JobOrInstanceIds
(assoc (s/optional-key :partial) s/Bool)))

(def allowed-list-states (set/union util/job-states util/instance-states))

(def QueryJobsParams
"Schema for querying for jobs by job uuid, allowing optionally for
'partial' results, meaning that some uuids can be valid and others not"
{:uuid [s/Uuid]
(s/optional-key :partial) s/Bool})
{(s/optional-key :uuid) [s/Uuid]
(s/optional-key :partial) s/Bool
(s/optional-key :user) UserName
(s/optional-key :state) [(apply s/enum allowed-list-states)]
(s/optional-key :start) s/Str
(s/optional-key :end) s/Str
(s/optional-key :limit) NonNegInt
(s/optional-key :name) JobNameListFilter})

(def QueryInstancesParams
"Schema for querying for instances by instance uuid, allowing optionally for
Expand Down Expand Up @@ -1145,13 +1153,138 @@
:exists? (partial retrieve-jobs conn)
:handle-ok (fn [ctx] (render-jobs-for-response-deprecated conn framework-id ctx))}))

(defn valid-name-filter?
"Returns true if the provided name filter is either nil or satisfies the JobNameListFilter schema"
[name]
(or (nil? name)
(nil? (s/check JobNameListFilter name))))

(defn normalize-list-states
"Given a set, states, returns a new set that only contains one of the
'terminal' states of completed, success, and failed. We take completed
to mean both success and failed."
[states]
(if (contains? states "completed")
(set/difference states util/instance-states)
(if (set/superset? states util/instance-states)
(-> states (set/difference util/instance-states) (conj "completed"))
states)))

(defn name-filter-str->name-filter-pattern
"Generates a regex pattern corresponding to a user-provided name filter string"
[name]
(re-pattern (-> name
(str/replace #"\." "\\\\.")
(str/replace #"\*+" ".*"))))

(defn name-filter-str->name-filter-fn
"Returns a name-filtering function (or nil) given a user-provided name filter string"
[name]
(when name
(let [pattern (name-filter-str->name-filter-pattern name)]
#(re-matches pattern %))))

(defn job-list-request-malformed?
"Returns a [true {::error ...}] pair if the request is malformed, and
otherwise [false m], where m represents data that gets merged into the ctx"
[ctx]
(let [{:strs [state user start end limit name]} (get-in ctx [:request :query-params])]
(cond
(not (and state user start end))
[true {::error "must supply the state, user name, start time, and end time"}]

(not (set/superset? allowed-list-states state))
[true {::error (str "unsupported state in " state ", must be one of: " allowed-list-states)}]

(not (valid-name-filter? name))
[true {::error
(str "unsupported name filter " name
", can only contain alphanumeric characters, '.', '-', '_', and '*' as a wildcard")}]

:else
(try
[false {::states (normalize-list-states state)
::user user
::start-ms (-> start ^DateTime util/parse-time .getMillis)
::end-ms (-> end ^DateTime util/parse-time .getMillis)
::limit (util/parse-int-default limit 150)
::name-filter-fn (name-filter-str->name-filter-fn name)}]
(catch NumberFormatException e
[true {::error (.toString e)}])))))

(defn job-list-request-allowed?
[is-authorized-fn ctx]
(let [{limit ::limit, user ::user, start-ms ::start-ms, end-ms ::end-ms} ctx
request-user (get-in ctx [:request :authorization/user])
impersonator (get-in ctx [:request :authorization/impersonator])]
(cond
(not (is-authorized-fn request-user :get impersonator {:owner user :item :job}))
[false {::error (str "You are not authorized to list jobs for " user)}]

(not (pos? limit))
[false {::error (str "limit must be positive")}]

(and start-ms (> start-ms end-ms))
[false {::error (str "start-ms (" start-ms ") must be before end-ms (" end-ms ")")}]

:else true)))

(timers/deftimer [cook-scheduler handler fetch-jobs])
(timers/deftimer [cook-scheduler handler list-endpoint])
(histograms/defhistogram [cook-mesos api list-request-param-time-range-ms])
(histograms/defhistogram [cook-mesos api list-request-param-limit])
(histograms/defhistogram [cook-mesos api list-response-job-count])

(defn list-jobs
"Queries using the params from ctx and returns the job uuids that were found"
[db include-custom-executor? ctx]
(timers/time!
list-endpoint
(let [{states ::states
user ::user
start-ms ::start-ms
end-ms ::end-ms
since-hours-ago ::since-hours-ago
limit ::limit
name-filter-fn ::name-filter-fn} ctx
start-ms' (or start-ms (- end-ms (-> since-hours-ago t/hours t/in-millis)))
start (Date. ^long start-ms')
end (Date. ^long end-ms)
job-uuids (->> (timers/time!
fetch-jobs
(util/get-jobs-by-user-and-states db user states start end limit
name-filter-fn include-custom-executor?))
(sort-by :job/submit-time)
reverse
(map :job/uuid))
job-uuids (if (nil? limit)
job-uuids
(take limit job-uuids))]
(histograms/update! list-request-param-time-range-ms (- end-ms start-ms'))
(histograms/update! list-request-param-limit limit)
(histograms/update! list-response-job-count (count job-uuids))
job-uuids)))

(defn jobs-list-exist?
[conn ctx]
[true {::jobs (list-jobs (d/db conn) true ctx)}])

(defn read-jobs-handler
[conn is-authorized-fn resource-attrs]
(base-cook-handler
(merge {:allowed-methods [:get]
:malformed? job-request-malformed?
:allowed? (partial job-request-allowed? conn is-authorized-fn)
:exists? (partial jobs-exist? conn)}
:malformed? (fn [ctx]
(if (get-in ctx [:request :params :uuid])
(job-request-malformed? ctx)
(job-list-request-malformed? ctx)))
:allowed? (fn [ctx]
(if (get-in ctx [:request :params :uuid])
(job-request-allowed? conn is-authorized-fn ctx)
(job-list-request-allowed? is-authorized-fn ctx)))
:exists? (fn [ctx]
(if (get-in ctx [:request :params :uuid])
(jobs-exist? conn ctx)
(jobs-list-exist? conn ctx)))}
resource-attrs)))

(defn read-jobs-handler-multiple
Expand Down Expand Up @@ -1958,70 +2091,25 @@
{:allowed-methods [:get]
:handle-ok (fn [_] (stringify settings))}))

(defn- parse-int-default
[s d]
(if (nil? s)
d
(Integer/parseInt s)))

(defn- parse-long-default
[s d]
(if (nil? s)
d
(Long/parseLong s)))

(timers/deftimer [cook-scheduler handler fetch-jobs])
(timers/deftimer [cook-scheduler handler list-endpoint])
(histograms/defhistogram [cook-mesos api list-request-param-time-range-ms])
(histograms/defhistogram [cook-mesos api list-request-param-limit])
(histograms/defhistogram [cook-mesos api list-response-job-count])

(defn normalize-list-states
"Given a set, states, returns a new set that only contains one of the
'terminal' states of completed, success, and failed. We take completed
to mean both success and failed."
[states]
(if (contains? states "completed")
(set/difference states util/instance-states)
(if (set/superset? states util/instance-states)
(-> states (set/difference util/instance-states) (conj "completed"))
states)))

(defn valid-name-filter?
"Returns true if the provided name filter is either nil or satisfies the JobNameListFilter schema"
[name]
(or (nil? name)
(nil? (s/check JobNameListFilter name))))

(defn name-filter-str->name-filter-pattern
"Generates a regex pattern corresponding to a user-provided name filter string"
[name]
(re-pattern (-> name
(str/replace #"\." "\\\\.")
(str/replace #"\*+" ".*"))))

(defn name-filter-str->name-filter-fn
"Returns a name-filtering function (or nil) given a user-provided name filter string"
[name]
(when name
(let [pattern (name-filter-str->name-filter-pattern name)]
#(re-matches pattern %))))

(defn list-resource
[db framework-id is-authorized-fn]
(liberator/resource
:available-media-types ["application/json"]
:allowed-methods [:get]
:as-response (fn [data ctx] {:body data})
:as-response (fn [data _] {:body data})
:malformed? (fn [ctx]
;; since-hours-ago is included for backwards compatibility but is deprecated
;; please use start-ms and end-ms instead
(let [{:keys [state user since-hours-ago start-ms end-ms limit name]
:as params}
(let [{:keys [state user since-hours-ago start-ms end-ms limit name]}
(keywordize-keys (or (get-in ctx [:request :query-params])
(get-in ctx [:request :body-params])))
states (when state (set (str/split state #"\+")))
allowed-list-states (set/union util/job-states util/instance-states)]
states (when state (set (str/split state #"\+")))]
(cond
(not (and state user))
[true {::error "must supply the state and the user name"}]
Expand All @@ -2038,9 +2126,9 @@
(try
[false {::states (normalize-list-states states)
::user user
::since-hours-ago (parse-int-default since-hours-ago 24)
::since-hours-ago (util/parse-int-default since-hours-ago 24)
::start-ms (parse-long-default start-ms nil)
::limit (parse-int-default limit Integer/MAX_VALUE)
::limit (util/parse-int-default limit Integer/MAX_VALUE)
::end-ms (parse-long-default end-ms (System/currentTimeMillis))
::name-filter-fn (name-filter-str->name-filter-fn name)}]
(catch NumberFormatException e
Expand Down Expand Up @@ -2070,32 +2158,8 @@
:handle-malformed ::error
:handle-forbidden ::error
:handle-ok (fn [ctx]
(timers/time!
list-endpoint
(let [{states ::states
user ::user
start-ms ::start-ms
end-ms ::end-ms
since-hours-ago ::since-hours-ago
limit ::limit
name-filter-fn ::name-filter-fn} ctx
start-ms' (or start-ms (- end-ms (-> since-hours-ago t/hours t/in-millis)))
start (Date. start-ms')
end (Date. end-ms)
job-uuids (->> (timers/time!
fetch-jobs
(util/get-jobs-by-user-and-states db user states start end limit name-filter-fn))
(sort-by :job/submit-time)
reverse
(map :job/uuid))
job-uuids (if (nil? limit)
job-uuids
(take limit job-uuids))
jobs (mapv (partial fetch-job-map db framework-id) job-uuids)]
(histograms/update! list-request-param-time-range-ms (- end-ms start-ms'))
(histograms/update! list-request-param-limit limit)
(histograms/update! list-response-job-count (count jobs))
jobs)))))
(let [job-uuids (list-jobs db false ctx)]
(mapv (partial fetch-job-map db framework-id) job-uuids)))))

;;
;; /unscheduled_jobs
Expand Down Expand Up @@ -2186,11 +2250,8 @@
(fn [ctx]
(let [{:keys [status start end name]} (-> ctx :request :params)
allowed-instance-statuses #{"unknown" "running" "success" "failed"}
parse-time (fn parse-time [s]
(or (tf/parse s)
(tc/from-long (Long/parseLong s))))
start-time (parse-time start)
end-time (parse-time end)]
start-time (util/parse-time start)
end-time (util/parse-time end)]
(cond
(not (allowed-instance-statuses status))
[true {::error (str "unsupported status " status ", must be one of: " allowed-instance-statuses)}]
Expand Down
Loading

0 comments on commit d236ec3

Please sign in to comment.