Skip to content

Commit

Permalink
clippy fixes and minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
pront authored and Daniel599 committed Jul 27, 2023
1 parent a4cf12e commit a9ef83a
Showing 1 changed file with 38 additions and 30 deletions.
68 changes: 38 additions & 30 deletions lib/codecs/src/decoding/format/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct ProtobufDeserializerConfig {
impl ProtobufDeserializerConfig {
/// Build the `ProtobufDeserializer` from this configuration.
pub fn build(&self) -> ProtobufDeserializer {
Into::<ProtobufDeserializer>::into(self)
ProtobufDeserializer::try_from(self).unwrap()
}

/// Return the type of event build by this deserializer.
Expand Down Expand Up @@ -84,17 +84,17 @@ impl ProtobufDeserializer {
Self { message_descriptor }
}

pub fn get_message_descriptor(
fn get_message_descriptor(
desc_file: String,
message_type: String,
) -> vector_common::Result<MessageDescriptor> {
let b = fs::read(desc_file.clone())
.map_err(|e| format!("Failed to open protobuf desc file '{desc_file}': {e}"))?;
let pool = DescriptorPool::decode(b.as_slice())
.map_err(|e| format!("Failed to parse protobuf desc file '{desc_file}': {e}"))?;
Ok(pool.get_message_by_name(&message_type).expect(&format!(
"The message type '{message_type}' could not be found in '{desc_file}'"
)))
Ok(pool.get_message_by_name(&message_type).unwrap_or_else(|| {
panic!("The message type '{message_type}' could not be found in '{desc_file}'")
}))
}
}

Expand Down Expand Up @@ -129,14 +129,14 @@ impl Deserializer for ProtobufDeserializer {
}
}

impl From<&ProtobufDeserializerConfig> for ProtobufDeserializer {
fn from(config: &ProtobufDeserializerConfig) -> Self {
impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
type Error = vector_common::Error;
fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result<Self> {
let message_descriptor = ProtobufDeserializer::get_message_descriptor(
config.desc_file.clone(),
config.message_type.clone(),
)
.unwrap();
Self::new(message_descriptor)
)?;
Ok(Self::new(message_descriptor))
}
}

Expand All @@ -145,29 +145,31 @@ fn to_vrl(
field_descriptor: Option<&prost_reflect::FieldDescriptor>,
) -> vector_common::Result<vrl::value::Value> {
let vrl_value = match prost_reflect_value {
prost_reflect::Value::Bool(v) => vrl::value::Value::from(v.clone()),
prost_reflect::Value::I32(v) => vrl::value::Value::from(v.clone()),
prost_reflect::Value::I64(v) => vrl::value::Value::from(v.clone()),
prost_reflect::Value::U32(v) => vrl::value::Value::from(v.clone()),
prost_reflect::Value::U64(v) => vrl::value::Value::from(v.clone()),
prost_reflect::Value::Bool(v) => vrl::value::Value::from(*v),
prost_reflect::Value::I32(v) => vrl::value::Value::from(*v),
prost_reflect::Value::I64(v) => vrl::value::Value::from(*v),
prost_reflect::Value::U32(v) => vrl::value::Value::from(*v),
prost_reflect::Value::U64(v) => vrl::value::Value::from(*v),
prost_reflect::Value::F32(v) => vrl::value::Value::Float(
NotNan::new(f64::from(v.clone()))
.map_err(|_e| format!("Float number cannot be Nan"))?,
),
prost_reflect::Value::F64(v) => vrl::value::Value::Float(
NotNan::new(v.clone()).map_err(|_e| format!("F64 number cannot be Nan"))?,
NotNan::new(f64::from(*v)).map_err(|_e| "Float number cannot be Nan")?,
),
prost_reflect::Value::F64(v) => {
vrl::value::Value::Float(NotNan::new(*v).map_err(|_e| "F64 number cannot be Nan")?)
}
prost_reflect::Value::String(v) => vrl::value::Value::from(v.as_str()),
prost_reflect::Value::Bytes(v) => vrl::value::Value::from(v.clone()),
prost_reflect::Value::EnumNumber(v) => {
if let Some(field_descriptor) = field_descriptor {
let kind = field_descriptor.kind();
let enum_desc = kind
.as_enum()
.ok_or_else(|| format!("Internal error while parsing protobuf enum"))?;
let enum_desc = kind.as_enum().ok_or_else(|| {
format!(
"Internal error while parsing protobuf enum. Field descriptor: {:?}",
field_descriptor
)
})?;
vrl::value::Value::from(
enum_desc
.get_value(v.clone())
.get_value(*v)
.ok_or_else(|| {
format!("The number {} cannot be in '{}'", v, enum_desc.name())
})?
Expand All @@ -188,24 +190,30 @@ fn to_vrl(
}
prost_reflect::Value::List(v) => {
let vec = v
.into_iter()
.iter()
.map(|o| to_vrl(o, field_descriptor))
.collect::<Result<Vec<_>, vector_common::Error>>()?;
vrl::value::Value::from(vec)
}
prost_reflect::Value::Map(v) => {
if let Some(field_descriptor) = field_descriptor {
let kind = field_descriptor.kind();
let message_desc = kind
.as_message()
.ok_or_else(|| format!("Internal error while parsing protobuf message"))?;
let message_desc = kind.as_message().ok_or_else(|| {
format!(
"Internal error while parsing protobuf field descriptor: {:?}",
field_descriptor
)
})?;
vrl::value::Value::from(
v.into_iter()
v.iter()
.map(|kv| {
Ok((
kv.0.as_str()
.ok_or_else(|| {
format!("Internal error while parsing protobuf map")
format!(
"Internal error while parsing protobuf map. Field descriptor: {:?}",
field_descriptor
)
})?
.to_string(),
to_vrl(kv.1, Some(&message_desc.map_entry_value_field()))?,
Expand Down

0 comments on commit a9ef83a

Please sign in to comment.