From 89bfae39d7b2b57d647ea11daa89fa396cffcd9c Mon Sep 17 00:00:00 2001 From: meteorgan Date: Fri, 23 Aug 2024 20:01:19 +0800 Subject: [PATCH 1/7] using hdfs to test atomic write dir --- .github/workflows/service_test_hdfs.yml | 32 +++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/.github/workflows/service_test_hdfs.yml b/.github/workflows/service_test_hdfs.yml index c4447c2d459..19009dc946e 100644 --- a/.github/workflows/service_test_hdfs.yml +++ b/.github/workflows/service_test_hdfs.yml @@ -101,11 +101,39 @@ jobs: OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020 OPENDAL_HDFS_ENABLE_APPEND: true - hdfs-default-with-atomic-write-dir: + hdfs-cluster-with-atomic-write-dir: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 + - name: Configure Hdfs + # namenode will use ports: 9870, 9000, 8020 + # datanode will use ports: 9864 + run: | + docker run -d \ + --name namenode \ + --network host \ + -e CLUSTER_NAME=test \ + -e WEBHDFS_CONF_dfs_webhdfs_enabled=true \ + -e CORE_CONF_hadoop_http_staticuser_user=root \ + -e HDFS_CONF_dfs_permissions_enabled=false \ + -e HDFS_CONF_dfs_support_append=true \ + -e HDFS_CONF_dfs_replication=1 \ + bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8 + + docker run -d \ + --name datanode \ + --network host \ + -e CLUSTER_NAME=test \ + -e WEBHDFS_CONF_dfs_webhdfs_enabled=true \ + -e CORE_CONF_hadoop_http_staticuser_user=root \ + -e HDFS_CONF_dfs_permissions_enabled=false \ + -e HDFS_CONF_dfs_support_append=true \ + -e HDFS_CONF_dfs_replication=1 \ + bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8 + + curl --retry 30 --retry-delay 1 --retry-connrefused http://localhost:9870 + - name: Setup Rust toolchain uses: ./.github/actions/setup with: @@ -135,5 +163,5 @@ jobs: OPENDAL_TEST: hdfs OPENDAL_HDFS_ROOT: /tmp/opendal/ OPENDAL_HDFS_ATOMIC_WRITE_DIR: /tmp/atomic_write_dir/opendal/ - OPENDAL_HDFS_NAME_NODE: default + OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020 OPENDAL_HDFS_ENABLE_APPEND: false From 386d8a2255c2a08fb4a135e3bda2abee1d1162da Mon Sep 17 00:00:00 2001 From: meteorgan Date: Fri, 23 Aug 2024 23:47:02 +0800 Subject: [PATCH 2/7] fix write when atomic_write_dir is set --- core/src/services/hdfs/backend.rs | 89 ++++++++++++++----------------- 1 file changed, 39 insertions(+), 50 deletions(-) diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index d5c15eccbec..07d75c747e5 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -22,7 +22,6 @@ use std::io::SeekFrom; use std::path::PathBuf; use std::sync::Arc; -use futures::AsyncWriteExt; use log::debug; use uuid::Uuid; @@ -290,40 +289,34 @@ impl Access for HdfsBackend { } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir { - let target_path = build_rooted_abs_path(&self.root, path); - let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path)); - - // If the target file exists, we should append to the end of it directly. - if op.append() && self.client.metadata(&target_path).is_ok() { - (target_path, None) - } else { - (target_path, Some(tmp_path)) + let target_path = build_rooted_abs_path(&self.root, path); + let target_exists = match self.client.metadata(&target_path) { + Ok(_) => true, + Err(err) => { + if err.kind() != io::ErrorKind::NotFound { + return Err(new_std_io_error(err)); + } + false } - } else { - let p = build_rooted_abs_path(&self.root, path); - (p, None) }; - if let Err(err) = self.client.metadata(&target_path) { - // Early return if other error happened. - if err.kind() != io::ErrorKind::NotFound { - return Err(new_std_io_error(err)); + let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| { + // If the target file exists, we should append to the end of it directly. + if op.append() && target_exists { + None + } else { + Some(build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path))) } + }); + if !target_exists { let parent = get_parent(&target_path); - self.client.create_dir(parent).map_err(new_std_io_error)?; - - let mut f = self - .client - .open_file() - .create(true) - .write(true) - .async_open(&target_path) - .await + } else if tmp_path.is_some() { + // we must delete the target_path, otherwise the rename_file operation will fail + self.client + .remove_file(&target_path) .map_err(new_std_io_error)?; - f.close().await.map_err(new_std_io_error)?; } let mut open_options = self.client.open_file(); @@ -489,37 +482,33 @@ impl Access for HdfsBackend { } fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir { - let target_path = build_rooted_abs_path(&self.root, path); - let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path)); - - // If the target file exists, we should append to the end of it directly. - if op.append() && self.client.metadata(&target_path).is_ok() { - (target_path, None) - } else { - (target_path, Some(tmp_path)) + let target_path = build_rooted_abs_path(&self.root, path); + let target_exists = match self.client.metadata(&target_path) { + Ok(_) => true, + Err(err) => { + if err.kind() != io::ErrorKind::NotFound { + return Err(new_std_io_error(err)); + } + false } - } else { - let p = build_rooted_abs_path(&self.root, path); - - (p, None) }; - if let Err(err) = self.client.metadata(&target_path) { - // Early return if other error happened. - if err.kind() != io::ErrorKind::NotFound { - return Err(new_std_io_error(err)); + let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| { + // If the target file exists, we should append to the end of it directly. + if op.append() && target_exists { + None + } else { + Some(build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path))) } + }); + if !target_exists { let parent = get_parent(&target_path); - self.client.create_dir(parent).map_err(new_std_io_error)?; - + } else if tmp_path.is_some() { + // we must delete the target_path, otherwise the rename_file operation will fail self.client - .open_file() - .create(true) - .write(true) - .open(&target_path) + .remove_file(&target_path) .map_err(new_std_io_error)?; } From 074bea667a32968a5d1e0145f1913c2c483cd0d1 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Sun, 25 Aug 2024 00:25:18 +0800 Subject: [PATCH 3/7] add hdfs action with atomic write dir --- .../action.yml | 33 ++++++++++ .../action.yml | 30 +++++++++ .github/workflows/service_test_hdfs.yml | 64 ------------------- fixtures/hdfs/docker-compose-hdfs-cluster.yml | 26 ++++++++ 4 files changed, 89 insertions(+), 64 deletions(-) create mode 100644 .github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml create mode 100644 .github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml create mode 100644 fixtures/hdfs/docker-compose-hdfs-cluster.yml diff --git a/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml b/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml new file mode 100644 index 00000000000..69aa2c6ed0f --- /dev/null +++ b/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml @@ -0,0 +1,33 @@ +name: hdfs_cluster_with_atomic_write_dir +description: 'Behavior test for hdfs cluster with atomic write dir' + +runs: + using: "composite" + steps: + - name: Setup HDFS cluster + shell: bash + working-directory: fixtures/hdfs + run: docker compose -f docker-compose-hdfs-cluster.yml up -d --wait + - name: Setup java env + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "11" + - name: Setup hadoop env + shell: bash + run: | + curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner + export HADOOP_HOME=/home/runner/hadoop-3.3.5 + echo "HADOOP_HOME=${HADOOP_HOME}" >> $GITHUB_ENV + echo "CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)" >> $GITHUB_ENV + echo "LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${HADOOP_HOME}/lib/native" >> $GITHUB_ENV + cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml + - name: Setup opendal env + shell: bash + run: | + cat << EOF >> $GITHUB_ENV + OPENDAL_HDFS_ROOT=/tmp/opendal/ + OPENDAL_HDFS_ATOMIC_WRITE_DIR=/tmp/atomic_write_dir/opendal/ + OPENDAL_HDFS_NAME_NODE=hdfs://localhost:8020 + OPENDAL_HDFS_ENABLE_APPEND=false + EOF \ No newline at end of file diff --git a/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml b/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml new file mode 100644 index 00000000000..96eab6f4fef --- /dev/null +++ b/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml @@ -0,0 +1,30 @@ +name: hdfs_default_with_atomic_write_dir +description: 'Behavior test for hdfs default with atomic write dir' + +runs: + using: "composite" + steps: + - name: Setup java env + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "11" + - name: Setup + shell: bash + run: | + curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner + + export HADOOP_HOME="/home/runner/hadoop-3.3.5" + export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob) + + cp ./fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml + + cat << EOF >> $GITHUB_ENV + HADOOP_HOME=${HADOOP_HOME} + CLASSPATH=${CLASSPATH} + LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native + OPENDAL_HDFS_ROOT=/tmp/opendal/ + OPENDAL_HDFS_ATOMIC_WRITE_DIR=/tmp/atomic_write_dir/opendal/ + OPENDAL_HDFS_NAME_NODE=default + OPENDAL_HDFS_ENABLE_APPEND=false + EOF \ No newline at end of file diff --git a/.github/workflows/service_test_hdfs.yml b/.github/workflows/service_test_hdfs.yml index 19009dc946e..75440c59e49 100644 --- a/.github/workflows/service_test_hdfs.yml +++ b/.github/workflows/service_test_hdfs.yml @@ -101,67 +101,3 @@ jobs: OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020 OPENDAL_HDFS_ENABLE_APPEND: true - hdfs-cluster-with-atomic-write-dir: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Configure Hdfs - # namenode will use ports: 9870, 9000, 8020 - # datanode will use ports: 9864 - run: | - docker run -d \ - --name namenode \ - --network host \ - -e CLUSTER_NAME=test \ - -e WEBHDFS_CONF_dfs_webhdfs_enabled=true \ - -e CORE_CONF_hadoop_http_staticuser_user=root \ - -e HDFS_CONF_dfs_permissions_enabled=false \ - -e HDFS_CONF_dfs_support_append=true \ - -e HDFS_CONF_dfs_replication=1 \ - bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8 - - docker run -d \ - --name datanode \ - --network host \ - -e CLUSTER_NAME=test \ - -e WEBHDFS_CONF_dfs_webhdfs_enabled=true \ - -e CORE_CONF_hadoop_http_staticuser_user=root \ - -e HDFS_CONF_dfs_permissions_enabled=false \ - -e HDFS_CONF_dfs_support_append=true \ - -e HDFS_CONF_dfs_replication=1 \ - bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8 - - curl --retry 30 --retry-delay 1 --retry-connrefused http://localhost:9870 - - - name: Setup Rust toolchain - uses: ./.github/actions/setup - with: - need-nextest: true - - - name: Setup java env - uses: actions/setup-java@v4 - with: - distribution: temurin - java-version: "11" - - name: Setup hadoop env - shell: bash - run: | - curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner - - - name: Test - shell: bash - working-directory: core - run: | - export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob) - export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{ env.HADOOP_HOME }}/lib/native - cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{ env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml - - cargo test behavior --features tests,services-hdfs - env: - HADOOP_HOME: "/home/runner/hadoop-3.3.5" - OPENDAL_TEST: hdfs - OPENDAL_HDFS_ROOT: /tmp/opendal/ - OPENDAL_HDFS_ATOMIC_WRITE_DIR: /tmp/atomic_write_dir/opendal/ - OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020 - OPENDAL_HDFS_ENABLE_APPEND: false diff --git a/fixtures/hdfs/docker-compose-hdfs-cluster.yml b/fixtures/hdfs/docker-compose-hdfs-cluster.yml new file mode 100644 index 00000000000..71a9b3b9ac8 --- /dev/null +++ b/fixtures/hdfs/docker-compose-hdfs-cluster.yml @@ -0,0 +1,26 @@ +version: '3.8' + +services: + namenode: + image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8 + container_name: namenode + network_mode: "host" + environment: + CLUSTER_NAME: test + WEBHDFS_CONF_dfs_webhdfs_enabled: true + CORE_CONF_hadoop_http_staticuser_user: root + HDFS_CONF_dfs_permissions_enabled: false + HDFS_CONF_dfs_support_append: true + HDFS_CONF_dfs_replication: 1 + datanode: + image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8 + container_name: datanode + network_mode: "host" + environment: + CLUSTER_NAME: test + WEBHDFS_CONF_dfs_webhdfs_enabled: true + CORE_CONF_hadoop_http_staticuser_user: root + HDFS_CONF_dfs_permissions_enabled: false + HDFS_CONF_dfs_support_append: true + HDFS_CONF_dfs_replication: 1 + From ca83fe006e544f0df90f90fa94b5f5531c074096 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Sun, 25 Aug 2024 00:55:44 +0800 Subject: [PATCH 4/7] migrate service_test_hdfs to test planner --- .github/services/hdfs/hdfs_cluster/action.yml | 32 ++++++ .github/workflows/service_test_hdfs.yml | 103 ------------------ 2 files changed, 32 insertions(+), 103 deletions(-) create mode 100644 .github/services/hdfs/hdfs_cluster/action.yml delete mode 100644 .github/workflows/service_test_hdfs.yml diff --git a/.github/services/hdfs/hdfs_cluster/action.yml b/.github/services/hdfs/hdfs_cluster/action.yml new file mode 100644 index 00000000000..1e2d01cbde5 --- /dev/null +++ b/.github/services/hdfs/hdfs_cluster/action.yml @@ -0,0 +1,32 @@ +name: hdfs_cluster +description: 'Behavior test for hdfs cluster' + +runs: + using: "composite" + steps: + - name: Setup HDFS cluster + shell: bash + working-directory: fixtures/hdfs + run: docker compose -f docker-compose-hdfs-cluster.yml up -d --wait + - name: Setup java env + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "11" + - name: Setup hadoop env + shell: bash + run: | + curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner + export HADOOP_HOME=/home/runner/hadoop-3.3.5 + echo "HADOOP_HOME=${HADOOP_HOME}" >> $GITHUB_ENV + echo "CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)" >> $GITHUB_ENV + echo "LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${HADOOP_HOME}/lib/native" >> $GITHUB_ENV + cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml + - name: Setup opendal env + shell: bash + run: | + cat << EOF >> $GITHUB_ENV + OPENDAL_HDFS_ROOT=/tmp/opendal/ + OPENDAL_HDFS_NAME_NODE=hdfs://localhost:8020 + OPENDAL_HDFS_ENABLE_APPEND=true + EOF \ No newline at end of file diff --git a/.github/workflows/service_test_hdfs.yml b/.github/workflows/service_test_hdfs.yml deleted file mode 100644 index 75440c59e49..00000000000 --- a/.github/workflows/service_test_hdfs.yml +++ /dev/null @@ -1,103 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: Service Test HDFS - -on: - push: - branches: - - main - pull_request: - branches: - - main - paths: - - "core/src/**" - - "core/tests/**" - - "!core/src/docs/**" - - "!core/src/services/**" - - "core/src/services/hdfs/**" - - ".github/workflows/service_test_hdfs.yml" - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} - cancel-in-progress: true - -jobs: - hdfs-cluster: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Configure Hdfs - # namenode will use ports: 9870, 9000, 8020 - # datanode will use ports: 9864 - run: | - docker run -d \ - --name namenode \ - --network host \ - -e CLUSTER_NAME=test \ - -e WEBHDFS_CONF_dfs_webhdfs_enabled=true \ - -e CORE_CONF_hadoop_http_staticuser_user=root \ - -e HDFS_CONF_dfs_permissions_enabled=false \ - -e HDFS_CONF_dfs_support_append=true \ - -e HDFS_CONF_dfs_replication=1 \ - bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8 - - docker run -d \ - --name datanode \ - --network host \ - -e CLUSTER_NAME=test \ - -e WEBHDFS_CONF_dfs_webhdfs_enabled=true \ - -e CORE_CONF_hadoop_http_staticuser_user=root \ - -e HDFS_CONF_dfs_permissions_enabled=false \ - -e HDFS_CONF_dfs_support_append=true \ - -e HDFS_CONF_dfs_replication=1 \ - bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8 - - curl --retry 30 --retry-delay 1 --retry-connrefused http://localhost:9870 - - - name: Setup Rust toolchain - uses: ./.github/actions/setup - with: - need-nextest: true - - - name: Setup java env - uses: actions/setup-java@v4 - with: - distribution: temurin - java-version: "11" - - name: Setup hadoop env - shell: bash - run: | - curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.2.4/hadoop-3.2.4.tar.gz | tar zxf - -C /home/runner - - - name: Test - shell: bash - working-directory: core - run: | - export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob) - export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{ env.HADOOP_HOME }}/lib/native - cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{ env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml - - cargo test behavior --features tests,services-hdfs - env: - HADOOP_HOME: "/home/runner/hadoop-3.2.4" - OPENDAL_TEST: hdfs - OPENDAL_HDFS_ROOT: /tmp/opendal/ - OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020 - OPENDAL_HDFS_ENABLE_APPEND: true - From b03bea1213c79a18b9cd22bd412a92303e9edbc3 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Sun, 25 Aug 2024 02:44:53 +0800 Subject: [PATCH 5/7] fix append logic --- core/src/services/hdfs/backend.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index 07d75c747e5..58e4b18c4a7 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -300,9 +300,10 @@ impl Access for HdfsBackend { } }; + let should_append = op.append() && target_exists; let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| { // If the target file exists, we should append to the end of it directly. - if op.append() && target_exists { + if should_append { None } else { Some(build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path))) @@ -321,7 +322,7 @@ impl Access for HdfsBackend { let mut open_options = self.client.open_file(); open_options.create(true); - if op.append() { + if should_append { open_options.append(true); } else { open_options.write(true); @@ -493,9 +494,10 @@ impl Access for HdfsBackend { } }; + let should_append = op.append() && target_exists; let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| { // If the target file exists, we should append to the end of it directly. - if op.append() && target_exists { + if should_append { None } else { Some(build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path))) @@ -514,7 +516,7 @@ impl Access for HdfsBackend { let mut open_options = self.client.open_file(); open_options.create(true); - if op.append() { + if should_append { open_options.append(true); } else { open_options.write(true); From 5c0bfc6cdbf6757e9ad1a898edda8b1b47773418 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Sun, 25 Aug 2024 02:54:56 +0800 Subject: [PATCH 6/7] add license --- .github/services/hdfs/hdfs_cluster/action.yml | 17 +++++++++++++++++ .../action.yml | 17 +++++++++++++++++ .../action.yml | 17 +++++++++++++++++ fixtures/hdfs/docker-compose-hdfs-cluster.yml | 17 +++++++++++++++++ 4 files changed, 68 insertions(+) diff --git a/.github/services/hdfs/hdfs_cluster/action.yml b/.github/services/hdfs/hdfs_cluster/action.yml index 1e2d01cbde5..f54624264b0 100644 --- a/.github/services/hdfs/hdfs_cluster/action.yml +++ b/.github/services/hdfs/hdfs_cluster/action.yml @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + name: hdfs_cluster description: 'Behavior test for hdfs cluster' diff --git a/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml b/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml index 69aa2c6ed0f..860b6137a14 100644 --- a/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml +++ b/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + name: hdfs_cluster_with_atomic_write_dir description: 'Behavior test for hdfs cluster with atomic write dir' diff --git a/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml b/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml index 96eab6f4fef..b8de8671611 100644 --- a/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml +++ b/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + name: hdfs_default_with_atomic_write_dir description: 'Behavior test for hdfs default with atomic write dir' diff --git a/fixtures/hdfs/docker-compose-hdfs-cluster.yml b/fixtures/hdfs/docker-compose-hdfs-cluster.yml index 71a9b3b9ac8..376e8ed4868 100644 --- a/fixtures/hdfs/docker-compose-hdfs-cluster.yml +++ b/fixtures/hdfs/docker-compose-hdfs-cluster.yml @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + version: '3.8' services: From ae8a6aa09903e240e1c16477069b05c96aacd2f3 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Tue, 3 Sep 2024 16:42:52 +0800 Subject: [PATCH 7/7] delete target file before rename_file --- core/src/services/hdfs/backend.rs | 26 ++++++++++++++------------ core/src/services/hdfs/writer.rs | 15 +++++++++++++++ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index 58e4b18c4a7..4d87d24a108 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -313,11 +313,6 @@ impl Access for HdfsBackend { if !target_exists { let parent = get_parent(&target_path); self.client.create_dir(parent).map_err(new_std_io_error)?; - } else if tmp_path.is_some() { - // we must delete the target_path, otherwise the rename_file operation will fail - self.client - .remove_file(&target_path) - .map_err(new_std_io_error)?; } let mut open_options = self.client.open_file(); @@ -335,7 +330,13 @@ impl Access for HdfsBackend { Ok(( RpWrite::new(), - HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)), + HdfsWriter::new( + target_path, + tmp_path, + f, + Arc::clone(&self.client), + target_exists, + ), )) } @@ -507,11 +508,6 @@ impl Access for HdfsBackend { if !target_exists { let parent = get_parent(&target_path); self.client.create_dir(parent).map_err(new_std_io_error)?; - } else if tmp_path.is_some() { - // we must delete the target_path, otherwise the rename_file operation will fail - self.client - .remove_file(&target_path) - .map_err(new_std_io_error)?; } let mut open_options = self.client.open_file(); @@ -528,7 +524,13 @@ impl Access for HdfsBackend { Ok(( RpWrite::new(), - HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)), + HdfsWriter::new( + target_path, + tmp_path, + f, + Arc::clone(&self.client), + target_exists, + ), )) } diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 0f60014f410..e4123861a0f 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -29,6 +29,7 @@ pub struct HdfsWriter { tmp_path: Option, f: Option, client: Arc, + target_path_exists: bool, } /// # Safety @@ -42,12 +43,14 @@ impl HdfsWriter { tmp_path: Option, f: F, client: Arc, + target_path_exists: bool, ) -> Self { Self { target_path, tmp_path, f: Some(f), client, + target_path_exists, } } } @@ -70,6 +73,12 @@ impl oio::Write for HdfsWriter { // TODO: we need to make rename async. if let Some(tmp_path) = &self.tmp_path { + // we must delete the target_path, otherwise the rename_file operation will fail + if self.target_path_exists { + self.client + .remove_file(&self.target_path) + .map_err(new_std_io_error)?; + } self.client .rename_file(tmp_path, &self.target_path) .map_err(new_std_io_error)? @@ -102,6 +111,12 @@ impl oio::BlockingWrite for HdfsWriter { f.flush().map_err(new_std_io_error)?; if let Some(tmp_path) = &self.tmp_path { + // we must delete the target_path, otherwise the rename_file operation will fail + if self.target_path_exists { + self.client + .remove_file(&self.target_path) + .map_err(new_std_io_error)?; + } self.client .rename_file(tmp_path, &self.target_path) .map_err(new_std_io_error)?;