Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: max-txn-ops option #3458

Merged
merged 2 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/cmd/src/cli/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ pub struct BenchTableMetadataCommand {

impl BenchTableMetadataCommand {
pub async fn build(&self) -> Result<Instance> {
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr]).await.unwrap();
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128)
.await
.unwrap();

let table_metadata_manager = Arc::new(TableMetadataManager::new(etcd_store));

Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl UpgradeCommand {
etcd_addr: &self.etcd_addr,
})?;
let tool = MigrateTableMetadata {
etcd_store: EtcdStore::with_etcd_client(client),
etcd_store: EtcdStore::with_etcd_client(client, 128),
dryrun: self.dryrun,
skip_catalog_keys: self.skip_catalog_keys,
skip_table_global_keys: self.skip_table_global_keys,
Expand Down
8 changes: 7 additions & 1 deletion src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ struct StartCommand {
/// The working home directory of this metasrv instance.
#[clap(long)]
data_home: Option<String>,

/// If it's not empty, the metasrv will store all data with this key prefix.
#[clap(long, default_value = "")]
store_key_prefix: String,
/// The max operations per txn
#[clap(long)]
max_txn_ops: Option<usize>,
}

impl StartCommand {
Expand Down Expand Up @@ -181,6 +183,10 @@ impl StartCommand {
opts.store_key_prefix = self.store_key_prefix.clone()
}

if let Some(max_txn_ops) = self.max_txn_ops {
opts.max_txn_ops = max_txn_ops;
}

// Disable dashboard in metasrv.
opts.http.disable_dashboard = true;

Expand Down
9 changes: 9 additions & 0 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
EtcdTxnFailed {
max_operations: usize,
#[snafu(source)]
error: etcd_client::Error,
location: Location,
},

#[snafu(display("Failed to get sequence: {}", err_msg))]
NextSequence { err_msg: String, location: Location },

Expand Down Expand Up @@ -400,6 +408,7 @@ impl ErrorExt for Error {
IllegalServerState { .. }
| EtcdTxnOpResponse { .. }
| EtcdFailed { .. }
| EtcdTxnFailed { .. }
| ConnectEtcd { .. } => StatusCode::Internal,

SerdeJson { .. }
Expand Down
33 changes: 32 additions & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ impl TableMetadataManager {
pub fn max_logical_tables_per_batch(&self) -> usize {
// The batch size is max_txn_size / 3 because the size of the `tables_data`
// is 3 times the size of the `tables_data`.
self.kv_backend.max_txn_size() / 3
self.kv_backend.max_txn_ops() / 3
}

/// Creates metadata for multiple logical tables and return an error if different metadata exists.
Expand Down Expand Up @@ -860,6 +860,7 @@ mod tests {
use bytes::Bytes;
use common_time::util::current_time_millis;
use futures::TryStreamExt;
use store_api::storage::RegionId;
use table::metadata::{RawTableInfo, TableInfo};

use super::datanode_table::DatanodeTableKey;
Expand Down Expand Up @@ -1056,6 +1057,36 @@ mod tests {
);
}

#[tokio::test]
async fn test_create_many_logical_tables_metadata() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(kv_backend);

let mut tables_data = vec![];
for i in 0..128 {
let table_id = i + 1;
let regin_number = table_id * 3;
let region_id = RegionId::new(table_id, regin_number);
let region_route = new_region_route(region_id.as_u64(), 2);
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
table_id,
&format!("my_table_{}", table_id),
region_routes.iter().map(|r| r.region.id.region_number()),
)
.into();
let table_route_value = TableRouteValue::physical(region_routes.clone());

tables_data.push((table_info, table_route_value));
}

// creates metadata.
table_metadata_manager
.create_logical_tables_metadata(tables_data)
.await
.unwrap();
}

#[tokio::test]
async fn test_delete_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
Expand Down
11 changes: 9 additions & 2 deletions src/common/meta/src/key/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use datatypes::schema::{ColumnSchema, SchemaBuilder};
use store_api::storage::TableId;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};

pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
table_id: TableId,
table_name: &str,
region_numbers: I,
) -> TableInfo {
let column_schemas = vec![
Expand Down Expand Up @@ -50,8 +51,14 @@ pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
TableInfoBuilder::default()
.table_id(table_id)
.table_version(5)
.name("mytable")
.name(table_name)
.meta(meta)
.build()
.unwrap()
}
pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
table_id: TableId,
region_numbers: I,
) -> TableInfo {
new_test_table_info_with_name(table_id, "mytable", region_numbers)
}
36 changes: 20 additions & 16 deletions src/common/meta/src/kv_backend/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,22 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;

// Maximum number of operations permitted in a transaction.
// The etcd default configuration's `--max-txn-ops` is 128.
//
// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
const MAX_TXN_SIZE: usize = 128;

fn convert_key_value(kv: etcd_client::KeyValue) -> KeyValue {
let (key, value) = kv.into_key_value();
KeyValue { key, value }
}

pub struct EtcdStore {
client: Client,
// Maximum number of operations permitted in a transaction.
// The etcd default configuration's `--max-txn-ops` is 128.
//
// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
max_txn_ops: usize,
}

