Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Fix running an output plugin with :onyx.core/params longer than 0 #894

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

sundbry
Copy link
Contributor

@sundbry sundbry commented May 4, 2019

When :onyx.core/params is set, the (identity) function in
operations.cljc throws because it can't handle more than one argument.
Instead of usinging identity, we return the last argument (the segment).

This is sort of an unusual thing to do, to attach params to a lifecycle affecting an output task, but it is for a 'generic' set of lifecycle calls in my use case in the mongodb plugin I am developing. We provide the :mongo connection as a parameter so you can do queries in normal :function tasks, and use the same lifecycles for the :output tasks to open/close the connection.

#error {
 :cause "Segment threw exception"
 :data {:exception #error {
 :cause "Wrong number of args (2) passed to: clojure.core/identity"
 :via
 [{:type clojure.lang.ArityException
   :message "Wrong number of args (2) passed to: clojure.core/identity"
   :at [clojure.lang.AFn throwArity "AFn.java" 429]}]
 :trace
 [[clojure.lang.AFn throwArity "AFn.java" 429]
  [clojure.lang.AFn invoke "AFn.java" 36]
  [clojure.core$partial$fn__5824 invoke "core.clj" 2624]
  [onyx.peer.transform$collect_next_segments$fn__38085 invoke "transform.clj" 8]
  [onyx.peer.transform$collect_next_segments invokeStatic "transform.clj" 8]
  [onyx.peer.transform$collect_next_segments invoke "transform.clj" 7]
  [onyx.peer.transform$apply_fn_single$fn__38090 invoke "transform.clj" 17]
  [clojure.core$map$fn__5851 invoke "core.clj" 2753]
  [clojure.lang.LazySeq sval "LazySeq.java" 42]
  [clojure.lang.LazySeq seq "LazySeq.java" 51]
  [clojure.lang.RT seq "RT.java" 531]
  [clojure.core$seq__5387 invokeStatic "core.clj" 137]
  [clojure.core$dorun invokeStatic "core.clj" 3133]
  [clojure.core$doall invokeStatic "core.clj" 3148]
  [clojure.core$doall invoke "core.clj" 3148]
  [onyx.peer.transform$apply_fn_single invokeStatic "transform.clj" 17]
  [onyx.peer.transform$apply_fn_single invoke "transform.clj" 14]
  [onyx.peer.transform$apply_fn invokeStatic "transform.clj" 54]
  [onyx.peer.transform$apply_fn invoke "transform.clj" 48]
  [onyx.peer.task_lifecycle$build_apply_fn$fn__64006 invoke "task_lifecycle.clj" 620]
  [onyx.peer.task_lifecycle$wrap_lifecycle_metrics$fn__64159 invoke "task_lifecycle.clj" 1098]
  [onyx.peer.task_lifecycle.TaskStateMachine exec "task_lifecycle.clj" 1071]
  [onyx.peer.task_lifecycle$run_task_lifecycle_BANG_ invokeStatic "task_lifecycle.clj" 550]
  [onyx.peer.task_lifecycle$run_task_lifecycle_BANG_ invoke "task_lifecycle.clj" 540]
  [onyx.peer.task_lifecycle$start_task_lifecycle_BANG_$fn__64180 invoke "task_lifecycle.clj" 1156]
  [clojure.core.async$thread_call$fn__11217 invoke "async.clj" 442]
  [clojure.lang.AFn run "AFn.java" 22]
  [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1149]
  [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 624]
  [java.lang.Thread run "Thread.java" 748]]

When :onyx.core/params is set, the (identity) function in
operations.cljc throws because it can't handle more than one argument.
Instead of usinging identity, we return the last argument (the segment).
@solatis
Copy link
Member

solatis commented May 4, 2019

So the use case you're trying to accomodate is that you can pass the actual connection as an argument to the output task, rather than the details how it can establish the connection?

Is there any reason why you're not using the "regular" pattern of opening a connection upon construction of the output plugin, e.g. in your situation here https://github.com/arctype-co/onyx-mongo/blob/master/src/onyx/plugin/mongo.clj#L129 ?

@sundbry
Copy link
Contributor Author

sundbry commented May 4, 2019

Hello Leon @solatis,

I first developed the plugin using that pattern, which I will probably go back to for the output instead of requiring the lifecycle hooks because it is simpler to program for the end user that way. The lifecycle hook adding the function params is for supporting reads/queries in :onyx/type :function tasks, so the same lifecycle hooks can be used in either context.

Regardless however of my design choices there, I still think this is a bug, because if I for example added a lifecycle hook with some :onyx.core/params targeting :all tasks, which would be a more valid use case, it would also raise this exception.

@solatis
Copy link
Member

solatis commented May 5, 2019

Ok, I think accepting this patch would be fine, but I wouldn't mind the input of @lbradstreet on this -- perhaps there was a better reason why these params only accept a single arg function.

@lbradstreet
Copy link
Member

I'm not sure I completely understand the use case, but the way I would think you would make this work is you would check whether it's an output task in https://github.com/arctype-co/onyx-mongo/blob/master/src/onyx/plugin/mongo.clj#L90 (via inspecting the task map), and you would just not inject into :onyx.core/params if an :onyx/fn is not supplied via the task-map.

Ignoring the params as you've done also works, but I think you're more likely to code something which drops params silently. It's probably fine though.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants