Skip to content

Commit

Permalink
feat: Graceful shutdown options (#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Mar 23, 2022
1 parent 7da5dd1 commit a5c3f0f
Show file tree
Hide file tree
Showing 22 changed files with 171 additions and 95 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ authors = ["Santiago Carmuega <santiago@carmuega.me>"]


[dependencies]
pallas = "0.7.0"
pallas = "0.8.0-alpha.0"
# pallas = { path = "../pallas/pallas" }
hex = "0.4.3"
net2 = "0.2.37"
Expand Down
1 change: 1 addition & 0 deletions src/bin/oura/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
since: None,
intersect,
retry_policy: None,
finalize: None,
}),
PeerMode::AsClient => DumpSource::N2C(N2CConfig {
address: AddressArg(bearer, socket),
Expand Down
1 change: 1 addition & 0 deletions src/bin/oura/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
since: None,
intersect,
retry_policy: None,
finalize: None,
}),
PeerMode::AsClient => WatchSource::N2C(N2CConfig {
address: AddressArg(bearer, socket),
Expand Down
28 changes: 14 additions & 14 deletions src/filters/fingerprint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,22 +213,22 @@ impl FilterProvider for Config {

let seed = self.seed.unwrap_or(0);

let handle = thread::spawn(move || loop {
let mut msg = input.recv().expect("error receiving message");

let fingerprint = build_fingerprint(&msg, seed);

match fingerprint {
Ok(value) => {
debug!("computed fingerprint {}", value);
msg.fingerprint = Some(value);
}
Err(err) => {
warn!("failed to compute fingerprint: {}, event: {:?}", err, msg);
let handle = thread::spawn(move || {
for mut msg in input.iter() {
let fingerprint = build_fingerprint(&msg, seed);

match fingerprint {
Ok(value) => {
debug!("computed fingerprint {}", value);
msg.fingerprint = Some(value);
}
Err(err) => {
warn!("failed to compute fingerprint: {}, event: {:?}", err, msg);
}
}
}

output_tx.send(msg).expect("error sending filter message");
output_tx.send(msg).expect("error sending filter message");
}
});

Ok((handle, output_rx))
Expand Down
7 changes: 4 additions & 3 deletions src/filters/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ impl FilterProvider for Config {
fn bootstrap(&self, input: StageReceiver) -> PartialBootstrapResult {
let (output_tx, output_rx) = new_inter_stage_channel(None);

let handle = thread::spawn(move || loop {
let msg = input.recv().expect("error receiving message");
output_tx.send(msg).expect("error sending filter message");
let handle = thread::spawn(move || {
for msg in input.iter() {
output_tx.send(msg).expect("error sending filter message");
}
});

Ok((handle, output_rx))
Expand Down
9 changes: 5 additions & 4 deletions src/filters/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ impl FilterProvider for Config {

let check = self.check.clone();

let handle = thread::spawn(move || loop {
let event = input.recv().expect("error receiving message");
if check.event_matches(&event) {
output_tx.send(event).expect("error sending filter message");
let handle = thread::spawn(move || {
for event in input.iter() {
if check.event_matches(&event) {
output_tx.send(event).expect("error sending filter message");
}
}
});

Expand Down
2 changes: 2 additions & 0 deletions src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ pub enum EventData {
block_slot: u64,
block_hash: String,
},
// // flow-control event to end the pipeline
// Finalize,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/assert/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ pub fn assertion_loop(
) -> Result<(), Error> {
let mut state = State::default();

loop {
let event = input.recv()?;

for event in input.iter() {
// notify pipeline about the progress
utils.track_sink_progress(&event);

Expand All @@ -86,4 +84,6 @@ pub fn assertion_loop(
run_check!(&config, &state, tx_records_matches_block_count);
run_check!(&config, &state, tx_has_input_and_output);
}

Ok(())
}
8 changes: 4 additions & 4 deletions src/sinks/aws_lambda/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ pub fn writer_loop(
.enable_io()
.build()?;

loop {
let event = input.recv().unwrap();

for event in input.iter() {
// notify the pipeline where we are
utils.track_sink_progress(&event);

Expand All @@ -48,7 +46,9 @@ pub fn writer_loop(

if let Err(err) = result {
log::error!("unrecoverable error invoking lambda funcion: {:?}", err);
break Err(err);
return Err(err);
}
}

Ok(())
}
8 changes: 4 additions & 4 deletions src/sinks/aws_sqs/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ pub fn writer_loop(
.enable_io()
.build()?;

loop {
let event = input.recv().unwrap();

for event in input.iter() {
// notify the pipeline where we are
utils.track_sink_progress(&event);

Expand All @@ -60,7 +58,9 @@ pub fn writer_loop(

if let Err(err) = result {
log::error!("unrecoverable error sending message to SQS: {:?}", err);
break Err(err);
return Err(err);
}
}

Ok(())
}
19 changes: 10 additions & 9 deletions src/sinks/elastic/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,25 @@ pub fn writer_loop(
.enable_io()
.build()?;

loop {
let event = input.recv().unwrap();

for event in input.iter() {
// notify the pipeline where we are
utils.track_sink_progress(&event);

let index = index.to_owned();
let client = client.clone();

rt.block_on(async move {
let result = match idempotency {
let result = rt.block_on(async move {
match idempotency {
true => index_event_with_id(client, &index, event).await,
false => index_event_without_id(client, &index, event).await,
};

if let Err(err) = result {
warn!("error indexing record in Elasticsearch: {}", err);
}
});

if let Err(err) = result {
log::error!("error indexing record in Elasticsearch: {}", err);
return Err(err);
}
}

Ok(())
}
6 changes: 3 additions & 3 deletions src/sinks/kafka/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ pub fn producer_loop(
partitioning: PartitionStrategy,
utils: Arc<Utils>,
) -> Result<(), Error> {
loop {
let event = input.recv()?;

for event in input.iter() {
// notify the pipeline where we are
utils.track_sink_progress(&event);

Expand All @@ -43,4 +41,6 @@ pub fn producer_loop(

debug!("pushed event to kafka: {:?}", &event);
}

Ok(())
}
6 changes: 3 additions & 3 deletions src/sinks/logs/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ pub fn jsonl_writer_loop(
output: &mut impl Write,
utils: Arc<Utils>,
) -> Result<(), Error> {
loop {
let evt = input.recv()?;

for evt in input.iter() {
// notify pipeline about the progress
utils.track_sink_progress(&evt);

let buf = json!(evt).to_string();
output.write_all(buf.as_bytes())?;
output.write_all(b"\n")?;
}

Ok(())
}
6 changes: 3 additions & 3 deletions src/sinks/stdout/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ pub fn jsonl_writer_loop(
output: &mut impl Write,
utils: Arc<Utils>,
) -> Result<(), Error> {
loop {
let evt = input.recv()?;

for evt in input.iter() {
// notify pipeline about the progress
utils.track_sink_progress(&evt);

let buf = json!(evt).to_string();
output.write_all(buf.as_bytes())?;
output.write_all(b"\n")?;
}

Ok(())
}
5 changes: 3 additions & 2 deletions src/sinks/terminal/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ pub fn reducer_loop(
"Oura terminal output started, waiting for chain data\n".with(Color::DarkGrey),
))?;

loop {
for evt in input.iter() {
let (width, _) = crossterm::terminal::size()?;
let evt = input.recv()?;

// notify progress to the pipeline
utils.track_sink_progress(&evt);
Expand All @@ -37,4 +36,6 @@ pub fn reducer_loop(
let line = LogLine::new(evt, width as usize);
stdout.execute(Print(line))?;
}

Ok(())
}
6 changes: 3 additions & 3 deletions src/sinks/webhook/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ pub(crate) fn request_loop(
backoff_delay: Duration,
utils: Arc<Utils>,
) -> Result<(), Error> {
loop {
let event = input.recv().unwrap();

for event in input.iter() {
// notify progress to the pipeline
utils.track_sink_progress(&event);

let body = RequestBody::from(event);

execute_fallible_request(client, url, &body, error_policy, max_retries, backoff_delay)?;
}

Ok(())
}
Loading

0 comments on commit a5c3f0f

Please sign in to comment.