Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example doesn't work with futures 0.2.0 #84

Closed
Gurpartap opened this issue Apr 17, 2018 · 4 comments
Closed

Example doesn't work with futures 0.2.0 #84

Gurpartap opened this issue Apr 17, 2018 · 4 comments

Comments

@Gurpartap
Copy link

Gurpartap commented Apr 17, 2018

The async processing example works on futures 0.1.20 but fails on the latest futures 0.2.0 release with the following errors:

// Create the outer pipeline on the message stream.
let processed_stream = consumer.start()
.filter_map(|result| { // Filter out errors
match result {
Ok(msg) => Some(msg),
Err(kafka_error) => {
warn!("Error while receiving from Kafka: {:?}", kafka_error);
None
}
}
}).for_each(|msg| { // Process each message
info!("Enqueuing message for computation");
let producer = producer.clone();
let topic_name = output_topic.to_owned();
let owned_message = msg.detach();
// Create the inner pipeline, that represents the processing of a single event.
let process_message = cpu_pool.spawn_fn(move || {
// Take ownership of the message, and runs an expensive computation on it,
// using one of the threads of the `cpu_pool`.
Ok(expensive_computation(&owned_message))
}).and_then(move |computation_result| {
// Send the result of the computation to Kafka, asynchronously.
info!("Sending result");

error[E0599]: no method named `filter_map` found for type `rdkafka::consumer::MessageStream<'_, rdkafka::consumer::DefaultConsumerContext>` in the current scope
  --> src/main.rs:81:10
   |
81 |         .filter_map(|result| {  // Filter out errors
   |          ^^^^^^^^^^
   |
   = note: the method `filter_map` exists but the following trait bounds were not satisfied:
           `&mut rdkafka::consumer::MessageStream<'_, rdkafka::consumer::DefaultConsumerContext> : std::iter::Iterator`
   = help: items from traits can only be used if the trait is in scope
help: the following trait is implemented but not in scope, perhaps add a `use` for it:
   |
10 | use futures::stream::Stream;
   |

error[E0599]: no method named `and_then` found for type `futures_cpupool::CpuFuture<std::string::String, _>` in the current scope
  --> src/main.rs:99:12
   |
99 |         }).and_then(move |computation_result| {
   |            ^^^^^^^^
   |
   = help: items from traits can only be used if the trait is in scope
help: the following trait is implemented but not in scope, perhaps add a `use` for it:
   |
10 | use futures::future::Future;
   |
[dependencies]
log = "0.4.1"
futures = "0.2.0"
# futures = "0.1.20" # works
futures-cpupool = "0.1.8"
rand = "0.5.0-pre.0"
tokio-core = "0.1.17"
rdkafka = "0.15.0"
@kaiba696
Copy link

kaiba696 commented May 2, 2018

+1

@jwilm
Copy link

jwilm commented May 17, 2018

The suggestions from Rust core team are to skip futures 0.2 and wait for futures 0.3.

@nbigaouette-eai
Copy link

@benesch
Copy link
Collaborator

benesch commented Oct 22, 2019

Futures 0.2 was postponed and replaced by std::future::Future and async/await. I'm going to close this out for now. Once Rust 1.39 is released, we'll upgrade to be compatible with futures 0.3 and async/await (#143). For now, though, we'll stick with futures 0.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants