Skip to content

Commit

Permalink
Make hive temporary staging directory location configurable
Browse files Browse the repository at this point in the history
In some deployments, /tmp cannot be used to write temporary files.
  • Loading branch information
kokosing committed Jan 30, 2019
1 parent 68393b0 commit 9227286
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public class HiveClientConfig
private int s3SelectPushdownMaxConnections = 500;

private boolean isTemporaryStagingDirectoryEnabled = true;
private String temporaryStagingDirectoryPath = "/tmp/presto-${USER}";

public int getMaxInitialSplits()
{
Expand Down Expand Up @@ -1205,4 +1206,18 @@ public boolean isTemporaryStagingDirectoryEnabled()
{
return isTemporaryStagingDirectoryEnabled;
}

@Config("hive.temporary-staging-directory-path")
@ConfigDescription("Location of temporary staging directory for write operations. Use ${USER} placeholder to use different location for each user.")
public HiveClientConfig setTemporaryStagingDirectoryPath(String temporaryStagingDirectoryPath)
{
this.temporaryStagingDirectoryPath = temporaryStagingDirectoryPath;
return this;
}

@NotNull
public String getTemporaryStagingDirectoryPath()
{
return temporaryStagingDirectoryPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, Conn
}

if (shouldUseTemporaryDirectory(session, context, targetPath)) {
Path writePath = createTemporaryPath(context, hdfsEnvironment, targetPath);
Path writePath = createTemporaryPath(session, context, hdfsEnvironment, targetPath);
return new LocationHandle(targetPath, writePath, false, STAGE_AND_MOVE_TO_TARGET_DIRECTORY);
}
else {
Expand All @@ -76,7 +76,7 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore,
Path targetPath = new Path(table.getStorage().getLocation());

if (shouldUseTemporaryDirectory(session, context, targetPath)) {
Path writePath = createTemporaryPath(context, hdfsEnvironment, targetPath);
Path writePath = createTemporaryPath(session, context, hdfsEnvironment, targetPath);
return new LocationHandle(targetPath, writePath, true, STAGE_AND_MOVE_TO_TARGET_DIRECTORY);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public final class HiveSessionProperties
private static final String OPTIMIZE_MISMATCHED_BUCKET_COUNT = "optimize_mismatched_bucket_count";
private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled";
private static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled";
private static final String TEMPORARY_STAGING_DIRECTORY_PATH = "temporary_staging_directory_path";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -302,6 +303,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
TEMPORARY_STAGING_DIRECTORY_ENABLED,
"Should use temporary staging directory for write operations",
hiveClientConfig.isTemporaryStagingDirectoryEnabled(),
false),
stringProperty(
TEMPORARY_STAGING_DIRECTORY_PATH,
"Temporary staging directory location",
hiveClientConfig.getTemporaryStagingDirectoryPath(),
false));
}

Expand Down Expand Up @@ -506,6 +512,11 @@ public static boolean isTemporaryStagingDirectoryEnabled(ConnectorSession sessio
return session.getProperty(TEMPORARY_STAGING_DIRECTORY_ENABLED, Boolean.class);
}

public static String getTemporaryStagingDirectoryPath(ConnectorSession session)
{
return session.getProperty(TEMPORARY_STAGING_DIRECTORY_PATH, String.class);
}

public static PropertyMetadata<DataSize> dataSizeSessionProperty(String name, String description, DataSize defaultValue, boolean hidden)
{
return new PropertyMetadata<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_SERDE_NOT_FOUND;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
import static io.prestosql.plugin.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION;
import static io.prestosql.plugin.hive.HiveSessionProperties.getTemporaryStagingDirectoryPath;
import static io.prestosql.plugin.hive.HiveUtil.checkCondition;
import static io.prestosql.plugin.hive.HiveUtil.isArrayType;
import static io.prestosql.plugin.hive.HiveUtil.isMapType;
Expand Down Expand Up @@ -546,10 +547,11 @@ private static boolean isDirectory(HdfsContext context, HdfsEnvironment hdfsEnvi
}
}

public static Path createTemporaryPath(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path targetPath)
public static Path createTemporaryPath(ConnectorSession session, HdfsContext context, HdfsEnvironment hdfsEnvironment, Path targetPath)
{
// use a per-user temporary directory to avoid permission problems
String temporaryPrefix = "/tmp/presto-" + context.getIdentity().getUser();
String temporaryPrefix = getTemporaryStagingDirectoryPath(session)
.replace("${USER}", context.getIdentity().getUser());

// use relative temporary directory on ViewFS
if (isViewFileSystem(context, hdfsEnvironment, targetPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public void testDefaults()
.setCollectColumnStatisticsOnWrite(false)
.setS3SelectPushdownEnabled(false)
.setS3SelectPushdownMaxConnections(500)
.setTemporaryStagingDirectoryEnabled(true));
.setTemporaryStagingDirectoryEnabled(true)
.setTemporaryStagingDirectoryPath("/tmp/presto-${USER}"));
}

@Test
Expand Down Expand Up @@ -201,6 +202,7 @@ public void testExplicitPropertyMappings()
.put("hive.s3select-pushdown.enabled", "true")
.put("hive.s3select-pushdown.max-connections", "1234")
.put("hive.temporary-staging-directory-enabled", "false")
.put("hive.temporary-staging-directory-path", "updated")
.build();

HiveClientConfig expected = new HiveClientConfig()
Expand Down Expand Up @@ -282,7 +284,8 @@ public void testExplicitPropertyMappings()
.setCollectColumnStatisticsOnWrite(true)
.setS3SelectPushdownEnabled(true)
.setS3SelectPushdownMaxConnections(1234)
.setTemporaryStagingDirectoryEnabled(false);
.setTemporaryStagingDirectoryEnabled(false)
.setTemporaryStagingDirectoryPath("updated");

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -3552,9 +3553,9 @@ public void testPrunePartitionFailure()
}

@Test
public void testTemporaryStagingDirectorySessionProperty()
public void testTemporaryStagingDirectorySessionProperties()
{
String tableName = "test_temporary_staging_directory_session_property";
String tableName = "test_temporary_staging_directory_session_properties";
assertUpdate(format("CREATE TABLE %s(i int)", tableName));

Session session = Session.builder(getSession())
Expand All @@ -3564,6 +3565,15 @@ public void testTemporaryStagingDirectorySessionProperty()
HiveInsertTableHandle hiveInsertTableHandle = getHiveInsertTableHandle(session, tableName);
assertEquals(hiveInsertTableHandle.getLocationHandle().getWritePath(), hiveInsertTableHandle.getLocationHandle().getTargetPath());

session = Session.builder(getSession())
.setCatalogSessionProperty("hive", "temporary_staging_directory_enabled", "true")
.setCatalogSessionProperty("hive", "temporary_staging_directory_path", "/tmp/custom/temporary-${USER}")
.build();

hiveInsertTableHandle = getHiveInsertTableHandle(session, tableName);
assertNotEquals(hiveInsertTableHandle.getLocationHandle().getWritePath(), hiveInsertTableHandle.getLocationHandle().getTargetPath());
assertTrue(hiveInsertTableHandle.getLocationHandle().getWritePath().toString().startsWith("file:/tmp/custom/temporary-"));

assertUpdate("DROP TABLE " + tableName);
}

Expand Down

0 comments on commit 9227286

Please sign in to comment.