Skip to content

Commit

Permalink
Merge branch 'main' into scalar-subqueries
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 21, 2022
2 parents eb9bfea + 8970988 commit a384a3b
Show file tree
Hide file tree
Showing 49 changed files with 906 additions and 122 deletions.
11 changes: 9 additions & 2 deletions .github/actions/artifact_download/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ inputs:
description: "default to ./target/{inputs.profile}/"
required: false
default: ""
outputs:
path:
description: ""
value: ${{ steps.info.outputs.path }}
src:
description: ""
value: ${{ steps.info.outputs.src }}
runs:
using: "composite"
steps:
Expand All @@ -30,14 +37,14 @@ runs:
else
path="${{ inputs.path }}"
fi
echo "::set-output name=path::${path}"
echo "path=${path}" >> $GITHUB_OUTPUT
if [[ "${{ inputs.profile }}" == "debug" && "${{ inputs.target }}" =~ "linux" ]]; then
src="s3"
else
src="github"
fi
echo "::set-output name=src::${src}"
echo "src=${src}" >> $GITHUB_OUTPUT
- uses: actions/download-artifact@v2
if: steps.info.outputs.src == 'github'
Expand Down
13 changes: 5 additions & 8 deletions .github/actions/artifact_upload/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,23 @@ inputs:
description: "default/hive"
required: false
default: default
outputs:
src:
description: ""
value: ${{ steps.info.outputs.src }}
runs:
using: "composite"
steps:
- name: Get Upload Info
id: info
shell: bash
run: |
if [[ -z "${{ inputs.path }}" ]]; then
path="./target/${{ inputs.profile }}"
else
path="${{ inputs.path }}"
fi
echo "::set-output name=path::${path}"
if [[ "${{ inputs.profile }}" == "debug" && "${{ inputs.target }}" =~ "linux" ]]; then
src="s3"
else
src="github"
fi
echo "::set-output name=src::${src}"
echo "src=${src}" >> $GITHUB_OUTPUT
- name: Upload artifact to github
if: steps.info.outputs.src == 'github'
Expand Down
6 changes: 3 additions & 3 deletions .github/actions/setup_docker/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ runs:
shell: bash
id: registry
run: |
echo "::set-output name=dockerhub::datafuselabs/${{ inputs.repo }}"
echo "::set-output name=ecr::public.ecr.aws/i7g1w5q7/${{ inputs.repo }}"
echo "::set-output name=acr::registry.cn-beijing.aliyuncs.com/datafuselabs/${{ inputs.repo }}"
echo 'dockerhub=datafuselabs/${{ inputs.repo }}' >> $GITHUB_OUTPUT
echo 'ecr=public.ecr.aws/i7g1w5q7/${{ inputs.repo }}' >> $GITHUB_OUTPUT
echo 'acr=registry.cn-beijing.aliyuncs.com/datafuselabs/${{ inputs.repo }}' >> $GITHUB_OUTPUT
6 changes: 3 additions & 3 deletions .github/workflows/build-tool.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
id: toolchain
run: |
version=$(awk -F'[ ="]+' '$1 == "channel" { print $2 }' scripts/setup/rust-toolchain.toml)
echo "::set-output name=TOOLCHAIN::${version}"
echo "TOOLCHAIN=${version}" >> $GITHUB_OUTPUT
- name: Build and publish databend build base image
uses: docker/build-push-action@v3
Expand Down Expand Up @@ -71,7 +71,7 @@ jobs:
id: toolchain
run: |
version=$(awk -F'[ ="]+' '$1 == "channel" { print $2 }' scripts/setup/rust-toolchain.toml)
echo "::set-output name=TOOLCHAIN::${version}"
echo "TOOLCHAIN=${version}" >> $GITHUB_OUTPUT
- name: Build and publish databend build image
uses: docker/build-push-action@v3
Expand Down Expand Up @@ -107,7 +107,7 @@ jobs:
id: toolchain
run: |
version=$(awk -F'[ ="]+' '$1 == "channel" { print $2 }' scripts/setup/rust-toolchain.toml)
echo "::set-output name=TOOLCHAIN::${version}"
echo "TOOLCHAIN=${version}" >> $GITHUB_OUTPUT
- name: Build and publish databend build base image
uses: docker/build-push-action@v3
Expand Down
16 changes: 8 additions & 8 deletions .github/workflows/databend-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Get latest tag
id: get-latest-tag
run: |
echo "::set-output name=tag::`gh release list -L 1 | cut -f 1`"
echo "tag=`gh release list -L 1 | cut -f 1`" >> $GITHUB_OUTPUT
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Bump version
Expand Down Expand Up @@ -94,7 +94,7 @@ jobs:
fetch-depth: 0
- name: Get target
id: target
run: echo ::set-output name=target::${{ matrix.arch }}-apple-darwin
run: echo 'target=${{ matrix.arch }}-apple-darwin' >> $GITHUB_OUTPUT
- name: Rust setup
run: |
bash ./scripts/setup/dev_setup.sh -yb
Expand Down Expand Up @@ -161,10 +161,10 @@ jobs:
fetch-depth: 0
- name: Get the version
id: get_version
run: echo ::set-output name=VERSION::${GITHUB_REF/refs\/tags\//}
run: echo "VERSION=${GITHUB_REF/refs\/tags\//}" >> $GITHUB_OUTPUT
- name: Get target
id: target
run: echo ::set-output name=target::${{ matrix.arch }}-unknown-linux-${{ matrix.platform }}
run: echo 'target=${{ matrix.arch }}-unknown-linux-${{ matrix.platform }}' >> $GITHUB_OUTPUT
- name: Setup Build Tool
uses: ./.github/actions/setup_build_tool
with:
Expand Down Expand Up @@ -224,7 +224,7 @@ jobs:
uses: actions/checkout@v3
- name: Get the version
id: get_version
run: echo ::set-output name=VERSION::${GITHUB_REF/refs\/tags\//}
run: echo "VERSION=${GITHUB_REF/refs\/tags\//}" >> $GITHUB_OUTPUT
- name: Upload to github release
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -243,7 +243,7 @@ jobs:

- name: Get the version
id: get_version
run: echo ::set-output name=VERSION::${GITHUB_REF/refs\/tags\//}
run: echo "VERSION=${GITHUB_REF/refs\/tags\//}" >> $GITHUB_OUTPUT

- name: Set up QEMU
uses: docker/setup-qemu-action@v1
Expand Down Expand Up @@ -298,7 +298,7 @@ jobs:

- name: Get the version
id: get_version
run: echo ::set-output name=VERSION::${GITHUB_REF/refs\/tags\//}
run: echo "VERSION=${GITHUB_REF/refs\/tags\//}" >> $GITHUB_OUTPUT

- name: Set up QEMU
uses: docker/setup-qemu-action@v1
Expand Down Expand Up @@ -342,7 +342,7 @@ jobs:
_tags="${_tags},${{ steps.login.outputs.ecr_repo }}:latest"
_tags="${_tags},${{ steps.login.outputs.acr_repo }}:latest"
fi
echo ::set-output name=IMAGE_TAGS::${_tags}
echo "IMAGE_TAGS=${_tags}" >> $GITHUB_OUTPUT
- name: push service image
uses: docker/build-push-action@v3
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/binaries/meta/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,12 @@ async fn main(_global_tracker: Arc<RuntimeTracker>) -> common_exception::Result<
}

// Join a raft cluster only after all service started.
meta_node
let join_res = meta_node
.join_cluster(&conf.raft_config, conf.grpc_api_address.clone())
.await?;

info!("join result: {:?}", join_res);

// Print information to users.
println!("Databend Metasrv");
println!();
Expand Down
4 changes: 1 addition & 3 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1864,9 +1864,7 @@ impl<KV: KVApi> SchemaApi for KV {
txn_cond_seq(&lock_key, Eq, lock_key_seq),
];
let mut if_then = vec![
// every copied files changed, change tbid seq to make all table child consistent.
txn_op_put(&tbid, serialize_struct(&tb_meta.unwrap())?), /* (tenant, db_id, tb_id) -> tb_meta */
txn_op_put(&lock_key, serialize_struct(&lock)?), // copied file lock key
txn_op_put(&lock_key, serialize_struct(&lock)?), // copied file lock key
];
for (file, file_info) in req.file_info.iter() {
let key = TableCopiedFileNameIdent {
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async-trait = "0.1.57"
backon = "0.2.0"
clap = { workspace = true }
futures = "0.3.24"
itertools = "0.10.5"
metrics = "0.20.1"
once_cell = "1.15.0"
poem = { version = "1", features = ["rustls"] }
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/meta_service/meta_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl RaftService for RaftServiceImpl {
return Ok(tonic::Response::new(raft_reply));
}

#[tracing::instrument(level = "debug", skip(self))]
#[tracing::instrument(level = "debug", skip_all)]
async fn forward(
&self,
request: tonic::Request<RaftRequest>,
Expand Down
78 changes: 61 additions & 17 deletions src/meta/service/src/meta_service/raftmeta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use common_meta_raft_store::state_machine::StateMachine;
use common_meta_sled_store::openraft;
use common_meta_sled_store::openraft::error::AddLearnerError;
use common_meta_sled_store::openraft::DefensiveCheck;
use common_meta_sled_store::openraft::RaftStorage;
use common_meta_sled_store::openraft::StorageError;
use common_meta_sled_store::openraft::StoreExt;
use common_meta_sled_store::SledKeySpace;
use common_meta_stoerr::MetaStorageError;
Expand All @@ -59,6 +61,7 @@ use common_meta_types::MetaOperationError;
use common_meta_types::MetaStartupError;
use common_meta_types::Node;
use common_meta_types::NodeId;
use itertools::Itertools;
use openraft::Config;
use openraft::LogId;
use openraft::Raft;
Expand Down Expand Up @@ -508,6 +511,7 @@ impl MetaNode {
return Ok(false);
};

let mut errors = vec![];
let addrs = &conf.leave_via;
info!("node-{} about to leave cluster via {:?}", leave_id, addrs);

Expand All @@ -518,10 +522,14 @@ impl MetaNode {
let conn_res = RaftServiceClient::connect(format!("http://{}", addr)).await;
let mut raft_client = match conn_res {
Ok(c) => c,
Err(err) => {
Err(e) => {
error!(
"fail connecting to {} while leaving cluster, err: {:?}",
addr, err
addr, e
);
errors.push(
AnyError::new(&e)
.add_context(|| format!("leave {} via: {}", leave_id, addr.clone())),
);
continue;
}
Expand All @@ -546,12 +554,18 @@ impl MetaNode {
}
Err(s) => {
error!("leaving cluster via {} fail: {:?}", addr, s);
errors.push(
AnyError::new(&s)
.add_context(|| format!("leave {} via: {}", leave_id, addr.clone())),
);
}
};
}
Err(MetaManagementError::Leave(AnyError::error(format!(
"fail to leave {} cluster via {:?}",
leave_id, addrs
"fail to leave {} cluster via {:?}, caused by errors: {}",
leave_id,
addrs,
errors.into_iter().map(|e| e.to_string()).join(", ")
))))
}

Expand All @@ -561,19 +575,25 @@ impl MetaNode {
&self,
conf: &RaftConfig,
grpc_api_addr: String,
) -> Result<(), MetaManagementError> {
) -> Result<Result<(), &'static str>, MetaManagementError> {
if conf.join.is_empty() {
info!("'--join' is empty, do not need joining cluster");
return Ok(());
return Ok(Err("Did not join: --join is empty"));
}

// Try to join a cluster only when this node is just created.
// Try to join a cluster only when this node has no log.
// Joining a node with log has risk messing up the data in this node and in the target cluster.
if self.is_opened() {
info!("meta node is already initialized, skip joining it to a cluster");
return Ok(());
let to_join = self
.can_join()
.await
.map_err(|e| MetaManagementError::Join(AnyError::new(&e)))?;

if !to_join {
info!("meta node has log, skip joining");
return Ok(Err("Did not join: node already has log"));
}

let mut errors = vec![];
let addrs = &conf.join;

// Joining cluster has to use advertise host instead of listen host.
Expand All @@ -596,6 +616,9 @@ impl MetaNode {
Ok(c) => c,
Err(e) => {
error!("connect to {} join cluster fail: {:?}", addr, e);
errors.push(
AnyError::new(&e).add_context(|| format!("join via: {}", addr.clone())),
);
continue;
}
};
Expand All @@ -616,24 +639,45 @@ impl MetaNode {
match join_res {
Ok(r) => {
let reply = r.into_inner();
if !reply.data.is_empty() {
info!("join cluster via {} success: {:?}", addr, reply.data);
return Ok(());
} else {
error!("join cluster via {} fail: {:?}", addr, reply.error);

let res: Result<ForwardResponse, MetaAPIError> = reply.into();
match res {
Ok(v) => {
info!("join cluster via {} success: {:?}", addr, v);
return Ok(Ok(()));
}
Err(e) => {
error!("join cluster via {} fail: {}", addr, e.to_string());
errors.push(
AnyError::new(&e)
.add_context(|| format!("join via: {}", addr.clone())),
);
}
}
}
Err(s) => {
error!("join cluster via {} fail: {:?}", addr, s);
errors.push(
AnyError::new(&s).add_context(|| format!("join via: {}", addr.clone())),
);
}
};
}
Err(MetaManagementError::Join(AnyError::error(format!(
"fail to join {} cluster via {:?}",
self.sto.id, addrs
"fail to join {} cluster via {:?}, caused by errors: {}",
self.sto.id,
addrs,
errors.into_iter().map(|e| e.to_string()).join(", ")
))))
}

/// Check meta-node state to see if it's appropriate to join to a cluster.
async fn can_join(&self) -> Result<bool, StorageError> {
let l = self.sto.get_log_state().await?;
info!("check can_join: log_state: {:?}", l);
Ok(l.last_log_id.is_none())
}

async fn do_start(conf: &MetaConfig) -> Result<Arc<MetaNode>, MetaStartupError> {
let raft_conf = &conf.raft_config;

Expand Down
Loading

0 comments on commit a384a3b

Please sign in to comment.