Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(pulsar sink): Refactor to use StreamSink #14345

Merged
merged 58 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
08688d9
enhancement(pulsar sink): Refactor to use StreamSink
addisonj Sep 8, 2022
4839a04
Update src/internal_events/pulsar.rs
addisonj Sep 12, 2022
44d156f
Update src/sinks/pulsar/config.rs
addisonj Sep 12, 2022
548fdce
Update src/sinks/pulsar/config.rs
addisonj Sep 12, 2022
f26685d
Update src/sinks/pulsar/config.rs
addisonj Sep 12, 2022
dfcac00
Update src/sinks/pulsar/config.rs
addisonj Sep 12, 2022
63a7965
Update src/sinks/pulsar/config.rs
addisonj Sep 12, 2022
fb244fa
cargo fmt
fuchsnj Sep 13, 2022
32bca39
Merge branch 'master' into pulsar_sink_improve
Nov 18, 2022
573b9ee
Update src/sinks/pulsar/config.rs
addisonj Nov 18, 2022
0514d8f
WIP: Refactor to use new abstractions, not compiling
Nov 18, 2022
e20a60a
fix all errors and pass tests
Nov 20, 2022
20d6160
update cargo.lock
Nov 20, 2022
918fb9a
Merge branch 'master' into pulsar_sink_improve
Nov 20, 2022
38af926
address pr feedback
Nov 22, 2022
a4e6d11
Merge branch 'master' into pulsar_sink_improve
Nov 22, 2022
638c1fc
fix fmt
Nov 23, 2022
5ab5d08
Merge branch 'master' into pulsar_sink_improve
Nov 23, 2022
69e60de
update cargo.lock
neuronull Nov 23, 2022
289cf6b
update cargo.lock
neuronull Nov 29, 2022
4f0de94
Merge branch 'pulsar_sink_improve' of github.com:addisonj/vector into…
neuronull Nov 29, 2022
c151b06
generate-component-docs
neuronull Nov 29, 2022
a472df3
Merge branch 'master' into pulsar_sink_improve
fuchsnj Nov 30, 2022
1e6238e
Merge branch 'master' into pulsar_sink_improve
neuronull Nov 30, 2022
af47c41
changes to adapt to pulsar crate 5.0.0
neuronull Dec 9, 2022
6b182e6
remove unused module
neuronull Dec 9, 2022
4463a79
restore Cargo.lock
neuronull Dec 13, 2022
89773f1
regenerate cargo.lock
neuronull Dec 14, 2022
ea1f903
Merge branch 'master' into pulsar_sink_improve
neuronull Mar 15, 2023
500217b
start to work out compilation errors
neuronull Mar 15, 2023
f4ad1f9
feedback bg
neuronull Mar 15, 2023
1dc2b7b
more edits toward convergence
neuronull Mar 15, 2023
a5b2c5e
fixing things
neuronull Mar 17, 2023
f366fd6
found more things
neuronull Mar 17, 2023
1f33668
add protocol
neuronull Mar 17, 2023
b6b6e36
clippy
neuronull Mar 17, 2023
85c1e2d
gen docs
neuronull Mar 17, 2023
e4aedce
spell check
neuronull Mar 17, 2023
77a9c08
feedback sg
neuronull Mar 20, 2023
e05f8ff
feedback sg
neuronull Mar 21, 2023
19f9487
feedback sg- simplify the producer in the Service
neuronull Mar 21, 2023
5da6321
auto gen docs
neuronull Mar 21, 2023
ea1e133
Revert "feedback sg- simplify the producer in the Service"
neuronull Mar 22, 2023
f0eaa9a
multi producer magic?
neuronull Mar 22, 2023
ec48acb
two too many
neuronull Mar 23, 2023
3de6bfd
make topic a Template in config
neuronull Mar 23, 2023
bad9281
batch settings one line
neuronull Mar 23, 2023
b0841b5
batch comment
neuronull Mar 23, 2023
da06b92
add retry logic to client builder
neuronull Mar 23, 2023
5288029
use optionaltargetpath
neuronull Mar 23, 2023
6d4368b
fix int tests
neuronull Mar 23, 2023
91ed921
Merge branch 'master' into pulsar_sink_improve
neuronull Mar 23, 2023
906f6a1
autogen docs
neuronull Mar 23, 2023
c3ed18b
reduce scope of the lock being held
neuronull Apr 10, 2023
0d9815f
Merge branch 'master' into pulsar_sink_improve
neuronull Apr 10, 2023
9e85fe9
remove unecessary clone
neuronull Apr 10, 2023
12111d2
check lock readiness in poll_ready()
neuronull Apr 10, 2023
f1a45e8
add comments about the mutex
neuronull Apr 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
352 changes: 167 additions & 185 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ ordered-float = { version = "3.4.0", default-features = false }
percent-encoding = { version = "2.2.0", default-features = false }
pin-project = { version = "1.0.12", default-features = false }
postgres-openssl = { version = "0.5.0", default-features = false, features = ["runtime"], optional = true }
pulsar = { version = "5.0.0", default-features = false, features = ["tokio-runtime", "auth-oauth2"], optional = true }
pulsar = { version = "5.0.0", default-features = false, features = ["tokio-runtime", "auth-oauth2", "lz4", "flate2", "zstd", "snap"], optional = true }
rand = { version = "0.8.5", default-features = false, features = ["small_rng"] }
rand_distr = { version = "0.4.3", default-features = false }
rdkafka = { version = "0.29.0", default-features = false, features = ["tokio", "libz", "ssl", "zstd"], optional = true }
Expand Down Expand Up @@ -680,7 +680,7 @@ sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-papertrail = ["dep:syslog"]
sinks-prometheus = ["aws-core", "dep:base64", "dep:prometheus-parser", "dep:snap", "dep:serde_with"]
sinks-pulsar = ["dep:avro-rs", "dep:pulsar"]
sinks-pulsar = ["dep:avro-rs", "dep:pulsar", "dep:lru"]
sinks-redis = ["dep:redis"]
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
sinks-socket = ["sinks-utils-udp"]
Expand Down
23 changes: 23 additions & 0 deletions src/internal_events/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,26 @@ impl InternalEvent for PulsarSendingError {
});
}
}

pub struct PulsarPropertyExtractionError<'a> {
pub property_field: &'a str,
}

impl InternalEvent for PulsarPropertyExtractionError<'_> {
fn emit(self) {
error!(
message = "Failed to extract properties. Value should be a map of String -> Bytes.",
error_code = "extracting_property",
error_type = error_type::PARSER_FAILED,
stage = error_stage::RECEIVING,
neuronull marked this conversation as resolved.
Show resolved Hide resolved
property_field = self.property_field,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total", 1,
"error_code" => "extracing_property",
"error_type" => error_type::PARSER_FAILED,
"stage" => error_stage::PROCESSING,
);
}
}
2 changes: 1 addition & 1 deletion src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ pub enum Sinks {

/// Apache Pulsar.
#[cfg(feature = "sinks-pulsar")]
Pulsar(#[configurable(derived)] pulsar::PulsarSinkConfig),
Pulsar(#[configurable(derived)] pulsar::config::PulsarSinkConfig),

/// Redis.
#[cfg(feature = "sinks-redis")]
Expand Down
Loading