Skip to content

Commit

Permalink
Merge branch 'master' into test-parallelism-3
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 25, 2024
2 parents ebeeb0b + 1e2c74f commit fdeeebb
Show file tree
Hide file tree
Showing 47 changed files with 2,091 additions and 651 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/spark_master_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
sudo apt install libedit-dev
if: steps.git-diff.outputs.diff
- name: Run Spark Master tests
# when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_test.yaml
run: |
build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test
TEST_PARALLELISM_COUNT=2 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test
if: steps.git-diff.outputs.diff
1 change: 1 addition & 0 deletions .github/workflows/spark_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
pipenv run pip install numpy==1.20.3
if: steps.git-diff.outputs.diff
- name: Run Scala/Java and Python tests
# when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_master_test.yaml
run: |
TEST_PARALLELISM_COUNT=3 pipenv run python run-tests.py --group spark
cd examples/scala && build/sbt "++ $SCALA_VERSION compile"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel;

import static java.lang.String.format;

import io.delta.kernel.annotation.Evolving;

/**
* Thrown when trying to create a checkpoint at version {@code v}, but there already exists
* a checkpoint at version {@code v}.
*
* @since 3.2.0
*/
@Evolving
public class CheckpointAlreadyExistsException extends IllegalArgumentException {
public CheckpointAlreadyExistsException(long version) {
super(format("Checkpoint for given version %d already exists in the table", version));
}
}
15 changes: 15 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.delta.kernel;

import java.io.IOException;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.client.TableClient;

Expand Down Expand Up @@ -111,4 +113,17 @@ Snapshot getSnapshotAsOfVersion(TableClient tableClient, long versionId)
*/
Snapshot getSnapshotAsOfTimestamp(TableClient tableClient, long millisSinceEpochUTC)
throws TableNotFoundException;

/**
* Checkpoint the table at given version. It writes a single checkpoint file.
*
* @param tableClient {@link TableClient} instance to use.
* @param version Version to checkpoint.
* @throws TableNotFoundException if the table is not found
* @throws CheckpointAlreadyExistsException if a checkpoint already exists at the given version
* @throws IOException for any I/O error.
* @since 3.2.0
*/
void checkpoint(TableClient tableClient, long version)
throws TableNotFoundException, CheckpointAlreadyExistsException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.SnapshotHint;
import static io.delta.kernel.internal.TableConfig.TOMBSTONE_RETENTION;

