Skip to content

Commit

Permalink
Merge pull request #5895 from sandflee/run_hive
Browse files Browse the repository at this point in the history
feat(hive) add support to query simple hive table
  • Loading branch information
mergify[bot] authored Jun 20, 2022
2 parents 55c3e4f + 713daf2 commit be7f251
Show file tree
Hide file tree
Showing 16 changed files with 655 additions and 39 deletions.
24 changes: 17 additions & 7 deletions .github/actions/test_stateful_hive_standalone/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ runs:
run: |
chmod +x ./target/debug/databend-*
# hive cluster setup
- name: Hive Cluster Setup
shell: bash
run: |
docker-compose -f "./docker/it-hive/hive-docker-compose.yml" up -d
- name: Checkout java env
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: '11'

# hive cluster setup
- name: Hive Cluster Setup
shell: bash
run: |
Expand All @@ -51,15 +52,24 @@ runs:
docker-compose -f "./docker/it-hive/hive-docker-compose.yml" exec -T hive-server bash -c "timeout 50 bash -c 'until nc -z localhost 10000; do sleep 1; echo "waiting..."; done'"
# hive test data setup, to be refined
- name: Hive Testing Data
# hasn't find a way to read hdfs data from docker isolated network yet, use local fs not hdfs to do the ci
# hive data is loaded to hdfs://namenode:8020/user/hive/warehouse/t_1/, we copy a mirror data to local fs .databend/stateless_test_data/user/hive/warehouse/t_1/
# databend actually read data from local fs
- name: Hive Create Table&Load Data
shell: bash
run: |
docker-compose -f "./docker/it-hive/hive-docker-compose.yml" exec -T hive-server bash -c "/opt/hive/bin/beeline -u jdbc:hive2://127.0.0.1:10000 -e 'CREATE TABLE if not exists pokes (foo INT);'"
docker-compose -f "./docker/it-hive/hive-docker-compose.yml" exec -T hive-server bash -c "/opt/hive/bin/beeline -u jdbc:hive2://127.0.0.1:10000 -e 'CREATE TABLE t_1 (lo_orderkey integer, lo_orderpriority varchar(15)) stored as parquet;'"
docker-compose -f "./docker/it-hive/hive-docker-compose.yml" exec -T hive-server bash -c "/opt/hive/bin/beeline -u jdbc:hive2://127.0.0.1:10000 -e 'load data local inpath \"/databend-data/t_1.parquet\" OVERWRITE into table t_1;'"
mkdir -p .databend/stateless_test_data/user/hive/warehouse/t_1/
cp tests/data/hive/t_1.parquet .databend/stateless_test_data/user/hive/warehouse/t_1/
- name: Run Stateful Tests with Standalone mode
shell: bash
run: |
bash ./scripts/ci/ci-run-stateful-hive-tests-standalone-embed-meta.sh
env:
LD_LIBRARY_PATH: ${{ env.JAVA_HOME }}/lib/server:${{ env.LD_LIBRARY_PATH }}


- name: Stop containers
if: always()
Expand Down
3 changes: 3 additions & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ build_exceptions! {

// Async insert error codes
AsyncInsertTimeoutError(1105),

TableInfoError(1106),
ReadTableDataError(1107),
}

// Metasvr errors [2001, 3000].
Expand Down
2 changes: 2 additions & 0 deletions docker/it-hive/hive-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ services:
environment:
HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore"
SERVICE_PRECONDITION: "hive-metastore:9083"
volumes:
- ../../tests/data/hive/:/databend-data/
ports:
- "10000:10000"
hive-metastore:
Expand Down
2 changes: 1 addition & 1 deletion query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ simd = ["common-arrow/simd"]
tokio-console = ["common-tracing/console", "common-base/tracing"]
memory-profiling = ["common-base/memory-profiling", "tempfile"]
storage-hdfs = ["opendal/services-hdfs", "common-io/storage-hdfs"]
hive = ["common-hive-meta-store", "thrift"]
hive = ["common-hive-meta-store", "thrift", "storage-hdfs"]

[dependencies]
# Workspace dependencies
Expand Down
54 changes: 47 additions & 7 deletions query/src/catalogs/hive/converters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
use std::sync::Arc;

use common_datavalues::chrono::Utc;
use common_datavalues::type_primitive::Int16Type;
use common_datavalues::type_primitive::Int32Type;
use common_datavalues::type_primitive::Int64Type;
use common_datavalues::type_primitive::Int8Type;
use common_datavalues::type_string::StringType;
use common_datavalues::DataField;
use common_datavalues::DataSchema;
use common_datavalues::DataTypeImpl;
Expand All @@ -33,6 +37,7 @@ use common_meta_app::schema::TableMeta;
use super::hive_database::HiveDatabase;
use super::hive_database::HIVE_DATABASE_ENGIE;
use super::hive_table::HIVE_TABLE_ENGIE;
use super::hive_table_options::HiveTableOptions;

