Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replace ctor by linkme #14814

Merged
merged 4 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/expr/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
] }
ctor = "0.2"
downcast-rs = "1.2"
easy-ext = "1"
either = "1"
enum-as-inner = "0.6"
futures-async-stream = { workspace = true }
futures-util = "0.3"
itertools = "0.12"
linkme = { version = "0.3", features = ["used_linker"] }
moka = { version = "0.12", features = ["future"] }
num-traits = "0.2"
parse-display = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion src/expr/core/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

pub use async_trait::async_trait;
pub use ctor::ctor;
pub use futures_async_stream::try_stream;
pub use futures_util::stream::BoxStream;
pub use itertools::multizip;
pub use linkme;
2 changes: 1 addition & 1 deletion src/expr/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![allow(non_snake_case)] // for `ctor` generated code
#![feature(let_chains)]
#![feature(lint_reasons)]
#![feature(iterator_try_collect)]
#![feature(lazy_cell)]
#![feature(coroutines)]
#![feature(never_type)]
#![feature(error_generic_member_access)]
#![feature(used_with_arg)]

extern crate self as risingwave_expr;

Expand Down
31 changes: 7 additions & 24 deletions src/expr/core/src/sig/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ use crate::table_function::BoxedTableFunction;
use crate::ExprError;

