diff --git a/src/client_lib/pegasus_scanner_impl.cpp b/src/client_lib/pegasus_scanner_impl.cpp index 62e591a6fd7..418373bea85 100644 --- a/src/client_lib/pegasus_scanner_impl.cpp +++ b/src/client_lib/pegasus_scanner_impl.cpp @@ -170,7 +170,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal() // valid data got std::string hash_key, sort_key, value; uint32_t expire_ts_seconds = 0; - // _kv_count == -1 means req just want to get data counts, not include data value + // _kv_count > -1 means req just want to get data counts, not include data value if (_kv_count == -1) { pegasus_restore_key(_kvs[_p].key, hash_key, sort_key); value = std::string(_kvs[_p].value.data(), _kvs[_p].value.length()); diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 04f0fe01822..172a14e56d4 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1187,8 +1187,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) limiter->add_count(); - auto state = append_key_value_for_scan( - resp.kvs, + auto state = validate_key_value_for_scan( it->key(), it->value(), request.hash_key_filter_type, @@ -1196,13 +1195,15 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) request.sort_key_filter_type, request.sort_key_filter_pattern, epoch_now, - request.no_value, - request.__isset.validate_partition_hash ? request.validate_partition_hash : true, - return_expire_ts, - !request.only_return_count); + request.__isset.validate_partition_hash ? request.validate_partition_hash : true); + switch (state) { case range_iteration_state::kNormal: count++; + if (!request.only_return_count) { + append_key_value( + resp.kvs, it->key(), it->value(), request.no_value, return_expire_ts); + } break; case range_iteration_state::kExpired: expire_count++; @@ -1303,10 +1304,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) _pfc_recent_filter_count->add(filter_count); } - // abandon calculate capacity unit - if (!request.only_return_count) { - _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); - } + _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); _pfc_scan_latency->set(dsn_now_ns() - start_time); } @@ -1366,21 +1364,21 @@ void pegasus_server_impl::on_scan(scan_rpc rpc) limiter->add_count(); - auto state = append_key_value_for_scan(resp.kvs, - it->key(), - it->value(), - hash_key_filter_type, - hash_key_filter_pattern, - sort_key_filter_type, - sort_key_filter_pattern, - epoch_now, - no_value, - validate_hash, - return_expire_ts, - !only_return_count); + auto state = validate_key_value_for_scan(it->key(), + it->value(), + hash_key_filter_type, + hash_key_filter_pattern, + sort_key_filter_type, + sort_key_filter_pattern, + epoch_now, + validate_hash); + switch (state) { case range_iteration_state::kNormal: count++; + if (!only_return_count) { + append_key_value(resp.kvs, it->key(), it->value(), no_value, return_expire_ts); + } break; case range_iteration_state::kExpired: expire_count++; @@ -1401,7 +1399,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc) it->Next(); } - if (!only_return_count) { + if (only_return_count) { resp.kvs.emplace_back(::dsn::apps::key_value()); resp.__set_kv_count(count); } @@ -1466,10 +1464,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc) resp.error = rocksdb::Status::Code::kNotFound; } - // abandon calculate capacity unit - if (only_return_count) { - _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); - } + _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); _pfc_scan_latency->set(dsn_now_ns() - start_time); } @@ -2283,19 +2278,15 @@ bool pegasus_server_impl::validate_filter(::dsn::apps::filter_type::type filter_ return false; } -range_iteration_state -pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_value> &kvs, - const rocksdb::Slice &key, - const rocksdb::Slice &value, - ::dsn::apps::filter_type::type hash_key_filter_type, - const ::dsn::blob &hash_key_filter_pattern, - ::dsn::apps::filter_type::type sort_key_filter_type, - const ::dsn::blob &sort_key_filter_pattern, - uint32_t epoch_now, - bool no_value, - bool request_validate_hash, - bool request_expire_ts, - bool fill_value) +range_iteration_state pegasus_server_impl::validate_key_value_for_scan( + const rocksdb::Slice &key, + const rocksdb::Slice &value, + ::dsn::apps::filter_type::type hash_key_filter_type, + const ::dsn::blob &hash_key_filter_pattern, + ::dsn::apps::filter_type::type sort_key_filter_type, + const ::dsn::blob &sort_key_filter_pattern, + uint32_t epoch_now, + bool request_validate_hash) { if (check_if_record_expired(epoch_now, value)) { if (_verbose_log) { @@ -2335,29 +2326,36 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu return range_iteration_state::kFiltered; } } - if (fill_value) { - ::dsn::apps::key_value kv; - std::shared_ptr key_buf(::dsn::utils::make_shared_array(raw_key.length())); - ::memcpy(key_buf.get(), raw_key.data(), raw_key.length()); - kv.key.assign(std::move(key_buf), 0, raw_key.length()); + return range_iteration_state::kNormal; +} - // extract expire ts if necessary - if (request_expire_ts) { - auto expire_ts_seconds = - pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value)); - kv.__set_expire_ts_seconds(static_cast(expire_ts_seconds)); - } +void pegasus_server_impl::append_key_value(std::vector<::dsn::apps::key_value> &kvs, + const rocksdb::Slice &key, + const rocksdb::Slice &value, + bool no_value, + bool request_expire_ts) +{ + ::dsn::apps::key_value kv; + ::dsn::blob raw_key(key.data(), 0, key.size()); + std::shared_ptr key_buf(::dsn::utils::make_shared_array(raw_key.length())); + ::memcpy(key_buf.get(), raw_key.data(), raw_key.length()); + kv.key.assign(std::move(key_buf), 0, raw_key.length()); - // extract value - if (!no_value) { - std::string value_buf(value.data(), value.size()); - pegasus_extract_user_data(_pegasus_data_version, std::move(value_buf), kv.value); - } - kvs.emplace_back(std::move(kv)); + // extract expire ts if necessary + if (request_expire_ts) { + auto expire_ts_seconds = + pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value)); + kv.__set_expire_ts_seconds(static_cast(expire_ts_seconds)); } - return range_iteration_state::kNormal; + // extract value + if (!no_value) { + std::string value_buf(value.data(), value.size()); + pegasus_extract_user_data(_pegasus_data_version, std::move(value_buf), kv.value); + } + + kvs.emplace_back(std::move(kv)); } range_iteration_state pegasus_server_impl::append_key_value_for_multi_get( diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 7f304dd213f..f3dbf5d9c61 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -224,19 +224,21 @@ class pegasus_server_impl : public pegasus_read_service void set_last_durable_decree(int64_t decree) { _last_durable_decree.store(decree); } + void append_key_value(std::vector<::dsn::apps::key_value> &kvs, + const rocksdb::Slice &key, + const rocksdb::Slice &value, + bool no_value, + bool request_expire_ts); + range_iteration_state - append_key_value_for_scan(std::vector<::dsn::apps::key_value> &kvs, - const rocksdb::Slice &key, - const rocksdb::Slice &value, - ::dsn::apps::filter_type::type hash_key_filter_type, - const ::dsn::blob &hash_key_filter_pattern, - ::dsn::apps::filter_type::type sort_key_filter_type, - const ::dsn::blob &sort_key_filter_pattern, - uint32_t epoch_now, - bool no_value, - bool request_validate_hash, - bool request_expire_ts, - bool fill_value); + validate_key_value_for_scan(const rocksdb::Slice &key, + const rocksdb::Slice &value, + ::dsn::apps::filter_type::type hash_key_filter_type, + const ::dsn::blob &hash_key_filter_pattern, + ::dsn::apps::filter_type::type sort_key_filter_type, + const ::dsn::blob &sort_key_filter_pattern, + uint32_t epoch_now, + bool request_validate_hash); range_iteration_state append_key_value_for_multi_get(std::vector<::dsn::apps::key_value> &kvs,