Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
shnarazk committed Apr 9, 2023
1 parent d304897 commit b45d32e
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 20 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ hyper-tls = { version = "^0.5" }
reqwest = "0.11.16"
serde_json = { version = "^1.0" }
tokio = { version = "^1.26", features = ["full"] }
futures-lite = "1.13.0"
async-compat = "0.2.1"
bytes = "1.4.0"
serde = "1.0.159"
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod reqwest_plugin;
81 changes: 61 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use {
},
// chrono::DateTime,
clap::Parser,
nhk_now::reqwest_plugin::*,
// hyper::Client,
// hyper_tls::HttpsConnector,
serde_json::Value,
Expand Down Expand Up @@ -53,10 +54,20 @@ fn main() {
}),
..Default::default()
}))
.add_plugin(ReqwestPlugin)
// .insert_resource(WinitSettings::desktop_app())
.insert_resource(ReqestTicket(1))
.add_systems(Startup, set_config)
.add_systems(Startup, (spawn_layout, spawn_tasks))
.add_systems(Update, (button_system, button_system2, handle_tasks))
.add_systems(Startup, spawn_layout)
.add_systems(
Update,
(
button_system,
send_requests,
handle_responses,
button_system2,
),
)
.run()
}

Expand Down Expand Up @@ -353,26 +364,26 @@ fn button_system2(
}
}

#[derive(Component)]
struct ProgramJson(Task<Value>);
// #[derive(Component)]
// struct ProgramJson(Task<Value>);

// we need despawn 'task' after reading the content after updating screen.
fn spawn_tasks(_commands: Commands) {
let thread_pool = IoTaskPool::get();
let config = AppConfig::default();
let service = "g1".to_string();
let _task = thread_pool.spawn(async move {
let Ok(json) = fetch_json_reqwest(&config, &service).await else {
return serde_json::from_str("{status: \"no data\"}").unwrap();
};
json
});
// commands.spawn(ProgramJson(task));
}
// // we need despawn 'task' after reading the content after updating screen.
// fn spawn_tasks(_commands: Commands) {
// let thread_pool = IoTaskPool::get();
// let config = AppConfig::default();
// let service = "g1".to_string();
// let _task = thread_pool.spawn(async move {
// let Ok(json) = fetch_json_reqwest(&config, &service).await else {
// return serde_json::from_str("{status: \"no data\"}").unwrap();
// };
// json
// });
// // commands.spawn(ProgramJson(task));
// }

fn handle_tasks(mut _commands: Commands, mut _tasks: Query<(Entity, &mut ProgramJson)>) {
// TODO:
}
// fn handle_tasks(mut _commands: Commands, mut _tasks: Query<(Entity, &mut ProgramJson)>) {
// // TODO:
// }

/*
#[allow(dead_code)]
Expand Down Expand Up @@ -439,3 +450,33 @@ fn parse_json(json: &Value) -> Option<(Value, String)> {
}
None
}

#[derive(Debug, Resource)]
struct ReqestTicket(u8);

fn send_requests(mut commands: Commands, mut ch: ResMut<ReqestTicket>) {
if ch.0 == 0 {
return;
}
let Ok(base) = format!(
"https://api.nhk.or.jp/v2/pg/now/{}/{}.json?key={}",
400, "g1", "",
).as_str().try_into() else {
return;
};
let req = reqwest::Request::new(reqwest::Method::GET, base);
let req = ReqwestRequest(Some(req));
ch.0 = 0;
commands.spawn(req);
dbg!();
}

fn handle_responses(mut commands: Commands, results: Query<(Entity, &ReqwestBytesResult)>) {
for (e, res) in results.iter() {
let string = res.as_str().unwrap();
info!("{string}");

// Done with this entity
commands.entity(e).despawn_recursive();
}
}
104 changes: 104 additions & 0 deletions src/reqwest_plugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use bevy::prelude::*;
use bevy::tasks::AsyncComputeTaskPool;
pub use reqwest;

use {bevy::tasks::Task, futures_lite::future};

#[derive(Resource)]
pub struct ReqwestClient(pub reqwest::Client);
impl Default for ReqwestClient {
fn default() -> Self {
Self(reqwest::Client::new())
}
}

/// we have to use an option to be able to ".take()" later
#[derive(Component, Deref)]
pub struct ReqwestRequest(pub Option<reqwest::Request>);

impl Into<ReqwestRequest> for reqwest::Request {
fn into(self) -> ReqwestRequest {
ReqwestRequest(Some(self))
}
}

#[derive(Component, Deref)]
struct ReqwestInflight(pub Task<reqwest::Result<bytes::Bytes>>);

#[derive(Component, Deref)]
pub struct ReqwestBytesResult(pub reqwest::Result<bytes::Bytes>);

impl ReqwestBytesResult {
pub fn as_str(&self) -> Option<&str> {
match &self.0 {
Ok(string) => Some(std::str::from_utf8(&string).ok()?),
Err(_) => None,
}
}
pub fn as_string(&mut self) -> Option<String> {
Some(self.as_str()?.into())
}
pub fn deserialize_json<'de, T: serde::Deserialize<'de>>(&'de mut self) -> Option<T> {
serde_json::from_str(self.as_str()?).ok()
}
}

pub struct ReqwestPlugin;
impl Plugin for ReqwestPlugin {
fn build(&self, app: &mut App) {
if !app.world.contains_resource::<ReqwestClient>() {
app.init_resource::<ReqwestClient>();
}
app.add_systems(Update, Self::start_handling_requests);
app.add_systems(Update, Self::poll_inflight_requests_to_bytes);
}
}

//TODO: Make type generic, and we can create systems for JSON and TEXT requests
impl ReqwestPlugin {
fn start_handling_requests(
mut commands: Commands,
http_client: ResMut<ReqwestClient>,
mut requests: Query<(Entity, &mut ReqwestRequest), Added<ReqwestRequest>>,
) {
let thread_pool = AsyncComputeTaskPool::get();
for (entity, mut request) in requests.iter_mut() {
bevy::log::debug!("Creating: {entity:?}");
// if we take the data, we can use it
if let Some(request) = request.0.take() {
let client = http_client.0.clone();

let task = {
thread_pool.spawn(async move {
let r = async_compat::Compat::new(async {
client.execute(request).await?.bytes().await
})
.await;
r
})
};
// put it as a component to be polled, and remove the request, it has been handled
commands.entity(entity).insert(ReqwestInflight(task));
commands.entity(entity).remove::<ReqwestRequest>();
}
}
}

fn poll_inflight_requests_to_bytes(
mut commands: Commands,
// Very important to have the Without, otherwise we get task failure upon completed task
mut requests: Query<(Entity, &mut ReqwestInflight), Without<ReqwestBytesResult>>,
) {
for (entity, mut request) in requests.iter_mut() {
bevy::log::debug!("polling: {entity:?}");

if let Some(result) = future::block_on(future::poll_once(&mut request.0)) {
// move the result over to a new component
commands
.entity(entity)
.insert(ReqwestBytesResult(result))
.remove::<ReqwestInflight>();
}
}
}
}

0 comments on commit b45d32e

Please sign in to comment.