Skip to content

Commit

Permalink
Removing handling of Ctrl+c (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
gr211 authored May 9, 2024
1 parent d409879 commit e90df9f
Show file tree
Hide file tree
Showing 9 changed files with 7 additions and 106 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
matrix:
rust:
- 1.77.0 # MSRV
- 1.78.0 # MSRV

steps:
- name: Checkout
Expand Down Expand Up @@ -73,7 +73,7 @@ jobs:
centos:
runs-on: ubuntu-latest
needs: [ci]
needs: [ ci ]

steps:
- name: System dependencies
Expand All @@ -98,7 +98,7 @@ jobs:
arch:
runs-on: ubuntu-latest
needs: [ci]
needs: [ ci ]

steps:
- name: Checkout
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
strategy:
matrix:
rust:
- 1.77.0 # MSRV
- 1.78.0 # MSRV

steps:
- name: Checkout
Expand Down
36 changes: 1 addition & 35 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ base64 = "0"
chrono = { version = "0", features = ["clock", "std"] }
clap = { version = "4", features = ["derive"] }
colored = "2"
ctrlc-async = "3"
env_logger = "0"
humantime = "2"
log = "0"
Expand Down
7 changes: 0 additions & 7 deletions src/aws.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub mod stream {
use anyhow::Result;
use async_trait::async_trait;
use aws_config::Region;
use aws_sdk_kinesis::operation::get_records::GetRecordsOutput;
use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput;
use aws_sdk_kinesis::operation::list_shards::ListShardsOutput;
Expand Down Expand Up @@ -38,8 +37,6 @@ pub mod stream {
shard_id: &str,
) -> Result<GetShardIteratorOutput>;

fn get_region(&self) -> Option<&Region>;

fn aws_datetime(timestamp: &chrono::DateTime<Utc>) -> DateTime {
DateTime::from_millis(timestamp.timestamp_millis())
}
Expand Down Expand Up @@ -143,10 +140,6 @@ pub mod client {
.map_err(SdkError::into_service_error)
.map_err(Into::into)
}

fn get_region(&self) -> Option<&Region> {
self.client.config().region()
}
}

pub async fn create_client(
Expand Down
23 changes: 1 addition & 22 deletions src/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use aws_sdk_kinesis::operation::get_records::GetRecordsError;
use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput;
use chrono::prelude::*;
use chrono::{DateTime, Utc};
use chrono::Utc;
use log::{debug, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -266,27 +266,6 @@ where
Ok(())
}

fn has_records_beyond_end_ts(&self, records: &[RecordResult]) -> bool {
match self.get_config().to_datetime {
Some(end_ts) if !records.is_empty() => {
let epoch = Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap();

let find_most_recent_ts = |records: &[RecordResult]| -> DateTime<Utc> {
records.iter().fold(epoch, |current_ts, record| {
let record_ts = Utc.timestamp_nanos(record.datetime.as_nanos() as i64);

std::cmp::max(current_ts, record_ts)
})
};

let most_recent_ts = find_most_recent_ts(records);

most_recent_ts >= end_ts
}
_ => true,
}
}

fn records_before_end_ts(&self, records: Vec<RecordResult>) -> Vec<RecordResult> {
match self.get_config().to_datetime {
Some(end_ts) if !records.is_empty() => records
Expand Down
2 changes: 0 additions & 2 deletions src/kinesis/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,5 @@ pub trait ShardProcessor<K: StreamClient>: Send + Sync {
tx_shard_iterator_progress: Sender<ShardIteratorProgress>,
) -> Result<()>;

fn has_records_beyond_end_ts(&self, records: &[RecordResult]) -> bool;

fn records_before_end_ts(&self, records: Vec<RecordResult>) -> Vec<RecordResult>;
}
18 changes: 1 addition & 17 deletions src/kinesis/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::sync::{Arc, Mutex};

use anyhow::Result;
use async_trait::async_trait;
use aws_sdk_kinesis::config::Region;
use aws_sdk_kinesis::operation::get_records::GetRecordsOutput;
use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput;
use aws_sdk_kinesis::operation::list_shards::ListShardsOutput;
Expand Down Expand Up @@ -32,7 +31,6 @@ async fn seed_shards_test() {
mpsc::channel::<ShardIteratorProgress>(1);

let client = TestKinesisClient {
region: Some(Region::new("us-east-1")),
done: Arc::new(Mutex::new(false)),
};

Expand Down Expand Up @@ -101,7 +99,6 @@ async fn produced_record_is_processed() {
let (tx_ticker_updates, mut rx_ticker_updates) = mpsc::channel::<TickerMessage>(10);

let client = TestKinesisClient {
region: Some(Region::new("us-east-1")),
done: Arc::new(Mutex::new(false)),
};

Expand Down Expand Up @@ -149,7 +146,6 @@ async fn beyond_to_timestamp_is_received() {
let (tx_ticker_updates, mut rx_ticker_updates) = mpsc::channel::<TickerMessage>(10);

let client = TestKinesisClient {
region: Some(Region::new("us-east-1")),
done: Arc::new(Mutex::new(false)),
};

Expand Down Expand Up @@ -191,7 +187,6 @@ async fn has_records_beyond_end_ts_when_has_end_ts() {
let (tx_ticker_updates, _) = mpsc::channel::<TickerMessage>(10);

let client = TestKinesisClient {
region: Some(Region::new("us-east-1")),
done: Arc::new(Mutex::new(false)),
};

Expand All @@ -211,7 +206,7 @@ async fn has_records_beyond_end_ts_when_has_end_ts() {
};

let records = vec![];
assert!(processor.has_records_beyond_end_ts(&records));
assert_eq!(processor.records_before_end_ts(records).len(), 0);

let record1 = RecordResult {
shard_id: Arc::new("shard_id".to_string()),
Expand Down Expand Up @@ -253,7 +248,6 @@ async fn has_records_beyond_end_ts_when_no_end_ts() {
let (tx_ticker_updates, _) = mpsc::channel::<TickerMessage>(10);

let client = TestKinesisClient {
region: Some(Region::new("us-east-1")),
done: Arc::new(Mutex::new(false)),
};

Expand Down Expand Up @@ -300,7 +294,6 @@ async fn handle_iterator_refresh_ok() {
};

let client = TestKinesisClient {
region: Some(Region::new("us-east-1")),
done: Arc::new(Mutex::new(false)),
};

Expand Down Expand Up @@ -348,7 +341,6 @@ fn wait_secs_ok() {

#[derive(Clone, Debug)]
pub struct TestKinesisClient {
region: Option<Region>,
done: Arc<Mutex<bool>>,
}

Expand Down Expand Up @@ -422,10 +414,6 @@ impl StreamClient for TestKinesisClient {
.shard_iterator("shard_iterator_latest".to_string())
.build())
}

fn get_region(&self) -> Option<&Region> {
self.region.as_ref()
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -475,8 +463,4 @@ impl StreamClient for TestTimestampInFutureKinesisClient {
) -> Result<GetShardIteratorOutput> {
unimplemented!()
}

fn get_region(&self) -> Option<&Region> {
unimplemented!()
}
}
18 changes: 0 additions & 18 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ where
rx_records: Receiver<Result<ShardProcessorADT, ProcessError>>,
) -> Result<()>;

fn handle_termination(&self, tx_records: Sender<Result<ShardProcessorADT, ProcessError>>);

fn delimiter(&self, handle: &mut BufWriter<W>) -> Result<(), Error>;

fn format_nb_messages(&self, messages_processed: u32) -> String {
Expand Down Expand Up @@ -178,8 +176,6 @@ where
let mut total_records_processed = 0;
let mut active_shards_count = self.shard_count();

self.handle_termination(tx_records.clone());

while let Some(res) = rx_records.recv().await {
match res {
Ok(adt) => match adt {
Expand Down Expand Up @@ -269,20 +265,6 @@ where
self.run_inner(tx_records, rx_records, output).await
}

fn handle_termination(&self, tx_records: Sender<Result<ShardProcessorADT, ProcessError>>) {
// Note: the exit_after_termination check is to help
// with tests where only one handler can be registered.
if self.get_config().exit_after_termination {
ctrlc_async::set_async_handler(async move {
tx_records
.send(Ok(ShardProcessorADT::Termination))
.await
.unwrap();
})
.expect("Error setting Ctrl-C handler");
}
}

fn delimiter(&self, handle: &mut BufWriter<W>) -> Result<()> {
if self.get_config().print_delimiter {
writeln!(
Expand Down

0 comments on commit e90df9f

Please sign in to comment.