Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More config options for HDFS storage (e.g. Kerberos) #57

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,22 @@ These are the options for `DruidSource`, to be passed with `write.options()`.

| Property | Description |
| --- |--- |
| `druid.segment_storage.s3.bucket` | S3 bucket name for the Deep Storage | |
| `druid.segment_storage.s3.basekey` | S3 key prefix for the Deep Storage. No trailing slashes. | |
| `druid.segment_storage.s3.bucket` | S3 bucket name for the Deep Storage |
| `druid.segment_storage.s3.basekey` | S3 key prefix for the Deep Storage. No trailing slashes. |

2. **If Deep Storage is `local`:**

| Property | Description |
| --- |--- |
| `druid.segment_storage.local.dir` | For local Deep Storage, absolute path to segment directory | |
| `druid.segment_storage.local.dir` | For local Deep Storage, absolute path to segment directory |

3. **If Deep Storage is `hdfs`:**

| Property | Description |
| --- | --- |
| `druid.segment_storage.hdfs.dir` | For hdfs Deep Storage, absolute path to segment directory |
| `druid.segment_storage.hdfs.security.kerberos.principal` | Kerberos principal |
| `druid.segment_storage.hdfs.security.kerberos.keytab` | Kerberos keytab |

#### Optional properties

Expand All @@ -336,7 +344,6 @@ These are the options for `DruidSource`, to be passed with `write.options()`.
| `druid.memory.max_rows` | Max number of rows to keep in memory in spark data writer | `75000` |
| `druid.segment_storage.type` | Type of Deep Storage to use. Allowed values: `s3`, `local`, `hdfs`. | `s3` |
| `druid.segment_storage.s3.disableacl` | Whether to disable ACL in S3 config. | `false` |
| `druid.segment_storage.hdfs.dir` | Hdfs segment storage location | `""` |
| `druid.datasource.init` | Boolean flag for (re-)initializing Druid datasource. If `true`, any pre-existing segments for the datasource is marked as unused. | `false` |
| `druid.bitmap_factory` | Compression format for bitmap indexes. Possible values: `concise`, `roaring`. For type `roaring`, the boolean property compressRunOnSerialization is always set to `true`. `rovio-ingest` uses `concise` by default regardless of Druid library version. | `concise` |
| `druid.segment.rollup` | Whether to rollup data during ingestion | `true` |
Expand Down
49 changes: 40 additions & 9 deletions src/main/java/com/rovio/ingest/WriterContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public class WriterContext implements Serializable {
private final String s3BaseKey;
private final boolean s3DisableAcl;
private final String localDir;
private final String hdfsDir;
private final String hdfsCoreSitePath;
private final String hdfsHdfsSitePath;
private final String hdfsDefaultFS;
private final String hdfsSecurityKerberosPrincipal;
private final String hdfsSecurityKerberosKeytab;
private final String deepStorageType;
private final boolean initDataSource;
private final String version;
Expand All @@ -64,7 +70,6 @@ public class WriterContext implements Serializable {
private final String dimensionsSpec;
private final String metricsSpec;
private final String transformSpec;
private String getHdfsStorageDir;

private WriterContext(CaseInsensitiveStringMap options, String version) {
this.dataSource = getOrThrow(options, ConfKeys.DATA_SOURCE);
Expand Down Expand Up @@ -97,7 +102,12 @@ private WriterContext(CaseInsensitiveStringMap options, String version) {
this.s3BaseKey = options.getOrDefault(ConfKeys.DEEP_STORAGE_S3_BASE_KEY, null);
this.s3DisableAcl = options.getBoolean(ConfKeys.DEEP_STORAGE_S3_DISABLE_ACL, false);
this.localDir = options.getOrDefault(ConfKeys.DEEP_STORAGE_LOCAL_DIRECTORY, null);
this.getHdfsStorageDir = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_DIRECTORY, null);
this.hdfsDir = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_STORAGE_DIRECTORY, null);
this.hdfsCoreSitePath = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_CORE_SITE_PATH, null);
this.hdfsHdfsSitePath = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_HDFS_SITE_PATH, null);
this.hdfsDefaultFS = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_DEFAULT_FS, null);
Comment on lines +106 to +108
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 3 conf options are left hidden – no mention in README.

  • DEEP_STORAGE_HDFS_DEFAULT_FS has a "hidden" implementation
  • X_SITE_PATHs are not implemented (exception is thrown if these are set).