impl EtcdStore {
pub async fn with_endpoints<E, S>(endpoints: S) -> Result<KvBackendRef>
pub async fn with_endpoints<E, S>(endpoints: S, max_txn_ops: usize) -> Result<KvBackendRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
Expand All @@ -58,16 +57,19 @@ impl EtcdStore {
.await
.context(error::ConnectEtcdSnafu)?;

Ok(Self::with_etcd_client(client))
Ok(Self::with_etcd_client(client, max_txn_ops))
}

pub fn with_etcd_client(client: Client) -> KvBackendRef {
Arc::new(Self { client })
pub fn with_etcd_client(client: Client, max_txn_ops: usize) -> KvBackendRef {
Arc::new(Self {
client,
max_txn_ops,
})
}

async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
let max_txn_size = self.max_txn_size();
if txn_ops.len() < max_txn_size {
let max_txn_ops = self.max_txn_ops();
if txn_ops.len() < max_txn_ops {
// fast path
let _timer = METRIC_META_TXN_REQUEST
.with_label_values(&["etcd", "txn"])
Expand All @@ -83,7 +85,7 @@ impl EtcdStore {
}

let txns = txn_ops
.chunks(max_txn_size)
.chunks(max_txn_ops)
.map(|part| async move {
let _timer = METRIC_META_TXN_REQUEST
.with_label_values(&["etcd", "txn"])
Expand Down Expand Up @@ -311,18 +313,20 @@ impl TxnService for EtcdStore {
.with_label_values(&["etcd", "txn"])
.start_timer();

let max_operations = txn.max_operations();

let etcd_txn: Txn = txn.into();
let txn_res = self
.client
.kv_client()
.txn(etcd_txn)
.await
.context(error::EtcdFailedSnafu)?;
.context(error::EtcdTxnFailedSnafu { max_operations })?;
txn_res.try_into()
}

fn max_txn_size(&self) -> usize {
MAX_TXN_SIZE
fn max_txn_ops(&self) -> usize {
self.max_txn_ops
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/common/meta/src/kv_backend/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::max;

use common_error::ext::ErrorExt;

use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};
Expand All @@ -27,7 +29,7 @@ pub trait TxnService: Sync + Send {
}

/// Maximum number of operations permitted in a transaction.
fn max_txn_size(&self) -> usize {
fn max_txn_ops(&self) -> usize {
usize::MAX
}
}
Expand Down Expand Up @@ -192,6 +194,12 @@ impl Txn {
self.req.failure = operations.into();
self
}

#[inline]
pub fn max_operations(&self) -> usize {
let opc = max(self.req.compare.len(), self.req.success.len());
max(opc, self.req.failure.len())
}
}

impl From<Txn> for TxnRequest {
Expand Down
4 changes: 3 additions & 1 deletion src/meta-srv/examples/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ fn main() {

#[tokio::main]
async fn run() {
let kv_backend = EtcdStore::with_endpoints(["127.0.0.1:2380"]).await.unwrap();
let kv_backend = EtcdStore::with_endpoints(["127.0.0.1:2380"], 128)
.await
.unwrap();

// put
let put_req = PutRequest {
Expand Down
3 changes: 2 additions & 1 deletion src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ pub async fn metasrv_builder(
(None, false) => {
let etcd_client = create_etcd_client(opts).await?;
let kv_backend = {
let etcd_backend = EtcdStore::with_etcd_client(etcd_client.clone());
let etcd_backend =
EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
if !opts.store_key_prefix.is_empty() {
Arc::new(ChrootKvBackend::new(
opts.store_key_prefix.clone().into_bytes(),
Expand Down
12 changes: 12 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ pub struct MetaSrvOptions {
pub wal: MetaSrvWalConfig,
pub export_metrics: ExportMetricsOption,
pub store_key_prefix: String,
/// The max operations per txn
///
/// This value is usually limited by which store is used for the `KvBackend`.
/// For example, if using etcd, this value should ensure that it is less than
/// or equal to the `--max-txn-ops` option value of etcd.
///
/// TODO(jeremy): Currently, this option only affects the etcd store, but it may
/// also affect other stores in the future. In other words, each store needs to
/// limit the number of operations in a txn because an infinitely large txn could
/// potentially block other operations.
pub max_txn_ops: usize,
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
}

impl MetaSrvOptions {
Expand Down Expand Up @@ -112,6 +123,7 @@ impl Default for MetaSrvOptions {
wal: MetaSrvWalConfig::default(),
export_metrics: ExportMetricsOption::default(),
store_key_prefix: String::new(),
max_txn_ops: 128,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn mock_with_memstore() -> MockInfo {
}

pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
let kv_backend = EtcdStore::with_endpoints([addr]).await.unwrap();
let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
mock(Default::default(), kv_backend, None, None).await
}

Expand Down
2 changes: 0 additions & 2 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,6 @@ impl Inserter {
&req.table_name,
);

info!("Logical table `{table_ref}` does not exist, try creating table");

let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let mut create_table_expr = build_create_table_expr(&table_ref, request_schema)?;

Expand Down
2 changes: 1 addition & 1 deletion tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl GreptimeDbClusterBuilder {
.split(',')
.map(|s| s.to_string())
.collect::<Vec<String>>();
let backend = EtcdStore::with_endpoints(endpoints)
let backend = EtcdStore::with_endpoints(endpoints, 128)
.await
.expect("malformed endpoints");
// Each retry requires a new isolation namespace.
Expand Down