Skip to content

Commit

Permalink
Set a UUID when building a Schema object. (#32399)
Browse files Browse the repository at this point in the history
* Set a UUID when building a Schema object.

Schemas are immutable so this meets the guarantee that a consistent UUID is used for matching schemas.

Cleanup some cases setting a random UUID after creating Schema.
Fix case where same UUID was assigned to different Schema after sorting.
Use Immutable data structures to enforce immutability.
Update OneOfType which using serialized proto equality which was incorrect if there was uuid, or encoding positions. Instead we can use a null Row using the generated oneof schema.
  • Loading branch information
scwhittle authored Sep 24, 2024
1 parent c503ec4 commit a79e710
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -90,7 +91,12 @@ public String toString() {
}
}
// A mapping between field names an indices.
private final BiMap<String, Integer> fieldIndices = HashBiMap.create();
private final BiMap<String, Integer> fieldIndices;

// Encoding positions can be used to maintain encoded byte compatibility between schemas with
// different field ordering or with added/removed fields. Such positions affect the encoding
// and decoding of Rows performed by RowCoderGenerator. They are stored within Schemas to
// facilitate plumbing to coders, display data etc but do not affect schema equality / uuid etc.
private Map<String, Integer> encodingPositions = Maps.newHashMap();
private boolean encodingPositionsOverridden = false;

Expand Down Expand Up @@ -312,17 +318,20 @@ public Schema(List<Field> fields) {
}

