Skip to content

Commit

Permalink
feat: max-txn-ops limit
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Mar 7, 2024
1 parent a218f12 commit 67ed364
Show file tree
Hide file tree
Showing 14 changed files with 103 additions and 27 deletions.
4 changes: 3 additions & 1 deletion src/cmd/src/cli/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ pub struct BenchTableMetadataCommand {

impl BenchTableMetadataCommand {
pub async fn build(&self) -> Result<Instance> {
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr]).await.unwrap();
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128)
.await
.unwrap();

let table_metadata_manager = Arc::new(TableMetadataManager::new(etcd_store));

Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl UpgradeCommand {
etcd_addr: &self.etcd_addr,
})?;
let tool = MigrateTableMetadata {
etcd_store: EtcdStore::with_etcd_client(client),
etcd_store: EtcdStore::with_etcd_client(client, 128),
dryrun: self.dryrun,
skip_catalog_keys: self.skip_catalog_keys,
skip_table_global_keys: self.skip_table_global_keys,
Expand Down
8 changes: 7 additions & 1 deletion src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ struct StartCommand {
/// The working home directory of this metasrv instance.
#[clap(long)]
data_home: Option<String>,

/// If it's not empty, the metasrv will store all data with this key prefix.
#[clap(long, default_value = "")]
store_key_prefix: String,
/// The max operations per txn
#[clap(long)]
max_txn_ops: Option<usize>,
}

impl StartCommand {
Expand Down Expand Up @@ -181,6 +183,10 @@ impl StartCommand {
opts.store_key_prefix = self.store_key_prefix.clone()
}

if let Some(max_txn_ops) = self.max_txn_ops {
opts.max_txn_ops = max_txn_ops;
}

// Disable dashboard in metasrv.
opts.http.disable_dashboard = true;

Expand Down
9 changes: 9 additions & 0 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
EtcdTxnFailed {
max_operations: usize,
#[snafu(source)]
error: etcd_client::Error,
location: Location,
},

#[snafu(display("Failed to get sequence: {}", err_msg))]
NextSequence { err_msg: String, location: Location },

Expand Down Expand Up @@ -400,6 +408,7 @@ impl ErrorExt for Error {
IllegalServerState { .. }
| EtcdTxnOpResponse { .. }
| EtcdFailed { .. }
| EtcdTxnFailed { .. }
| ConnectEtcd { .. } => StatusCode::Internal,

SerdeJson { .. }
Expand Down
33 changes: 32 additions & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ impl TableMetadataManager {
pub fn max_logical_tables_per_batch(&self) -> usize {
// The batch size is max_txn_size / 3 because the size of the `tables_data`
// is 3 times the size of the `tables_data`.
self.kv_backend.max_txn_size() / 3
self.kv_backend.max_txn_ops() / 3
}

/// Creates metadata for multiple logical tables and return an error if different metadata exists.
Expand Down Expand Up @@ -860,6 +860,7 @@ mod tests {
use bytes::Bytes;
use common_time::util::current_time_millis;
use futures::TryStreamExt;
use store_api::storage::RegionId;
use table::metadata::{RawTableInfo, TableInfo};

use super::datanode_table::DatanodeTableKey;
Expand Down Expand Up @@ -1056,6 +1057,36 @@ mod tests {
);
}

#[tokio::test]
async fn test_create_many_logical_tables_metadata() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(kv_backend);

let mut tables_data = vec![];
for i in 0..table_metadata_manager.max_logical_tables_per_batch() as u32 {
let table_id = i + 1;
let regin_number = table_id * 3;
let region_id = RegionId::new(table_id, regin_number);
let region_route = new_region_route(region_id.as_u64(), 2);
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
table_id,
&format!("my_table_{}", table_id),
region_routes.iter().map(|r| r.region.id.region_number()),
)
.into();
let table_route_value = TableRouteValue::physical(region_routes.clone());

tables_data.push((table_info, table_route_value));
}

// creates metadata.
table_metadata_manager
.create_logical_tables_metadata(tables_data)
.await
.unwrap();
}

#[tokio::test]
async fn test_delete_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
Expand Down
11 changes: 9 additions & 2 deletions src/common/meta/src/key/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use datatypes::schema::{ColumnSchema, SchemaBuilder};
use store_api::storage::TableId;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};

pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
table_id: TableId,
table_name: &str,
region_numbers: I,
) -> TableInfo {
let column_schemas = vec![
Expand Down Expand Up @@ -50,8 +51,14 @@ pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
TableInfoBuilder::default()
.table_id(table_id)
.table_version(5)
.name("mytable")
.name(table_name)
.meta(meta)
.build()
.unwrap()
}
pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
table_id: TableId,
region_numbers: I,
) -> TableInfo {
new_test_table_info_with_name(table_id, "table_name", region_numbers)
}
32 changes: 18 additions & 14 deletions src/common/meta/src/kv_backend/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,22 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;

// Maximum number of operations permitted in a transaction.
// The etcd default configuration's `--max-txn-ops` is 128.
//
// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
const MAX_TXN_SIZE: usize = 128;