/**
* Implementation of {@link Snapshot}.
Expand All @@ -38,6 +40,7 @@ public class SnapshotImpl implements Snapshot {
private final LogReplay logReplay;
private final Protocol protocol;
private final Metadata metadata;
private final LogSegment logSegment;

public SnapshotImpl(
Path logPath,
Expand All @@ -49,6 +52,7 @@ public SnapshotImpl(
Optional<SnapshotHint> snapshotHint) {
this.dataPath = dataPath;
this.version = version;
this.logSegment = logSegment;
this.logReplay = new LogReplay(
logPath,
dataPath,
Expand Down Expand Up @@ -90,6 +94,17 @@ public Protocol getProtocol() {
return protocol;
}

public CreateCheckpointIterator getCreateCheckpointIterator(
TableClient tableClient) {
long minFileRetentionTimestampMillis =
System.currentTimeMillis() - TOMBSTONE_RETENTION.fromMetadata(metadata);
return new CreateCheckpointIterator(
tableClient,
logSegment,
minFileRetentionTimestampMillis
);
}

/**
* Get the latest transaction version for given <i>applicationId</i>. This information comes
* from the transactions identifiers stored in Delta transaction log. This API is not a public
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal;

import java.util.function.Function;
import java.util.function.Predicate;

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.util.IntervalParserUtils;

/**
* Represents the table properties. Also provides methods to access the property values
* from the table metadata.
*/
public class TableConfig<T> {
/**
* The shortest duration we have to keep logically deleted data files around before deleting
* them physically.
*
* Note: this value should be large enough:
* <ul>
* <li>It should be larger than the longest possible duration of a job if you decide to
* run "VACUUM" when there are concurrent readers or writers accessing the table.</li>
* <li>If you are running a streaming query reading from the table, you should make sure
* the query doesn't stop longer than this value. Otherwise, the query may not be able to
* restart as it still needs to read old files.</li>
* </ul>
*/
public static final TableConfig<Long> TOMBSTONE_RETENTION = new TableConfig<>(
"delta.deletedFileRetentionDuration",
"interval 1 week",
IntervalParserUtils::safeParseIntervalAsMillis,
value -> value >= 0,
"needs to be provided as a calendar interval such as '2 weeks'. Months" +
" and years are not accepted. You may specify '365 days' for a year instead."
);

private final String key;
private final String defaultValue;
private final Function<String, T> fromString;
private final Predicate<T> validator;
private final String helpMessage;

private TableConfig(
String key,
String defaultValue,
Function<String, T> fromString,
Predicate<T> validator,
String helpMessage) {
this.key = key;
this.defaultValue = defaultValue;
this.fromString = fromString;
this.validator = validator;
this.helpMessage = helpMessage;
}

/**
* Returns the value of the table property from the given metadata.
*
* @param metadata the table metadata
* @return the value of the table property
*/
public T fromMetadata(Metadata metadata) {
T value = fromString.apply(metadata.getConfiguration().getOrDefault(key, defaultValue));
if (!validator.test(value)) {
throw new IllegalArgumentException(
String.format("Invalid value for table property '%s': '%s'. %s",
key, value, helpMessage));
}
return value;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,10 @@ public Snapshot getSnapshotAsOfTimestamp(TableClient tableClient, long millisSin
throws TableNotFoundException {
return snapshotManager.getSnapshotForTimestamp(tableClient, millisSinceEpochUTC);
}

@Override
public void checkpoint(TableClient tableClient, long version)
throws TableNotFoundException, CheckpointAlreadyExistsException, IOException {
snapshotManager.checkpoint(tableClient, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class AddFile {
true /* nullable */
);

/**
* Schema of the {@code add} action in the Delta Log without stats. Used for constructing
* table snapshot to read data from the table.
*/
public static final StructType SCHEMA_WITHOUT_STATS = new StructType()
.add("path", StringType.STRING, false /* nullable */)
.add("partitionValues",
Expand All @@ -41,4 +45,16 @@ public class AddFile {

public static final StructType SCHEMA_WITH_STATS = SCHEMA_WITHOUT_STATS
.add(JSON_STATS_FIELD);

/**
* Full schema of the {@code add} action in the Delta Log.
*/
public static final StructType FULL_SCHEMA = SCHEMA_WITHOUT_STATS
.add("stats", StringType.STRING, true /* nullable */)
.add(
"tags",
new MapType(StringType.STRING, StringType.STRING, true),
true /* nullable */);
// There are more fields which are added when row-id tracking and clustering is enabled.
// When Kernel starts supporting row-ids and clustering, we should add those fields here.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.actions;

import io.delta.kernel.types.*;

/**
* Metadata about {@code remove} action in the Delta Log.
*/
public class RemoveFile {
/**
* Full schema of the {@code remove} action in the Delta Log.
*/
public static final StructType FULL_SCHEMA = new StructType()
.add("path", StringType.STRING, false /* nullable */)
.add("deletionTimestamp", LongType.LONG, true /* nullable */)
.add("dataChange", BooleanType.BOOLEAN, false /* nullable*/)
.add("extendedFileMetadata", BooleanType.BOOLEAN, true /* nullable */)
.add("partitionValues",
new MapType(StringType.STRING, StringType.STRING, true),
true /* nullable*/)
.add("size", LongType.LONG, true /* nullable*/)
.add("stats", StringType.STRING, true /* nullable */)
.add(
"tags",
new MapType(StringType.STRING, StringType.STRING, true),
true /* nullable */)
.add("deletionVector", DeletionVectorDescriptor.READ_SCHEMA, true /* nullable */);
// There are more fields which are added when row-id tracking is enabled. When Kernel
// starts supporting row-ids, we should add those fields here.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.actions;

import io.delta.kernel.types.StructType;

public class SingleAction {
/**
* Get the schema of reading entries from Delta Log delta and checkpoint files for construction
* of new checkpoint.
*/
public static StructType CHECKPOINT_SCHEMA = new StructType()
.add("txn", SetTransaction.READ_SCHEMA)
.add("add", AddFile.FULL_SCHEMA)
.add("remove", RemoveFile.FULL_SCHEMA)
.add("metaData", Metadata.READ_SCHEMA)
.add("protocol", Protocol.READ_SCHEMA);
// Once we start supporting updating CDC or domain metadata enabled tables, we should add the
// schema for those fields here.
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package io.delta.kernel.internal.checkpoints;

import java.util.Optional;
import java.util.*;

import io.delta.kernel.data.Row;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.StructType;

import io.delta.kernel.internal.data.GenericRow;

public class CheckpointMetaData {
public static CheckpointMetaData fromRow(Row row) {
return new CheckpointMetaData(
Expand All @@ -45,6 +47,15 @@ public CheckpointMetaData(long version, long size, Optional<Long> parts) {
this.parts = parts;
}

public Row toRow() {
Map<Integer, Object> dataMap = new HashMap<>();
dataMap.put(0, version);
dataMap.put(1, size);
parts.ifPresent(aLong -> dataMap.put(2, aLong));

return new GenericRow(READ_SCHEMA, dataMap);
}

@Override
public String toString() {
return "CheckpointMetaData{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;

/**
* Class to load the {@link CheckpointMetaData} from `_last_checkpoint` file.
* Class to load and write the {@link CheckpointMetaData} from `_last_checkpoint` file.
*/
public class Checkpointer {
private static final Logger logger = LoggerFactory.getLogger(Checkpointer.class);
Expand Down Expand Up @@ -185,6 +185,22 @@ public Optional<CheckpointMetaData> readLastCheckpointFile(TableClient tableClie
return loadMetadataFromFile(tableClient, 0 /* tries */);
}

/**
* Write the given data to last checkpoint metadata file.
* @param tableClient {@link TableClient} instance to use for writing
* @param checkpointMetaData Checkpoint metadata to write
* @throws IOException For any I/O issues.
*/
public void writeLastCheckpointFile(
TableClient tableClient,
CheckpointMetaData checkpointMetaData) throws IOException {
tableClient.getJsonHandler()
.writeJsonFileAtomically(
lastCheckpointFilePath.toString(),
singletonCloseableIterator(checkpointMetaData.toRow()),
true /* overwrite */);
}

/**
* Loads the checkpoint metadata from the _last_checkpoint file.
* <p>
Expand Down
Loading

0 comments on commit fdeeebb

Please sign in to comment.