Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote fdb #54

Draft
wants to merge 217 commits into
base: develop
Choose a base branch
from
Draft

Remote fdb #54

wants to merge 217 commits into from

Conversation

Ozaq
Copy link
Contributor

@Ozaq Ozaq commented Nov 26, 2024

No description provided.

src/fdb5/CMakeLists.txt Outdated Show resolved Hide resolved

public: // methods

TypesRegistry();
TypesRegistry(eckit::Stream& s);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs explicit to prevent unexpected type coversion

Comment on lines +39 to +45
s >> numTypes;
for (size_t i=0; i<numTypes; i++) {
s >> name;
s >> type;
types_[name] = type;
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably throw / or report an error otherwise if the stream cannot be read from any longer, e.g the other side hung up unexpectedly.

@@ -47,6 +47,8 @@ class DistFDB : public FDBBase {

ListIterator list(const FDBToolRequest& request) override;

AxesIterator axesIterator(const FDBToolRequest& request, int level=3) override { NOTIMP; }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this warrants an explanation why this FDB type does not allow axes iteration.

src/fdb5/api/FDB.h Outdated Show resolved Hide resolved
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoteFDB only supports a subset of the interface and throws a NotImplemented exception that will reach caller code. I think this is a surprising design as the Exception is unexpected (nothing in the interface indicates that behavior) and also not very descriptive. Not implemented can have several meanings, at a bare minimum the exception should be translated into a "NotSupportedForThisDBType" exception. It would be nice to be able to programmatically inspect for capabilities, e.g. have something such has bool supports_wipe().


eckit::DataHandle* dataHandle(const FieldLocation& fieldLocation);
eckit::DataHandle* dataHandle(const FieldLocation& fieldLocation, const Key& remapKey);
~RemoteFDB() {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be virtual/override

}
};
}
// TODO move the endpoint replacement to the server side ()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done before merging?