fn convert_key_value(kv: etcd_client::KeyValue) -> KeyValue {
let (key, value) = kv.into_key_value();
KeyValue { key, value }
}

pub struct EtcdStore {
client: Client,
// Maximum number of operations permitted in a transaction.
// The etcd default configuration's `--max-txn-ops` is 128.
//
// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
max_txn_ops: usize,
}

impl EtcdStore {
pub async fn with_endpoints<E, S>(endpoints: S) -> Result<KvBackendRef>
pub async fn with_endpoints<E, S>(endpoints: S, max_txn_ops: usize) -> Result<KvBackendRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
Expand All @@ -58,15 +57,18 @@ impl EtcdStore {
.await
.context(error::ConnectEtcdSnafu)?;

Ok(Self::with_etcd_client(client))
Ok(Self::with_etcd_client(client, max_txn_ops))
}

pub fn with_etcd_client(client: Client) -> KvBackendRef {
Arc::new(Self { client })
pub fn with_etcd_client(client: Client, max_txn_ops: usize) -> KvBackendRef {
Arc::new(Self {
client,
max_txn_ops,
})
}

async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
let max_txn_size = self.max_txn_size();
let max_txn_size = self.max_txn_ops();
if txn_ops.len() < max_txn_size {
// fast path
let _timer = METRIC_META_TXN_REQUEST
Expand Down Expand Up @@ -311,18 +313,20 @@ impl TxnService for EtcdStore {
.with_label_values(&["etcd", "txn"])
.start_timer();

let max_operations = txn.max_operations();

let etcd_txn: Txn = txn.into();
let txn_res = self
.client
.kv_client()
.txn(etcd_txn)
.await
.context(error::EtcdFailedSnafu)?;
.context(error::EtcdTxnFailedSnafu { max_operations })?;
txn_res.try_into()
}

fn max_txn_size(&self) -> usize {
MAX_TXN_SIZE
fn max_txn_ops(&self) -> usize {
self.max_txn_ops
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/common/meta/src/kv_backend/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::max;

use common_error::ext::ErrorExt;

use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};
Expand All @@ -27,7 +29,7 @@ pub trait TxnService: Sync + Send {
}

/// Maximum number of operations permitted in a transaction.
fn max_txn_size(&self) -> usize {
fn max_txn_ops(&self) -> usize {
usize::MAX
}
}
Expand Down Expand Up @@ -192,6 +194,12 @@ impl Txn {
self.req.failure = operations.into();
self
}

#[inline]
pub fn max_operations(&self) -> usize {
let opc = max(self.req.compare.len(), self.req.success.len());
max(opc, self.req.failure.len())
}
}

impl From<Txn> for TxnRequest {
Expand Down
4 changes: 3 additions & 1 deletion src/meta-srv/examples/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ fn main() {

#[tokio::main]
async fn run() {
let kv_backend = EtcdStore::with_endpoints(["127.0.0.1:2380"]).await.unwrap();
let kv_backend = EtcdStore::with_endpoints(["127.0.0.1:2380"], 128)
.await
.unwrap();

// put
let put_req = PutRequest {
Expand Down
3 changes: 2 additions & 1 deletion src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ pub async fn metasrv_builder(
(None, false) => {
let etcd_client = create_etcd_client(opts).await?;
let kv_backend = {
let etcd_backend = EtcdStore::with_etcd_client(etcd_client.clone());
let etcd_backend =
EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
if !opts.store_key_prefix.is_empty() {
Arc::new(ChrootKvBackend::new(
opts.store_key_prefix.clone().into_bytes(),
Expand Down
8 changes: 8 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ pub struct MetaSrvOptions {
pub wal: MetaSrvWalConfig,
pub export_metrics: ExportMetricsOption,
pub store_key_prefix: String,
/// The max operations per txn
///
/// If using etcd as the store, this value should be less than or equal to the
/// `--max-txn-ops` option value of etcd. If using another store, this option
/// can be ignored.
/// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
pub max_txn_ops: usize,
}

impl MetaSrvOptions {
Expand Down Expand Up @@ -112,6 +119,7 @@ impl Default for MetaSrvOptions {
wal: MetaSrvWalConfig::default(),
export_metrics: ExportMetricsOption::default(),
store_key_prefix: String::new(),
max_txn_ops: 100,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn mock_with_memstore() -> MockInfo {
}

pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
let kv_backend = EtcdStore::with_endpoints([addr]).await.unwrap();
let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
mock(Default::default(), kv_backend, None, None).await
}

Expand Down
2 changes: 0 additions & 2 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,6 @@ impl Inserter {
&req.table_name,
);

info!("Logical table `{table_ref}` does not exist, try creating table");

let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let mut create_table_expr = build_create_table_expr(&table_ref, request_schema)?;

Expand Down
2 changes: 1 addition & 1 deletion tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl GreptimeDbClusterBuilder {
.split(',')
.map(|s| s.to_string())
.collect::<Vec<String>>();
let backend = EtcdStore::with_endpoints(endpoints)
let backend = EtcdStore::with_endpoints(endpoints, 128)
.await
.expect("malformed endpoints");
// Each retry requires a new isolation namespace.
Expand Down

0 comments on commit 67ed364

Please sign in to comment.