/// The global registry of all function signatures.
pub static FUNCTION_REGISTRY: LazyLock<FunctionRegistry> = LazyLock::new(|| unsafe {
// SAFETY: this function is called after all `#[ctor]` functions are called.
pub static FUNCTION_REGISTRY: LazyLock<FunctionRegistry> = LazyLock::new(|| {
let mut map = FunctionRegistry::default();
tracing::info!("found {} functions", FUNCTION_REGISTRY_INIT.len());
for sig in FUNCTION_REGISTRY_INIT.drain(..) {
map.insert(sig);
tracing::info!("found {} functions", FUNCTIONS.len());
for f in FUNCTIONS {
map.insert(f());
}
map
});
Expand Down Expand Up @@ -459,22 +458,6 @@ pub enum FuncBuilder {
Udf,
}

/// Register a function into global registry.
///
/// # Safety
///
/// This function must be called sequentially.
///
/// It is designed to be used by `#[function]` macro.
/// Users SHOULD NOT call this function.
#[doc(hidden)]
pub unsafe fn _register(sig: FuncSign) {
FUNCTION_REGISTRY_INIT.push(sig)
}

/// The global registry of function signatures on initialization.
///
/// `#[function]` macro will generate a `#[ctor]` function to register the signature into this
/// vector. The calls are guaranteed to be sequential. The vector will be drained and moved into
/// `FUNCTION_REGISTRY` on the first access of `FUNCTION_REGISTRY`.
static mut FUNCTION_REGISTRY_INIT: Vec<FuncSign> = Vec::new();
/// A static distributed slice of functions defined by `#[function]`.
#[linkme::distributed_slice]
pub static FUNCTIONS: [fn() -> FuncSign];
1 change: 1 addition & 0 deletions src/expr/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ hex = "0.4"
icelake = { workspace = true }
itertools = "0.12"
jsonbb = "0.1.2"
linkme = { version = "0.3", features = ["used_linker"] }
md5 = "0.7"
num-traits = "0.2"
regex = "1"
Expand Down
1 change: 1 addition & 0 deletions src/expr/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#![feature(test)]
#![feature(iter_array_chunks)]
#![feature(result_flattening)]
#![feature(used_with_arg)]

mod aggregate;
mod scalar;
Expand Down
30 changes: 15 additions & 15 deletions src/expr/macro/src/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,20 @@ impl FunctionAttr {
let deprecated = self.deprecated;

Ok(quote! {
#[risingwave_expr::codegen::ctor]
fn #ctor_name() {
#[risingwave_expr::codegen::linkme::distributed_slice(risingwave_expr::sig::FUNCTIONS)]
fn #ctor_name() -> risingwave_expr::sig::FuncSign {
use risingwave_common::types::{DataType, DataTypeName};
use risingwave_expr::sig::{_register, FuncSign, SigDataType, FuncBuilder};
use risingwave_expr::sig::{FuncSign, SigDataType, FuncBuilder};

unsafe { _register(FuncSign {
FuncSign {
name: risingwave_pb::expr::expr_node::Type::#pb_type.into(),
inputs_type: vec![#(#args),*],
variadic: #variadic,
ret_type: #ret,
build: FuncBuilder::Scalar(#build_fn),
type_infer: #type_infer_fn,
deprecated: #deprecated,
}) };
}
}
})
}
Expand Down Expand Up @@ -615,12 +615,12 @@ impl FunctionAttr {
let deprecated = self.deprecated;

Ok(quote! {
#[risingwave_expr::codegen::ctor]
fn #ctor_name() {
#[risingwave_expr::codegen::linkme::distributed_slice(risingwave_expr::sig::FUNCTIONS)]
fn #ctor_name() -> risingwave_expr::sig::FuncSign {
use risingwave_common::types::{DataType, DataTypeName};
use risingwave_expr::sig::{_register, FuncSign, SigDataType, FuncBuilder};
use risingwave_expr::sig::{FuncSign, SigDataType, FuncBuilder};

unsafe { _register(FuncSign {
FuncSign {
name: risingwave_expr::aggregate::AggKind::#pb_type.into(),
inputs_type: vec![#(#args),*],
variadic: false,
Expand All @@ -633,7 +633,7 @@ impl FunctionAttr {
},
type_infer: #type_infer_fn,
deprecated: #deprecated,
}) };
}
}
})
}
Expand Down Expand Up @@ -946,20 +946,20 @@ impl FunctionAttr {
let deprecated = self.deprecated;

Ok(quote! {
#[risingwave_expr::codegen::ctor]
fn #ctor_name() {
#[risingwave_expr::codegen::linkme::distributed_slice(risingwave_expr::sig::FUNCTIONS)]
fn #ctor_name() -> risingwave_expr::sig::FuncSign {
use risingwave_common::types::{DataType, DataTypeName};
use risingwave_expr::sig::{_register, FuncSign, SigDataType, FuncBuilder};
use risingwave_expr::sig::{FuncSign, SigDataType, FuncBuilder};

unsafe { _register(FuncSign {
FuncSign {
name: risingwave_pb::expr::table_function::Type::#pb_type.into(),
inputs_type: vec![#(#args),*],
variadic: false,
ret_type: #ret,
build: FuncBuilder::Table(#build_fn),
type_infer: #type_infer_fn,
deprecated: #deprecated,
}) };
}
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ base64 = "0.21"
bk-tree = "0.5.0"
bytes = "1"
clap = { version = "4", features = ["derive"] }
ctor = "0.2"
downcast-rs = "1.2"
dyn-clone = "1.0.14"
easy-ext = "1"
Expand All @@ -40,6 +39,7 @@ futures-async-stream = { workspace = true }
iana-time-zone = "0.1"
icelake = { workspace = true }
itertools = "0.12"
linkme = { version = "0.3", features = ["used_linker"] }
maplit = "1"
md5 = "0.7.0"
num-integer = "0.1"
Expand Down
34 changes: 13 additions & 21 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,13 @@ pub fn get_sys_views_in_schema(schema_name: &str) -> Option<Vec<Arc<ViewCatalog>

/// The global registry of all builtin catalogs.
pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
// SAFETY: this function is called after all `#[ctor]` functions are called.
let mut table_by_schema_name = HashMap::new();
let mut table_name_by_id = HashMap::new();
let mut view_by_schema_name = HashMap::new();
tracing::info!("found {} catalogs", unsafe { SYS_CATALOGS_INIT.len() });
for (id, catalog) in unsafe { SYS_CATALOGS_INIT.drain(..) } {
tracing::info!("found {} catalogs", SYS_CATALOGS_SLICE.len());
for catalog in SYS_CATALOGS_SLICE {
let (id, catalog) = catalog();
assert!(id < NON_RESERVED_SYS_CATALOG_ID as u32);
match catalog {
BuiltinCatalog::Table(table) => {
let sys_table: SystemTableCatalog = table.into();
Expand All @@ -328,28 +329,19 @@ pub static SYS_CATALOGS: LazyLock<SystemCatalog> = LazyLock::new(|| {
}
});

pub static mut SYS_CATALOGS_INIT: Vec<(u32, BuiltinCatalog)> = vec![];

/// Register the catalog to global registry.
///
/// Note: The function is used by macro generated code. Don't call it directly.
#[doc(hidden)]
pub(super) unsafe fn _register(id: u32, builtin_catalog: BuiltinCatalog) {
assert!(id < NON_RESERVED_SYS_CATALOG_ID as u32);

SYS_CATALOGS_INIT.push((id, builtin_catalog));
}
#[linkme::distributed_slice]
pub static SYS_CATALOGS_SLICE: [fn() -> (u32, BuiltinCatalog)];

macro_rules! prepare_sys_catalog {
($( { $builtin_catalog:expr $(, $func:ident $($await:ident)?)? } ),* $(,)?) => {
paste::paste! {
$(
#[ctor::ctor]
unsafe fn [<register_${index()}>]() {
_register((${index()} + 1) as u32, $builtin_catalog);
$(
const _: () = {
#[linkme::distributed_slice(crate::catalog::system_catalog::SYS_CATALOGS_SLICE)]
fn catalog() -> (u32, BuiltinCatalog) {
(${index()} as u32 + 1, $builtin_catalog)
}
)*
}
};
)*

#[async_trait]
impl SysCatalogReader for SysCatalogReaderImpl {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#![feature(error_generic_member_access)]
#![feature(round_ties_even)]
#![feature(iterator_try_collect)]
#![feature(used_with_arg)]
#![recursion_limit = "256"]

#[cfg(test)]
Expand Down
Loading