Skip to content

Commit

Permalink
Clean up code
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Aug 11, 2023
1 parent aa33d1e commit cf1965b
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 122 deletions.
26 changes: 13 additions & 13 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 17 additions & 17 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,29 +85,29 @@ impl SourceConfig {
pub fn source_type(&self) -> &str {
match self.source_params {
SourceParams::File(_) => "file",
SourceParams::Kafka(_) => "kafka",
SourceParams::GcpPubSub(_) => "gcp_pubsub",
SourceParams::Kinesis(_) => "kinesis",
SourceParams::Vec(_) => "vec",
SourceParams::Void(_) => "void",
SourceParams::IngestApi => "ingest-api",
SourceParams::IngestCli => "ingest-cli",
SourceParams::Kafka(_) => "kafka",
SourceParams::Kinesis(_) => "kinesis",
SourceParams::Pulsar(_) => "pulsar",
SourceParams::Vec(_) => "vec",
SourceParams::Void(_) => "void",
}
}

// TODO: Remove after source factory refactor.
pub fn params(&self) -> JsonValue {
match &self.source_params {
SourceParams::File(params) => serde_json::to_value(params),
SourceParams::Kafka(params) => serde_json::to_value(params),
SourceParams::GcpPubSub(params) => serde_json::to_value(params),
SourceParams::Kinesis(params) => serde_json::to_value(params),
SourceParams::Vec(params) => serde_json::to_value(params),
SourceParams::Void(params) => serde_json::to_value(params),
SourceParams::IngestApi => serde_json::to_value(()),
SourceParams::IngestCli => serde_json::to_value(()),
SourceParams::Kafka(params) => serde_json::to_value(params),
SourceParams::Kinesis(params) => serde_json::to_value(params),
SourceParams::Pulsar(params) => serde_json::to_value(params),
SourceParams::Vec(params) => serde_json::to_value(params),
SourceParams::Void(params) => serde_json::to_value(params),
}
.unwrap()
}
Expand Down Expand Up @@ -203,16 +203,16 @@ impl FromStr for SourceInputFormat {
#[serde(tag = "source_type", content = "params", rename_all = "snake_case")]
pub enum SourceParams {
File(FileSourceParams),
Kafka(KafkaSourceParams),
GcpPubSub(GcpPubSubSourceParams),
Kinesis(KinesisSourceParams),
Pulsar(PulsarSourceParams),
Vec(VecSourceParams),
Void(VoidSourceParams),
#[serde(rename = "ingest-api")]
IngestApi,
#[serde(rename = "ingest-cli")]
IngestCli,
Kafka(KafkaSourceParams),
Kinesis(KinesisSourceParams),
Pulsar(PulsarSourceParams),
Vec(VecSourceParams),
Void(VoidSourceParams),
}

impl SourceParams {
Expand Down Expand Up @@ -293,12 +293,12 @@ pub struct GcpPubSubSourceParams {
#[serde(default)]
#[serde(skip_serializing_if = "is_false")]
pub enable_backfill_mode: bool,
/// GCP service account credentials (None will use default via GOOGLE_APPLICATION_CREDENTIALS)
/// GCP service account credentials (`None` will use default via
/// GOOGLE_APPLICATION_CREDENTIALS)
pub credentials: Option<String>,
/// How many threads spread pubsub pull requests over (default 10)
/// Higher values can mean higher throughput but higher overhead
/// Number of pull requests issued in parallel by the source (default 1)
pub pull_parallelism: Option<u64>,
/// The max messages to pull per pull request (default 1000)
/// Maximum number of messages returned by a pull request (default 1,000)
pub max_messages_per_pull: Option<i32>,
}

Expand Down
13 changes: 6 additions & 7 deletions quickwit/quickwit-config/src/source_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,17 @@ impl SourceConfigForSerialization {
SourceParams::Kafka(_) | SourceParams::Kinesis(_) | SourceParams::Pulsar(_) => {
// TODO consider any validation opportunity
}
SourceParams::Vec(_)
| SourceParams::Void(_)
SourceParams::GcpPubSub(_)
| SourceParams::IngestApi
| SourceParams::IngestCli => {}
SourceParams::GcpPubSub(_) => {}
| SourceParams::IngestCli
| SourceParams::Vec(_)
| SourceParams::Void(_) => {}
}
match &self.source_params {
SourceParams::Kafka(_) => {}
SourceParams::GcpPubSub(_) => {}
SourceParams::GcpPubSub(_) | SourceParams::Kafka(_) => {}
_ => {
if self.desired_num_pipelines > 1 || self.max_num_pipelines_per_indexer > 1 {
bail!("Quickwit currently supports multiple pipelines only for Kafka sources. Open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types.");
bail!("Quickwit currently supports multiple pipelines only for GCP PubSub or Kafka sources. Open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types.");
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ fail = { workspace = true }
flume = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
google-cloud-default = { workspace = true, optional = true }
google-cloud-googleapis = { workspace = true, optional = true }
google-cloud-pubsub = { workspace = true, optional = true }
itertools = { workspace = true }
libz-sys = { workspace = true, optional = true }
once_cell = { workspace = true }
Expand All @@ -46,9 +49,6 @@ ulid = { workspace = true }
utoipa = { workspace = true }
vrl = { workspace = true, optional = true }
vrl-stdlib = { workspace = true, optional = true }
google-cloud-pubsub = { workspace = true, optional = true }
google-cloud-default = { workspace = true, optional = true }
google-cloud-googleapis = { workspace = true, optional = true }

quickwit-actors = { workspace = true }
quickwit-aws = { workspace = true }
Expand All @@ -63,7 +63,7 @@ quickwit-proto = { workspace = true }
quickwit-storage = { workspace = true }

[features]
gcp_pubsub = ["dep:google-cloud-pubsub", "dep:google-cloud-default", "dep:google-cloud-googleapis"]
gcp-pubsub = ["dep:google-cloud-pubsub", "dep:google-cloud-default", "dep:google-cloud-googleapis"]
kafka = ["rdkafka", "backoff"]
kafka-broker-tests = []
vendored-kafka = ["kafka", "libz-sys/static", "openssl/vendored", "rdkafka/gssapi-vendored"]
Expand Down
Loading

0 comments on commit cf1965b

Please sign in to comment.