Skip to content

Commit

Permalink
Update if/else calls. (#17)
Browse files Browse the repository at this point in the history
* Update if/else calls.
Update private fields.
Update single line java docs.

* Fix indentation.

* Add null read test.
  • Loading branch information
milos-colic authored Jun 23, 2022
1 parent 3df60d6 commit 878ddb5
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@
import io.delta.sharing.spark.model.DeltaTableFiles;
import io.delta.sharing.spark.model.DeltaTableMetadata;
import io.delta.sharing.spark.model.Table;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import scala.Option$;
import scala.Some$;
import scala.collection.JavaConverters;
import scala.collection.Seq;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -24,14 +33,6 @@
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import scala.Option$;
import scala.Some$;
import scala.collection.JavaConverters;
import scala.collection.Seq;

/**
* A wrapper class for {@link io.delta.sharing.spark.DeltaSharingRestClient}
Expand Down Expand Up @@ -78,12 +79,12 @@ public Path getCheckpointPath() {
* Constructor.
*
* @param profileProvider An instance of {@link DeltaSharingProfileProvider}.
* @param checkpointPath An path to a temporary checkpoint location.
* @param checkpointPath An path to a temporary checkpoint location.
* @throws IOException Transitive due to the call to
* {@link Files#createTempDirectory(String, FileAttribute[])}.
* {@link Files#createTempDirectory(String, FileAttribute[])}.
*/
public DeltaSharing(final DeltaSharingProfileProvider profileProvider,
final Path checkpointPath) throws IOException {
final Path checkpointPath) throws IOException {

if (!Files.exists(checkpointPath)) {
Files.createDirectory(checkpointPath);
Expand All @@ -102,15 +103,15 @@ public DeltaSharing(final DeltaSharingProfileProvider profileProvider,
/**
* Constructor.
*
* @param providerConf A valid JSON document corresponding to
* {@link DeltaSharingProfileProvider}.
* @param providerConf A valid JSON document corresponding to
* {@link DeltaSharingProfileProvider}.
* @param checkpointLocation A string containing a path to be used as a
* checkpoint location.
* checkpoint location.
* @throws IOException Transitive due to the call to
* {@link Files#createDirectories(Path, FileAttribute[])}.
* {@link Files#createDirectories(Path, FileAttribute[])}.
*/
public DeltaSharing(final String providerConf,
final String checkpointLocation) throws IOException {
final String checkpointLocation) throws IOException {
this(new DeltaSharingJsonProvider(providerConf),
Paths.get(checkpointLocation));
}
Expand All @@ -122,7 +123,7 @@ public DeltaSharing(final String providerConf,
*
* @return A list of all tables.
* @implNote Suppress unnecessary local variable is done to remove warnings
* for a decoupled Scala to Java conversion call and a return call.
* for a decoupled Scala to Java conversion call and a return call.
*/
@SuppressWarnings("UnnecessaryLocalVariable")
public List<Table> listAllTables() {
Expand Down Expand Up @@ -156,11 +157,11 @@ public long getTableVersion(Table table) {
*
* @return A list of files corresponding to a table.
* @implNote Suppress unnecessary local variable is done to remove warnings
* for a decoupled Scala to Java conversion call and a return call.
* for a decoupled Scala to Java conversion call and a return call.
*/
@SuppressWarnings("UnnecessaryLocalVariable")
public List<AddFile> getFiles(Table table, List<String> predicates,
Integer limit) {
Integer limit) {
Seq<String> predicatesSeq = JavaConverters
.asScalaIteratorConverter(predicates.iterator()).asScala().toSeq();
DeltaTableFiles deltaTableFiles;
Expand All @@ -181,7 +182,7 @@ public List<AddFile> getFiles(Table table, List<String> predicates,
*
* @return A list of files corresponding to a table.
* @implNote Suppress unnecessary local variable is done to remove warnings
* for a decoupled Scala to Java conversion call and a return call.
* for a decoupled Scala to Java conversion call and a return call.
*/
public List<AddFile> getFiles(Table table, List<String> predicates) {
return getFiles(table, predicates, null);
Expand All @@ -199,11 +200,11 @@ public String getCoordinates(Table table) {
String coords = table.share() + "." + table.schema() + "." + table.name();
Matcher matcher = pattern.matcher(coords);
boolean matchFound = matcher.find();
if (matchFound) {
return coords;
} else {
if (!matchFound) {
throw new IllegalArgumentException("Invalid format for coordinates");
}

return coords;
}

/**
Expand All @@ -218,13 +219,13 @@ private Path getFileCheckpointPath(AddFile file) {
Pattern pattern = Pattern.compile("[a-zA-Z0-9]*", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(fileId);
boolean matchFound = matcher.find();
if (matchFound) {
String path = String.format("%s/%s.parquet", this.tempDir, file.id());
return Paths.get(path);
} else {
if (!matchFound) {
throw new IllegalArgumentException(
"Invalid format for file id. The id contains special characters.");
}

String path = String.format("%s/%s.parquet", this.tempDir, file.id());
return Paths.get(path);
}

/**
Expand All @@ -234,7 +235,7 @@ private Path getFileCheckpointPath(AddFile file) {
* @param files Files for which we are generating the checkpoint file copies.
* @return A fully qualified path for a checkpoint file copy.
* @throws IOException Transitive exception due to the call to
* {@link Files#write(Path, byte[], OpenOption...)}.
* {@link Files#write(Path, byte[], OpenOption...)}.
*/
private List<Path> writeCheckpointFiles(List<AddFile> files)
throws IOException, URISyntaxException {
Expand Down Expand Up @@ -274,9 +275,9 @@ private List<Path> getCheckpointPaths(List<AddFile> files) {
*
* @param table Table whose reader is requested.
* @return An instance of {@link TableReader} that will manage the reads from
* the table.
* the table.
* @throws IOException Transitive due to the call to
* {@link TableReader#TableReader(List)}.
* {@link TableReader#TableReader(List)}.
*/
@SuppressWarnings("UnnecessaryLocalVariable")
public TableReader<GenericRecord> getTableReader(Table table)
Expand All @@ -286,8 +287,27 @@ public TableReader<GenericRecord> getTableReader(Table table)
fs.setConf(new Configuration());
}
String uniqueRef = getCoordinates(table);
List<Path> paths;
DeltaTableMetadata newMetadata = this.getMetadata(table);

List<Path> paths = getPaths(uniqueRef, files, newMetadata);

TableReader<GenericRecord> tableReader = new TableReader<>(paths);
return tableReader;
}

/**
* Fetches the list of file pats from the checkpoint location.
* Files whose metadata has drifted are updated.
*
* @param uniqueRef Reference via table coordinates.
* @param files A list of add files for the table.
* @param newMetadata A new value of the metadata for the table.
* @return A list of paths to checkpoint files.
* @throws IOException Read/Write errors can occur if temp directory has been altered outside the JVM.
* @throws URISyntaxException URI exception can be thrown by writeCheckpointFiles method call.
*/
private List<Path> getPaths(String uniqueRef, List<AddFile> files, DeltaTableMetadata newMetadata) throws IOException, URISyntaxException {
List<Path> paths;
if (this.metadataMap.containsKey(uniqueRef)) {
DeltaTableMetadata metadata = this.metadataMap.get(uniqueRef);
if (!newMetadata.equals(metadata)) {
Expand All @@ -300,9 +320,7 @@ public TableReader<GenericRecord> getTableReader(Table table)
paths = writeCheckpointFiles(files);
this.metadataMap.put(uniqueRef, newMetadata);
}

TableReader<GenericRecord> tableReader = new TableReader<>(paths);
return tableReader;
return paths;
}

/**
Expand All @@ -312,7 +330,7 @@ public TableReader<GenericRecord> getTableReader(Table table)
* @param table An instance of {@link Table} whose records we are reading.
* @return A list of records from the table instance.
* @throws IOException Transitive due to the call to
* {@link TableReader#read()}
* {@link TableReader#read()}
*/
public List<GenericRecord> getAllRecords(Table table)
throws IOException, URISyntaxException {
Expand All @@ -333,12 +351,12 @@ public List<GenericRecord> getAllRecords(Table table)
* {@link DeltaSharing#getTableReader(Table)} to access the reader and then
* use {@link TableReader#readN(Integer)} to read blocks of records.
*
* @param table An instance of {@link Table} whose records we are reading.
* @param table An instance of {@link Table} whose records we are reading.
* @param numRec Number of records to be read at most.
* @return A list of records from the table instance. If less records are
* available, only the available records will be returned.
* available, only the available records will be returned.
* @throws IOException Transitive due to the call to
* {@link TableReader#read()}
* {@link TableReader#read()}
*/
public List<GenericRecord> getNumRecords(Table table, int numRec)
throws IOException, URISyntaxException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
* <p/>
*
* @see DeltaSharingProfileAdaptor DeltaSharingProfileAdaptor used for object
* mapping when parsing JSON.
* mapping when parsing JSON.
* @since 0.1.0
*/
public class DeltaSharingJsonProvider implements DeltaSharingProfileProvider {
private String configuration;
private DeltaSharingProfile profile;
private final DeltaSharingProfile profile;

/**
* Constructor method that expects Configuration JSON. JSON file must contain
Expand All @@ -30,10 +29,9 @@ public class DeltaSharingJsonProvider implements DeltaSharingProfileProvider {
*/
public DeltaSharingJsonProvider(String conf) throws JsonProcessingException {
try {
configuration = conf;
ObjectMapper mapper = new ObjectMapper();
DeltaSharingProfileAdaptor profileAdaptor =
mapper.readValue(configuration, DeltaSharingProfileAdaptor.class);
mapper.readValue(conf, DeltaSharingProfileAdaptor.class);
profile = profileAdaptor.toProfile();
} catch (Exception e) {
System.out.print(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,74 @@
* Update when the format of profile file changes.
*
* @implNote Suppress is added because of all the getters and setters are required to be explicitly
* created for Jackson to parse JSONs correctly. However warnings are shown since getters
* and setters are not explicitly tests.
* created for Jackson to parse JSONs correctly. However warnings are shown since getters
* and setters are not explicitly tests.
* @since 0.1.0
*/
public class DeltaSharingProfileAdaptor {
int shareCredentialsVersion;
String endpoint;
String bearerToken;
String expirationTime;
private int shareCredentialsVersion;
private String endpoint;
private String bearerToken;
private String expirationTime;

/** Default constructor. */
public DeltaSharingProfileAdaptor() {}
/**
* Default constructor.
*/
public DeltaSharingProfileAdaptor() {
}

/** Getter for shareCredentialsVersion. */
/**
* Getter for shareCredentialsVersion.
*/
public int getShareCredentialsVersion() {
return shareCredentialsVersion;
}

/** Setter for shareCredentialsVersion. */
/**
* Setter for shareCredentialsVersion.
*/
public void setShareCredentialsVersion(int shareCredentialsVersion) {
this.shareCredentialsVersion = shareCredentialsVersion;
}

/** Getter for endpoint. */
/**
* Getter for endpoint.
*/
public String getEndpoint() {
return endpoint;
}

/** Setter for endpoint. */
/**
* Setter for endpoint.
*/
public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}

/** Getter for bearerToken. */
/**
* Getter for bearerToken.
*/
public String getBearerToken() {
return bearerToken;
}

/** Setter for bearerToken. */
/**
* Setter for bearerToken.
*/
public void setBearerToken(String bearerToken) {
this.bearerToken = bearerToken;
}

/** Getter for expirationTime. */
/**
* Getter for expirationTime.
*/
public String getExpirationTime() {
return expirationTime;
}

/** Setter for expirationTime. */
/**
* Setter for expirationTime.
*/
public void setExpirationTime(String expirationTime) {
this.expirationTime = expirationTime;
}
Expand All @@ -66,7 +85,7 @@ public void setExpirationTime(String expirationTime) {
* from cross language APIs.
*
* @return An equivalent instance of Scala {@link DeltaSharingProfile}
* class.
* class.
*/
public DeltaSharingProfile toProfile() {
return DeltaSharingProfile.apply(Option.apply(shareCredentialsVersion), endpoint, bearerToken,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package com.databricks.labs.delta.sharing.java.format.parquet;

import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;

import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;

/**
* Parquet InputFile with a local java.nio.Path. Adapted from https://github.com/benwatson528/intellij-avro-parquet-plugin/blob/master/src/main/java/uk/co/hadoopathome/intellij/viewer/fileformat/LocalInputFile.java This
* class is required to instantiate {@link org.apache.parquet.avro.AvroParquetReader} instances.
* @implNote https://github.com/benwatson528/intellij-avro-parquet-plugin is licenced under Apache 2.0.
*
* @implNote https://github.com/benwatson528/intellij-avro-parquet-plugin is licenced under Apache 2.0.
*/
public class LocalInputFile implements InputFile {
/** Local file object. */

private final RandomAccessFile input;

/**
Expand Down Expand Up @@ -161,7 +163,7 @@ private int readDirectBuffer(ByteBuffer byteBuffer, byte[] page, ByteBufferReade
}

private static void readFullyDirectBuffer(ByteBuffer byteBuffer, byte[] page,
ByteBufferReader reader) throws IOException {
ByteBufferReader reader) throws IOException {
int nextReadLength = Math.min(byteBuffer.remaining(), page.length);
int bytesRead = 0;

Expand Down
Loading

0 comments on commit 878ddb5

Please sign in to comment.