Skip to content

Commit

Permalink
chore: pass SinkReplyBuilder and Transaction explicitly. Part3 (#3966)
Browse files Browse the repository at this point in the history
  • Loading branch information
romange authored Oct 23, 2024
1 parent e24f697 commit bf42eb0
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 125 deletions.
53 changes: 25 additions & 28 deletions src/server/hll_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,26 @@ using namespace facade;

namespace {

template <typename T> void HandleOpValueResult(const OpResult<T>& result, ConnectionContext* cntx) {
template <typename T>
void HandleOpValueResult(const OpResult<T>& result, SinkReplyBuilder* builder) {
static_assert(std::is_integral<T>::value,
"we are only handling types that are integral types in the return types from "
"here");
if (result) {
cntx->SendLong(result.value());
builder->SendLong(result.value());
} else {
switch (result.status()) {
case OpStatus::WRONG_TYPE:
cntx->SendError(kWrongTypeErr);
builder->SendError(kWrongTypeErr);
break;
case OpStatus::OUT_OF_MEMORY:
cntx->SendError(kOutOfMemory);
builder->SendError(kOutOfMemory);
break;
case OpStatus::INVALID_VALUE:
cntx->SendError(HllFamily::kInvalidHllErr);
builder->SendError(HllFamily::kInvalidHllErr);
break;
default:
cntx->SendLong(0); // in case we don't have the value we should just send 0
builder->SendLong(0); // in case we don't have the value we should just send 0
break;
}
}
Expand Down Expand Up @@ -124,17 +125,16 @@ OpResult<int> AddToHll(const OpArgs& op_args, string_view key, CmdArgList values
return std::min(updated, 1);
}

void PFAdd(CmdArgList args, ConnectionContext* cntx) {
void PFAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);

auto cb = [&](Transaction* t, EngineShard* shard) {
return AddToHll(t->GetOpArgs(shard), key, args);
};

Transaction* trans = cntx->transaction;
OpResult<int> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
OpResult<int> res = tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
}

OpResult<int64_t> CountHllsSingle(const OpArgs& op_args, string_view key) {
Expand Down Expand Up @@ -204,7 +204,7 @@ vector<HllBufferPtr> ConvertShardVector(const vector<vector<string>>& hlls) {
return ptrs;
}

OpResult<int64_t> PFCountMulti(CmdArgList args, ConnectionContext* cntx) {
OpResult<int64_t> PFCountMulti(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
vector<vector<string>> hlls;
hlls.resize(shard_set->size());

Expand All @@ -218,8 +218,7 @@ OpResult<int64_t> PFCountMulti(CmdArgList args, ConnectionContext* cntx) {
return result.status();
};

Transaction* trans = cntx->transaction;
trans->ScheduleSingleHop(std::move(cb));
tx->ScheduleSingleHop(std::move(cb));

vector<HllBufferPtr> ptrs = ConvertShardVector(hlls);
int64_t pf_count = pfcountMulti(ptrs.data(), ptrs.size());
Expand All @@ -230,22 +229,21 @@ OpResult<int64_t> PFCountMulti(CmdArgList args, ConnectionContext* cntx) {
}
}

void PFCount(CmdArgList args, ConnectionContext* cntx) {
void PFCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
if (args.size() == 1) {
string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) {
return CountHllsSingle(t->GetOpArgs(shard), key);
};

Transaction* trans = cntx->transaction;
OpResult<int64_t> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
OpResult<int64_t> res = tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
} else {
HandleOpValueResult(PFCountMulti(args, cntx), cntx);
HandleOpValueResult(PFCountMulti(args, tx, builder), builder);
}
}

OpResult<int> PFMergeInternal(CmdArgList args, ConnectionContext* cntx) {
OpResult<int> PFMergeInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
vector<vector<string>> hlls;
hlls.resize(shard_set->size());

Expand All @@ -262,11 +260,10 @@ OpResult<int> PFMergeInternal(CmdArgList args, ConnectionContext* cntx) {
return result.status();
};

Transaction* trans = cntx->transaction;
trans->Execute(std::move(cb), false);
tx->Execute(std::move(cb), false);

if (!success) {
trans->Conclude();
tx->Conclude();
return OpStatus::INVALID_VALUE;
}

Expand All @@ -292,21 +289,21 @@ OpResult<int> PFMergeInternal(CmdArgList args, ConnectionContext* cntx) {

return OpStatus::OK;
};
trans->Execute(std::move(set_cb), true);
tx->Execute(std::move(set_cb), true);

return result;
}

void PFMerge(CmdArgList args, ConnectionContext* cntx) {
OpResult<int> result = PFMergeInternal(args, cntx);
void PFMerge(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
OpResult<int> result = PFMergeInternal(args, tx, builder);
if (result.ok()) {
if (result.value() == 0) {
cntx->SendOk();
builder->SendOk();
} else {
cntx->SendError(HllFamily::kInvalidHllErr);
builder->SendError(HllFamily::kInvalidHllErr);
}
} else {
HandleOpValueResult(result, cntx);
HandleOpValueResult(result, builder);
}
}

Expand Down
Loading

0 comments on commit bf42eb0

Please sign in to comment.