Skip to content

Commit

Permalink
Merge branch 'apache:master' into HUDI-4997
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Oct 19, 2022
2 parents e27c8c7 + 048299e commit e496d0f
Show file tree
Hide file tree
Showing 92 changed files with 3,113 additions and 615 deletions.
4 changes: 4 additions & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ github:
issues: true
projects: true
discussions: true
enabled_merge_buttons:
squash: true
merge: false
rebase: false
notifications:
commits: commits@hudi.apache.org
issues: commits@hudi.apache.org
Expand Down
4 changes: 2 additions & 2 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ _Describe context and summary for this change. Highlight if any code was copied.

_Describe any public API or user-facing feature change or any performance impact._

**Risk level: none | low | medium | high**
### Risk level (write none, low medium or high below)

_Choose one. If medium or high, explain what verification was done to mitigate the risks._
_If medium or high, explain what verification was done to mitigate the risks._

### Documentation Update

Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## How to update the Pull Request Template

When updating the pr template, you must consider if updates need to be made to scripts/pr_compliance.py

## What are the files in workflows?
- bot.yml: runs the hudi unit tests with various versions of scala, spark, and flink
- pr_compliance.yml: checks pr titles and main comment to make sure that everything is filled out and formatted properly
- update_pr_compliance: runs the pr_compliance tests when scripts/pr_compliance.py is updated
21 changes: 21 additions & 0 deletions .github/workflows/pr_compliance.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: validate pr
on:
pull_request:
types: [opened, edited, reopened, synchronize]
branches:
- master

jobs:
validate-pr:
runs-on: ubuntu-latest
env:
REQUEST_BODY: ${{ github.event.pull_request.body }}
REQUEST_TITLE: ${{ github.event.pull_request.title }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
- name: run script
run: python3 scripts/pr_compliance.py



18 changes: 18 additions & 0 deletions .github/workflows/update_pr_compliance.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: Update Pr Compliance

on:
pull_request:
types: [opened, edited, reopened, synchronize]
branches:
- master
paths:
- scripts/pr_compliance.py

jobs:
run-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
- name: run script
run: python3 scripts/pr_compliance.py run-tests > test.log || { echo "::error::pr_compliance.py $(cat test.log)" && exit 1; }
5 changes: 5 additions & 0 deletions doap_HUDI.rdf
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@
<created>2022-08-16</created>
<revision>0.12.0</revision>
</Version>
<Version>
<name>Apache Hudi 0.12.1</name>
<created>2022-10-18</created>
<revision>0.12.1</revision>
</Version>
</release>
<repository>
<GitRepository>
Expand Down
2 changes: 2 additions & 0 deletions docker/demo/sparksql-bootstrap-prep-source.commands
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@
import org.apache.spark.sql.functions.col

val df = spark.read.format("org.apache.hudi").load("/user/hive/warehouse/stock_ticks_cow/*/*/*").drop("_hoodie_commit_time", "_hoodie_record_key", "_hoodie_file_name", "_hoodie_commit_seqno", "_hoodie_partition_path")
// TODO(HUDI-4944): fix the test to use a partition column with slashes (`/`) included
// in the value. Currently it fails the tests due to slash encoding.
df.write.format("parquet").save("/user/hive/warehouse/stock_ticks_cow_bs_src/2018/08/31/")
System.exit(0)
2 changes: 1 addition & 1 deletion docker/setup_demo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if [ "$HUDI_DEMO_ENV" != "dev" ]; then
HUDI_WS=${WS_ROOT} docker-compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} pull
fi
sleep 5
HUDI_WS=${WS_ROOT} docker-compose --verbose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} up -d
HUDI_WS=${WS_ROOT} docker-compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} up -d
sleep 15

