Skip to content

Commit

Permalink
Move SubprocessScribeLogger from eden to edencommon
Browse files Browse the repository at this point in the history
Summary:
To support better telemetry and logging in watchman we want to use Eden's components. Lets migrate and detangle the needed pieces.

This change moves SubprocessScribeLogger from eden to edencommon.

Reviewed By: genevievehelsel

Differential Revision: D54556070

fbshipit-source-id: cafe845c7bf11b1bcb595c01dc3c4f3a905c7c7e
  • Loading branch information
jdelliot authored and facebook-github-bot committed Mar 7, 2024
1 parent b16e58b commit 0d176a9
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 1 deletion.
1 change: 0 additions & 1 deletion eden/common/telemetry/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ target_link_libraries(
Folly::folly
)


target_include_directories(
edencommon_telemetry
PUBLIC
Expand Down
34 changes: 34 additions & 0 deletions eden/common/telemetry/ScribeLogger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <folly/Range.h>
#include <string>

namespace facebook::eden {

/**
* An interface to a scribe logger implementation.
*
* Subclasses must override either of the log overloads.
*
* Messages must not contain newlines. Messages are not durable. They may be
* dropped under load or for other reasons.
*/
class ScribeLogger {
public:
virtual ~ScribeLogger() = default;
virtual void log(folly::StringPiece message) {
return log(message.str());
}
virtual void log(std::string message) {
return log(folly::StringPiece{message});
}
};

} // namespace facebook::eden
172 changes: 172 additions & 0 deletions eden/common/telemetry/SubprocessScribeLogger.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "eden/common/telemetry/SubprocessScribeLogger.h"

#include <folly/logging/xlog.h>
#include <folly/system/ThreadName.h>

namespace {
/**
* If the writer process is backed up, limit the message queue size to the
* following bytes.
*/
constexpr size_t kQueueLimitBytes = 128 * 1024;

constexpr std::chrono::seconds kFlushTimeout{1};
constexpr std::chrono::seconds kProcessExitTimeout{1};
constexpr std::chrono::seconds kProcessTerminateTimeout{1};
} // namespace

namespace facebook::eden {

SubprocessScribeLogger::SubprocessScribeLogger(
const char* executable,
folly::StringPiece category)
: SubprocessScribeLogger{
std::vector<std::string>{executable, category.str()}} {}

SubprocessScribeLogger::SubprocessScribeLogger(
const std::vector<std::string>& argv,
FileDescriptor stdoutFd) {
SpawnedProcess::Options options;
options.pipeStdin();

if (stdoutFd) {
options.dup2(std::move(stdoutFd), STDOUT_FILENO);
} else {
options.nullStdout();
}

if (!folly::kIsWindows) {
// Forward stderr to the edenfs log.
// Ensure that no cwd directory handles are held open.
//
// TODO: Not enabled on Windows due to SpawnedProcess removing the UNC
// prefix, making CWD be "" which CreateProcess on Windows refuses. Once
// Mercurial is taught to deal with UNC correctly (D42282703), this can be
// enabled on Windows.
options.chdir(kRootAbsPath);
}

process_ = SpawnedProcess{argv, std::move(options)};

SCOPE_FAIL {
closeProcess();
};

writerThread_ = std::thread([this] {
folly::setThreadName("ScribeLoggerWriter");
writerThread();
});
}

SubprocessScribeLogger::~SubprocessScribeLogger() {
{
auto state = state_.lock();
state->shouldStop = true;
}
newMessageOrStop_.notify_one();

{
auto until = std::chrono::steady_clock::now() + kFlushTimeout;
auto state = state_.lock();
allMessagesWritten_.wait_until(
state.as_lock(), until, [&] { return state->didStop; });
}

closeProcess();
writerThread_.join();
}

void SubprocessScribeLogger::closeProcess() {
// Close the pipe, which should trigger the process to quit.
process_.closeParentFd(STDIN_FILENO);

// The writer thread might be blocked writing to a stuck process, so wait
// until the process is dead to join the thread.
process_.waitOrTerminateOrKill(kProcessExitTimeout, kProcessTerminateTimeout);
}

void SubprocessScribeLogger::log(std::string message) {
size_t messageSize = message.size();

{
auto state = state_.lock();
XCHECK(!state->shouldStop) << "log() called during destruction - that's UB";
if (state->didStop) {
return;
}
if (state->totalBytes + messageSize > kQueueLimitBytes) {
XLOG_EVERY_MS(DBG7, 10000) << "ScribeLogger queue full, dropping message";
// queue full, dropping!
return;
}

// This order is important in order to be atomic under std::bad_alloc.
state->messages.emplace_back(std::move(message));
state->totalBytes += messageSize;
}
newMessageOrStop_.notify_one();
}

void SubprocessScribeLogger::writerThread() {
auto fd = process_.stdinFd();

for (;;) {
std::string message;

{
auto state = state_.lock();
newMessageOrStop_.wait(state.as_lock(), [&] {
return state->shouldStop || !state->messages.empty();
});
if (!state->messages.empty()) {
XCHECK_LE(state->messages.front().size(), state->totalBytes)
<< "totalSize accounting fell out of sync!";

// The below statements are all noexcept.
std::swap(message, state->messages.front());
state->messages.pop_front();
state->totalBytes -= message.size();
} else {
// If the predicate succeeded but we have no messages, then we're
// shutting down cleanly.
assert(state->shouldStop);
XCHECK_EQ(0ul, state->totalBytes)
<< "totalSize accounting fell out of sync!";
state->didStop = true;
state.unlock();
allMessagesWritten_.notify_one();
return;
}
}

char newline = '\n';
std::array<iovec, 2> iov;
iov[0].iov_base = message.data();
iov[0].iov_len = message.size();
iov[1].iov_base = &newline;
iov[1].iov_len = sizeof(newline);
if (fd.writevFull(iov.data(), iov.size()).hasException()) {
// TODO: We could attempt to restart the process here.
XLOG(ERR) << "Failed to writev to logger process stdin: "
<< folly::errnoStr(errno) << ". Giving up!";
// Give up. Allow the ScribeLogger class to be destroyed.
{
auto state = state_.lock();
state->didStop = true;
state->messages.clear();
state->totalBytes = 0;
}
allMessagesWritten_.notify_one();
return;
}
}
}

} // namespace facebook::eden
76 changes: 76 additions & 0 deletions eden/common/telemetry/SubprocessScribeLogger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <folly/Synchronized.h>
#include <list>

#include "eden/common/telemetry/ScribeLogger.h"
#include "eden/common/utils/SpawnedProcess.h"

namespace facebook::eden {

/**
* SubprocessScribeLogger manages an external unix process and asynchronously
* forwards newline-delimited messages to its stdin.
*/
class SubprocessScribeLogger : public ScribeLogger {
public:
/**
* Launch `executable` with `category` as the first argument.
*/
SubprocessScribeLogger(const char* executable, folly::StringPiece category);

/**
* Launch the process specified at argv[0] with the given argv, and forward
* its stdout to `stdoutFd`, if non-negative. Otherwise, output goes to
* /dev/null.
*/
explicit SubprocessScribeLogger(
const std::vector<std::string>& argv,
FileDescriptor stdoutFd = FileDescriptor());

/**
* Waits for the managed process to exit. If it is hung and doesn't complete,
* terminates the process. Either way, this destructor will completed within a
* bounded amount of time.
*/
~SubprocessScribeLogger();

/**
* Forwards a log message to the external process. Must not contain newlines,
* since that is how the process distinguishes between messages.
*
* If the writer process is not keeping up, messages are dropped.
*/
void log(std::string message) override;
using ScribeLogger::log;

private:
void closeProcess();
void writerThread();

struct State {
bool shouldStop = false;
bool didStop = false;

/// Sum of sizes of queued messages.
size_t totalBytes = 0;
/// Invariant: empty if didStop is true
std::list<std::string> messages;
};

SpawnedProcess process_;
std::thread writerThread_;

folly::Synchronized<State, std::mutex> state_;
std::condition_variable newMessageOrStop_;
std::condition_variable allMessagesWritten_;
};

} // namespace facebook::eden
34 changes: 34 additions & 0 deletions eden/common/telemetry/test/SubprocessScribeLoggerTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "eden/common/telemetry/SubprocessScribeLogger.h"

#include <folly/Exception.h>
#include <folly/FileUtil.h>
#include <folly/experimental/TestUtil.h>
#include <folly/portability/GTest.h>

using namespace facebook::eden;
using namespace folly::string_piece_literals;

TEST(ScribeLogger, log_messages_are_written_with_newlines) {
folly::test::TemporaryFile output;

{
SubprocessScribeLogger logger{
std::vector<std::string>{"/bin/cat"},
FileDescriptor(
::dup(output.fd()), "dup", FileDescriptor::FDType::Generic)};
logger.log("foo"_sp);
logger.log("bar"_sp);
}

folly::checkUnixError(lseek(output.fd(), 0, SEEK_SET));
std::string contents;
folly::readFile(output.fd(), contents);
EXPECT_EQ("foo\nbar\n", contents);
}

0 comments on commit 0d176a9

Please sign in to comment.