diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 208609e..21c05e7 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -16,7 +16,7 @@ jobs: strategy: matrix: rust: - - 1.77.0 # MSRV + - 1.78.0 # MSRV steps: - name: Checkout @@ -73,7 +73,7 @@ jobs: centos: runs-on: ubuntu-latest - needs: [ci] + needs: [ ci ] steps: - name: System dependencies @@ -98,7 +98,7 @@ jobs: arch: runs-on: ubuntu-latest - needs: [ci] + needs: [ ci ] steps: - name: Checkout diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index eca968a..dc76d00 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: rust: - - 1.77.0 # MSRV + - 1.78.0 # MSRV steps: - name: Checkout diff --git a/Cargo.lock b/Cargo.lock index e4e6ff3..82fd6cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -638,17 +638,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "ctrlc-async" -version = "3.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "598e9d68e769aa1283460a3b0ec0d049ccfb6170277aea37089fa3f58fd721a1" -dependencies = [ - "nix 0.23.2", - "tokio", - "winapi", -] - [[package]] name = "deranged" version = "0.3.11" @@ -1021,12 +1010,11 @@ dependencies = [ "chrono", "clap", "colored", - "ctrlc-async", "env_logger", "humantime", "log", "log4rs", - "nix 0.28.0", + "nix", "rand", "thiserror", "tokio", @@ -1103,15 +1091,6 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" -[[package]] -name = "memoffset" -version = "0.6.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" -dependencies = [ - "autocfg", -] - [[package]] name = "miniz_oxide" version = "0.7.2" @@ -1132,19 +1111,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "nix" -version = "0.23.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3790c00a0150112de0f4cd161e3d7fc4b2d8a5542ffc35f099a2562aecb35c" -dependencies = [ - "bitflags 1.3.2", - "cc", - "cfg-if", - "libc", - "memoffset", -] - [[package]] name = "nix" version = "0.28.0" diff --git a/Cargo.toml b/Cargo.toml index 10da83d..3cc518e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ base64 = "0" chrono = { version = "0", features = ["clock", "std"] } clap = { version = "4", features = ["derive"] } colored = "2" -ctrlc-async = "3" env_logger = "0" humantime = "2" log = "0" diff --git a/src/aws.rs b/src/aws.rs index 7d87a26..b39c5f4 100644 --- a/src/aws.rs +++ b/src/aws.rs @@ -1,7 +1,6 @@ pub mod stream { use anyhow::Result; use async_trait::async_trait; - use aws_config::Region; use aws_sdk_kinesis::operation::get_records::GetRecordsOutput; use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput; use aws_sdk_kinesis::operation::list_shards::ListShardsOutput; @@ -38,8 +37,6 @@ pub mod stream { shard_id: &str, ) -> Result; - fn get_region(&self) -> Option<&Region>; - fn aws_datetime(timestamp: &chrono::DateTime) -> DateTime { DateTime::from_millis(timestamp.timestamp_millis()) } @@ -143,10 +140,6 @@ pub mod client { .map_err(SdkError::into_service_error) .map_err(Into::into) } - - fn get_region(&self) -> Option<&Region> { - self.client.config().region() - } } pub async fn create_client( diff --git a/src/kinesis.rs b/src/kinesis.rs index ffd5ac2..7c6e1ae 100644 --- a/src/kinesis.rs +++ b/src/kinesis.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use aws_sdk_kinesis::operation::get_records::GetRecordsError; use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput; use chrono::prelude::*; -use chrono::{DateTime, Utc}; +use chrono::Utc; use log::{debug, warn}; use std::sync::Arc; use tokio::sync::mpsc; @@ -266,27 +266,6 @@ where Ok(()) } - fn has_records_beyond_end_ts(&self, records: &[RecordResult]) -> bool { - match self.get_config().to_datetime { - Some(end_ts) if !records.is_empty() => { - let epoch = Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap(); - - let find_most_recent_ts = |records: &[RecordResult]| -> DateTime { - records.iter().fold(epoch, |current_ts, record| { - let record_ts = Utc.timestamp_nanos(record.datetime.as_nanos() as i64); - - std::cmp::max(current_ts, record_ts) - }) - }; - - let most_recent_ts = find_most_recent_ts(records); - - most_recent_ts >= end_ts - } - _ => true, - } - } - fn records_before_end_ts(&self, records: Vec) -> Vec { match self.get_config().to_datetime { Some(end_ts) if !records.is_empty() => records diff --git a/src/kinesis/models.rs b/src/kinesis/models.rs index 53549d2..dac6625 100644 --- a/src/kinesis/models.rs +++ b/src/kinesis/models.rs @@ -115,7 +115,5 @@ pub trait ShardProcessor: Send + Sync { tx_shard_iterator_progress: Sender, ) -> Result<()>; - fn has_records_beyond_end_ts(&self, records: &[RecordResult]) -> bool; - fn records_before_end_ts(&self, records: Vec) -> Vec; } diff --git a/src/kinesis/tests.rs b/src/kinesis/tests.rs index 4d891e4..a423e69 100644 --- a/src/kinesis/tests.rs +++ b/src/kinesis/tests.rs @@ -3,7 +3,6 @@ use std::sync::{Arc, Mutex}; use anyhow::Result; use async_trait::async_trait; -use aws_sdk_kinesis::config::Region; use aws_sdk_kinesis::operation::get_records::GetRecordsOutput; use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput; use aws_sdk_kinesis::operation::list_shards::ListShardsOutput; @@ -32,7 +31,6 @@ async fn seed_shards_test() { mpsc::channel::(1); let client = TestKinesisClient { - region: Some(Region::new("us-east-1")), done: Arc::new(Mutex::new(false)), }; @@ -101,7 +99,6 @@ async fn produced_record_is_processed() { let (tx_ticker_updates, mut rx_ticker_updates) = mpsc::channel::(10); let client = TestKinesisClient { - region: Some(Region::new("us-east-1")), done: Arc::new(Mutex::new(false)), }; @@ -149,7 +146,6 @@ async fn beyond_to_timestamp_is_received() { let (tx_ticker_updates, mut rx_ticker_updates) = mpsc::channel::(10); let client = TestKinesisClient { - region: Some(Region::new("us-east-1")), done: Arc::new(Mutex::new(false)), }; @@ -191,7 +187,6 @@ async fn has_records_beyond_end_ts_when_has_end_ts() { let (tx_ticker_updates, _) = mpsc::channel::(10); let client = TestKinesisClient { - region: Some(Region::new("us-east-1")), done: Arc::new(Mutex::new(false)), }; @@ -211,7 +206,7 @@ async fn has_records_beyond_end_ts_when_has_end_ts() { }; let records = vec![]; - assert!(processor.has_records_beyond_end_ts(&records)); + assert_eq!(processor.records_before_end_ts(records).len(), 0); let record1 = RecordResult { shard_id: Arc::new("shard_id".to_string()), @@ -253,7 +248,6 @@ async fn has_records_beyond_end_ts_when_no_end_ts() { let (tx_ticker_updates, _) = mpsc::channel::(10); let client = TestKinesisClient { - region: Some(Region::new("us-east-1")), done: Arc::new(Mutex::new(false)), }; @@ -300,7 +294,6 @@ async fn handle_iterator_refresh_ok() { }; let client = TestKinesisClient { - region: Some(Region::new("us-east-1")), done: Arc::new(Mutex::new(false)), }; @@ -348,7 +341,6 @@ fn wait_secs_ok() { #[derive(Clone, Debug)] pub struct TestKinesisClient { - region: Option, done: Arc>, } @@ -422,10 +414,6 @@ impl StreamClient for TestKinesisClient { .shard_iterator("shard_iterator_latest".to_string()) .build()) } - - fn get_region(&self) -> Option<&Region> { - self.region.as_ref() - } } #[derive(Clone, Debug)] @@ -475,8 +463,4 @@ impl StreamClient for TestTimestampInFutureKinesisClient { ) -> Result { unimplemented!() } - - fn get_region(&self) -> Option<&Region> { - unimplemented!() - } } diff --git a/src/sink.rs b/src/sink.rs index a01c675..a342d0a 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -132,8 +132,6 @@ where rx_records: Receiver>, ) -> Result<()>; - fn handle_termination(&self, tx_records: Sender>); - fn delimiter(&self, handle: &mut BufWriter) -> Result<(), Error>; fn format_nb_messages(&self, messages_processed: u32) -> String { @@ -178,8 +176,6 @@ where let mut total_records_processed = 0; let mut active_shards_count = self.shard_count(); - self.handle_termination(tx_records.clone()); - while let Some(res) = rx_records.recv().await { match res { Ok(adt) => match adt { @@ -269,20 +265,6 @@ where self.run_inner(tx_records, rx_records, output).await } - fn handle_termination(&self, tx_records: Sender>) { - // Note: the exit_after_termination check is to help - // with tests where only one handler can be registered. - if self.get_config().exit_after_termination { - ctrlc_async::set_async_handler(async move { - tx_records - .send(Ok(ShardProcessorADT::Termination)) - .await - .unwrap(); - }) - .expect("Error setting Ctrl-C handler"); - } - } - fn delimiter(&self, handle: &mut BufWriter) -> Result<()> { if self.get_config().print_delimiter { writeln!(