Skip to content

Commit

Permalink
chore(dev): Add wrapper for file-source in vector-lib (vectordotd…
Browse files Browse the repository at this point in the history
  • Loading branch information
bruceg authored and AndrooTheChen committed Sep 23, 2024
1 parent 46bcecb commit 039053d
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ pin-project.workspace = true
# Internal libs
dnsmsg-parser = { path = "lib/dnsmsg-parser", optional = true }
fakedata = { path = "lib/fakedata", optional = true }
file-source = { path = "lib/file-source", optional = true }
lookup = { package = "vector-lookup", path = "lib/vector-lookup" }
portpicker = { path = "lib/portpicker" }
prometheus-parser = { path = "lib/prometheus-parser", optional = true }
Expand Down Expand Up @@ -525,7 +524,7 @@ sources-dnstap = ["dep:base64", "dep:trust-dns-proto", "dep:dnsmsg-parser", "pro
sources-docker_logs = ["docker"]
sources-eventstoredb_metrics = []
sources-exec = []
sources-file = ["dep:file-source"]
sources-file = ["vector-lib/file-source"]
sources-file-descriptor = ["tokio-util/io"]
sources-fluent = ["dep:base64", "sources-utils-net-tcp", "tokio-util/net", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
sources-gcp_pubsub = ["gcp", "dep:h2", "dep:prost-types", "protobuf-build", "dep:tonic"]
Expand All @@ -537,7 +536,7 @@ sources-internal_logs = []
sources-internal_metrics = []
sources-journald = []
sources-kafka = ["dep:rdkafka"]
sources-kubernetes_logs = ["dep:file-source", "kubernetes", "transforms-reduce"]
sources-kubernetes_logs = ["vector-lib/file-source", "kubernetes", "transforms-reduce"]
sources-logstash = ["sources-utils-net-tcp", "tokio-util/net"]
sources-mongodb_metrics = ["dep:mongodb"]
sources-nats = ["dep:async-nats", "dep:nkeys"]
Expand Down
2 changes: 2 additions & 0 deletions lib/vector-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ publish = false
[dependencies]
codecs = { path = "../codecs", default-features = false }
enrichment = { path = "../enrichment" }
file-source = { path = "../file-source", optional = true }
vector-buffers = { path = "../vector-buffers", default-features = false }
vector-common = { path = "../vector-common" }
vector-config = { path = "../vector-config" }
Expand All @@ -17,6 +18,7 @@ vector-stream = { path = "../vector-stream" }
[features]
api = ["vector-core/api"]
lua = ["vector-core/lua"]
file-source = ["dep:file-source"]
syslog = ["codecs/syslog"]
test = ["vector-core/test"]
vrl = ["vector-core/vrl"]
2 changes: 2 additions & 0 deletions lib/vector-lib/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub use codecs;
pub use enrichment;
#[cfg(feature = "file-source")]
pub use file_source;
pub use vector_buffers as buffers;
pub use vector_common::{
assert_event_data_eq, btreemap, byte_size_of, byte_size_of::ByteSizeOf, conversion,
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ impl<'a, P: std::fmt::Debug> InternalEvent for FileIoError<'a, P> {
mod source {
use std::{io::Error, path::Path, time::Duration};

use file_source::FileSourceInternalEvents;
use metrics::counter;
use vector_lib::file_source::FileSourceInternalEvents;

use super::{FileOpen, InternalEvent};
use crate::emit;
Expand Down
12 changes: 6 additions & 6 deletions src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,6 @@ use std::{convert::TryInto, future, path::PathBuf, time::Duration};

use bytes::Bytes;
use chrono::Utc;
use file_source::{
calculate_ignore_before,
paths_provider::glob::{Glob, MatchOptions},
Checkpointer, FileFingerprint, FileServer, FingerprintStrategy, Fingerprinter, Line, ReadFrom,
ReadFromConfig,
};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
use regex::bytes::Regex;
Expand All @@ -17,6 +11,12 @@ use tokio::{sync::oneshot, task::spawn_blocking};
use tracing::{Instrument, Span};
use vector_lib::codecs::{BytesDeserializer, BytesDeserializerConfig};
use vector_lib::configurable::configurable_component;
use vector_lib::file_source::{
calculate_ignore_before,
paths_provider::glob::{Glob, MatchOptions},
Checkpointer, FileFingerprint, FileServer, FingerprintStrategy, Fingerprinter, Line, ReadFrom,
ReadFromConfig,
};
use vector_lib::finalizer::OrderedFinalizer;
use vector_lib::{
config::{LegacyKey, LogNamespace},
Expand Down
2 changes: 1 addition & 1 deletion src/sources/kubernetes_logs/k8s_paths_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

use std::path::PathBuf;

use file_source::paths_provider::PathsProvider;
use k8s_openapi::api::core::v1::{Namespace, Pod};
use kube::runtime::reflector::{store::Store, ObjectRef};
use vector_lib::file_source::paths_provider::PathsProvider;

use super::path_helpers::build_pod_logs_directory;
use crate::kubernetes::pod_manager_logic::extract_static_pod_config_hashsum;
Expand Down
8 changes: 4 additions & 4 deletions src/sources/kubernetes_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ use std::{path::PathBuf, time::Duration};

use bytes::Bytes;
use chrono::Utc;
use file_source::{
calculate_ignore_before, Checkpointer, FileServer, FileServerShutdown, FingerprintStrategy,
Fingerprinter, Line, ReadFrom, ReadFromConfig,
};
use futures::{future::FutureExt, stream::StreamExt};
use futures_util::Stream;
use k8s_openapi::api::core::v1::{Namespace, Node, Pod};
Expand All @@ -27,6 +23,10 @@ use lookup::{lookup_v2::OptionalTargetPath, owned_value_path, path, OwnedTargetP
use serde_with::serde_as;
use vector_lib::codecs::{BytesDeserializer, BytesDeserializerConfig};
use vector_lib::configurable::configurable_component;
use vector_lib::file_source::{
calculate_ignore_before, Checkpointer, FileServer, FileServerShutdown, FingerprintStrategy,
Fingerprinter, Line, ReadFrom, ReadFromConfig,
};
use vector_lib::{config::LegacyKey, config::LogNamespace, EstimatedJsonEncodedSizeOf};
use vector_lib::{
internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
Expand Down
8 changes: 4 additions & 4 deletions src/sources/kubernetes_logs/util.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::{error::Error, future::Future, time::Duration};

use file_source::{
paths_provider::PathsProvider, Checkpointer, FileServer, FileServerShutdown,
FileSourceInternalEvents, Line,
};
use futures::{
future::{select, Either},
pin_mut, FutureExt, Sink,
};
use tokio::task::spawn_blocking;
use vector_lib::file_source::{
paths_provider::PathsProvider, Checkpointer, FileServer, FileServerShutdown,
FileSourceInternalEvents, Line,
};

/// A tiny wrapper around a [`FileServer`] that runs it as a [`spawn_blocking`]
/// task.
Expand Down

0 comments on commit 039053d

Please sign in to comment.