Skip to content

Commit

Permalink
Scheduled events processing
Browse files Browse the repository at this point in the history
  • Loading branch information
pilosus committed Jul 29, 2023
1 parent 399eb92 commit 427db94
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 68 deletions.
25 changes: 10 additions & 15 deletions src/dienstplan/commands.clj
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,6 @@ Caveats:
(into (hash-map) (remove (fn [[_ v]] (nil? v)) context))]
result))

(defn get-now-ts
[]
(new java.sql.Timestamp (System/currentTimeMillis)))

;; Parse app mention response

(s/fdef parse-command
Expand Down Expand Up @@ -527,7 +523,7 @@ Caveats:
text))

(defmethod command-exec! :create [command-map]
(let [now (get-now-ts)
(let [now (helpers/now-ts-sql)
{:keys [channel rotation]} (get-channel-rotation command-map)
channel-formatted (slack-mention-channel channel)
users (get-in command-map [:args :users])
Expand Down Expand Up @@ -563,7 +559,7 @@ Caveats:
result))

(defmethod command-exec! :update [command-map]
(let [now (get-now-ts)
(let [now (helpers/now-ts-sql)
{:keys [channel rotation]} (get-channel-rotation command-map)
channel-formatted (slack-mention-channel channel)
users (get-in command-map [:args :users])
Expand Down Expand Up @@ -606,10 +602,9 @@ Caveats:
(let [{:keys [channel rotation]} (get-channel-rotation command-map)
channel-formatted (slack-mention-channel channel)
{:keys [users-count users-updated prev-duty current-duty]}
(db/rotate-duty! channel rotation (get-now-ts))
_ (log/info
(format "Updated %s/%s for rotation %s of channel %s"
users-updated users-count rotation channel))
(db/rotate-duty! channel rotation (helpers/now-ts-sql))
_ (log/infof "Updated %s/%s for rotation %s of channel %s"
users-updated users-count rotation channel)
duties (map slack/get-user-name [prev-duty current-duty])
text
(cond
Expand All @@ -629,7 +624,7 @@ Caveats:
(let [{:keys [channel rotation]} (get-channel-rotation command-map)
channel-formatted (slack-mention-channel channel)
name (get-in command-map [:args :user])
assigned (db/assign! channel rotation name (get-now-ts))
assigned (db/assign! channel rotation name (helpers/now-ts-sql))
text
(if (= assigned :user-not-found)
(format "User %s is not found in rotation `%s` of channel %s"
Expand Down Expand Up @@ -742,10 +737,10 @@ Caveats:

(defn send-command-response!
[request]
(log/info "Request to Slack API")
(let [body-map (get-command-response request)
body (json/generate-string body-map)
{:keys [status data]}
(slack/slack-api-request {:method :chat.postMessage :body body})
_ (log/info
(format "Post message to Slack: status %s body %s" status data))]
{:keys [status data]} (slack/slack-api-request
{:method :chat.postMessage :body body})]
(log/infof "Response from Slack API: status %s body %s" status data)
body-map))
4 changes: 3 additions & 1 deletion src/dienstplan/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
[dienstplan.endpoints :as endpoints]
[dienstplan.logging :as logging]
[dienstplan.middlewares :as middlewares]
[dienstplan.schedule :as schedule]
[mount.core :as mount :refer [defstate]]
[ring.adapter.jetty :refer [run-jetty]]
[ring.middleware.cookies :refer [wrap-cookies]]
Expand Down Expand Up @@ -133,4 +134,5 @@
(case mode
:server (mount/start)
:migrate (db/migrate nil)
:rollback (db/rollback nil)))))
:rollback (db/rollback nil)
:schedule (schedule/run)))))

Check warning on line 138 in src/dienstplan/core.clj

View check run for this annotation

Codecov / codecov/patch

src/dienstplan/core.clj#L137-L138

Added lines #L137 - L138 were not covered by tests
15 changes: 9 additions & 6 deletions src/dienstplan/db.clj
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@
(update-users conn assigned ts))
assigned)))

(defn executable-schedules
(defn schedules-get
"Get scheduled events that are ready to be executed"
[conn now]
(jdbc/execute!
Expand All @@ -365,16 +365,19 @@

(defn schedule-update!
[conn params]
(sql/update!
conn
:schedule
{:run_at (:schedule/run_at params)}
["id = ?" (:schedule/id params)]))
(let [updated (:next.jdbc/update-count
(sql/update!
conn
:schedule
{:run_at (:schedule/run_at params)}
["id = ?" (:schedule/id params)]))]
(log/debugf "Schedule row updated: %s with params: %s" updated params)))

(defn schedule-insert!
[params]
(jdbc/with-transaction [conn db]
(try (let [inserted (sql/insert! conn :schedule params)]
(log/debugf "Schedule inserted: %s" inserted)
{:result (when (-> inserted :schedule/id int?)
(format "Executable `%s` successfully scheduled with `%s`"
(:executable params) (:crontab params)))})
Expand Down
25 changes: 25 additions & 0 deletions src/dienstplan/helpers.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
;; Copyright (c) Vitaly Samigullin and contributors. All rights reserved.
;;
;; This program and the accompanying materials are made available under the
;; terms of the Eclipse Public License 2.0 which is available at
;; http://www.eclipse.org/legal/epl-2.0.
;;
;; This Source Code may also be made available under the following Secondary
;; Licenses when the conditions for such availability set forth in the Eclipse
;; Public License, v. 2.0 are satisfied: GNU General Public License as published by
;; the Free Software Foundation, either version 2 of the License, or (at your
;; option) any later version, with the GNU Classpath Exception which is available
;; at https://www.gnu.org/software/classpath/license.html.
;;
;; SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0

(ns dienstplan.helpers
"Common helper functions shared across namespaces"
(:require [clojure.string :as string]))

(defn now-ts-sql
"Get current timestamp in JDBC API compatible format"
[]
(new java.sql.Timestamp (System/currentTimeMillis)))

(defn now-ts-seconds
"Get current Unix timestamp in seconds"
[]
(quot (System/currentTimeMillis) 1000))

(defn nilify
[s]
(if (string/blank? s) nil s))
Expand Down
2 changes: 1 addition & 1 deletion src/dienstplan/middlewares.clj
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
(let [sentry-context (get-sentry-context request)
sentry-error {:message "Something has gone wrong!" :throwable e}
sentry-event (merge sentry-error sentry-context)]
(log/error e (format "Request failed: %s with error: %s" request e))
(log/errorf e "Request failed: %s with error: %s" request e)
(sentry/send-event sentry-event)
;; For some reason cannot move the response to finally,
;; the block executed, but never returned from the endpoint
Expand Down
126 changes: 91 additions & 35 deletions src/dienstplan/schedule.clj
Original file line number Diff line number Diff line change
@@ -1,40 +1,96 @@
;; Copyright (c) Vitaly Samigullin and contributors. All rights reserved.
;;
;; This program and the accompanying materials are made available under the
;; terms of the Eclipse Public License 2.0 which is available at
;; http://www.eclipse.org/legal/epl-2.0.
;;
;; This Source Code may also be made available under the following Secondary
;; Licenses when the conditions for such availability set forth in the Eclipse
;; Public License, v. 2.0 are satisfied: GNU General Public License as published by
;; the Free Software Foundation, either version 2 of the License, or (at your
;; option) any later version, with the GNU Classpath Exception which is available
;; at https://www.gnu.org/software/classpath/license.html.
;;
;; SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0

(ns dienstplan.schedule
(:require
[dienstplan.db :as db]
[clojure.tools.logging :as log]
[org.pilosus.kairos :as kairos]
[next.jdbc :as jdbc]))

;; TODO
;; fn-get-schedules - db/get-schedules
;; fn-update-schedule - db/update-schedule
;; fn-process-command - ???
(defn process-scheduled-commands
[dienstplan.db :as db]
[dienstplan.helpers :as helpers]
[dienstplan.commands :as commands]
[next.jdbc :as jdbc]
[org.pilosus.kairos :as kairos]))

(defn- next-run-ts
"Given crontab line, return the next timestamp in JDBC compatible format"
[schedule-row]
(-> schedule-row
:schedule/crontab
(kairos/get-dt-seq)
first
.toInstant
java.sql.Timestamp/from))

(defn- schedule-update-map
[schedule-row]
{:schedule/id (:schedule/id schedule-row)
:schedule/run_at (next-run-ts schedule-row)})

(defn- exec-command-map
"Get a request hashmap to run the bot command with"
[schedule-row]
{:params
{:event
;; we need a placeholder in place of a bot mention to match the regex
{:text (format "<@PLACEHOLDER> %s" (:schedule/executable schedule-row)),
:ts (-> (helpers/now-ts-seconds)
float
str)
:channel (:schedule/channel schedule-row)}}})

(defn process-rows
"Iterate over rows from `schedule` table, process them, return number of processed rows"
[rows fn-process-command fn-update-schedule]
(loop [events (seq rows)
processed 0]
(if events
(let [event (first events)
command-map (exec-command-map event)
query-map (schedule-update-map event)]
(log/debug "Start processing event" event)

;; If outter transaction rolls back, it doesn't affect the nested one
(jdbc/with-transaction [nested-trx db/db]
(try
(log/debug "Process scheduled command" command-map)
(fn-process-command command-map)

(log/debug "Update processed row in schedule table" query-map)
(fn-update-schedule nested-trx query-map)

(catch Exception e
(log/errorf e "Couldn't process row %s: %s" event (.getMessage e))
(.rollback nested-trx))))

Check warning on line 74 in src/dienstplan/schedule.clj

View check run for this annotation

Codecov / codecov/patch

src/dienstplan/schedule.clj#L74

Added line #L74 was not covered by tests

(log/debug "Event processed")
(recur (next events) (inc processed)))
processed)))

(defn process-events
"Process scheduled events, return number of processed events"
[fn-get-schedules fn-process-command fn-update-schedule]
(jdbc/with-transaction [conn db/db]
(let [now (new java.sql.Timestamp (System/currentTimeMillis))
rows (fn-get-schedules conn now)]
(loop [events (seq rows)]
(let [event (first events)
next-run-at (-> event
:schedule/crontab
(kairos/get-dt-seq)
first
.toInstant
java.sql.Timestamp/from)]
;; If outter transaction rolls back, it doesn't affect nested one
(jdbc/with-transaction [nested-trx db/db]
(try
(fn-process-command (:schedule/channel event)
(:schedule/command event))
(fn-update-schedule
nested-trx
{:schedule/id (:schedule/command event)
:schedule/run_at next-run-at})
(catch Exception e
(log/error "Couldn't process command for event"
event
(.getMessage e))
(.rollback nested-trx))))
(when events
(recur (next events))))))))
(log/info "Start processing scheduled events")
(let [rows (fn-get-schedules conn (helpers/now-ts-sql))
processed (process-rows rows fn-process-command fn-update-schedule)]
(if (> processed 0)
(do (log/infof "Processed %s event(s)" processed) processed)
(do (log/info "No scheduled events found") 0)))))

(defn run
"Schedule processing entrypoint"
[]
(process-events db/schedules-get
commands/send-command-response!
db/schedule-update!))
6 changes: 2 additions & 4 deletions src/dienstplan/verify.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
[buddy.core.mac :as mac]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[dienstplan.helpers :as helpers]
[dienstplan.spec :as spec]))

(def VERSION "v0")
(def REPLAY_ATTACK_THRESHOLD_SECONDS (* 60 5))

(defn get-current-ts []
(quot (System/currentTimeMillis) 1000))

(defn calculate-signature
[sig-str sig-key]
(let [hmac (-> (mac/hash sig-str {:key sig-key :alg :hmac+sha256})
Expand All @@ -47,7 +45,7 @@
(let [body (:raw-body request)
headers (get request :headers)
ts (Integer/parseInt (get headers :x-slack-request-timestamp))
now (get-current-ts)
now (helpers/now-ts-seconds)
replay-attack? (> (- now ts) REPLAY_ATTACK_THRESHOLD_SECONDS)
recieved-sig (get headers :x-slack-signature)
sig-str (format "%s:%s:%s" VERSION ts body)
Expand Down
Loading

0 comments on commit 427db94

Please sign in to comment.