-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Implement NaN counts in ORC #1790
Conversation
* exceptions when they are accessed. | ||
*/ | ||
public class ParquetFieldMetrics extends FieldMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the reason for having ParquetFieldMetrics
extending FieldMetrics
is to allow ORC and parquet to diverge if needed. If we are moving them back to a common class, why not just move everything back to FieldMetrics
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually to allow Parquet and ORC to diverge from Avro, so we will use the actual FiledMetrics
in avro
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit confused, in that case why not have 3 separated classes ParquetFieldMetrics
, ORCFieldMetrics
and AvroFieldMetrics
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's also an alternative approach, I wasn't sure if I want to directly duplicate the code in ParquetFieldMetrics
to create a ORC version though since I don't think parquet/ORC libraries will support NaN natively soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a need for 3 classes when 2 of them would be nearly identical. I like what is in this PR, with the one note about naming.
} | ||
|
||
return fieldMetrics | ||
.filter(metrics -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are having this util class, can we decompose this function into methods like metricsColumnName(FieldMetrics, Schema)
and metricsMode(FieldMetrics, MetricsConfig)
? They might be useful in other classes, and also make the lambda chain cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll break this down
private MetricsUtil() { | ||
} | ||
|
||
public static Map<Integer, Long> getNanValueCounts( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems like iceberg prefers method names without get
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was aware of that, but couldn't come up with a good name without get
in this case (things like generateCount
/createCount
is longer and sounds like just for workaround the word "get"), and later assume it was fine since it's not an ordinary getter... Do you have a recommendation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with get
in this case is that it isn't clear where the return value is coming from. I think that create
is a better option because it is clear that the return value is built from the input arguments.
An example of where the value may come from somewhere else is IPUtil.getHostName(String iface)
. The input value is used, but the actual return value would come from an external source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I'll update in both here and #1829
@@ -109,6 +112,10 @@ private WriteBuilder() { | |||
iPrimitive, primitive)); | |||
} | |||
} | |||
|
|||
private int getFieldId(TypeDescription typeDescription) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the private method feels redundant since the body is also just one line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally think it's easier to read by segregating the abstract logic of what's doing here from the actual underlying implementation so I created the helper; do you feel strong about this? I'll see if other people have the same comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @jackye1995. I probably wouldn't separate this out into its own method.
@@ -129,5 +139,9 @@ private WriteBuilder() { | |||
"Invalid iceberg type %s corresponding to Flink logical type %s", iPrimitive, flinkPrimitive)); | |||
} | |||
} | |||
|
|||
private int getFieldId(TypeDescription typeDescription) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:same comment as before, do we need this private method for the 1 line call?
@@ -46,8 +50,8 @@ private FlinkOrcWriter(RowType rowType, Schema iSchema) { | |||
} | |||
} | |||
|
|||
public static OrcRowWriter<RowData> buildWriter(RowType rowType, Schema iSchema) { | |||
return new FlinkOrcWriter(rowType, iSchema); | |||
public static OrcRowWriter<RowData> buildWriter(RowType rowType, Schema iSchema, TypeDescription schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it necesSary to pass in the type description? can we get id from the iceberg schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I tried to avoid changing the signature but wasn't able to find id information from schema, I'll see if people more familiar with the project would have comment on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field IDs are kept in NestedField
, not primitives. That's probably why you didn't find one that was usable in the primitive
method. What we do in other visitors is add methods to the visitor that are called before and after visiting a struct field, array element, map key, and map value. Those methods are passed the field. Then the visitor just needs to implement the before/after to maintain a stack of field IDs.
Here's a visit
method with the callbacks: https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/types/TypeUtil.java#L334
And here's an example of using them to get the field IDs: https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java#L75-L83
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the info! Yeah earlier I noticed that the ids exist in NestedField
but wasn't able to find out a good way to extract without larger changes to the signature, and tried to replace StructType
/NestedType
with NestedField
but that would result in losing other information. I'll update to use the before/after pattern.
@@ -115,12 +120,30 @@ public short getShort(int ordinal) { | |||
|
|||
@Override | |||
public int getInt(int ordinal) { | |||
return struct.get(ordinal, Integer.class); | |||
Object integer = struct.get(ordinal, Object.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this issue about handling date and timestamp, can it be a separated PR? This PR is already very big.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed for TestMergingMetrics
to work for ORC (also mentioned in pr description) so I think it makes more sense to have this in here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, should we publish a separated PR for this? My biggest concern is that people might have different opinions about the way to handle these data types. When embedded as a part of a big PR, it gets less attention for people who are interested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a similar change in this class in an earlier pr, and I think this change is specific to the test itself so I'm bit reluctant to move it out of the current context, as without this context this change will be confusing to understand:
- currently spark supports dates and time the way this class already handles, as spark represents dates/time with internal structures of long/int ref1 ref2, so the new code introduced by this won't affect the behavior of this class, as Spark will continue to write and get these types with the same internal numeric types. And this class is only used for loading metadata tables so performance with the extra if-else check probably is not a big concern too.
- However in
TestMergingMetrics
itself, since data are created by random record generator which uses the actualLocalDate
to populate the fields, and we are wrapping the generated records with this class for writing, we have to handle them specially here. But I don't think this usage pattern (of writing spark rows by wrapping them with iceberg record) will be used in production since the spark engine will use the actual sparkInternalRow
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess since I'll send out a separate PR anyway I'll include change in this class as part of that PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is okay with me.
This touches a lot of files. Can we separate the refactoring out into a separate PR and focus on just ORC implementations here? |
Sure, I'll send out a PR that only contains refactoring, and once that's merged I'll update this to depend on that one, unless you want it the other way around? |
* @param id field id being tracked by the writer | ||
* @param nanValueCount number of NaN values, will only be non-0 for double or float field. | ||
*/ | ||
public ParquetFieldMetrics(int id, | ||
public NaNOnlyFieldMetrics(int id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably change this to NaNFieldMetrics
instead because it is likely that we will be adding lower/upper bounds to this in the near future. That would avoid another rename, but it's up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I'll update in both here and #1829
@@ -17,51 +17,50 @@ | |||
* under the License. | |||
*/ | |||
|
|||
package org.apache.iceberg.parquet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be in API or could it be in core
instead?
If the class is never returned to users, then I would keep it in core. The classes in api
are primarily those that users would interact with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I'll move both this and FieldMetrics
to core in both here and #1829
@@ -77,12 +80,12 @@ private GenericOrcWriters() { | |||
return LongWriter.INSTANCE; | |||
} | |||
|
|||
public static OrcValueWriter<Float> floats() { | |||
return FloatWriter.INSTANCE; | |||
public static OrcValueWriter<Float> floats(Integer id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The id is required, right? If so, then I think it could be int
instead. It's an int
in the constructor that gets called.
This could add a precondition to check that the id is non-null, but I think it would be better to do that before calling this method because the caller would probably know the field name rather than just ID. Using the field name would produce a better error message.
|
||
abstract class FlinkSchemaVisitor<T> { | ||
|
||
static <T> T visit(RowType flinkType, Schema schema, FlinkSchemaVisitor<T> visitor) { | ||
return visit(flinkType, schema.asStruct(), visitor); | ||
static <T> T visit(RowType flinkType, Schema schema, TypeDescription typeDesc, FlinkSchemaVisitor<T> visitor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be cleaner to add the beforeField
/afterField
callbacks instead of the typeDesc
.
@@ -114,6 +120,7 @@ private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescripti | |||
columnSizes, | |||
valueCounts, | |||
nullCounts, | |||
Maps.newHashMap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use null when metrics are missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I missed updating this during refactoring. Will update!
* Returns a stream of {@link FieldMetrics} that this OrcRowWriter keeps track of. | ||
* <p> | ||
* Since ORC keeps track of most metrics via column statistics, for now OrcRowWriter only keeps track of NaN | ||
* counters for double or float columns. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this paragraph needs to be here because it is a snapshot of how another component works. It could get stale really easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I'll remove similar instances in here, OrcValueWriter
and ParquetValueWriter
Looking close to ready, but I made a few comments. |
@@ -98,9 +106,9 @@ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescript | |||
case LONG: | |||
return SparkOrcValueWriters.longs(); | |||
case FLOAT: | |||
return SparkOrcValueWriters.floats(); | |||
return SparkOrcValueWriters.floats(getFieldId(primitive)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: getFieldId
is not used anywhere else, why not just use ORCSchemaUtil.fieldId
/** | ||
* Returns a stream of {@link FieldMetrics} that this OrcRowWriter keeps track of. | ||
*/ | ||
Stream<FieldMetrics> metrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why some method signatures of metrics
have default, but some others below do not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently value writers all have a default, and this is because we are only tracking this metrics for float and wrapper types, declaring a default will save other types from declaring empty stream. This row writer on the other hand handles row writing, and it will always need to read from value writers' metrics, and thus default wouldn't help much on this aspect. Although I guess we want to prevent breaking people's code if they implement their own version of row writer, that I'll add a default here to avoid that.
visitor.beforeListElement(elementField); | ||
try { | ||
element = visit(listType.getElementType(), iListType.elementType(), visitor); | ||
} finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error should be logged if we catch anything. same for the try finally block above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are not catching any exception here so we don't have anything to log? Did I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I was thinking about logging the exception once in the catch block, but it seems unnecessary.
Looks good now. Thanks for the latest update, I think it is a bit cleaner now that it doesn't pass the type description. |
TestMergingMetrics
didn't handleDate
/Timestamp
types as expected: they don't have matching type inStructInternalRow.get
and the logic default the value to null, and this didn't trigger issue earlier since in the test case they were declared as optional fields. ORC Spark writer handles null values differently and revealed this problem.Edit: To reduce the size of this PR I separated some of the changes to #1829, and will rebase this on top of #1829 once that is merged. This PR still contains some of the changes in #1829 to ensure the code can compile and tests can run.