Skip to content

Commit

Permalink
remove duplicate BatchBuilder in kafka source
Browse files Browse the repository at this point in the history
  • Loading branch information
AyWa committed Aug 11, 2023
1 parent 0690dc2 commit aa33d1e
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 45 deletions.
9 changes: 4 additions & 5 deletions quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use google_cloud_default::WithAuthExt;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_pubsub::subscription::Subscription;
use quickwit_actors::{ActorContext, ActorExitStatus, Mailbox};
use quickwit_common::rand::append_random_suffix;
use quickwit_config::GcpPubSubSourceParams;
use quickwit_metastore::checkpoint::{PartitionId, Position, SourceCheckpoint};
use serde_json::Value as JsonValue;
Expand Down Expand Up @@ -116,8 +117,8 @@ impl GcpPubSubSource {
state: GcpPubSubSourceState::default(),
subscription_source,
backfill_mode_enabled,
// TODO: get the real value
partition_id: "<node_id>/<pipeline_ord>".to_string(),
// TODO: replace with "<node_id>/<pipeline_ord>"
partition_id: append_random_suffix("gcp_pubsub"),
// pull_parallelism,
max_messages_per_pull,
})
Expand Down Expand Up @@ -158,11 +159,9 @@ impl Source for GcpPubSubSource {
ctx.record_progress();
}

// TODO: need to wait for all the id to be ack for at_least_once
if self.should_exit() {
info!(subscription = %self.subscription, "Reached end of subscription.");
ctx.ask(doc_processor_mailbox, batch.build_force())
.await
.context("Failed to force commit last batch!")?;
ctx.send_exit_with_success(doc_processor_mailbox).await?;
return Err(ActorExitStatus::Success);
}
Expand Down
40 changes: 6 additions & 34 deletions quickwit/quickwit-indexing/src/source/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ use itertools::Itertools;
use oneshot;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_config::KafkaSourceParams;
use quickwit_metastore::checkpoint::{
PartitionId, Position, SourceCheckpoint, SourceCheckpointDelta,
};
use quickwit_metastore::checkpoint::{PartitionId, Position, SourceCheckpoint};
use quickwit_proto::IndexUid;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::{
Expand All @@ -48,8 +46,10 @@ use tokio::time;
use tracing::{debug, info, warn};

use crate::actors::DocProcessor;
use crate::models::{NewPublishLock, PublishLock, RawDocBatch};
use crate::source::{Source, SourceContext, SourceExecutionContext, TypedSourceFactory};
use crate::models::{NewPublishLock, PublishLock};
use crate::source::{
BatchBuilder, Source, SourceContext, SourceExecutionContext, TypedSourceFactory,
};

/// Number of bytes after which we cut a new batch.
///
Expand Down Expand Up @@ -446,34 +446,6 @@ impl KafkaSource {
}
}

#[derive(Debug, Default)]
struct BatchBuilder {
docs: Vec<Bytes>,
num_bytes: u64,
checkpoint_delta: SourceCheckpointDelta,
}

impl BatchBuilder {
fn build(self) -> RawDocBatch {
RawDocBatch {
docs: self.docs,
checkpoint_delta: self.checkpoint_delta,
force_commit: false,
}
}

fn clear(&mut self) {
self.docs.clear();
self.num_bytes = 0;
self.checkpoint_delta = SourceCheckpointDelta::default();
}

fn push(&mut self, doc: Bytes, num_bytes: u64) {
self.docs.push(doc);
self.num_bytes += num_bytes;
}
}

#[async_trait]
impl Source for KafkaSource {
async fn initialize(
Expand Down Expand Up @@ -782,7 +754,7 @@ mod kafka_broker_tests {

use super::*;
use crate::new_split_id;
use crate::source::{quickwit_supported_sources, SourceActor};
use crate::source::{quickwit_supported_sources, RawDocBatch, SourceActor};

fn create_admin_client() -> anyhow::Result<AdminClient<DefaultClientContext>> {
let admin_client = ClientConfig::new()
Expand Down
11 changes: 5 additions & 6 deletions quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,11 @@ impl BatchBuilder {
force_commit: false,
}
}
pub fn build_force(self) -> RawDocBatch {
RawDocBatch {
docs: self.docs,
checkpoint_delta: self.checkpoint_delta,
force_commit: true,
}

pub fn clear(&mut self) {
self.docs.clear();
self.num_bytes = 0;
self.checkpoint_delta = SourceCheckpointDelta::default();
}

pub fn push(&mut self, doc: Bytes, num_bytes: u64) {
Expand Down

0 comments on commit aa33d1e

Please sign in to comment.