///! Skeleton of mappers
impl From<hms::Database> for HiveDatabase {
Expand All @@ -59,9 +64,34 @@ pub fn try_into_table_info(
fields: Vec<hms::FieldSchema>,
) -> Result<TableInfo> {
let schema = Arc::new(try_into_schema(fields)?);
let partition_keys = if let Some(partitions) = &hms_table.partition_keys {
let r = partitions
.iter()
.filter_map(|field| field.name.clone())
.collect();
Some(r)
} else {
None
};

let location = if let Some(storage) = &hms_table.sd {
storage
.location
.as_ref()
.map(|location| location.to_string())
} else {
None
};

let table_options = HiveTableOptions {
partition_keys,
location,
};

let meta = TableMeta {
schema,
engine: HIVE_TABLE_ENGIE.to_owned(),
engine_options: table_options.into(),
created_on: Utc::now(),
..Default::default()
};
Expand Down Expand Up @@ -91,14 +121,24 @@ fn try_into_schema(hive_fields: Vec<hms::FieldSchema>) -> Result<DataSchema> {
}

fn try_from_filed_type_name(type_name: impl AsRef<str>) -> Result<DataTypeImpl> {
let name = type_name.as_ref();
let name = type_name.as_ref().to_uppercase();
// TODO more mappings goes here
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types
match name.to_uppercase().as_str() {
"INT" => Ok(DataTypeImpl::Int32(Int32Type::default())),
_ => Err(ErrorCode::IllegalDataType(format!(
"unknown hive data type [{}]",
name
))),

// Hive string data type could be varchar(n), where n is the maximum number of characters
if name.starts_with("VARCHAR") {
Ok(DataTypeImpl::String(StringType::default()))
} else {
match name.as_str() {
"TINYINT" => Ok(DataTypeImpl::Int8(Int8Type::default())),
"SMALLINT" => Ok(DataTypeImpl::Int16(Int16Type::default())),
"INT" => Ok(DataTypeImpl::Int32(Int32Type::default())),
"BIGINT" => Ok(DataTypeImpl::Int64(Int64Type::default())),
"BINARY" | "STRING" => Ok(DataTypeImpl::String(StringType::default())),
_ => Err(ErrorCode::IllegalDataType(format!(
"unknown hive data type [{}]",
name
))),
}
}
}
4 changes: 2 additions & 2 deletions query/src/catalogs/hive/hive_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl Catalog for HiveCatalog {
}

fn get_table_by_info(&self, table_info: &TableInfo) -> Result<Arc<dyn Table>> {
let res: Arc<dyn Table> = Arc::new(HiveTable::create(table_info.clone()));
let res: Arc<dyn Table> = Arc::new(HiveTable::try_create(table_info.clone())?);
Ok(res)
}

Expand Down Expand Up @@ -155,7 +155,7 @@ impl Catalog for HiveCatalog {
.get_schema(db_name.to_owned(), table_name.to_owned())
.map_err(from_thrift_error)?;
let table_info: TableInfo = super::converters::try_into_table_info(table_meta, fields)?;
let res: Arc<dyn Table> = Arc::new(HiveTable::create(table_info));
let res: Arc<dyn Table> = Arc::new(HiveTable::try_create(table_info)?);
Ok(res)
}

Expand Down
55 changes: 55 additions & 0 deletions query/src/catalogs/hive/hive_partition.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_planners::PartInfo;
use common_planners::PartInfoPtr;

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)]
pub struct HivePartInfo {
pub location: String,
}

#[typetag::serde(name = "hive")]
impl PartInfo for HivePartInfo {
fn as_any(&self) -> &dyn Any {
self
}

fn equals(&self, info: &Box<dyn PartInfo>) -> bool {
match info.as_any().downcast_ref::<HivePartInfo>() {
None => false,
Some(other) => self == other,
}
}
}

impl HivePartInfo {
pub fn create(location: String) -> Arc<Box<dyn PartInfo>> {
Arc::new(Box::new(HivePartInfo { location }))
}

pub fn from_part(info: &PartInfoPtr) -> Result<&HivePartInfo> {
match info.as_any().downcast_ref::<HivePartInfo>() {
Some(part_ref) => Ok(part_ref),
None => Err(ErrorCode::LogicalError(
"Cannot downcast from PartInfo to HivePartInfo.",
)),
}
}
}
Loading

0 comments on commit be7f251

Please sign in to comment.