Skip to content

Commit

Permalink
feat: add py_udf operator (hydro-project#792)
Browse files Browse the repository at this point in the history
feature-gates `py_udf` behind `python` feature, does not run in wasm tests
  • Loading branch information
MingweiSamuel committed Jun 29, 2023
1 parent 016abee commit bd2e3b0
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ jobs:
CARGO_TARGET_WASM32_UNKNOWN_UNKNOWN_RUNNER: wasm-bindgen-test-runner
with:
command: test
args: -p hydroflow --target wasm32-unknown-unknown --tests --no-fail-fast
args: -p hydroflow --target wasm32-unknown-unknown --tests --no-fail-fast --no-default-features --features __default_wasm

test-cli:
name: Test CLI
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions hydroflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ documentation = "https://docs.rs/hydroflow/"
description = "Hydro's low-level dataflow runtime and IR"

[features]
default = [ "async", "macros" , "nightly" ]
default = [ "macros" , "nightly", "python" ]
__default_wasm = [ "macros", "nightly" ]

nightly = [ "hydroflow_macro", "hydroflow_macro/diagnostics" ]
async = [ "dep:futures" ]
macros = [ "hydroflow_macro", "hydroflow_datalog" ]
hydroflow_macro = [ "dep:hydroflow_macro" ]
hydroflow_datalog = [ "dep:hydroflow_datalog" ]
cli_integration = [ "dep:hydroflow_cli_integration" ]
python = [ "dep:pyo3" ]

[[example]]
name = "kvs_bench"
Expand All @@ -24,14 +26,15 @@ required-features = [ "nightly" ]
bincode = "1.3"
byteorder = "1.4.3"
bytes = "1.1.0"
futures = { version = "0.3", optional = true }
futures = "0.3"
hydroflow_cli_integration = { optional = true, path = "../hydroflow_cli_integration", version = "^0.2.0" }
hydroflow_datalog = { optional = true, path = "../hydroflow_datalog", version = "^0.2.0" }
hydroflow_lang = { path = "../hydroflow_lang", version = "^0.2.0" }
hydroflow_macro = { optional = true, path = "../hydroflow_macro", version = "^0.2.0" }
itertools = "0.10"
lattices = { path = "../lattices", version = "^0.2.0", features = [ "serde" ] }
pusherator = { path = "../pusherator", version = "^0.0.1" }
pyo3 = { optional = true, version = "0.18" }
ref-cast = "1.0"
regex = "1.8.4"
rustc-hash = "1.1.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
source: hydroflow/tests/surface_python.rs
expression: hf.meta_graph().unwrap().to_dot()
---
digraph {
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n1v1 [label="(n1v1) source_iter(0..10)", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n2v1 [label="(n2v1) py_udf(\l r#\"\ldef fib(n):\lif n < 2:\l return n\lelse:\l return fib(n - 2) + fib(n - 1)\l \"#,\l \"fib\",\l)\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n3v1 [label="(n3v1) map(|x: PyResult<Py<PyAny>>| Python::with_gil(|py| {\l usize::extract(x.unwrap().as_ref(py)).unwrap()\l}))\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n4v1 [label="(n4v1) assert([0, 1, 1, 2, 3, 5, 8, 13, 21, 34])", fontname=Monaco, shape=house, style = filled, color = "#ffff00"]
n1v1 -> n2v1
n2v1 -> n3v1
n3v1 -> n4v1
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
source: hydroflow/tests/surface_python.rs
expression: hf.meta_graph().unwrap().to_mermaid()
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em;
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1[\"(1v1) <code>source_iter(0..10)</code>"/]:::pullClass
2v1[\"<div style=text-align:center>(2v1)</div> <code>py_udf(<br> r&num;&quot;<br>def fib(n):<br>if n &lt; 2:<br> return n<br>else:<br> return fib(n - 2) + fib(n - 1)<br> &quot;&num;,<br> &quot;fib&quot;,<br>)</code>"/]:::pullClass
3v1[\"<div style=text-align:center>(3v1)</div> <code>map(|x: PyResult&lt;Py&lt;PyAny&gt;&gt;| Python::with_gil(|py| {<br> usize::extract(x.unwrap().as_ref(py)).unwrap()<br>}))</code>"/]:::pullClass
4v1[/"(4v1) <code>assert([0, 1, 1, 2, 3, 5, 8, 13, 21, 34])</code>"\]:::pushClass
1v1--->2v1
2v1--->3v1
3v1--->4v1
end

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
source: hydroflow/tests/surface_python.rs
expression: hf.meta_graph().unwrap().to_dot()
---
digraph {
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n1v1 [label="(n1v1) source_iter([5])", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n2v1 [label="(n2v1) py_udf(r#\"\ldef add(a, b):\lreturn a + b\l \"#, \"add\")\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n3v1 [label="(n3v1) map(PyResult::<Py<PyAny>>::unwrap_err)", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n4v1 [label="(n4v1) map(|py_err| py_err.to_string())", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n5v1 [label="(n5v1) assert([\"TypeError: add() missing 1 required positional argument: 'b'\"])", fontname=Monaco, shape=house, style = filled, color = "#ffff00"]
n1v1 -> n2v1
n2v1 -> n3v1
n3v1 -> n4v1
n4v1 -> n5v1
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
source: hydroflow/tests/surface_python.rs
expression: hf.meta_graph().unwrap().to_mermaid()
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em;
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1[\"(1v1) <code>source_iter([5])</code>"/]:::pullClass
2v1[\"<div style=text-align:center>(2v1)</div> <code>py_udf(r&num;&quot;<br>def add(a, b):<br>return a + b<br> &quot;&num;, &quot;add&quot;)</code>"/]:::pullClass
3v1[\"(3v1) <code>map(PyResult::&lt;Py&lt;PyAny&gt;&gt;::unwrap_err)</code>"/]:::pullClass
4v1[\"(4v1) <code>map(|py_err| py_err.to_string())</code>"/]:::pullClass
5v1[/"(5v1) <code>assert([&quot;TypeError: add() missing 1 required positional argument: 'b'&quot;])</code>"\]:::pushClass
1v1--->2v1
2v1--->3v1
3v1--->4v1
4v1--->5v1
end

43 changes: 43 additions & 0 deletions hydroflow/tests/surface_python.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#![cfg(feature = "python")]

use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax};
use multiplatform_test::multiplatform_test;
use pyo3::prelude::*;

#[multiplatform_test(test)]
pub fn test_python_basic() {
let mut hf = hydroflow_syntax! {
source_iter(0..10)
-> py_udf(r#"
def fib(n):
if n < 2:
return n
else:
return fib(n - 2) + fib(n - 1)
"#, "fib")
-> map(|x: PyResult<Py<PyAny>>| Python::with_gil(|py| {
usize::extract(x.unwrap().as_ref(py)).unwrap()
}))
-> assert([0, 1, 1, 2, 3, 5, 8, 13, 21, 34]);
};
assert_graphvis_snapshots!(hf);

hf.run_available();
}

#[multiplatform_test(test)]
pub fn test_python_too_many_args() {
let mut hf = hydroflow_syntax! {
source_iter([5])
-> py_udf(r#"
def add(a, b):
return a + b
"#, "add")
-> map(PyResult::<Py<PyAny>>::unwrap_err)
-> map(|py_err| py_err.to_string())
-> assert(["TypeError: add() missing 1 required positional argument: 'b'"]);
};
assert_graphvis_snapshots!(hf);

hf.run_available();
}
4 changes: 2 additions & 2 deletions hydroflow_lang/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ fn generate_op_docs() -> Result<()> {

const DOCTEST_HYDROFLOW_PREFIX: &str = "\
```rust
# #[allow(unused_imports)] use hydroflow::{var_args, var_expr};
# #[allow(unused_imports)] use hydroflow::pusherator::Pusherator;
# #[allow(unused_imports)] use hydroflow::{var_args, var_expr, pusherator::Pusherator};
# #[cfg(feature = \"python\")] #[allow(unused_imports)] use pyo3::prelude::*;
# let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
# __rt.block_on(async { hydroflow::tokio::task::LocalSet::new().run_until(async {
# let mut __hf = hydroflow::hydroflow_syntax! {";
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ declare_ops![
persist::PERSIST,
persist_mut::PERSIST_MUT,
persist_mut_keyed::PERSIST_MUT_KEYED,
py_udf::PY_UDF,
reduce::REDUCE,
spin::SPIN,
sort::SORT,
Expand Down
138 changes: 138 additions & 0 deletions hydroflow_lang/src/graph/ops/py_udf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use proc_macro2::Literal;
use quote::quote_spanned;

use super::{
FlowProperties, FlowPropertyVal, OperatorCategory, OperatorConstraints, OperatorInstance,
OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1,
};

/// > Arguments: First, the source code for a python module, second, the name of a unary function
/// > defined within the module source code.
///
/// **Requires the "python" feature to be enabled.**
///
/// An operator which allows you to run a python udf. Input arguments must be a stream of items
/// which implement [`IntoPy`](https://docs.rs/pyo3/latest/pyo3/conversion/trait.IntoPy.html).
/// See the [relevant pyo3 docs here](https://pyo3.rs/latest/conversions/tables#mapping-of-rust-types-to-python-types).
///
/// Output items are of type `PyResult<Py<PyAny>>`. Rust native types can be extracted using
/// `.extract()`, see the [relevant pyo3 docs here](https://pyo3.rs/latest/conversions/traits#extract-and-the-frompyobject-trait)
/// or the examples below.
///
/// ```hydroflow
/// source_iter(0..10)
/// -> py_udf(r#"
/// def fib(n):
/// if n < 2:
/// return n
/// else:
/// return fib(n - 2) + fib(n - 1)
/// "#, "fib")
/// -> map(|x: PyResult<Py<PyAny>>| Python::with_gil(|py| {
/// usize::extract(x.unwrap().as_ref(py)).unwrap()
/// }))
/// -> assert([0, 1, 1, 2, 3, 5, 8, 13, 21, 34]);
/// ```
///
/// ```hydroflow
/// source_iter([5])
/// -> py_udf(r#"
/// def add(a, b):
/// return a + b
/// "#, "add")
/// -> map(PyResult::<Py<PyAny>>::unwrap_err)
/// -> map(|py_err| py_err.to_string())
/// -> assert(["TypeError: add() missing 1 required positional argument: 'b'"]);
/// ```
pub const PY_UDF: OperatorConstraints = OperatorConstraints {
name: "py_udf",
categories: &[OperatorCategory::Map],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 2,
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
ports_inn: None,
ports_out: None,
properties: FlowProperties {
deterministic: FlowPropertyVal::DependsOnArgs,
monotonic: FlowPropertyVal::DependsOnArgs,
inconsistency_tainted: false,
},
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
root,
op_span,
context,
hydroflow,
ident,
inputs,
outputs,
is_pull,
op_name,
op_inst: OperatorInstance { arguments, .. },
..
},
_| {
let py_src = &arguments[0];
let py_func_name = &arguments[1];

let py_func_ident = wc.make_ident("py_func");

let err_lit = Literal::string(&*format!(
"Hydroflow 'python' feature must be enabled to use `{}`",
op_name
));

let write_prologue = quote_spanned! {op_span=>
#[cfg(feature = "python")]
let #py_func_ident = {
::pyo3::prepare_freethreaded_python();
let func = ::pyo3::Python::with_gil::<_, ::pyo3::PyResult<::pyo3::Py<::pyo3::PyAny>>>(|py| {
Ok(::pyo3::types::PyModule::from_code(
py,
#py_src,
"_filename",
"_modulename",
)?
.getattr(#py_func_name)?
.into())
}).expect("Failed to compile python.");
#hydroflow.add_state(func)
};
#[cfg(not(feature = "python"))]
::std::compiler_error!(#err_lit);
};
let closure = quote_spanned! {op_span=>
|x| {
#[cfg(feature = "python")]
{
// TODO(mingwei): maybe this can be outside the closure?
let py_func = #context.state_ref(#py_func_ident);
::pyo3::Python::with_gil(|py| py_func.call1(py, (x,)))
}
#[cfg(not(feature = "python"))]
panic!()
}
};
let write_iterator = if is_pull {
let input = &inputs[0];
quote_spanned! {op_span=>
let #ident = #input.map(#closure);
}
} else {
let output = &outputs[0];
quote_spanned! {op_span=>
let #ident = #root::pusherator::map::Map::new(#closure, #output);
}
};
Ok(OperatorWriteOutput {
write_prologue,
write_iterator,
..Default::default()
})
},
};

0 comments on commit bd2e3b0

Please sign in to comment.