Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translate between different versions of prost-types #3

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,3 @@ fn main() {
// Emit the aforementioned stanzas.
tracker.emit_rerun_stanzas();
}

2 changes: 1 addition & 1 deletion src/sinks/gcp/bigquery/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use vector_lib::codecs::encoding::ProtobufSerializerConfig;
use futures::FutureExt;
use http::Uri;
use indoc::indoc;
use tonic::transport::Channel;
use vector_lib::codecs::encoding::ProtobufSerializerConfig;
use vector_lib::configurable::configurable_component;

use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto;
Expand Down
162 changes: 159 additions & 3 deletions src/sinks/gcp/bigquery/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use bytes::BytesMut;
use vector_lib::codecs::encoding::ProtobufSerializer;
use prost::Message;
use std::num::NonZeroUsize;
use tokio_util::codec::Encoder;
use vector_lib::request_metadata::RequestMetadata;
use vector_lib::codecs::encoding::ProtobufSerializer;
use vector_lib::event::Finalizable;
use vector_lib::request_metadata::RequestMetadata;

use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto;
use super::service::BigqueryRequest;
Expand Down Expand Up @@ -48,7 +48,9 @@
) -> (NonZeroUsize, proto::append_rows_request::ProtoData) {
let proto_data = proto::append_rows_request::ProtoData {
writer_schema: Some(proto::ProtoSchema {
proto_descriptor: Some(self.protobuf_serializer.descriptor_proto().clone()),
proto_descriptor: Some(translate_descriptor_proto(
self.protobuf_serializer.descriptor_proto().clone(),
)),
}),
rows: Some(proto::ProtoRows { serialized_rows }),
};
Expand Down Expand Up @@ -136,6 +138,160 @@
}
}

/// Convert from `prost_reflect::prost_types::DescriptorProto` to `prost_types::DescriptorProto`
///
/// Someone upgraded `prost_reflect` without upgrading the other prost crates,
/// so the `prost_types` version used by `prost_reflect` is newer than the version used by vector.
///
/// This function discards any `UninterpretedOption`s.
///
/// "Why don't you just upgrade `prost_types` to match the version used by `prost_reflect`?
/// Ha. Hahaha. Hahahahahahaha. My branches are littered with the corpses of such attempts.

Check failure

Code scanning / check-spelling

Unrecognized Spelling Error

Hahaha is not a recognized word. (unrecognized-spelling)

Check failure

Code scanning / check-spelling

Unrecognized Spelling Error

Hahahahahahaha is not a recognized word. (unrecognized-spelling)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My branches are littered with the corpses of such attempts.

im sorry

fn translate_descriptor_proto(
old_descriptor: prost_reflect::prost_types::DescriptorProto,
) -> prost_types::DescriptorProto {
prost_types::DescriptorProto {
name: old_descriptor.name,
field: old_descriptor
.field
.into_iter()
.map(|field| prost_types::FieldDescriptorProto {
name: field.name,
number: field.number,
label: field.label,
r#type: field.r#type,
type_name: field.type_name,
extendee: field.extendee,
default_value: field.default_value,
oneof_index: field.oneof_index,
json_name: field.json_name,
options: field.options.map(|options| prost_types::FieldOptions {
ctype: options.ctype,
packed: options.packed,
jstype: options.jstype,
lazy: options.lazy,
deprecated: options.deprecated,
weak: options.weak,
uninterpreted_option: Default::default(),
}),
proto3_optional: field.proto3_optional,
})
.collect(),
extension: old_descriptor
.extension
.into_iter()
.map(|field| prost_types::FieldDescriptorProto {
name: field.name,
number: field.number,
label: field.label,
r#type: field.r#type,
type_name: field.type_name,
extendee: field.extendee,
default_value: field.default_value,
oneof_index: field.oneof_index,
json_name: field.json_name,
options: field.options.map(|options| prost_types::FieldOptions {
ctype: options.ctype,
packed: options.packed,
jstype: options.jstype,
lazy: options.lazy,
deprecated: options.deprecated,
weak: options.weak,
uninterpreted_option: Default::default(),
}),
proto3_optional: field.proto3_optional,
})
.collect(),
nested_type: old_descriptor
.nested_type
.into_iter()
.map(translate_descriptor_proto)
.collect(),
enum_type: old_descriptor
.enum_type
.into_iter()
.map(|enum_descriptor| prost_types::EnumDescriptorProto {
name: enum_descriptor.name,
value: enum_descriptor
.value
.into_iter()
.map(|value| prost_types::EnumValueDescriptorProto {
name: value.name,
number: value.number,
options: value.options.map(|options| prost_types::EnumValueOptions {
deprecated: options.deprecated,
uninterpreted_option: Default::default(),
}),
})
.collect(),
options: enum_descriptor
.options
.map(|options| prost_types::EnumOptions {
allow_alias: options.allow_alias,
deprecated: options.deprecated,
uninterpreted_option: Default::default(),
}),
reserved_range: enum_descriptor
.reserved_range
.into_iter()
.map(
|reserved_range| prost_types::enum_descriptor_proto::EnumReservedRange {
start: reserved_range.start,
end: reserved_range.end,
},
)
.collect(),
reserved_name: enum_descriptor.reserved_name,
})
.collect(),
extension_range: old_descriptor
.extension_range
.into_iter()
.map(
|extension_range| prost_types::descriptor_proto::ExtensionRange {
start: extension_range.start,
end: extension_range.end,
options: extension_range
.options
.map(|_| prost_types::ExtensionRangeOptions {
uninterpreted_option: Default::default(),
}),
},
)
.collect(),
oneof_decl: old_descriptor
.oneof_decl
.into_iter()
.map(|oneof| prost_types::OneofDescriptorProto {
name: oneof.name,
options: oneof.options.map(|_| prost_types::OneofOptions {
uninterpreted_option: Default::default(),
}),
})
.collect(),
options: old_descriptor
.options
.map(|options| prost_types::MessageOptions {
message_set_wire_format: options.message_set_wire_format,
no_standard_descriptor_accessor: options.no_standard_descriptor_accessor,
deprecated: options.deprecated,
map_entry: options.map_entry,
uninterpreted_option: Default::default(),
}),
reserved_range: old_descriptor
.reserved_range
.into_iter()
.map(
|reserved_range| prost_types::descriptor_proto::ReservedRange {
start: reserved_range.start,
end: reserved_range.end,
},
)
.collect(),
reserved_name: old_descriptor.reserved_name,
}
}

#[cfg(test)]
mod test {
use bytes::{BufMut, Bytes, BytesMut};
Expand Down
6 changes: 4 additions & 2 deletions src/sinks/gcp/bigquery/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use tonic::service::Interceptor;
use tonic::transport::Channel;
use tonic::{Request, Status};
use tower::Service;
use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_lib::event::EventStatus;
use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_lib::stream::DriverResponse;

use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto;
Expand Down Expand Up @@ -81,7 +81,9 @@ impl DriverResponse for BigqueryResponse {
// these errors can't be retried because the event payload is almost definitely bad
Ok(super::proto::third_party::google::rpc::Code::InvalidArgument)
| Ok(super::proto::third_party::google::rpc::Code::NotFound)
| Ok(super::proto::third_party::google::rpc::Code::AlreadyExists) => EventStatus::Rejected,
| Ok(super::proto::third_party::google::rpc::Code::AlreadyExists) => {
EventStatus::Rejected
}
// everything else can probably be retried
_ => EventStatus::Errored,
}
Expand Down
Loading