Skip to content

Commit

Permalink
Hudi fixes and exclusions for databricks (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf authored Nov 14, 2020
1 parent a10f6d5 commit 70dcc86
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
10 changes: 10 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@
<groupId>com.logicalclocks</groupId>
<artifactId>deequ</artifactId>
<version>${deequ.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
10 changes: 6 additions & 4 deletions java/src/main/java/com/logicalclocks/hsfs/engine/HudiEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,13 @@ private Map<String, String> setupHudiReadOpts(Long startTimestamp, Long endTimes
Map<String, String> readOptions) {
Map<String, String> hudiArgs = new HashMap<String, String>();

String hudiCommitStartTime = timeStampToHudiFormat(startTimestamp);
String hudiCommitEndTime = timeStampToHudiFormat(endTimestamp);
if (startTimestamp != null) {
hudiArgs.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, timeStampToHudiFormat(startTimestamp));
} else {
hudiArgs.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, timeStampToHudiFormat(0L));
}

hudiArgs.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, hudiCommitStartTime);
hudiArgs.put(HUDI_END_INSTANTTIME_OPT_KEY, hudiCommitEndTime);
hudiArgs.put(HUDI_END_INSTANTTIME_OPT_KEY, timeStampToHudiFormat(endTimestamp));
hudiArgs.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL);

// Overwrite with user provided options if any
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/core/hudi_feature_group_alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def __init__(
self,
feature_group,
alias,
left_feature_group_start_timestamp,
left_feature_group_end_timestamp,
left_feature_group_start_timestamp=0,
):
self._feature_group = feature_group_module.FeatureGroup.from_response_json(
feature_group
Expand Down

0 comments on commit 70dcc86

Please sign in to comment.