Skip to content

Commit

Permalink
[HUDI-3654] Preparations for hudi metastore. (#5572)
Browse files Browse the repository at this point in the history
* [HUDI-3654] Preparations for hudi metastore.

Co-authored-by: gengxiaoyu <gengxiaoyu@bytedance.com>
  • Loading branch information
minihippo and gengxiaoyu authored May 17, 2022
1 parent a7a42e4 commit ad773b3
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoa
return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
.setProperties(config.getProps()).build();
}

public Option<EmbeddedTimelineService> getTimelineServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,9 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {

private Stream<HoodieInstant> getInstantsToArchive() {
Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
if (config.isMetastoreEnabled()) {
return Stream.empty();
}

// For archiving and cleaning instants, we need to include intermediate state files if they exist
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieMetastoreConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
Expand Down Expand Up @@ -495,6 +496,7 @@ public class HoodieWriteConfig extends HoodieConfig {
private FileSystemViewStorageConfig viewStorageConfig;
private HoodiePayloadConfig hoodiePayloadConfig;
private HoodieMetadataConfig metadataConfig;
private HoodieMetastoreConfig metastoreConfig;
private HoodieCommonConfig commonConfig;
private EngineType engineType;

Expand Down Expand Up @@ -886,6 +888,7 @@ protected HoodieWriteConfig(EngineType engineType, Properties props) {
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build();
this.metastoreConfig = HoodieMetastoreConfig.newBuilder().fromProperties(props).build();
this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build();
}

Expand Down Expand Up @@ -2100,6 +2103,13 @@ public HoodieStorageLayout.LayoutType getLayoutType() {
return HoodieStorageLayout.LayoutType.valueOf(getString(HoodieLayoutConfig.LAYOUT_TYPE));
}

/**
* Metastore configs.
*/
public boolean isMetastoreEnabled() {
return metastoreConfig.enableMetastore();
}

public static class Builder {

protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieW
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
.setProperties(config.getProps()).build();
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.config;

import javax.annotation.concurrent.Immutable;
import java.util.Properties;

/**
* Configurations used by the HUDI Metastore.
*/
@Immutable
@ConfigClassProperty(name = "Metastore Configs",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Configurations used by the Hudi Metastore.")
public class HoodieMetastoreConfig extends HoodieConfig {

public static final String METASTORE_PREFIX = "hoodie.metastore";

public static final ConfigProperty<Boolean> METASTORE_ENABLE = ConfigProperty
.key(METASTORE_PREFIX + ".enable")
.defaultValue(false)
.withDocumentation("Use metastore server to store hoodie table metadata");

public static final ConfigProperty<String> METASTORE_URLS = ConfigProperty
.key(METASTORE_PREFIX + ".uris")
.defaultValue("thrift://localhost:9090")
.withDocumentation("Metastore server uris");

public static final ConfigProperty<Integer> METASTORE_CONNECTION_RETRIES = ConfigProperty
.key(METASTORE_PREFIX + ".connect.retries")
.defaultValue(3)
.withDocumentation("Number of retries while opening a connection to metastore");

public static final ConfigProperty<Integer> METASTORE_CONNECTION_RETRY_DELAY = ConfigProperty
.key(METASTORE_PREFIX + ".connect.retry.delay")
.defaultValue(1)
.withDocumentation("Number of seconds for the client to wait between consecutive connection attempts");

public static HoodieMetastoreConfig.Builder newBuilder() {
return new HoodieMetastoreConfig.Builder();
}

public boolean enableMetastore() {
return getBoolean(METASTORE_ENABLE);
}

public String getMetastoreUris() {
return getStringOrDefault(METASTORE_URLS);
}

public int getConnectionRetryLimit() {
return getIntOrDefault(METASTORE_CONNECTION_RETRIES);
}

public int getConnectionRetryDelay() {
return getIntOrDefault(METASTORE_CONNECTION_RETRY_DELAY);
}

public static class Builder {
private final HoodieMetastoreConfig config = new HoodieMetastoreConfig();

public Builder fromProperties(Properties props) {
this.config.getProps().putAll(props);
return this;
}

public Builder setUris(String uris) {
config.setValue(METASTORE_URLS, uris);
return this;
}

public HoodieMetastoreConfig build() {
config.setDefaults(HoodieMetastoreConfig.class.getName());
return config;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.table;

import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetastoreConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -38,6 +39,7 @@
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -98,21 +100,22 @@ public class HoodieTableMetaClient implements Serializable {
// NOTE: Since those two parameters lay on the hot-path of a lot of computations, we
// use tailored extension of the {@code Path} class allowing to avoid repetitive
// computations secured by its immutability
private SerializablePath basePath;
private SerializablePath metaPath;
protected SerializablePath basePath;
protected SerializablePath metaPath;

private transient HoodieWrapperFileSystem fs;
private boolean loadActiveTimelineOnLoad;
private SerializableConfiguration hadoopConf;
protected SerializableConfiguration hadoopConf;
private HoodieTableType tableType;
private TimelineLayoutVersion timelineLayoutVersion;
private HoodieTableConfig tableConfig;
private HoodieActiveTimeline activeTimeline;
protected HoodieTableConfig tableConfig;
protected HoodieActiveTimeline activeTimeline;
private HoodieArchivedTimeline archivedTimeline;
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
protected HoodieMetastoreConfig metastoreConfig;

private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
protected HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
String payloadClassName, FileSystemRetryConfig fileSystemRetryConfig) {
LOG.info("Loading HoodieTableMetaClient from " + basePath);
Expand Down Expand Up @@ -367,6 +370,13 @@ public synchronized HoodieArchivedTimeline getArchivedTimeline() {
return archivedTimeline;
}

public HoodieMetastoreConfig getMetastoreConfig() {
if (metastoreConfig == null) {
metastoreConfig = new HoodieMetastoreConfig();
}
return metastoreConfig;
}

/**
* Returns fresh new archived commits as a timeline from startTs (inclusive).
*
Expand Down Expand Up @@ -451,7 +461,8 @@ public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hado
HoodieTableConfig.create(fs, metaPathDir, props);
// We should not use fs.getConf as this might be different from the original configuration
// used to create the fs in unit tests
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath)
.setProperties(props).build();
LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
return metaClient;
}
Expand Down Expand Up @@ -620,6 +631,21 @@ public void initializeBootstrapDirsIfNotExists() throws IOException {
initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath.toString(), getFs());
}

private static HoodieTableMetaClient newMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
String payloadClassName, FileSystemRetryConfig fileSystemRetryConfig, Properties props) {
HoodieMetastoreConfig metastoreConfig = null == props
? new HoodieMetastoreConfig.Builder().build()
: new HoodieMetastoreConfig.Builder().fromProperties(props).build();
return metastoreConfig.enableMetastore()
? (HoodieTableMetaClient) ReflectionUtils.loadClass("org.apache.hudi.common.table.HoodieTableMetastoreClient",
new Class<?>[]{Configuration.class, ConsistencyGuardConfig.class, FileSystemRetryConfig.class, String.class, String.class, HoodieMetastoreConfig.class},
conf, consistencyGuardConfig, fileSystemRetryConfig,
props.getProperty(HoodieTableConfig.DATABASE_NAME.key()), props.getProperty(HoodieTableConfig.NAME.key()), metastoreConfig)
: new HoodieTableMetaClient(conf, basePath,
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig);
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -636,6 +662,7 @@ public static class Builder {
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
private Option<TimelineLayoutVersion> layoutVersion = Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION);
private Properties props;

public Builder setConf(Configuration conf) {
this.conf = conf;
Expand Down Expand Up @@ -672,11 +699,16 @@ public Builder setLayoutVersion(Option<TimelineLayoutVersion> layoutVersion) {
return this;
}

public Builder setProperties(Properties properties) {
this.props = properties;
return this;
}

public HoodieTableMetaClient build() {
ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init HoodieTableMetaClient");
ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient");
return new HoodieTableMetaClient(conf, basePath,
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig);
return newMetaClient(conf, basePath,
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig, props);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public void deleteInstantFileIfExists(HoodieInstant instant) {
}
}

private void deleteInstantFile(HoodieInstant instant) {
protected void deleteInstantFile(HoodieInstant instant) {
LOG.info("Deleting instant " + instant);
Path inFlightCommitFilePath = getInstantFileNamePath(instant.getFileName());
try {
Expand Down Expand Up @@ -536,7 +536,7 @@ private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant,
transitionState(fromInstant, toInstant, data, false);
}

private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data,
protected void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data,
boolean allowRedundantTransitions) {
ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
try {
Expand Down Expand Up @@ -566,7 +566,7 @@ private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant,
}
}

private void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) {
protected void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) {
ValidationUtils.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp()));
Path inFlightCommitFilePath = getInstantFileNamePath(inflight.getFileName());
Path commitFilePath = getInstantFileNamePath(completed.getFileName());
Expand Down Expand Up @@ -632,7 +632,7 @@ public void saveToCompactionRequested(HoodieInstant instant, Option<byte[]> cont
}

/**
* Saves content for inflight/requested REPLACE instant.
* Saves content for requested REPLACE instant.
*/
public void saveToPendingReplaceCommit(HoodieInstant instant, Option<byte[]> content) {
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
Expand Down Expand Up @@ -719,7 +719,7 @@ public void saveToPendingIndexAction(HoodieInstant instant, Option<byte[]> conte
createFileInMetaPath(instant.getFileName(), content, false);
}

private void createFileInMetaPath(String filename, Option<byte[]> content, boolean allowOverwrite) {
protected void createFileInMetaPath(String filename, Option<byte[]> content, boolean allowOverwrite) {
Path fullPath = getInstantFileNamePath(filename);
if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {
FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieMetastoreConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableSupplier;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Functions.Function2;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hudi.metadata.HoodieTableMetadata;
Expand Down Expand Up @@ -59,6 +61,8 @@
public class FileSystemViewManager {
private static final Logger LOG = LogManager.getLogger(FileSystemViewManager.class);

private static final String HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS = "org.apache.hudi.common.table.view.HoodieMetastoreFileSystemView";

private final SerializableConfiguration conf;
// The View Storage config used to store file-system views
private final FileSystemViewStorageConfig viewStorageConfig;
Expand Down Expand Up @@ -165,6 +169,11 @@ private static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieMeta
return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(),
metadataSupplier.get());
}
if (metaClient.getMetastoreConfig().enableMetastore()) {
return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS,
new Class<?>[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetastoreConfig.class},
metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetastoreConfig());
}
return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());
}

Expand All @@ -184,6 +193,11 @@ public static HoodieTableFileSystemView createInMemoryFileSystemViewWithTimeline
if (metadataConfig.enabled()) {
return new HoodieMetadataFileSystemView(engineContext, metaClient, timeline, metadataConfig);
}
if (metaClient.getMetastoreConfig().enableMetastore()) {
return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS,
new Class<?>[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetadataConfig.class},
metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetastoreConfig());
}
return new HoodieTableFileSystemView(metaClient, timeline);
}

Expand Down

0 comments on commit ad773b3

Please sign in to comment.