Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(query): remove unused parquet2 codes #16282

Merged
merged 6 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions src/common/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ mod column_node;
pub use column_node::ColumnNode;
pub use column_node::ColumnNodes;

mod parquet2;
pub use parquet2::infer_schema_with_extension;
pub use parquet2::read_parquet_metas_in_parallel;

pub mod parquet_rs;
pub use parquet_rs::read_metadata_async;
pub use parquet_rs::read_parquet_schema_async_rs;
Expand Down
132 changes: 0 additions & 132 deletions src/common/storage/src/parquet2.rs

This file was deleted.

53 changes: 53 additions & 0 deletions src/common/storage/src/parquet_rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ pub async fn read_parquet_schema_async_rs(
infer_schema_with_extension(meta.file_metadata())
}

pub fn read_parquet_schema_sync_rs(
operator: &Operator,
path: &str,
file_size: Option<u64>,
) -> Result<ArrowSchema> {
let meta = read_metadata_sync(path, operator, file_size)?;
infer_schema_with_extension(meta.file_metadata())
}

pub fn infer_schema_with_extension(meta: &FileMetaData) -> Result<ArrowSchema> {
let mut arrow_schema = parquet_to_arrow_schema(meta.schema_descr(), meta.key_value_metadata())?;
// Convert data types to extension types using meta information.
Expand Down Expand Up @@ -131,6 +140,50 @@ pub async fn read_metadata_async(
}
}

pub fn read_metadata_sync(
sundy-li marked this conversation as resolved.
Show resolved Hide resolved
path: &str,
operator: &Operator,
file_size: Option<u64>,
) -> Result<ParquetMetaData> {
let blocking = operator.blocking();
let file_size = match file_size {
None => blocking.stat(path)?.content_length(),
Some(n) => n,
};

check_footer_size(file_size)?;

// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
let default_end_len = DEFAULT_FOOTER_READ_SIZE.min(file_size);
let buffer = blocking
.read_with(path)
.range((file_size - default_end_len)..file_size)
.call()?
.to_vec();
let buffer_len = buffer.len();
let metadata_len = decode_footer(
&buffer[(buffer_len - FOOTER_SIZE as usize)..]
.try_into()
.unwrap(),
)? as u64;
check_meta_size(file_size, metadata_len)?;

let footer_len = FOOTER_SIZE + metadata_len;
if (footer_len as usize) <= buffer_len {
// The whole metadata is in the bytes we already read
let offset = buffer_len - footer_len as usize;
Ok(decode_metadata(&buffer[offset..])?)
} else {
let mut metadata = blocking
.read_with(path)
.range((file_size - footer_len)..(file_size - buffer_len as u64))
.call()?
.to_vec();
metadata.extend(buffer);
Ok(decode_metadata(&metadata)?)
}
}