public Schema(List<Field> fields, Options options) {
this.fields = fields;
this.fields = ImmutableList.copyOf(fields);
int index = 0;
for (Field field : fields) {
BiMap<String, Integer> fieldIndicesMutable = HashBiMap.create();
for (Field field : this.fields) {
Preconditions.checkArgument(
fieldIndices.get(field.getName()) == null,
fieldIndicesMutable.get(field.getName()) == null,
"Duplicate field " + field.getName() + " added to schema");
encodingPositions.put(field.getName(), index);
fieldIndices.put(field.getName(), index++);
fieldIndicesMutable.put(field.getName(), index++);
}
this.hashCode = Objects.hash(fieldIndices, fields);
this.fieldIndices = ImmutableBiMap.copyOf(fieldIndicesMutable);
this.options = options;
this.hashCode = Objects.hash(this.fieldIndices, this.fields, this.options);
this.uuid = UUID.randomUUID();
}

public static Schema of(Field... fields) {
Expand All @@ -334,29 +343,24 @@ public static Schema of(Field... fields) {
* fields.
*/
public Schema sorted() {
// Create a new schema and copy over the appropriate Schema object attributes:
// {fields, uuid, options}
// Note: encoding positions are not copied over because generally they should align with the
// ordering of field indices. Otherwise, problems may occur when encoding/decoding Rows of
// this schema.
Schema sortedSchema =
this.fields.stream()
.sorted(Comparator.comparing(Field::getName))
.map(
field -> {
FieldType innerType = field.getType();
if (innerType.getRowSchema() != null) {
Schema innerSortedSchema = innerType.getRowSchema().sorted();
innerType = innerType.toBuilder().setRowSchema(innerSortedSchema).build();
return field.toBuilder().setType(innerType).build();
}
return field;
})
.collect(Schema.toSchema())
.withOptions(getOptions());
sortedSchema.setUUID(getUUID());

return sortedSchema;
// Create a new schema and copy over the appropriate Schema object attributes: {fields, options}
// Note: uuid is not copied as the Schema field ordering is changed. encoding positions are not
// copied over because generally they should align with the ordering of field indices.
// Otherwise, problems may occur when encoding/decoding Rows of this schema.
return this.fields.stream()
.sorted(Comparator.comparing(Field::getName))
.map(
field -> {
FieldType innerType = field.getType();
if (innerType.getRowSchema() != null) {
Schema innerSortedSchema = innerType.getRowSchema().sorted();
innerType = innerType.toBuilder().setRowSchema(innerSortedSchema).build();
return field.toBuilder().setType(innerType).build();
}
return field;
})
.collect(Schema.toSchema())
.withOptions(getOptions());
}

/** Returns a copy of the Schema with the options set. */
Expand Down Expand Up @@ -405,11 +409,14 @@ public boolean equals(@Nullable Object o) {
return false;
}
Schema other = (Schema) o;
// If both schemas have a UUID set, we can simply compare the UUIDs.
if (uuid != null && other.uuid != null) {
if (Objects.equals(uuid, other.uuid)) {
return true;
}
// If both schemas have a UUID set, we can short-circuit deep comparison if the
// UUIDs are equal.
if (uuid != null && other.uuid != null && Objects.equals(uuid, other.uuid)) {
return true;
}
// Utilize hash-code pre-calculation for cheap negative comparison.
if (this.hashCode != other.hashCode) {
return false;
}
return Objects.equals(fieldIndices, other.fieldIndices)
&& Objects.equals(getFields(), other.getFields())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,62 +115,80 @@ private static String getLogicalTypeUrn(String identifier) {
.build();

public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) {
String uuid = schema.getUUID() != null ? schema.getUUID().toString() : "";
return schemaToProto(schema, serializeLogicalType, true);
}

public static SchemaApi.Schema schemaToProto(
Schema schema, boolean serializeLogicalType, boolean serializeUUID) {
String uuid = schema.getUUID() != null && serializeUUID ? schema.getUUID().toString() : "";
SchemaApi.Schema.Builder builder = SchemaApi.Schema.newBuilder().setId(uuid);
for (Field field : schema.getFields()) {
SchemaApi.Field protoField =
fieldToProto(
field,
schema.indexOf(field.getName()),
schema.getEncodingPositions().get(field.getName()),
serializeLogicalType);
serializeLogicalType,
serializeUUID);
builder.addFields(protoField);
}
builder.addAllOptions(optionsToProto(schema.getOptions()));
return builder.build();
}

private static SchemaApi.Field fieldToProto(
Field field, int fieldId, int position, boolean serializeLogicalType) {
Field field, int fieldId, int position, boolean serializeLogicalType, boolean serializeUUID) {
return SchemaApi.Field.newBuilder()
.setName(field.getName())
.setDescription(field.getDescription())
.setType(fieldTypeToProto(field.getType(), serializeLogicalType))
.setType(fieldTypeToProto(field.getType(), serializeLogicalType, serializeUUID))
.setId(fieldId)
.setEncodingPosition(position)
.addAllOptions(optionsToProto(field.getOptions()))
.build();
}

@VisibleForTesting
static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean serializeLogicalType) {
static SchemaApi.FieldType fieldTypeToProto(
FieldType fieldType, boolean serializeLogicalType, boolean serializeUUID) {
SchemaApi.FieldType.Builder builder = SchemaApi.FieldType.newBuilder();
switch (fieldType.getTypeName()) {
case ROW:
builder.setRowType(
SchemaApi.RowType.newBuilder()
.setSchema(schemaToProto(fieldType.getRowSchema(), serializeLogicalType)));
.setSchema(
schemaToProto(fieldType.getRowSchema(), serializeLogicalType, serializeUUID)));
break;

case ARRAY:
builder.setArrayType(
SchemaApi.ArrayType.newBuilder()
.setElementType(
fieldTypeToProto(fieldType.getCollectionElementType(), serializeLogicalType)));
fieldTypeToProto(
fieldType.getCollectionElementType(),
serializeLogicalType,
serializeUUID)));
break;

case ITERABLE:
builder.setIterableType(
SchemaApi.IterableType.newBuilder()
.setElementType(
fieldTypeToProto(fieldType.getCollectionElementType(), serializeLogicalType)));
fieldTypeToProto(
fieldType.getCollectionElementType(),
serializeLogicalType,
serializeUUID)));
break;

case MAP:
builder.setMapType(
SchemaApi.MapType.newBuilder()
.setKeyType(fieldTypeToProto(fieldType.getMapKeyType(), serializeLogicalType))
.setValueType(fieldTypeToProto(fieldType.getMapValueType(), serializeLogicalType))
.setKeyType(
fieldTypeToProto(
fieldType.getMapKeyType(), serializeLogicalType, serializeUUID))
.setValueType(
fieldTypeToProto(
fieldType.getMapValueType(), serializeLogicalType, serializeUUID))
.build());
break;