This is how it was in fabricebaranski@9c6eb40. We can keep them as a starting point for possible implementation in the future.

this.hdfsSecurityKerberosPrincipal = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_SECURITY_KERBEROS_PRINCIPAL, null);
this.hdfsSecurityKerberosKeytab = options.getOrDefault(ConfKeys.DEEP_STORAGE_HDFS_SECURITY_KERBEROS_KEYTAB, null);

this.deepStorageType = options.getOrDefault(ConfKeys.DEEP_STORAGE_TYPE, DEFAULT_DRUID_DEEP_STORAGE_TYPE);
Preconditions.checkArgument(Arrays.asList("s3", "local", "hdfs").contains(this.deepStorageType),
Expand Down Expand Up @@ -194,8 +204,28 @@ public String getLocalDir() {
return localDir;
}

public String getDeepStorageType() {
return deepStorageType;
public String getHdfsDir() {
return hdfsDir;
}

public String getHdfsCoreSitePath() {
return hdfsCoreSitePath;
}

public String getHdfsHdfsSitePath() {
return hdfsHdfsSitePath;
}

public String getHdfsDefaultFS() {
return hdfsDefaultFS;
}

public String getHdfsSecurityKerberosPrincipal() {
return hdfsSecurityKerberosPrincipal;
}

public String getHdfsSecurityKerberosKeytab() {
return hdfsSecurityKerberosKeytab;
}

public boolean isInitDataSource() {
Expand Down Expand Up @@ -238,10 +268,6 @@ public String getTransformSpec() {
return transformSpec;
}

public String getHdfsStorageDir() {
return getHdfsStorageDir;
}

public static class ConfKeys {
public static final String DATASOURCE_INIT = "druid.datasource.init";
// Segment config
Expand Down Expand Up @@ -274,6 +300,11 @@ public static class ConfKeys {
// Local config (only for testing)
public static final String DEEP_STORAGE_LOCAL_DIRECTORY = "druid.segment_storage.local.dir";
// HDFS config
public static final String DEEP_STORAGE_HDFS_DIRECTORY = "druid.segment_storage.hdfs.dir";
public static final String DEEP_STORAGE_HDFS_STORAGE_DIRECTORY = "druid.segment_storage.hdfs.dir";
public static final String DEEP_STORAGE_HDFS_CORE_SITE_PATH = "druid.segment_storage.hdfs.core.site.path";
public static final String DEEP_STORAGE_HDFS_HDFS_SITE_PATH = "druid.segment_storage.hdfs.hdfs.site.path";
public static final String DEEP_STORAGE_HDFS_DEFAULT_FS = "druid.segment_storage.hdfs.default.fs";
public static final String DEEP_STORAGE_HDFS_SECURITY_KERBEROS_PRINCIPAL = "druid.segment_storage.hdfs.security.kerberos.principal";
public static final String DEEP_STORAGE_HDFS_SECURITY_KERBEROS_KEYTAB = "druid.segment_storage.hdfs.security.kerberos.keytab";
}
}
52 changes: 46 additions & 6 deletions src/main/java/com/rovio/ingest/util/SegmentStorageUpdater.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.druid.storage.hdfs.HdfsDataSegmentKiller;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPusher;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
import org.apache.druid.storage.hdfs.HdfsKerberosConfig;
import org.apache.druid.storage.hdfs.HdfsStorageAuthentication;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.S3DataSegmentKiller;
import org.apache.druid.storage.s3.S3DataSegmentPusher;
Expand All @@ -47,7 +49,16 @@ public static DataSegmentPusher createPusher(WriterContext param) {
if (param.isLocalDeepStorage()) {
return new LocalDataSegmentPusher(getLocalConfig(param.getLocalDir()));
} else if (param.isHdfsDeepStorage()) {
return new HdfsDataSegmentPusher(getHdfsConfig(param.getHdfsStorageDir()), new Configuration(), MAPPER);
return new HdfsDataSegmentPusher(
getHdfsConfig(
param.getHdfsDir(),
param.getHdfsSecurityKerberosPrincipal(),
param.getHdfsSecurityKerberosKeytab(),
getHdfsHadoopConfiguration(param.getHdfsCoreSitePath(), param.getHdfsHdfsSitePath(), param.getHdfsDefaultFS())
),
getHdfsHadoopConfiguration(param.getHdfsCoreSitePath(), param.getHdfsHdfsSitePath(), param.getHdfsDefaultFS()),
MAPPER
);
} else {
ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3 = getAmazonS3().get();
S3DataSegmentPusherConfig s3Config = new S3DataSegmentPusherConfig();
Expand All @@ -64,7 +75,15 @@ public static DataSegmentKiller createKiller(WriterContext param) {
if (param.isLocalDeepStorage()) {
return new LocalDataSegmentKiller(getLocalConfig(param.getLocalDir()));
} else if (param.isHdfsDeepStorage()) {
return new HdfsDataSegmentKiller(new Configuration(), getHdfsConfig(param.getHdfsStorageDir()));
return new HdfsDataSegmentKiller(
getHdfsHadoopConfiguration(param.getHdfsCoreSitePath(), param.getHdfsHdfsSitePath(), param.getHdfsDefaultFS()),
getHdfsConfig(
param.getHdfsDir(),
param.getHdfsSecurityKerberosPrincipal(),
param.getHdfsSecurityKerberosKeytab(),
getHdfsHadoopConfiguration(param.getHdfsCoreSitePath(), param.getHdfsHdfsSitePath(), param.getHdfsDefaultFS())
)
);
} else {
Supplier<ServerSideEncryptingAmazonS3> serverSideEncryptingAmazonS3 = getAmazonS3();
S3DataSegmentPusherConfig s3Config = new S3DataSegmentPusherConfig();
Expand Down Expand Up @@ -92,11 +111,32 @@ private static LocalDataSegmentPusherConfig getLocalConfig(String localDir) {
}).get();
}

private static HdfsDataSegmentPusherConfig getHdfsConfig(String hdfsStorageDir) {
private static HdfsDataSegmentPusherConfig getHdfsConfig(String hdfsDir, String kerberosPrincipal, String kerberosKeytab, Configuration conf) {
return Suppliers.memoize(() -> {
HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig();
hdfsSegmentPusherConfig.setStorageDirectory(hdfsStorageDir);
return hdfsSegmentPusherConfig;
HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig();
if (hdfsDir != null) {
config.setStorageDirectory(hdfsDir);
}
if (kerberosPrincipal != null && kerberosKeytab != null) {
HdfsKerberosConfig hdfsKerberosConfig = new HdfsKerberosConfig(kerberosPrincipal, kerberosKeytab);
HdfsStorageAuthentication hdfsAuth = new HdfsStorageAuthentication(hdfsKerberosConfig, conf);
hdfsAuth.authenticate();
}
return config;
}).get();
}

private static Configuration getHdfsHadoopConfiguration(String hdfsCoreSitePath, String hdfsHdfsSitePath, String defaultFS) {
return Suppliers.memoize(() -> {
if (hdfsCoreSitePath == null || hdfsHdfsSitePath == null) {
throw new UnsupportedOperationException("Custom hdfs site configuration is not implemented");
} else {
Configuration configuration = new Configuration(true);
if (defaultFS != null) {
configuration.set("fs.defaultFS", defaultFS);
}
return configuration;
}
}).get();
}
}
Loading