/// check file is large enough to hold footer
fn check_footer_size(file_size: u64) -> Result<()> {
if file_size < FOOTER_SIZE {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

use std::sync::Arc;

use arrow_schema::Schema;
use databend_common_expression::TableSchema;
use databend_common_meta_app::schema::TableInfo;

#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug)]
pub struct ResultScanTableInfo {
pub table_info: TableInfo,
pub query_id: String,
pub block_raw_data: Vec<u8>,
pub location: String,
pub schema: Schema,
pub file_size: u64,
}

impl ResultScanTableInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub async fn do_refresh_virtual_column(
let all_generated = if let Some(schema) = schema {
virtual_exprs
.iter()
.all(|virtual_name| schema.fields.iter().any(|f| &f.name == virtual_name))
.all(|virtual_name| schema.fields.iter().any(|f| f.name() == virtual_name))
} else {
false
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> {
assert!(schema.is_some());
let schema = schema.unwrap();
assert_eq!(schema.fields.len(), 2);
assert_eq!(schema.fields[0].name, "v['a']");
assert_eq!(schema.fields[1].name, "v[0]");
assert_eq!(schema.fields[0].name(), "v['a']");
assert_eq!(schema.fields[1].name(), "v[0]");
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ pub struct QueryContext {
}

impl QueryContext {
// Each table will create a new QueryContext
// So partition_queue could be independent in each table context
// see `builder_join.rs` for more details
pub fn create_from(other: Arc<QueryContext>) -> Arc<QueryContext> {
QueryContext::create_from_shared(other.shared.clone())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ use databend_common_expression::types::NumberScalar;
use databend_common_expression::FunctionKind;
use databend_common_expression::Scalar;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_storage::DataOperator;
use databend_common_storages_result_cache::ResultCacheMetaManager;
use databend_common_storages_result_cache::ResultCacheReader;
use databend_common_storages_result_cache::ResultScan;
use databend_common_users::UserApiProvider;

Expand Down Expand Up @@ -187,22 +185,16 @@ impl Binder {
databend_common_base::runtime::block_on(async move {
let result_cache_mgr = ResultCacheMetaManager::create(kv_store, 0);
let meta_key = meta_key.unwrap();
let (table_schema, block_raw_data) = match result_cache_mgr
.get(meta_key.clone())
.await?
{
Some(value) => {
let op = DataOperator::instance().operator();
ResultCacheReader::read_table_schema_and_data(op, &value.location).await?
}
let location = match result_cache_mgr.get(meta_key.clone()).await? {
Some(value) => value.location,
None => {
return Err(ErrorCode::EmptyData(format!(
"`RESULT_SCAN` failed: Unable to fetch cached data for query ID '{}'. The data may have exceeded its TTL or been cleaned up. Cache key: '{}'",
query_id, meta_key
)).set_span(*span));
}
};
let table = ResultScan::try_create(table_schema, query_id, block_raw_data)?;
let table = ResultScan::try_create(query_id, location).await?;

let table_alias_name = if let Some(table_alias) = alias {
Some(normalize_identifier(&table_alias.name, &self.name_resolution_ctx).name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl AggIndexReader {
.iter()
.all(|c| c.pages.iter().map(|p| p.num_values).sum::<u64>() == num_rows)
);

let columns_meta = metadata
.into_iter()
.enumerate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_arrow::arrow::io::parquet::read as pread;
use databend_common_catalog::plan::PartInfoPtr;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_storage::parquet_rs::read_metadata_sync;
use databend_common_storage::read_metadata_async;
use log::debug;

use super::AggIndexReader;
Expand All @@ -32,20 +33,12 @@ impl AggIndexReader {
) -> Option<(PartInfoPtr, MergeIOReadResult)> {
let op = self.reader.operator.blocking();
match op.stat(loc) {
Ok(meta) => {
let mut reader = op
.reader(loc)
.ok()?
.into_std_read(0..meta.content_length())
.ok()?;
let metadata = pread::read_metadata(&mut reader)
.inspect_err(|e| {
debug!("Read aggregating index `{loc}`'s metadata failed: {e}")
})
.ok()?;
debug_assert_eq!(metadata.row_groups.len(), 1);
let row_group = &metadata.row_groups[0];
Ok(_meta) => {
let metadata = read_metadata_sync(loc, &self.reader.operator, None).ok()?;
debug_assert_eq!(metadata.num_row_groups(), 1);
let row_group = &metadata.row_groups()[0];
let columns_meta = build_columns_meta(row_group);

let part = FuseBlockPartInfo::create(
loc.to_string(),
row_group.num_rows() as u64,
Expand Down Expand Up @@ -80,16 +73,12 @@ impl AggIndexReader {
loc: &str,
) -> Option<(PartInfoPtr, MergeIOReadResult)> {
match self.reader.operator.stat(loc).await {
Ok(meta) => {
let reader = self.reader.operator.reader(loc).await.ok()?;
let metadata = pread::read_metadata_async(reader, meta.content_length())
Ok(_meta) => {
let metadata = read_metadata_async(loc, &self.reader.operator, None)
.await
.inspect_err(|e| {
debug!("Read aggregating index `{loc}`'s metadata failed: {e}")
})
.ok()?;
debug_assert_eq!(metadata.row_groups.len(), 1);
let row_group = &metadata.row_groups[0];
debug_assert_eq!(metadata.num_row_groups(), 1);
let row_group = &metadata.row_groups()[0];
let columns_meta = build_columns_meta(row_group);
let res = self
.reader
Expand Down
Loading
Loading