Skip to content

Commit

Permalink
Merge pull request #6253 from zhang2014/fix/ISSUE-6237
Browse files Browse the repository at this point in the history
fix(processor): show correctly progress in cluster mode
  • Loading branch information
BohuTANG authored Jul 4, 2022
2 parents 295480b + da297fa commit 3380242
Show file tree
Hide file tree
Showing 27 changed files with 1,099 additions and 573 deletions.
16 changes: 16 additions & 0 deletions common/base/src/base/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,25 @@ impl Progress {
.fetch_add(progress_values.bytes, Ordering::Relaxed);
}

pub fn fetch(&self) -> ProgressValues {
let rows = self.rows.fetch_min(0, Ordering::SeqCst);
let bytes = self.bytes.fetch_min(0, Ordering::SeqCst);

ProgressValues { rows, bytes }
}

pub fn get_values(&self) -> ProgressValues {
let rows = self.rows.load(Ordering::Relaxed) as usize;
let bytes = self.bytes.load(Ordering::Relaxed) as usize;
ProgressValues { rows, bytes }
}
}

impl ProgressValues {
pub fn diff(&self, other: &Self) -> ProgressValues {
ProgressValues {
rows: self.rows - other.rows,
bytes: self.bytes - other.bytes,
}
}
}
45 changes: 45 additions & 0 deletions common/exception/src/exception_flight.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_arrow::arrow_format::flight::data::FlightData;

use crate::ErrorCode;
use crate::Result;

impl From<ErrorCode> for FlightData {
fn from(error: ErrorCode) -> Self {
// TODO: has stack trace
FlightData {
app_metadata: vec![0x02],
data_body: error.message().into_bytes(),
data_header: error.code().to_be_bytes().to_vec(),
flight_descriptor: None,
}
}
}

impl TryFrom<FlightData> for ErrorCode {
type Error = ErrorCode;

fn try_from(flight_data: FlightData) -> Result<Self> {
match flight_data.data_header.try_into() {
Err(_) => Err(ErrorCode::BadBytes("Cannot parse inf usize.")),
Ok(slice) => {
let code = u16::from_be_bytes(slice);
let message = String::from_utf8(flight_data.data_body)?;
Ok(ErrorCode::create(code, message, None, None))
}
}
}
}
1 change: 1 addition & 0 deletions common/exception/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

pub mod exception;
mod exception_code;
mod exception_flight;
mod exception_into;

pub use exception::ErrorCode;
Expand Down
8 changes: 4 additions & 4 deletions query/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ pub use rpc::DataExchange;
pub use rpc::DataExchangeManager;
pub use rpc::DatabendQueryFlightDispatcher;
pub use rpc::DatabendQueryFlightService;
pub use rpc::ExecutePacket;
pub use rpc::ExecutorPacket;
pub use rpc::ExecutePartialQueryPacket;
pub use rpc::FlightAction;
pub use rpc::FlightClient;
pub use rpc::FlightTicket;
pub use rpc::FragmentPacket;
pub use rpc::FragmentPlanPacket;
pub use rpc::InitNodesChannelPacket;
pub use rpc::MergeExchange;
pub use rpc::PrepareChannel;
pub use rpc::QueryFragmentsPlanPacket;
pub use rpc::ShuffleAction;
pub use rpc::ShuffleDataExchange;
pub use rpc::StreamTicket;
Expand Down
2 changes: 1 addition & 1 deletion query/src/api/rpc/exchange/exchange_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use async_channel::Sender;
use common_arrow::arrow_format::flight::data::FlightData;
use common_exception::Result;

use crate::api::rpc::packet::DataPacket;
use crate::api::rpc::packets::DataPacket;

// Different from async_channel::Sender
// It is allowed to close the channel when has one reference.
Expand Down
20 changes: 13 additions & 7 deletions query/src/api/rpc/exchange/exchange_channel_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use common_exception::Result;
use futures::future::Either;

use crate::api::rpc::exchange::exchange_channel::FragmentReceiver;
use crate::api::rpc::packet::DataPacket;
use crate::api::rpc::packets::DataPacket;
use crate::api::rpc::packets::FragmentData;
use crate::sessions::QueryContext;

pub struct ExchangeReceiver {
Expand Down Expand Up @@ -99,6 +100,13 @@ impl ExchangeReceiver {
}
}
Either::Right((_notified, _recv)) => {
while let Ok(recv_data) = rx.try_recv() {
let txs = &mut fragments_receiver;
if let Err(_cause) = this.on_packet(recv_data, txs).await {
break;
}
}

break;
}
};
Expand Down Expand Up @@ -201,7 +209,7 @@ impl ExchangeReceiver {
fragments_receiver: &mut [Option<FragmentReceiver>],
) -> Result<()> {
match packet? {
DataPacket::Data(fragment_id, flight_data) => {
DataPacket::FragmentData(FragmentData::Data(fragment_id, flight_data)) => {
if let Some(tx) = &fragments_receiver[fragment_id] {
if let Err(_cause) = tx.send(Ok(flight_data)).await {
common_tracing::tracing::warn!(
Expand All @@ -217,7 +225,7 @@ impl ExchangeReceiver {

Ok(())
}
DataPacket::EndFragment(fragment_id) => {
DataPacket::FragmentData(FragmentData::End(fragment_id)) => {
if fragment_id < fragments_receiver.len() {
if let Some(tx) = fragments_receiver[fragment_id].take() {
drop(tx);
Expand All @@ -228,10 +236,8 @@ impl ExchangeReceiver {
Ok(())
}
DataPacket::ErrorCode(error_code) => Err(error_code),
DataPacket::Progress(ref values) => {
self.ctx.get_scan_progress().incr(values);
Ok(())
}
DataPacket::Progress(progress_info) => progress_info.inc(&self.ctx),
DataPacket::PrecommitBlock(precommit_block) => precommit_block.precommit(&self.ctx),
}
}
}
Loading

0 comments on commit 3380242

Please sign in to comment.