diff --git a/build.rs b/build.rs index 9009d7c314c91..d5b53ef4702db 100644 --- a/build.rs +++ b/build.rs @@ -271,4 +271,3 @@ fn main() { // Emit the aforementioned stanzas. tracker.emit_rerun_stanzas(); } - diff --git a/src/sinks/gcp/bigquery/config.rs b/src/sinks/gcp/bigquery/config.rs index 1b38e82e53a55..a49affd1f1397 100644 --- a/src/sinks/gcp/bigquery/config.rs +++ b/src/sinks/gcp/bigquery/config.rs @@ -1,8 +1,8 @@ -use vector_lib::codecs::encoding::ProtobufSerializerConfig; use futures::FutureExt; use http::Uri; use indoc::indoc; use tonic::transport::Channel; +use vector_lib::codecs::encoding::ProtobufSerializerConfig; use vector_lib::configurable::configurable_component; use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto; diff --git a/src/sinks/gcp/bigquery/request_builder.rs b/src/sinks/gcp/bigquery/request_builder.rs index 350d82046fede..880f02761e7a5 100644 --- a/src/sinks/gcp/bigquery/request_builder.rs +++ b/src/sinks/gcp/bigquery/request_builder.rs @@ -1,10 +1,10 @@ use bytes::BytesMut; -use vector_lib::codecs::encoding::ProtobufSerializer; use prost::Message; use std::num::NonZeroUsize; use tokio_util::codec::Encoder; -use vector_lib::request_metadata::RequestMetadata; +use vector_lib::codecs::encoding::ProtobufSerializer; use vector_lib::event::Finalizable; +use vector_lib::request_metadata::RequestMetadata; use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto; use super::service::BigqueryRequest; @@ -48,7 +48,9 @@ impl BigqueryRequestBuilder { ) -> (NonZeroUsize, proto::append_rows_request::ProtoData) { let proto_data = proto::append_rows_request::ProtoData { writer_schema: Some(proto::ProtoSchema { - proto_descriptor: Some(self.protobuf_serializer.descriptor_proto().clone()), + proto_descriptor: Some(translate_descriptor_proto( + self.protobuf_serializer.descriptor_proto().clone(), + )), }), rows: Some(proto::ProtoRows { serialized_rows }), }; @@ -136,6 +138,160 @@ impl IncrementalRequestBuilder> for BigqueryRequestBuilder { } } +/// Convert from `prost_reflect::prost_types::DescriptorProto` to `prost_types::DescriptorProto` +/// +/// Someone upgraded `prost_reflect` without upgrading the other prost crates, +/// so the `prost_types` version used by `prost_reflect` is newer than the version used by vector. +/// +/// This function discards any `UninterpretedOption`s. +/// +/// "Why don't you just upgrade `prost_types` to match the version used by `prost_reflect`? +/// Ha. Hahaha. Hahahahahahaha. My branches are littered with the corpses of such attempts. +fn translate_descriptor_proto( + old_descriptor: prost_reflect::prost_types::DescriptorProto, +) -> prost_types::DescriptorProto { + prost_types::DescriptorProto { + name: old_descriptor.name, + field: old_descriptor + .field + .into_iter() + .map(|field| prost_types::FieldDescriptorProto { + name: field.name, + number: field.number, + label: field.label, + r#type: field.r#type, + type_name: field.type_name, + extendee: field.extendee, + default_value: field.default_value, + oneof_index: field.oneof_index, + json_name: field.json_name, + options: field.options.map(|options| prost_types::FieldOptions { + ctype: options.ctype, + packed: options.packed, + jstype: options.jstype, + lazy: options.lazy, + deprecated: options.deprecated, + weak: options.weak, + uninterpreted_option: Default::default(), + }), + proto3_optional: field.proto3_optional, + }) + .collect(), + extension: old_descriptor + .extension + .into_iter() + .map(|field| prost_types::FieldDescriptorProto { + name: field.name, + number: field.number, + label: field.label, + r#type: field.r#type, + type_name: field.type_name, + extendee: field.extendee, + default_value: field.default_value, + oneof_index: field.oneof_index, + json_name: field.json_name, + options: field.options.map(|options| prost_types::FieldOptions { + ctype: options.ctype, + packed: options.packed, + jstype: options.jstype, + lazy: options.lazy, + deprecated: options.deprecated, + weak: options.weak, + uninterpreted_option: Default::default(), + }), + proto3_optional: field.proto3_optional, + }) + .collect(), + nested_type: old_descriptor + .nested_type + .into_iter() + .map(translate_descriptor_proto) + .collect(), + enum_type: old_descriptor + .enum_type + .into_iter() + .map(|enum_descriptor| prost_types::EnumDescriptorProto { + name: enum_descriptor.name, + value: enum_descriptor + .value + .into_iter() + .map(|value| prost_types::EnumValueDescriptorProto { + name: value.name, + number: value.number, + options: value.options.map(|options| prost_types::EnumValueOptions { + deprecated: options.deprecated, + uninterpreted_option: Default::default(), + }), + }) + .collect(), + options: enum_descriptor + .options + .map(|options| prost_types::EnumOptions { + allow_alias: options.allow_alias, + deprecated: options.deprecated, + uninterpreted_option: Default::default(), + }), + reserved_range: enum_descriptor + .reserved_range + .into_iter() + .map( + |reserved_range| prost_types::enum_descriptor_proto::EnumReservedRange { + start: reserved_range.start, + end: reserved_range.end, + }, + ) + .collect(), + reserved_name: enum_descriptor.reserved_name, + }) + .collect(), + extension_range: old_descriptor + .extension_range + .into_iter() + .map( + |extension_range| prost_types::descriptor_proto::ExtensionRange { + start: extension_range.start, + end: extension_range.end, + options: extension_range + .options + .map(|_| prost_types::ExtensionRangeOptions { + uninterpreted_option: Default::default(), + }), + }, + ) + .collect(), + oneof_decl: old_descriptor + .oneof_decl + .into_iter() + .map(|oneof| prost_types::OneofDescriptorProto { + name: oneof.name, + options: oneof.options.map(|_| prost_types::OneofOptions { + uninterpreted_option: Default::default(), + }), + }) + .collect(), + options: old_descriptor + .options + .map(|options| prost_types::MessageOptions { + message_set_wire_format: options.message_set_wire_format, + no_standard_descriptor_accessor: options.no_standard_descriptor_accessor, + deprecated: options.deprecated, + map_entry: options.map_entry, + uninterpreted_option: Default::default(), + }), + reserved_range: old_descriptor + .reserved_range + .into_iter() + .map( + |reserved_range| prost_types::descriptor_proto::ReservedRange { + start: reserved_range.start, + end: reserved_range.end, + }, + ) + .collect(), + reserved_name: old_descriptor.reserved_name, + } +} + #[cfg(test)] mod test { use bytes::{BufMut, Bytes, BytesMut}; diff --git a/src/sinks/gcp/bigquery/service.rs b/src/sinks/gcp/bigquery/service.rs index 62fccbdb2f51c..148f4140ad2d6 100644 --- a/src/sinks/gcp/bigquery/service.rs +++ b/src/sinks/gcp/bigquery/service.rs @@ -7,8 +7,8 @@ use tonic::service::Interceptor; use tonic::transport::Channel; use tonic::{Request, Status}; use tower::Service; -use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_lib::event::EventStatus; +use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; use vector_lib::stream::DriverResponse; use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto; @@ -81,7 +81,9 @@ impl DriverResponse for BigqueryResponse { // these errors can't be retried because the event payload is almost definitely bad Ok(super::proto::third_party::google::rpc::Code::InvalidArgument) | Ok(super::proto::third_party::google::rpc::Code::NotFound) - | Ok(super::proto::third_party::google::rpc::Code::AlreadyExists) => EventStatus::Rejected, + | Ok(super::proto::third_party::google::rpc::Code::AlreadyExists) => { + EventStatus::Rejected + } // everything else can probably be retried _ => EventStatus::Errored, }