Skip to content

Commit

Permalink
Support Create/Drop/Show/Refresh Index Syntax for Spark SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
huberylee committed Jun 6, 2022
1 parent 5d18b80 commit bb5c112
Show file tree
Hide file tree
Showing 11 changed files with 896 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.index;

import java.util.Arrays;
import java.util.Map;

public class SecondaryIndex {
private String indexName;
private String[] colNames;
private SecondaryIndexType indexType;
private Map<String, Map<String, String>> colOptions;
private Map<String, String> options;

public SecondaryIndex() {
}

public SecondaryIndex(
String indexName,
String[] colNames,
SecondaryIndexType indexType,
Map<String, Map<String, String>> colOptions,
Map<String, String> options) {
this.indexName = indexName;
this.colNames = colNames;
this.indexType = indexType;
this.colOptions = colOptions;
this.options = options;
}

public String getIndexName() {
return indexName;
}

public String[] getColNames() {
return colNames;
}

public SecondaryIndexType getIndexType() {
return indexType;
}

public Map<String, Map<String, String>> getColOptions() {
return colOptions;
}

public Map<String, String> getOptions() {
return options;
}

public static Builder builder() {
return new Builder();
}

@Override
public String toString() {
return "SecondaryIndex{"
+ "indexName='" + indexName + '\''
+ ", colNames='" + Arrays.toString(colNames) + '\''
+ ", indexType=" + indexType
+ ", colOptions=" + colOptions
+ ", options=" + options
+ '}';
}

public static class Builder {
private String indexName;
private String[] colNames;
private SecondaryIndexType indexType;
private Map<String, Map<String, String>> colOptions;
private Map<String, String> options;

public Builder setIndexName(String indexName) {
this.indexName = indexName;
return this;
}

public Builder setColNames(String[] colNames) {
this.colNames = colNames;
return this;
}

public Builder setIndexType(String indexType) {
this.indexType = SecondaryIndexType.of(indexType);
return this;
}

public Builder setColOptions(Map<String, Map<String, String>> colOptions) {
this.colOptions = colOptions;
return this;
}

public Builder setOptions(Map<String, String> options) {
this.options = options;
return this;
}

public SecondaryIndex build() {
return new SecondaryIndex(indexName, colNames, indexType, colOptions, options);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.index;

import org.apache.hudi.exception.HoodieSecondaryIndexException;

import java.util.Arrays;

public enum SecondaryIndexType {
LUCENE((byte) 1);

private final byte type;

SecondaryIndexType(byte type) {
this.type = type;
}

public byte getValue() {
return type;
}

public static SecondaryIndexType of(byte indexType) {
return Arrays.stream(SecondaryIndexType.values())
.filter(t -> t.type == indexType)
.findAny()
.orElseThrow(() ->
new HoodieSecondaryIndexException("Unknown secondary index type:" + indexType)
);
}

public static SecondaryIndexType of(String indexType) {
return Arrays.stream(SecondaryIndexType.values())
.filter(t -> t.name().equals(indexType.toUpperCase()))
.findAny()
.orElseThrow(() ->
new HoodieSecondaryIndexException("Unknown secondary index type:" + indexType)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.OrderedProperties;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.index.SecondaryIndex;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
Expand All @@ -39,9 +40,14 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieSecondaryIndexException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -234,6 +240,11 @@ public class HoodieTableConfig extends HoodieConfig {
.withDocumentation("Comma-separated list of metadata partitions that have been completely built and in-sync with data table. "
+ "These partitions are ready for use by the readers");

public static final ConfigProperty<String> SECONDARY_INDEXES = ConfigProperty
.key("hoodie.table.secondary.indexes")
.noDefaultValue()
.withDocumentation("The secondary indexes' info");

private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // <database_name>.<table_name>

public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) {
Expand Down Expand Up @@ -498,6 +509,19 @@ public Option<String[]> getPartitionFields() {
return Option.empty();
}

public Option<SecondaryIndex[]> getSecondaryIndexes() {
if (contains(SECONDARY_INDEXES)) {
String indexesStr = getString(SECONDARY_INDEXES);
try {
return Option.ofNullable(fromJsonString(indexesStr, SecondaryIndex[].class));
} catch (Exception e) {
throw new HoodieSecondaryIndexException("Fail to parse secondary indexes", e);
}
}

return Option.empty();
}

/**
* @returns the partition field prop.
*/
Expand Down Expand Up @@ -640,6 +664,21 @@ public Map<String, String> propsMap() {
.collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
}

private static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Exception {
if (jsonStr == null || jsonStr.isEmpty()) {
return clazz.newInstance();
}

return getObjectMapper().readValue(jsonStr, clazz);
}

private static ObjectMapper getObjectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
return mapper;
}

/**
* @deprecated Use {@link #BASE_FILE_FORMAT} and its methods.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.hadoop.SerializablePath;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.hadoop.SerializablePath;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -741,6 +741,7 @@ public static class PropertyBuilder {
private Boolean shouldDropPartitionColumns;
private String metadataPartitions;
private String inflightMetadataPartitions;
private String secondaryIndexFields;

/**
* Persist the configs that is written at the first time, and should not be changed.
Expand Down Expand Up @@ -875,6 +876,11 @@ public PropertyBuilder setInflightMetadataPartitions(String partitions) {
return this;
}

public PropertyBuilder setSecondaryIndexFields(String secondaryIndexFields) {
this.secondaryIndexFields = secondaryIndexFields;
return this;
}

public PropertyBuilder set(String key, Object value) {
if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
this.others.put(key, value);
Expand All @@ -883,7 +889,7 @@ public PropertyBuilder set(String key, Object value) {
}

public PropertyBuilder set(Map<String, Object> props) {
for (String key: HoodieTableConfig.PERSISTED_CONFIG_LIST) {
for (String key : HoodieTableConfig.PERSISTED_CONFIG_LIST) {
Object value = props.get(key);
if (value != null) {
set(key, value);
Expand Down Expand Up @@ -982,6 +988,9 @@ public PropertyBuilder fromProperties(Properties properties) {
if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)) {
setInflightMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT));
}
if (hoodieConfig.contains(HoodieTableConfig.SECONDARY_INDEXES)) {
setSecondaryIndexFields(hoodieConfig.getString(HoodieTableConfig.SECONDARY_INDEXES));
}
return this;
}

Expand Down Expand Up @@ -1072,6 +1081,9 @@ public Properties build() {
if (null != inflightMetadataPartitions) {
tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT, inflightMetadataPartitions);
}
if (null != secondaryIndexFields) {
tableConfig.setValue(HoodieTableConfig.SECONDARY_INDEXES, secondaryIndexFields);
}
return tableConfig.getProps();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.exception;

public class HoodieSecondaryIndexException extends HoodieException {
public HoodieSecondaryIndexException(String message) {
super(message);
}

public HoodieSecondaryIndexException(String message, Throwable t) {
super(message, t);
}
}
Loading

0 comments on commit bb5c112

Please sign in to comment.