Comment on lines 90 to 97
class FDBEndpoint {
public:
eckit::net::Endpoint fieldLocationEndpoint_;
bool localFieldLocation_;

// TODO: Abstract this dictionary into a RemoteConfiguration object, which
// understands how to do the negotiation, etc, but uses Value (i.e.
// essentially JSON) over the wire for flexibility.
s << availableFunctionality().get();
FDBEndpoint(const eckit::net::Endpoint& fieldLocationEndpoint, bool localFieldLocation) :
fieldLocationEndpoint_(fieldLocationEndpoint), localFieldLocation_(localFieldLocation) {}
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unused

Comment on lines 224 to 233
new AsyncIterator([messageQueue, remoteFDB](eckit::Queue<ValueType>& queue) {
StoredMessage msg = std::make_pair(remote::MessageHeader{}, eckit::Buffer{0});
eckit::Buffer msg{0};
while (true) {
if (messageQueue->pop(msg) == -1) {
break;
} else {
MemoryStream s(msg.second);
MemoryStream s(msg);
queue.emplace(HelperClass::valueFromStream(s, remoteFDB));
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting seems broken.

@Ozaq
Copy link
Contributor Author

Ozaq commented Nov 26, 2024

I think this change also needs a couple of high level system tests, e.g. 2 FDB instances one as local, L. And another one as remote R, where R talks to L.

@codecov-commenter
Copy link

codecov-commenter commented Nov 26, 2024

Codecov Report

Attention: Patch coverage is 19.50839% with 2063 lines in your changes missing coverage. Please review.

Project coverage is 61.24%. Comparing base (c6e8b9e) to head (92e37c7).

Files with missing lines Patch % Lines
src/fdb5/remote/server/ServerConnection.cc 0.00% 331 Missing ⚠️
src/fdb5/remote/client/ClientConnection.cc 0.00% 276 Missing ⚠️
src/fdb5/remote/server/CatalogueHandler.cc 0.00% 264 Missing ⚠️
src/fdb5/remote/client/RemoteStore.cc 0.00% 227 Missing ⚠️
src/fdb5/remote/server/StoreHandler.cc 0.00% 149 Missing ⚠️
src/fdb5/api/RemoteFDB.cc 0.00% 128 Missing ⚠️
src/fdb5/remote/client/RemoteCatalogue.cc 0.00% 98 Missing ⚠️
src/fdb5/remote/Connection.cc 0.00% 89 Missing ⚠️
src/fdb5/remote/client/ClientConnectionRouter.cc 0.00% 66 Missing ⚠️
src/fdb5/remote/RemoteFieldLocation.cc 0.00% 44 Missing ⚠️
... and 52 more
Additional details and impacted files
@@             Coverage Diff             @@
##           develop      #54      +/-   ##
===========================================
- Coverage    64.50%   61.24%   -3.27%     
===========================================
  Files          238      264      +26     
  Lines        13844    14924    +1080     
  Branches      1334     1474     +140     
===========================================
+ Hits          8930     9140     +210     
- Misses        4914     5784     +870     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

@tbkr tbkr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite a lot obsolete comment/commented-out code remarks.

There are some comments on changed logic as well, which may slipped through, while writing the code.

if (location_) {
if (withLocation) {
out << sep << *location_;
} else if (withLength) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic changed here:

Now only true if location_ and withLength are both true.

@@ -27,9 +27,9 @@ ControlVisitor::ControlVisitor(eckit::Queue<ControlElement>& queue,
identifiers_(identifiers) {}


bool ControlVisitor::visitDatabase(const Catalogue& catalogue, const Store& store) {
bool ControlVisitor::visitDatabase(const Catalogue& catalogue) { //, const Store& store) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the unneeded comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -37,7 +37,8 @@ class ControlVisitor : public QueryVisitor<ControlElement> {
bool visitIndexes() override { return false; }
bool visitEntries() override { return false; }

bool visitDatabase(const Catalogue& catalogue, const Store& store) override;
bool visitDatabase(const Catalogue& catalogue) override;
// bool visitDatabase(const Catalogue& catalogue, const Store& store) override;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -44,7 +44,8 @@ class DumpVisitor : public QueryVisitor<DumpElement> {
bool visitIndexes() override { return false; }
bool visitEntries() override { return false; }

bool visitDatabase(const Catalogue& catalogue, const Store& store) override {
// bool visitDatabase(const Catalogue& catalogue, const Store& store) override {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -43,14 +43,16 @@ struct ListVisitor : public QueryVisitor<ListElement> {

/// Make a note of the current database. Subtract its key from the current
/// request so we can test request is used in its entirety
bool visitDatabase(const Catalogue& catalogue, const Store& store) override {
// bool visitDatabase(const Catalogue& catalogue, const Store& store) override {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove, below as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -32,7 +32,8 @@ class TocPurgeVisitor : public PurgeVisitor, public TocStatsReportVisitor {
TocPurgeVisitor(const TocCatalogue& catalogue, const Store& store);
~TocPurgeVisitor() override;

bool visitDatabase(const Catalogue& catalogue, const Store& store) override;
// bool visitDatabase(const Catalogue& catalogue, const Store& store) override;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -218,7 +218,8 @@ TocStatsReportVisitor::TocStatsReportVisitor(const TocCatalogue& catalogue, bool

TocStatsReportVisitor::~TocStatsReportVisitor() {}

bool TocStatsReportVisitor::visitDatabase(const Catalogue& catalogue, const Store& store) {
//bool TocStatsReportVisitor::visitDatabase(const Catalogue& catalogue, const Store& store) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -159,7 +159,8 @@ class TocStatsReportVisitor : public virtual StatsReportVisitor {

private: // methods

bool visitDatabase(const Catalogue& catalogue, const Store& store) override;
// bool visitDatabase(const Catalogue& catalogue, const Store& store) override;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -100,14 +100,16 @@ TocWipeVisitor::TocWipeVisitor(const TocCatalogue& catalogue,
TocWipeVisitor::~TocWipeVisitor() {}


bool TocWipeVisitor::visitDatabase(const Catalogue& catalogue, const Store& store) {
// bool TocWipeVisitor::visitDatabase(const Catalogue& catalogue, const Store& store) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove, also below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -37,7 +37,8 @@ class TocWipeVisitor : public WipeVisitor {

private: // methods

bool visitDatabase(const Catalogue& catalogue, const Store& store) override;
// bool visitDatabase(const Catalogue& catalogue, const Store& store) override;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -47,7 +47,7 @@ class Store {

virtual std::string type() const = 0;
virtual bool open() = 0;
virtual void flush() = 0;
virtual size_t flush() = 0;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks innocent at first, but ti me this implies that I now have to check the result here and ensure it is what the calling code expects.

EDIT: Ok I have looked at the use and I think this should be changed back to return void. The use I see is only to add an assert to ensure the flush call to the store matches (in terms of number of flushed locations).

src/fdb5/database/Archiver.cc Outdated Show resolved Hide resolved
public:
ConnectionError(const int);
ConnectionError(const int, const eckit::net::Endpoint&);
static size_t bufferSize() { return 1024*1024; }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems to be a bit excessive, from looking at the mars request logs from one day (2024-11-11) I can see that <1% requests exceed 4Kb (as text) and the largest is ~245Kb. This probably should not be a one size fits all approach. It would be very nice if a MarsRequest type would be able to give an upper bound / exact size for its serialization methods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, this was increased from 4096 over the summer when we needed to list/read from 40 years of climate-dt data on the lumi databridge.

Probably would be sensible to have a smaller buffer, and transfer larger requests in chunks if need be.

@@ -57,7 +62,7 @@ class EntryVisitor : public eckit::NonCopyable {

// n.b. non-owning
const Catalogue* currentCatalogue_ = nullptr;
const Store* currentStore_ = nullptr;
mutable Store* currentStore_ = nullptr;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this now gets deleted, the above comment does not longer fit. Also this should be a unique_ptr as it is even initialized from the release of a unique_ptr in EntryVisitor.cc:46

Copy link
Contributor Author

@Ozaq Ozaq Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed comments, created a ticket for the unique_ptr

Store& EntryVisitor::store() const {
if (!currentStore_) {
ASSERT(currentCatalogue_);
currentStore_ = currentCatalogue_->buildStore().release();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be a raw pointer but a unique_ptr

Comment on lines +128 to +132
eckit::Buffer buf = controlWriteReadResponse(remote::Message::Stores, generateRequestID());
eckit::MemoryStream s(buf);
size_t numStores;
s >> numStores;
ASSERT(numStores > 0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that we deserialize from a stream here although we send a specific message on the control channel. I would expect that there is a specific response for each request and hence the response should arrive back at the caller deserialised

E.g.

StoreResponse response = controlWriteReadResponse(remote::Message::Stores, generateRequestID())
if(response.stores == 0) {
// and so on


if (!archiveFuture_.valid()) {
// Client
bool RemoteFDB::handle(remote::Message message, bool control, uint32_t requestID) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The control parameter is unused

Comment on lines +358 to +386
if (hdr.clientID()) {
bool handled = false;

ASSERT(hdr.control() || single_);

auto pp = promises_.find(hdr.requestID);
if (pp != promises_.end()) {
std::lock_guard<std::mutex> lock(promisesMutex_);
if (hdr.payloadSize == 0) {
ASSERT(hdr.message == Message::Received);
pp->second.set_value(eckit::Buffer(0));
} else {
pp->second.set_value(std::move(payload));
}
promises_.erase(pp);
handled = true;
} else {
Client* client = nullptr;
{
std::lock_guard<std::mutex> lock(clientsMutex_);

auto it = clients_.find(hdr.clientID());
if (it == clients_.end()) {
std::stringstream ss;
ss << "ERROR: connection=" << controlEndpoint_ << " received [clientID="<< hdr.clientID() << ",requestID="<< hdr.requestID << ",message=" << hdr.message << ",payload=" << hdr.payloadSize << "]" << std::endl;
ss << "Unexpected answer for clientID recieved (" << hdr.clientID() << "). ABORTING";
eckit::Log::status() << ss.str() << std::endl;
eckit::Log::error() << "Retrieving... " << ss.str() << std::endl;
throw eckit::SeriousBug(ss.str(), Here());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the end of this code block it is considered to be a serious bug if the client id is unknown, however we only enter this code if there is ANY client ID. If there is no client ID the code will just continue with the next iteration of the for-ever loop. Is this intentional? I would argue that no client id is the same error case as an unknown client id.

src/fdb5/remote/client/RemoteCatalogue.h Outdated Show resolved Hide resolved
void checkUID() const override;
eckit::URI uri() const override;

void sendArchiveData(uint32_t id, const Key& key, std::unique_ptr<FieldLocation> fieldLocation);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is unused as far as I can tell.

src/fdb5/remote/client/RemoteCatalogue.cc Outdated Show resolved Hide resolved
Comment on lines 45 to 58
Buffer keyBuffer(4096);
MemoryStream keyStream(keyBuffer);
keyStream << currentIndexKey_;
keyStream << key;

Buffer locBuffer(4096);
MemoryStream locStream(locBuffer);
locStream << *fieldLocation;

std::vector<std::pair<const void*, uint32_t>> payloads;
payloads.push_back(std::pair<const void*, uint32_t>{keyBuffer, keyStream.position()});
payloads.push_back(std::pair<const void*, uint32_t>{locBuffer, locStream.position()});

dataWrite(Message::Blob, id, payloads);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is serialization and deserialization code in many places and I think this needs to be abstracted away into the read/write methods.

With the current implementation I see problems if we ever want to evolve the protocol, either by changing the serialization format or by introducing new / deprecating old fields.

I would like to see something along the lines of:

struct MessageBlob {
  // Contains everything the message entails
};

auto msg = createMessage(...) // Or use a builder
write(msg) // This takes care of buffer handling & serialization

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants