Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Rename to data operator to make space for cache #8314

Merged
merged 5 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ pub struct StorageMokaConfig {}

static SHARE_TABLE_CONFIG: OnceCell<Singleton<ShareTableConfig>> = OnceCell::new();

// TODO: This config should be moved out of common-storage crate.
#[derive(Clone)]
pub struct ShareTableConfig {
pub share_endpoint_address: Option<String>,
Expand Down
16 changes: 14 additions & 2 deletions src/common/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,19 @@

//! `common_storage` will provide storage related types and functions.
//!
//! This crate will return `std::io::Result`.
//! Databend Query will have three kinds of storage operators, visit
//! [RFC: Cache](https://databend.rs/doc/contributing/rfcs/cache) for
//! more detailed information.
//!
//! - persist operator: All data will be persisted until users delete them.
//! - cache operator: Backends could have their GC or background auto eviction logic, which means cache services is non-persist.
//! - temporary operator: Backend will be configured with TTL and timely delete old data.
//!
//! Users can use different operator based on their own needs, for example:
//!
//! - Users table data must be accessed via persist operator
//! - Table snapshots, segments cache must be stored accessed via cache operator.
//! - Intermediate data generated by query could be stored by temporary operator.

mod config;
pub use config::ShareTableConfig;
Expand All @@ -38,7 +50,7 @@ pub use config::STORAGE_S3_DEFAULT_ENDPOINT;
mod operator;
pub use operator::init_operator;
pub use operator::CacheOperator;
pub use operator::StorageOperator;
pub use operator::PersistOperator;

mod location;
pub use location::parse_uri_location;
Expand Down
54 changes: 42 additions & 12 deletions src/common/storage/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,40 +279,63 @@ fn init_moka_operator(_: &StorageMokaConfig) -> Result<Operator> {
Ok(Operator::new(builder.build()?))
}

/// PersistOperator is the operator to access persist services.
///
/// # Notes
///
/// All data accessed via this operator will be persisted.
#[derive(Clone, Debug)]
pub struct StorageOperator {
pub struct PersistOperator {
operator: Operator,
params: StorageParams,
}

impl Deref for StorageOperator {
impl Deref for PersistOperator {
type Target = Operator;

fn deref(&self) -> &Self::Target {
&self.operator
}
}

static STORAGE_OPERATOR: OnceCell<Singleton<StorageOperator>> = OnceCell::new();
static PERSIST_OPERATOR: OnceCell<Singleton<PersistOperator>> = OnceCell::new();

impl PersistOperator {
/// Create a new persist operator.
pub fn new(op: Operator, params: StorageParams) -> Self {
Self {
operator: op,
params,
}
}

/// Get the operator from PersistOperator
pub fn operator(&self) -> Operator {
self.operator.clone()
}

/// Get the params from PersistOperator
pub fn params(&self) -> &StorageParams {
&self.params
}

impl StorageOperator {
pub async fn init(
conf: &StorageConfig,
v: Singleton<StorageOperator>,
v: Singleton<PersistOperator>,
) -> common_exception::Result<()> {
v.init(Self::try_create(conf).await?)?;

STORAGE_OPERATOR.set(v).ok();
PERSIST_OPERATOR.set(v).ok();
Ok(())
}

pub async fn try_create(conf: &StorageConfig) -> common_exception::Result<StorageOperator> {
pub async fn try_create(conf: &StorageConfig) -> common_exception::Result<PersistOperator> {
Self::try_create_with_storage_params(&conf.params).await
}

pub async fn try_create_with_storage_params(
sp: &StorageParams,
) -> common_exception::Result<StorageOperator> {
) -> common_exception::Result<PersistOperator> {
let operator = init_operator(sp)?;

// OpenDAL will send a real request to underlying storage to check whether it works or not.
Expand All @@ -332,14 +355,14 @@ impl StorageOperator {
)));
}

Ok(StorageOperator {
Ok(PersistOperator {
operator,
params: sp.clone(),
})
}

pub fn instance() -> StorageOperator {
match STORAGE_OPERATOR.get() {
pub fn instance() -> PersistOperator {
match PERSIST_OPERATOR.get() {
None => panic!("StorageOperator is not init"),
Some(storage_operator) => storage_operator.get(),
}
Expand All @@ -350,7 +373,14 @@ impl StorageOperator {
}
}

/// The operator for cache.
/// CacheOperator is the operator to access cache services.
///
/// # Notes
///
/// As described in [RFC: Cache](https://databend.rs/doc/contributing/rfcs/cache):
///
/// All data stored in cache operator should be non-persist and could be GC or
/// background auto evict at any time.
#[derive(Clone, Debug)]
pub struct CacheOperator {
op: Operator,
Expand Down
1 change: 0 additions & 1 deletion src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,3 @@ common-storage = { path = "../../common/storage" }

async-trait = "0.1.57"
dyn-clone = "1.0.9"
opendal = { version = "0.19", features = ["layers-retry"] }
8 changes: 3 additions & 5 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use common_legacy_planners::Partitions;
use common_legacy_planners::ReadDataSourcePlan;
use common_meta_types::UserInfo;
use common_settings::Settings;
use common_storage::StorageParams;
use opendal::Operator;
use common_storage::PersistOperator;

use crate::catalog::Catalog;
use crate::cluster_info::Cluster;
Expand Down Expand Up @@ -91,9 +90,8 @@ pub trait TableContext: Send + Sync {
fn get_query_str(&self) -> String;
/// Get the kind of session running query.
fn get_query_kind(&self) -> String;
// Get the storage data accessor operator from the session manager.
fn get_storage_operator(&self) -> Result<Operator>;
fn get_storage_params(&self) -> StorageParams;
// Get the persist storage data accessor operator from the session manager.
fn get_persist_operator(&self) -> Result<PersistOperator>;
fn get_dal_context(&self) -> &DalContext;
fn push_precommit_block(&self, block: DataBlock);
fn consume_precommit_blocks(&self) -> Vec<DataBlock>;
Expand Down
14 changes: 7 additions & 7 deletions src/query/service/src/global_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use common_catalog::catalog::CatalogManager;
use common_config::Config;
use common_exception::Result;
use common_fuse_meta::caches::CacheManager;
use common_storage::PersistOperator;
use common_storage::ShareTableConfig;
use common_storage::StorageOperator;
use common_tracing::QueryLogger;
use common_users::RoleCacheManager;
use common_users::UserApiProvider;
Expand All @@ -38,7 +38,7 @@ pub struct GlobalServices {
global_runtime: UnsafeCell<Option<Arc<Runtime>>>,
query_logger: UnsafeCell<Option<Arc<QueryLogger>>>,
cluster_discovery: UnsafeCell<Option<Arc<ClusterDiscovery>>>,
storage_operator: UnsafeCell<Option<StorageOperator>>,
storage_operator: UnsafeCell<Option<PersistOperator>>,
cache_manager: UnsafeCell<Option<Arc<CacheManager>>>,
catalog_manager: UnsafeCell<Option<Arc<CatalogManager>>>,
http_query_manager: UnsafeCell<Option<Arc<HttpQueryManager>>>,
Expand Down Expand Up @@ -79,7 +79,7 @@ impl GlobalServices {
// Cluster discovery.
ClusterDiscovery::init(config.clone(), global_services.clone()).await?;

StorageOperator::init(&config.storage, global_services.clone()).await?;
PersistOperator::init(&config.storage, global_services.clone()).await?;

ShareTableConfig::init(
&config.query.share_endpoint_address,
Expand Down Expand Up @@ -155,8 +155,8 @@ impl SingletonImpl<Arc<ClusterDiscovery>> for GlobalServices {
}
}

impl SingletonImpl<StorageOperator> for GlobalServices {
fn get(&self) -> StorageOperator {
impl SingletonImpl<PersistOperator> for GlobalServices {
fn get(&self) -> PersistOperator {
unsafe {
match &*self.storage_operator.get() {
None => panic!("StorageOperator is not init"),
Expand All @@ -165,9 +165,9 @@ impl SingletonImpl<StorageOperator> for GlobalServices {
}
}

fn init(&self, value: StorageOperator) -> Result<()> {
fn init(&self, value: PersistOperator) -> Result<()> {
unsafe {
*(self.storage_operator.get() as *mut Option<StorageOperator>) = Some(value);
*(self.storage_operator.get() as *mut Option<PersistOperator>) = Some(value);
Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Interpreter for AlterShareTenantsInterpreter {

save_share_spec(
&self.ctx.get_tenant(),
self.ctx.get_storage_operator()?,
self.ctx.get_persist_operator()?.operator(),
resp.spec_vec,
)
.await?;
Expand All @@ -80,7 +80,7 @@ impl Interpreter for AlterShareTenantsInterpreter {

save_share_spec(
&self.ctx.get_tenant(),
self.ctx.get_storage_operator()?,
self.ctx.get_persist_operator()?.operator(),
resp.spec_vec,
)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Interpreter for CreateShareInterpreter {

save_share_spec(
&self.ctx.get_tenant(),
self.ctx.get_storage_operator()?,
self.ctx.get_persist_operator()?.operator(),
resp.spec_vec,
)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Interpreter for DropShareInterpreter {

save_share_spec(
&self.ctx.get_tenant(),
self.ctx.get_storage_operator()?,
self.ctx.get_persist_operator()?.operator(),
resp.spec_vec,
)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl Interpreter for GrantShareObjectInterpreter {

save_share_spec(
&self.ctx.get_tenant(),
self.ctx.get_storage_operator()?,
self.ctx.get_persist_operator()?.operator(),
resp.spec_vec,
)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl Interpreter for RevokeShareObjectInterpreter {

save_share_spec(
&self.ctx.get_tenant(),
self.ctx.get_storage_operator()?,
self.ctx.get_persist_operator()?.operator(),
resp.spec_vec,
)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Interpreter for CreateUserStageInterpreter {

if user_stage.stage_type == StageType::Internal {
let prefix = format!("stage/{}/", user_stage.stage_name);
let op = self.ctx.get_storage_operator()?;
let op = self.ctx.get_persist_operator()?.operator();
op.object(&prefix).create().await?
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl HttpQueryHandle {
schema: result_schema.clone(),
user: ctx.get_current_user()?.identity(),
};
let data_accessor = ctx.get_storage_operator()?;
let data_accessor = ctx.get_persist_operator()?.operator();

let sink = ResultTableSink::create(
input.clone(),
Expand Down
16 changes: 7 additions & 9 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ use common_legacy_planners::SourceInfo;
use common_legacy_planners::StageTableInfo;
use common_meta_app::schema::TableInfo;
use common_meta_types::UserInfo;
use common_storage::StorageParams;
use opendal::Operator;
use common_storage::PersistOperator;
use parking_lot::RwLock;
use tracing::debug;

Expand Down Expand Up @@ -310,14 +309,13 @@ impl TableContext for QueryContext {
}

// Get the storage data accessor operator from the session manager.
fn get_storage_operator(&self) -> Result<Operator> {
// deref from `StorageOperator` to `opendal::Operator` first.
let operator = (*self.shared.storage_operator).clone();
fn get_persist_operator(&self) -> Result<PersistOperator> {
let pop = self.shared.persist_operator.clone();

Ok(operator.layer(self.shared.dal_ctx.as_ref().clone()))
}
fn get_storage_params(&self) -> StorageParams {
self.shared.get_storage_params()
Ok(PersistOperator::new(
pop.operator().layer(self.shared.dal_ctx.as_ref().clone()),
pop.params().clone(),
))
}
fn get_dal_context(&self) -> &DalContext {
self.shared.dal_ctx.as_ref()
Expand Down
8 changes: 4 additions & 4 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::UserInfo;
use common_storage::StorageOperator;
use common_storage::PersistOperator;
use common_storage::StorageParams;
use parking_lot::Mutex;
use parking_lot::RwLock;
Expand Down Expand Up @@ -79,7 +79,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) auth_manager: Arc<AuthMgr>,
pub(in crate::sessions) affect: Arc<Mutex<Option<QueryAffect>>>,
pub(in crate::sessions) catalog_manager: Arc<CatalogManager>,
pub(in crate::sessions) storage_operator: StorageOperator,
pub(in crate::sessions) persist_operator: PersistOperator,
pub(in crate::sessions) executor: Arc<RwLock<Weak<PipelineExecutor>>>,
pub(in crate::sessions) precommit_blocks: Arc<RwLock<Vec<DataBlock>>>,
pub(in crate::sessions) created_time: SystemTime,
Expand All @@ -96,7 +96,7 @@ impl QueryContextShared {
cluster_cache,
config: config.clone(),
catalog_manager: CatalogManager::instance(),
storage_operator: StorageOperator::instance(),
persist_operator: PersistOperator::instance(),
init_query_id: Arc::new(RwLock::new(Uuid::new_v4().to_string())),
scan_progress: Arc::new(Progress::create()),
result_progress: Arc::new(Progress::create()),
Expand Down Expand Up @@ -166,7 +166,7 @@ impl QueryContextShared {
}

pub fn get_storage_params(&self) -> StorageParams {
self.storage_operator.get_storage_params()
self.persist_operator.get_storage_params()
}

pub fn get_tenant(&self) -> String {
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/sql/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use common_planner::plans::ShowCreateTablePlan;
use common_planner::plans::TruncateTablePlan;
use common_planner::plans::UndropTablePlan;
use common_storage::parse_uri_location;
use common_storage::StorageOperator;
use common_storage::PersistOperator;
use common_storage::UriLocation;
use tracing::debug;

Expand Down Expand Up @@ -378,7 +378,7 @@ impl<'a> Binder {
let (sp, _) = parse_uri_location(&uri)?;

// create a temporary op to check if params is correct
StorageOperator::try_create_with_storage_params(&sp).await?;
PersistOperator::try_create_with_storage_params(&sp).await?;

Some(sp)
}
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/storages/result/block_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ impl BlockBufferWriterWithResultTable {
.collect::<Vec<usize>>();
let projection = Projection::Columns(indices);

let reader = BlockReader::create(ctx.get_storage_operator()?, schema, projection)?;
let reader =
BlockReader::create(ctx.get_persist_operator()?.operator(), schema, projection)?;
Ok(Box::new(Self {
buffer,
reader,
Expand Down
Loading