Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give the full server struct to the FDW when instanciating it #323

Merged
merged 4 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions supabase-wrappers/src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,48 @@
use std::collections::HashMap;
use std::ffi::CStr;

use crate::prelude::*;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::*;

pub struct ForeignServer {
pub server_name: String,
pub server_type: Option<String>,
pub server_version: Option<String>,
pub options: HashMap<String, String>,
}

// create a fdw instance from its id
pub(super) unsafe fn create_fdw_instance_from_server_id<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
fserver_id: pg_sys::Oid,
) -> W {
let to_string = |raw: *mut std::ffi::c_char| -> Option<String> {
if raw.is_null() {
return None;
}
let c_str = CStr::from_ptr(raw);
let value = c_str
.to_str()
.map_err(|_| {
OptionsError::OptionValueIsInvalidUtf8(
String::from_utf8_lossy(c_str.to_bytes()).to_string(),
)
})
.report_unwrap()
.to_string();
Some(value)
};
let fserver = pg_sys::GetForeignServer(fserver_id);
let fserver_opts = options_to_hashmap((*fserver).options).report_unwrap();
let wrapper = W::new(&fserver_opts);
let server = ForeignServer {
server_name: to_string((*fserver).servername).unwrap(),
server_type: to_string((*fserver).servertype),
server_version: to_string((*fserver).serverversion),
options: options_to_hashmap((*fserver).options).report_unwrap(),
};
let wrapper = W::new(server);
wrapper.report_unwrap()
}

Expand Down
3 changes: 2 additions & 1 deletion supabase-wrappers/src/interface.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Provides interface types and trait to develop Postgres foreign data wrapper
//!