docker exec -it adhoc-1 /bin/bash /var/hoodie/ws/docker/demo/setup_demo_container.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -99,9 +100,11 @@ public void markSuccess(HoodieRecord record, Option<Map<String, String>> optiona
if (optionalRecordMetadata.isPresent()) {
String eventTimeVal = optionalRecordMetadata.get().getOrDefault(METADATA_EVENT_TIME_KEY, null);
try {
long eventTime = DateTimeUtils.parseDateTime(eventTimeVal).toEpochMilli();
stat.setMinEventTime(eventTime);
stat.setMaxEventTime(eventTime);
if (!StringUtils.isNullOrEmpty(eventTimeVal)) {
long eventTime = DateTimeUtils.parseDateTime(eventTimeVal).toEpochMilli();
stat.setMinEventTime(eventTime);
stat.setMaxEventTime(eventTime);
}
} catch (DateTimeException | IllegalArgumentException e) {
LOG.debug(String.format("Fail to parse event time value: %s", eventTimeVal), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.lock.LockState;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -32,12 +33,15 @@
import org.jetbrains.annotations.NotNull;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* InProcess level lock. This {@link LockProvider} implementation is to
* guard table from concurrent operations happening in the local JVM process.
* A separate lock is maintained per "table basepath".
* <p>
* Note: This Lock provider implementation doesn't allow lock reentrancy.
* Attempting to reacquire the lock from the same thread will throw
Expand All @@ -47,22 +51,27 @@
public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLock>, Serializable {

private static final Logger LOG = LogManager.getLogger(InProcessLockProvider.class);
private static final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();
private static final Map<String, ReentrantReadWriteLock> LOCK_INSTANCE_PER_BASEPATH = new ConcurrentHashMap<>();
private final ReentrantReadWriteLock lock;
private final String basePath;
private final long maxWaitTimeMillis;

public InProcessLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
TypedProperties typedProperties = lockConfiguration.getConfig();
basePath = lockConfiguration.getConfig().getProperty(HoodieWriteConfig.BASE_PATH.key());
ValidationUtils.checkArgument(basePath != null);
lock = LOCK_INSTANCE_PER_BASEPATH.computeIfAbsent(basePath, (ignore) -> new ReentrantReadWriteLock());
maxWaitTimeMillis = typedProperties.getLong(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS);
}

@Override
public void lock() {
LOG.info(getLogMessage(LockState.ACQUIRING));
if (LOCK.isWriteLockedByCurrentThread()) {
if (lock.isWriteLockedByCurrentThread()) {
throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
}
LOCK.writeLock().lock();
lock.writeLock().lock();
LOG.info(getLogMessage(LockState.ACQUIRED));
}

Expand All @@ -74,13 +83,13 @@ public boolean tryLock() {
@Override
public boolean tryLock(long time, @NotNull TimeUnit unit) {
LOG.info(getLogMessage(LockState.ACQUIRING));
if (LOCK.isWriteLockedByCurrentThread()) {
if (lock.isWriteLockedByCurrentThread()) {
throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
}

boolean isLockAcquired;
try {
isLockAcquired = LOCK.writeLock().tryLock(time, unit);
isLockAcquired = lock.writeLock().tryLock(time, unit);
} catch (InterruptedException e) {
throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_ACQUIRE));
}
Expand All @@ -93,8 +102,8 @@ public boolean tryLock(long time, @NotNull TimeUnit unit) {
public void unlock() {
LOG.info(getLogMessage(LockState.RELEASING));
try {
if (LOCK.isWriteLockedByCurrentThread()) {
LOCK.writeLock().unlock();
if (lock.isWriteLockedByCurrentThread()) {
lock.writeLock().unlock();
LOG.info(getLogMessage(LockState.RELEASED));
} else {
LOG.warn("Cannot unlock because the current thread does not hold the lock.");
Expand All @@ -106,18 +115,19 @@ public void unlock() {

@Override
public ReentrantReadWriteLock getLock() {
return LOCK;
return lock;
}

@Override
public void close() {
if (LOCK.isWriteLockedByCurrentThread()) {
LOCK.writeLock().unlock();
if (lock.isWriteLockedByCurrentThread()) {
lock.writeLock().unlock();
}
LOG.info(getLogMessage(LockState.ALREADY_RELEASED));
LOCK_INSTANCE_PER_BASEPATH.remove(basePath);
}

private String getLogMessage(LockState state) {
return StringUtils.join("Thread ", String.valueOf(Thread.currentThread().getName()), " ",
state.name(), " in-process lock.");
return String.format("Base Path %s, Lock Instance %s, Thread %s, In-process lock state %s", basePath, getLock().toString(), Thread.currentThread().getName(), state.name());
}
}
Loading

0 comments on commit e496d0f

Please sign in to comment.