Skip to content

Commit

Permalink
Merge pull request #5300 from datafuselabs/revert-5294-meta-runtime
Browse files Browse the repository at this point in the history
Revert "chore(query): introduce meta Runtime"
  • Loading branch information
BohuTANG authored May 11, 2022
2 parents 883e768 + 617afb7 commit 641dd5e
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 63 deletions.
13 changes: 7 additions & 6 deletions .github/workflows/developing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ jobs:
- uses: actions/checkout@v2
- uses: ./.github/actions/test_stateless_standalone_linux

test_stateless_standalone_macos:
runs-on: macos-11
needs: build_macos
steps:
- uses: actions/checkout@v2
- uses: ./.github/actions/test_stateless_standalone_macos
# https://github.com/datafuselabs/databend/issues/5287
#test_stateless_standalone_macos:
# runs-on: macos-11
# needs: build_macos
# steps:
# - uses: actions/checkout@v2
# - uses: ./.github/actions/test_stateless_standalone_macos

test_stateless_cluster_linux:
runs-on: ubuntu-latest
Expand Down
16 changes: 8 additions & 8 deletions .github/workflows/production.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ jobs:
with:
profile: release

test_stateless_standalone_macos:
runs-on: macos-11
needs: build_macos
steps:
- uses: actions/checkout@v2
- uses: ./.github/actions/test_stateless_standalone_macos
with:
profile: release
# test_stateless_standalone_macos:
# runs-on: macos-11
# needs: build_macos
# steps:
# - uses: actions/checkout@v2
# - uses: ./.github/actions/test_stateless_standalone_macos
# with:
# profile: release

test_stateless_cluster_linux:
runs-on: ubuntu-latest
Expand Down
28 changes: 0 additions & 28 deletions query/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,6 @@ impl QueryContext {
self.shared.session.session_mgr.get_storage_runtime()
}

pub fn get_meta_runtime(&self) -> Arc<Runtime> {
self.shared.session.session_mgr.get_meta_runtime()
}

pub async fn reload_config(&self) -> Result<()> {
self.shared.reload_config().await
}
Expand Down Expand Up @@ -428,30 +424,6 @@ impl QueryContext {
})?;
Ok(FunctionContext { tz })
}

pub fn block_on_meta<F>(&self, future: F) -> Result<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let rt = self.get_meta_runtime();
let (tx, rx): (
std::sync::mpsc::Sender<F::Output>,
std::sync::mpsc::Receiver<F::Output>,
) = std::sync::mpsc::channel();

rt.try_spawn(async move {
let res = future.await;
tx.send(res).unwrap();
})?;
match rx.recv() {
Ok(v) => Ok(v),
Err(cause) => Err(ErrorCode::LogicalError(format!(
"Logical error, receive error. {:?}",
cause
))),
}
}
}

impl TrySpawn for QueryContext {
Expand Down
14 changes: 0 additions & 14 deletions query/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ pub struct SessionManager {
pub status: Arc<RwLock<SessionManagerStatus>>,
storage_operator: RwLock<Operator>,
storage_runtime: Arc<Runtime>,
meta_runtime: Arc<Runtime>,
_guards: Vec<WorkerGuard>,
}

Expand All @@ -91,14 +90,6 @@ impl SessionManager {
Runtime::with_worker_threads(storage_num_cpus, Some("IO-worker".to_owned()))?
};

let meta_runtime = {
let meta_num_cpus = std::cmp::max(1, num_cpus::get() / 4);
Arc::new(Runtime::with_worker_threads(
meta_num_cpus,
Some("META-worker".to_owned()),
)?)
};

// NOTE: Magic happens here. We will add a layer upon original storage operator
// so that all underlying storage operations will send to storage runtime.
let storage_operator = Self::init_storage_operator(&conf)
Expand Down Expand Up @@ -134,7 +125,6 @@ impl SessionManager {
status,
storage_operator: RwLock::new(storage_operator),
storage_runtime: Arc::new(storage_runtime),
meta_runtime,
_guards,
}))
}
Expand Down Expand Up @@ -176,10 +166,6 @@ impl SessionManager {
self.storage_runtime.clone()
}

pub fn get_meta_runtime(&self) -> Arc<Runtime> {
self.meta_runtime.clone()
}

pub async fn create_session(self: &Arc<Self>, typ: SessionType) -> Result<SessionRef> {
// TODO: maybe deadlock
let config = self.get_conf();
Expand Down
10 changes: 3 additions & 7 deletions query/src/sql/statements/query/query_normalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,9 @@ pub struct QueryNormalizer {

/// Replace alias in query and collect aggregate functions
impl QueryNormalizer {
fn try_create(ctx: Arc<QueryContext>) -> Result<QueryNormalizer> {
async fn try_create(ctx: Arc<QueryContext>) -> Result<QueryNormalizer> {
let tenant = ctx.get_tenant();
let manager = ctx.get_user_manager();
let udfs = ctx.block_on_meta(async move {
let res = manager.get_udfs(&tenant);
res.await
})??;
let udfs = ctx.get_user_manager().get_udfs(&tenant).await?;
Ok(QueryNormalizer {
expression_analyzer: ExpressionAnalyzer::create_with_udfs_support(ctx, udfs),
aliases_map: HashMap::new(),
Expand All @@ -64,7 +60,7 @@ impl QueryNormalizer {
}

pub async fn normalize(ctx: Arc<QueryContext>, v: &DfQueryStatement) -> Result<QueryASTIR> {
let query_normalizer = QueryNormalizer::try_create(ctx)?;
let query_normalizer = QueryNormalizer::try_create(ctx).await?;
query_normalizer.transform(v).await
}

Expand Down
1 change: 1 addition & 0 deletions query/src/users/user_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl UserApiProvider {
let client = MetaClientProvider::new(MetaGrpcClientConf::from(&conf.meta))
.try_get_kv_client()
.await?;

Ok(Arc::new(UserApiProvider { client }))
}

Expand Down

0 comments on commit 641dd5e

Please sign in to comment.