use crate::instance::ForeignServer;
use crate::FdwRoutine;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::{Date, Timestamp, TimestampWithTimeZone};
Expand Down Expand Up @@ -502,7 +503,7 @@ pub trait ForeignDataWrapper<E: Into<ErrorReport>> {
/// You can do any initalization in this function, like saving connection
/// info or API url in an variable, but don't do heavy works like database
/// connection or API call.
fn new(options: &HashMap<String, String>) -> Result<Self, E>
fn new(server: ForeignServer) -> Result<Self, E>
where
Self: Sized;

Expand Down
7 changes: 4 additions & 3 deletions supabase-wrappers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@
//! type HelloWorldFdwResult<T> = Result<T, HelloWorldFdwError>;
//!
//! impl ForeignDataWrapper<HelloWorldFdwError> for HelloWorldFdw {
//! fn new(options: &HashMap<String, String>) -> HelloWorldFdwResult<Self> {
//! // 'options' is the key-value pairs defined in `CREATE SERVER` SQL, for example,
//! fn new(server: ForeignServer) -> HelloWorldFdwResult<Self> {
//! // 'server.options' is the key-value pairs defined in `CREATE SERVER` SQL, for example,
//! //
//! // create server my_helloworld_server
//! // foreign data wrapper wrappers_helloworld
Expand Down Expand Up @@ -172,7 +172,7 @@
//! }
//!
//! impl ForeignDataWrapper<HelloWorldFdwError> for HelloWorldFdw {
//! fn new(options: &HashMap<String, String>) -> Result<Self, HelloWorldFdwError> {
//! fn new(server: ForeignServer) -> Result<Self, HelloWorldFdwError> {
//! Ok(Self {
//! row_cnt: 0,
//! tgt_cols: Vec::new(),
Expand Down Expand Up @@ -299,6 +299,7 @@ pub mod utils;
/// The prelude includes all necessary imports to make Wrappers work
pub mod prelude {
pub use crate::import_foreign_schema::*;
pub use crate::instance::ForeignServer;
pub use crate::interface::*;
pub use crate::options::*;
pub use crate::utils::*;
Expand Down
9 changes: 5 additions & 4 deletions wrappers/src/fdw/airtable_fdw/airtable_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,17 @@ impl AirtableFdw {

// TODO Add support for INSERT, UPDATE, DELETE
impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
fn new(options: &HashMap<String, String>) -> AirtableFdwResult<Self> {
let base_url = options
fn new(server: ForeignServer) -> AirtableFdwResult<Self> {
let base_url = server
.options
.get("api_url")
.map(|t| t.to_owned())
.unwrap_or_else(|| "https://api.airtable.com/v0".to_string());

let client = match options.get("api_key") {
let client = match server.options.get("api_key") {
Some(api_key) => Some(create_client(api_key)?),
None => {
let key_id = require_option("api_key_id", options)?;
let key_id = require_option("api_key_id", &server.options)?;
if let Some(api_key) = get_vault_secret(key_id) {
Some(create_client(&api_key)?)
} else {
Expand Down
9 changes: 5 additions & 4 deletions wrappers/src/fdw/auth0_fdw/auth0_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ impl ForeignDataWrapper<Auth0FdwError> for Auth0Fdw {
// info or API url in an variable, but don't do any heavy works like making a
// database connection or API call.

fn new(options: &HashMap<String, String>) -> Result<Self, Auth0FdwError> {
let url = require_option("url", options)?.to_string();
let api_key = if let Some(api_key) = options.get("api_key") {
fn new(server: ForeignServer) -> Result<Self, Auth0FdwError> {
let url = require_option("url", &server.options)?.to_string();
let api_key = if let Some(api_key) = server.options.get("api_key") {
api_key.clone()
} else {
let api_key_id = options
let api_key_id = server
.options
.get("api_key_id")
.expect("`api_key_id` must be set if `api_key` is not");
get_vault_secret(api_key_id).ok_or(Auth0FdwError::SecretNotFound(api_key_id.clone()))?
Expand Down
16 changes: 9 additions & 7 deletions wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ impl BigQueryFdw {
}

impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
fn new(options: &HashMap<String, String>) -> Result<Self, BigQueryFdwError> {
fn new(server: ForeignServer) -> Result<Self, BigQueryFdwError> {
let mut ret = BigQueryFdw {
rt: create_async_runtime()?,
client: None,
project_id: require_option("project_id", options)?.to_string(),
dataset_id: require_option("dataset_id", options)?.to_string(),
project_id: require_option("project_id", &server.options)?.to_string(),
dataset_id: require_option("dataset_id", &server.options)?.to_string(),
table: "".to_string(),
rowid_col: "".to_string(),
tgt_cols: Vec::new(),
Expand All @@ -160,13 +160,15 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
};

// Is authentication mocked
let mock_auth: bool = options
let mock_auth: bool = server
.options
.get("mock_auth")
.map(|t| t.to_owned())
.unwrap_or_else(|| "false".to_string())
== *"true";

let api_endpoint = options
let api_endpoint = server
.options
.get("api_endpoint")
.map(|t| t.to_owned())
.unwrap_or_else(|| "https://bigquery.googleapis.com/bigquery/v2".to_string());
Expand All @@ -182,10 +184,10 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
serde_json::to_string_pretty(&dummy_auth_config)
.expect("dummy auth config should not fail to serialize")
}
false => match options.get("sa_key") {
false => match server.options.get("sa_key") {
Some(sa_key) => sa_key.to_owned(),
None => {
let sa_key_id = require_option("sa_key_id", options)?;
let sa_key_id = require_option("sa_key_id", &server.options)?;
match get_vault_secret(sa_key_id) {
Some(sa_key) => sa_key,
None => return Ok(ret),
Expand Down
6 changes: 3 additions & 3 deletions wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ impl ClickHouseFdw {
}

impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
fn new(options: &HashMap<String, String>) -> ClickHouseFdwResult<Self> {
fn new(server: ForeignServer) -> ClickHouseFdwResult<Self> {
let rt = create_async_runtime()?;
let conn_str = match options.get("conn_string") {
let conn_str = match server.options.get("conn_string") {
Some(conn_str) => conn_str.to_owned(),
None => {
let conn_str_id = require_option("conn_string_id", options)?;
let conn_str_id = require_option("conn_string_id", &server.options)?;
get_vault_secret(conn_str_id).unwrap_or_default()
}
};
Expand Down
15 changes: 8 additions & 7 deletions wrappers/src/fdw/cognito_fdw/cognito_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,17 @@ impl ForeignDataWrapper<CognitoFdwError> for CognitoFdw {
// info or API url in an variable, but don't do any heavy works like making a
// database connection or API call.

fn new(options: &HashMap<String, String>) -> Result<Self, CognitoFdwError> {
let user_pool_id = require_option("user_pool_id", options)?.to_string();
let aws_region = require_option("region", options)?.to_string();
fn new(server: ForeignServer) -> Result<Self, CognitoFdwError> {
let user_pool_id = require_option("user_pool_id", &server.options)?.to_string();
let aws_region = require_option("region", &server.options)?.to_string();

let aws_access_key_id = require_option("aws_access_key_id", options)?.to_string();
let aws_access_key_id = require_option("aws_access_key_id", &server.options)?.to_string();
let aws_secret_access_key =
if let Some(aws_secret_access_key) = options.get("aws_secret_access_key") {
if let Some(aws_secret_access_key) = server.options.get("aws_secret_access_key") {
aws_secret_access_key.clone()
} else {
let aws_secret_access_key = options
let aws_secret_access_key = server
.options
.get("api_key_id")
.expect("`api_key_id` must be set if `aws_secret_access_key` is not");
get_vault_secret(aws_secret_access_key).ok_or(CognitoFdwError::SecretNotFound(
Expand All @@ -122,7 +123,7 @@ impl ForeignDataWrapper<CognitoFdwError> for CognitoFdw {
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;

let mut builder = config.to_builder();
if let Some(endpoint_url) = options.get("endpoint_url") {
if let Some(endpoint_url) = server.options.get("endpoint_url") {
if !endpoint_url.is_empty() {
builder.set_endpoint_url(Some(endpoint_url.clone()));
}
Expand Down
10 changes: 5 additions & 5 deletions wrappers/src/fdw/firebase_fdw/firebase_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,23 +211,23 @@ impl FirebaseFdw {
}

impl ForeignDataWrapper<FirebaseFdwError> for FirebaseFdw {
fn new(options: &HashMap<String, String>) -> FirebaseFdwResult<Self> {
fn new(server: ForeignServer) -> FirebaseFdwResult<Self> {
let mut ret = Self {
rt: create_async_runtime()?,
project_id: require_option("project_id", options)?.to_string(),
project_id: require_option("project_id", &server.options)?.to_string(),
client: None,
scan_result: Vec::default(),
};

// get oauth2 access token if it is directly defined in options
let token = if let Some(access_token) = options.get("access_token") {
let token = if let Some(access_token) = server.options.get("access_token") {
access_token.to_owned()
} else {
// otherwise, get it from the options or Vault
let sa_key = match options.get("sa_key") {
let sa_key = match server.options.get("sa_key") {
Some(sa_key) => sa_key.to_owned(),
None => {
let sa_key_id = require_option("sa_key_id", options)?;
let sa_key_id = require_option("sa_key_id", &server.options)?;
match get_vault_secret(sa_key_id) {
Some(sa_key) => sa_key,
None => return Ok(ret),
Expand Down
2 changes: 1 addition & 1 deletion wrappers/src/fdw/helloworld_fdw/helloworld_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl ForeignDataWrapper<HelloWorldFdwError> for HelloWorldFdw {
// You can do any initalization in this new() function, like saving connection
// info or API url in an variable, but don't do any heavy works like making a
// database connection or API call.
fn new(_options: &HashMap<String, String>) -> HelloWorldFdwResult<Self> {
fn new(_server: ForeignServer) -> HelloWorldFdwResult<Self> {
Ok(Self {
row_cnt: 0,
tgt_cols: Vec::new(),
Expand Down
9 changes: 5 additions & 4 deletions wrappers/src/fdw/logflare_fdw/logflare_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ impl LogflareFdw {
}

impl ForeignDataWrapper<LogflareFdwError> for LogflareFdw {
fn new(options: &HashMap<String, String>) -> LogflareFdwResult<Self> {
let base_url = options
fn new(server: ForeignServer) -> LogflareFdwResult<Self> {
let base_url = server
.options
.get("api_url")
.map(|t| t.to_owned())
.map(|s| {
Expand All @@ -205,10 +206,10 @@ impl ForeignDataWrapper<LogflareFdwError> for LogflareFdw {
}
})
.unwrap_or_else(|| LogflareFdw::BASE_URL.to_string());
let client = match options.get("api_key") {
let client = match server.options.get("api_key") {
Some(api_key) => Some(create_client(api_key)),
None => {
let key_id = require_option("api_key_id", options)?;
let key_id = require_option("api_key_id", &server.options)?;
get_vault_secret(key_id).map(|api_key| create_client(&api_key))
}
}
Expand Down
6 changes: 3 additions & 3 deletions wrappers/src/fdw/mssql_fdw/mssql_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ impl MssqlFdw {
}

impl ForeignDataWrapper<MssqlFdwError> for MssqlFdw {
fn new(options: &HashMap<String, String>) -> MssqlFdwResult<Self> {
fn new(server: ForeignServer) -> MssqlFdwResult<Self> {
let rt = create_async_runtime()?;
let conn_str = match options.get("conn_string") {
let conn_str = match server.options.get("conn_string") {
Some(conn_str) => conn_str.to_owned(),
None => {
let conn_str_id = require_option("conn_string_id", options)?;
let conn_str_id = require_option("conn_string_id", &server.options)?;
get_vault_secret(conn_str_id).unwrap_or_default()
}
};
Expand Down
6 changes: 3 additions & 3 deletions wrappers/src/fdw/redis_fdw/redis_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,11 @@ impl RedisFdw {
}

impl ForeignDataWrapper<RedisFdwError> for RedisFdw {
fn new(options: &HashMap<String, String>) -> RedisFdwResult<Self> {
let conn_url = match options.get("conn_url") {
fn new(server: ForeignServer) -> RedisFdwResult<Self> {
let conn_url = match server.options.get("conn_url") {
Some(url) => url.to_owned(),
None => {
let conn_url_id = require_option("conn_url_id", options)?;
let conn_url_id = require_option("conn_url_id", &server.options)?;
get_vault_secret(conn_url_id).unwrap_or_default()
}
};
Expand Down
17 changes: 9 additions & 8 deletions wrappers/src/fdw/s3_fdw/s3_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl S3Fdw {
}

impl ForeignDataWrapper<S3FdwError> for S3Fdw {
fn new(options: &HashMap<String, String>) -> S3FdwResult<Self> {
fn new(server: ForeignServer) -> S3FdwResult<Self> {
// cannot use create_async_runtime() as the runtime needs to be created
// for multiple threads
let rt = tokio::runtime::Runtime::new()
Expand All @@ -128,27 +128,27 @@ impl ForeignDataWrapper<S3FdwError> for S3Fdw {
};

// get is_mock flag
let is_mock: bool = options.get("is_mock") == Some(&"true".to_string());
let is_mock: bool = server.options.get("is_mock") == Some(&"true".to_string());

// get credentials
let creds = if is_mock {
// LocalStack uses hardcoded credentials
Some(("test".to_string(), "test".to_string()))
} else {
match options.get("vault_access_key_id") {
match server.options.get("vault_access_key_id") {
Some(vault_access_key_id) => {
// if using credentials stored in Vault
let vault_secret_access_key =
require_option("vault_secret_access_key", options)?;
require_option("vault_secret_access_key", &server.options)?;
get_vault_secret(vault_access_key_id)
.zip(get_vault_secret(vault_secret_access_key))
}
None => {
// if using credentials directly specified
let aws_access_key_id =
require_option("aws_access_key_id", options)?.to_string();
require_option("aws_access_key_id", &server.options)?.to_string();
let aws_secret_access_key =
require_option("aws_secret_access_key", options)?.to_string();
require_option("aws_secret_access_key", &server.options)?.to_string();
Some((aws_access_key_id, aws_secret_access_key))
}
}
Expand All @@ -163,7 +163,8 @@ impl ForeignDataWrapper<S3FdwError> for S3Fdw {
let region = if is_mock {
default_region
} else {
options
server
.options
.get("aws_region")
.map(|t| t.to_owned())
.unwrap_or(default_region)
Expand All @@ -177,7 +178,7 @@ impl ForeignDataWrapper<S3FdwError> for S3Fdw {
let mut config_loader = aws_config::defaults(BehaviorVersion::latest());

// endpoint_url not supported as env var in rust https://github.com/awslabs/aws-sdk-rust/issues/932
if let Some(endpoint_url) = options.get("endpoint_url") {
if let Some(endpoint_url) = server.options.get("endpoint_url") {
config_loader = config_loader.endpoint_url(endpoint_url);
}

Expand Down
Loading
Loading