Skip to content

Commit

Permalink
Account for other configuration parameters (#172)
Browse files Browse the repository at this point in the history
Extend setup of JDBC storage connectors to allow for additional
parameters. Mainly isolationLevel and batchsize used to tweak writing to
the online feature store
  • Loading branch information
SirOibaf committed Dec 1, 2020
1 parent ac6c24d commit 8b8dbec
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 21 deletions.
19 changes: 3 additions & 16 deletions java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import lombok.ToString;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -74,23 +72,12 @@ public class StorageConnector {
@Setter
private StorageConnectorType storageConnectorType;

public Map<String, String> getSparkOptions() throws FeatureStoreException {
List<String[]> args = Arrays.stream(arguments.split(","))
public Map<String, String> getSparkOptions() {
Map<String, String> options = Arrays.stream(arguments.split(","))
.map(arg -> arg.split("="))
.collect(Collectors.toList());
.collect(Collectors.toMap(a -> a[0], a -> a[1]));

String user = args.stream().filter(arg -> arg[0].equalsIgnoreCase(Constants.JDBC_USER))
.findFirst()
.orElseThrow(() -> new FeatureStoreException("No user provided for storage connector"))[1];

String password = args.stream().filter(arg -> arg[0].equalsIgnoreCase(Constants.JDBC_PWD))
.findFirst()
.orElseThrow(() -> new FeatureStoreException("No password provided for storage connector"))[1];

Map<String, String> options = new HashMap<>();
options.put(Constants.JDBC_URL, connectionString);
options.put(Constants.JDBC_USER, user);
options.put(Constants.JDBC_PWD, password);
return options;
}
}
8 changes: 3 additions & 5 deletions python/hsfs/storage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ def spark_options(self):
arguments.
"""
args = [arg.split("=") for arg in self._arguments.split(",")]
options = {a[0]: a[1] for a in args}
options["url"] = self._connection_string

return {
"url": self._connection_string,
"user": [arg[1] for arg in args if arg[0] == "user"][0],
"password": [arg[1] for arg in args if arg[0] == "password"][0],
}
return options

0 comments on commit 8b8dbec

Please sign in to comment.