Expand All @@ -186,12 +204,14 @@ static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean seriali
.setUrn(logicalType.getIdentifier())
.setPayload(ByteString.copyFrom(((UnknownLogicalType) logicalType).getPayload()))
.setRepresentation(
fieldTypeToProto(logicalType.getBaseType(), serializeLogicalType));
fieldTypeToProto(
logicalType.getBaseType(), serializeLogicalType, serializeUUID));

if (logicalType.getArgumentType() != null) {
logicalTypeBuilder
.setArgumentType(
fieldTypeToProto(logicalType.getArgumentType(), serializeLogicalType))
fieldTypeToProto(
logicalType.getArgumentType(), serializeLogicalType, serializeUUID))
.setArgument(
fieldValueToProto(logicalType.getArgumentType(), logicalType.getArgument()));
}
Expand All @@ -200,13 +220,15 @@ static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean seriali
logicalTypeBuilder =
SchemaApi.LogicalType.newBuilder()
.setRepresentation(
fieldTypeToProto(logicalType.getBaseType(), serializeLogicalType))
fieldTypeToProto(
logicalType.getBaseType(), serializeLogicalType, serializeUUID))
.setUrn(urn);
if (logicalType.getArgumentType() != null) {
logicalTypeBuilder =
logicalTypeBuilder
.setArgumentType(
fieldTypeToProto(logicalType.getArgumentType(), serializeLogicalType))
fieldTypeToProto(
logicalType.getArgumentType(), serializeLogicalType, serializeUUID))
.setArgument(
fieldValueToProto(
logicalType.getArgumentType(), logicalType.getArgument()));
Expand All @@ -226,7 +248,8 @@ static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean seriali
builder.setLogicalType(
SchemaApi.LogicalType.newBuilder()
.setUrn(URN_BEAM_LOGICAL_MILLIS_INSTANT)
.setRepresentation(fieldTypeToProto(FieldType.INT64, serializeLogicalType))
.setRepresentation(
fieldTypeToProto(FieldType.INT64, serializeLogicalType, serializeUUID))
.build());
break;
case DECIMAL:
Expand All @@ -235,7 +258,8 @@ static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean seriali
builder.setLogicalType(
SchemaApi.LogicalType.newBuilder()
.setUrn(URN_BEAM_LOGICAL_DECIMAL)
.setRepresentation(fieldTypeToProto(FieldType.BYTES, serializeLogicalType))
.setRepresentation(
fieldTypeToProto(FieldType.BYTES, serializeLogicalType, serializeUUID))
.build());
break;
case BYTE:
Expand Down Expand Up @@ -288,14 +312,14 @@ public static Schema schemaFromProto(SchemaApi.Schema protoSchema) {
Schema schema = builder.build();

Preconditions.checkState(encodingLocationMap.size() == schema.getFieldCount());
long dinstictEncodingPositions = encodingLocationMap.values().stream().distinct().count();
Preconditions.checkState(dinstictEncodingPositions <= schema.getFieldCount());
if (dinstictEncodingPositions < schema.getFieldCount() && schema.getFieldCount() > 0) {
long distinctEncodingPositions = encodingLocationMap.values().stream().distinct().count();
Preconditions.checkState(distinctEncodingPositions <= schema.getFieldCount());
if (distinctEncodingPositions < schema.getFieldCount() && schema.getFieldCount() > 0) {
// This means that encoding positions were not specified in the proto. Generally, we don't
// expect this to happen,
// but if it does happen, we expect none to be specified - in which case the should all be
// zero.
Preconditions.checkState(dinstictEncodingPositions == 1);
Preconditions.checkState(distinctEncodingPositions == 1);
} else if (protoSchema.getEncodingPositionsSet()) {
schema.setEncodingPositions(encodingLocationMap);
}
Expand Down Expand Up @@ -771,7 +795,8 @@ private static List<SchemaApi.Option> optionsToProto(Schema.Options options) {
protoOptions.add(
SchemaApi.Option.newBuilder()
.setName(name)
.setType(fieldTypeToProto(Objects.requireNonNull(options.getType(name)), false))
.setType(
fieldTypeToProto(Objects.requireNonNull(options.getType(name)), false, false))
.setValue(
fieldValueToProto(
Objects.requireNonNull(options.getType(name)), options.getValue(name)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ private OneOfType(List<Field> fields, @Nullable Map<String, Integer> enumMap) {
enumerationType = EnumerationType.create(enumValues);
}
oneOfSchema = Schema.builder().addFields(nullableFields).build();
schemaProtoRepresentation = SchemaTranslation.schemaToProto(oneOfSchema, false).toByteArray();
schemaProtoRepresentation =
SchemaTranslation.schemaToProto(oneOfSchema, false, false).toByteArray();
}

/** Create an {@link OneOfType} logical type. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public void toAndFromProto() throws Exception {
public static class FromProtoToProtoTest {
@Parameters(name = "{index}: {0}")
public static Iterable<SchemaApi.Schema> data() {
ImmutableList.Builder<SchemaApi.Schema> listBuilder = ImmutableList.builder();
SchemaApi.Schema.Builder builder = SchemaApi.Schema.newBuilder();
// A go 'int'
builder.addFields(
Expand All @@ -232,6 +233,9 @@ public static Iterable<SchemaApi.Schema> data() {
.setId(0)
.setEncodingPosition(0)
.build());
SchemaApi.Schema singleFieldSchema = builder.build();
listBuilder.add(singleFieldSchema);

// A pickled python object
builder.addFields(
SchemaApi.Field.newBuilder()
Expand Down Expand Up @@ -294,21 +298,51 @@ public static Iterable<SchemaApi.Schema> data() {
.setId(2)
.setEncodingPosition(2)
.build());
SchemaApi.Schema unknownLogicalTypeSchema = builder.build();
SchemaApi.Schema multipleFieldSchema = builder.build();
listBuilder.add(multipleFieldSchema);

return ImmutableList.<SchemaApi.Schema>builder().add(unknownLogicalTypeSchema).build();
builder.clear();
builder.addFields(
SchemaApi.Field.newBuilder()
.setName("nested")
.setType(
SchemaApi.FieldType.newBuilder()
.setRowType(
SchemaApi.RowType.newBuilder().setSchema(singleFieldSchema).build())
.build())
.build());
SchemaApi.Schema nestedSchema = builder.build();
listBuilder.add(nestedSchema);

return listBuilder.build();
}

@Parameter(0)
public SchemaApi.Schema schemaProto;

private void clearIds(SchemaApi.Schema.Builder builder) {
builder.clearId();
for (SchemaApi.Field.Builder field : builder.getFieldsBuilderList()) {
if (field.hasType()
&& field.getType().hasRowType()
&& field.getType().getRowType().hasSchema()) {
clearIds(field.getTypeBuilder().getRowTypeBuilder().getSchemaBuilder());
}
}
}

@Test
public void fromProtoAndToProto() throws Exception {
Schema decodedSchema = SchemaTranslation.schemaFromProto(schemaProto);

SchemaApi.Schema reencodedSchemaProto = SchemaTranslation.schemaToProto(decodedSchema, true);
SchemaApi.Schema.Builder builder = reencodedSchemaProto.toBuilder();
clearIds(builder);
assertThat(builder.build(), equalTo(schemaProto));

assertThat(reencodedSchemaProto, equalTo(schemaProto));
SchemaApi.Schema reencodedSchemaProtoWithoutUUID =
SchemaTranslation.schemaToProto(decodedSchema, true, false);
assertThat(reencodedSchemaProtoWithoutUUID, equalTo(schemaProto));
}
}

Expand Down Expand Up @@ -432,8 +466,8 @@ public static Iterable<Schema.FieldType> data() {
public Schema.FieldType fieldType;

@Test
public void testLogicalTypeSerializeDeserilizeCorrectly() {
SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, true);
public void testLogicalTypeSerializeDeserializeCorrectly() {
SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, true, false);
Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto);

assertThat(
Expand All @@ -451,7 +485,7 @@ public void testLogicalTypeSerializeDeserilizeCorrectly() {

@Test
public void testLogicalTypeFromToProtoCorrectly() {
SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, false);
SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, false, false);
Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto);

if (STANDARD_LOGICAL_TYPES.containsKey(translated.getLogicalType().getIdentifier())) {
Expand Down
Loading

0 comments on commit a79e710

Please sign in to comment.