diff --git a/eden/common/telemetry/CMakeLists.txt b/eden/common/telemetry/CMakeLists.txt index f60d1b96..6ddfd5f3 100644 --- a/eden/common/telemetry/CMakeLists.txt +++ b/eden/common/telemetry/CMakeLists.txt @@ -18,7 +18,6 @@ target_link_libraries( Folly::folly ) - target_include_directories( edencommon_telemetry PUBLIC diff --git a/eden/common/telemetry/ScribeLogger.h b/eden/common/telemetry/ScribeLogger.h new file mode 100644 index 00000000..3af00a3b --- /dev/null +++ b/eden/common/telemetry/ScribeLogger.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include + +namespace facebook::eden { + +/** + * An interface to a scribe logger implementation. + * + * Subclasses must override either of the log overloads. + * + * Messages must not contain newlines. Messages are not durable. They may be + * dropped under load or for other reasons. + */ +class ScribeLogger { + public: + virtual ~ScribeLogger() = default; + virtual void log(folly::StringPiece message) { + return log(message.str()); + } + virtual void log(std::string message) { + return log(folly::StringPiece{message}); + } +}; + +} // namespace facebook::eden diff --git a/eden/common/telemetry/SubprocessScribeLogger.cpp b/eden/common/telemetry/SubprocessScribeLogger.cpp new file mode 100644 index 00000000..c064b0c1 --- /dev/null +++ b/eden/common/telemetry/SubprocessScribeLogger.cpp @@ -0,0 +1,172 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "eden/common/telemetry/SubprocessScribeLogger.h" + +#include +#include + +namespace { +/** + * If the writer process is backed up, limit the message queue size to the + * following bytes. + */ +constexpr size_t kQueueLimitBytes = 128 * 1024; + +constexpr std::chrono::seconds kFlushTimeout{1}; +constexpr std::chrono::seconds kProcessExitTimeout{1}; +constexpr std::chrono::seconds kProcessTerminateTimeout{1}; +} // namespace + +namespace facebook::eden { + +SubprocessScribeLogger::SubprocessScribeLogger( + const char* executable, + folly::StringPiece category) + : SubprocessScribeLogger{ + std::vector{executable, category.str()}} {} + +SubprocessScribeLogger::SubprocessScribeLogger( + const std::vector& argv, + FileDescriptor stdoutFd) { + SpawnedProcess::Options options; + options.pipeStdin(); + + if (stdoutFd) { + options.dup2(std::move(stdoutFd), STDOUT_FILENO); + } else { + options.nullStdout(); + } + + if (!folly::kIsWindows) { + // Forward stderr to the edenfs log. + // Ensure that no cwd directory handles are held open. + // + // TODO: Not enabled on Windows due to SpawnedProcess removing the UNC + // prefix, making CWD be "" which CreateProcess on Windows refuses. Once + // Mercurial is taught to deal with UNC correctly (D42282703), this can be + // enabled on Windows. + options.chdir(kRootAbsPath); + } + + process_ = SpawnedProcess{argv, std::move(options)}; + + SCOPE_FAIL { + closeProcess(); + }; + + writerThread_ = std::thread([this] { + folly::setThreadName("ScribeLoggerWriter"); + writerThread(); + }); +} + +SubprocessScribeLogger::~SubprocessScribeLogger() { + { + auto state = state_.lock(); + state->shouldStop = true; + } + newMessageOrStop_.notify_one(); + + { + auto until = std::chrono::steady_clock::now() + kFlushTimeout; + auto state = state_.lock(); + allMessagesWritten_.wait_until( + state.as_lock(), until, [&] { return state->didStop; }); + } + + closeProcess(); + writerThread_.join(); +} + +void SubprocessScribeLogger::closeProcess() { + // Close the pipe, which should trigger the process to quit. + process_.closeParentFd(STDIN_FILENO); + + // The writer thread might be blocked writing to a stuck process, so wait + // until the process is dead to join the thread. + process_.waitOrTerminateOrKill(kProcessExitTimeout, kProcessTerminateTimeout); +} + +void SubprocessScribeLogger::log(std::string message) { + size_t messageSize = message.size(); + + { + auto state = state_.lock(); + XCHECK(!state->shouldStop) << "log() called during destruction - that's UB"; + if (state->didStop) { + return; + } + if (state->totalBytes + messageSize > kQueueLimitBytes) { + XLOG_EVERY_MS(DBG7, 10000) << "ScribeLogger queue full, dropping message"; + // queue full, dropping! + return; + } + + // This order is important in order to be atomic under std::bad_alloc. + state->messages.emplace_back(std::move(message)); + state->totalBytes += messageSize; + } + newMessageOrStop_.notify_one(); +} + +void SubprocessScribeLogger::writerThread() { + auto fd = process_.stdinFd(); + + for (;;) { + std::string message; + + { + auto state = state_.lock(); + newMessageOrStop_.wait(state.as_lock(), [&] { + return state->shouldStop || !state->messages.empty(); + }); + if (!state->messages.empty()) { + XCHECK_LE(state->messages.front().size(), state->totalBytes) + << "totalSize accounting fell out of sync!"; + + // The below statements are all noexcept. + std::swap(message, state->messages.front()); + state->messages.pop_front(); + state->totalBytes -= message.size(); + } else { + // If the predicate succeeded but we have no messages, then we're + // shutting down cleanly. + assert(state->shouldStop); + XCHECK_EQ(0ul, state->totalBytes) + << "totalSize accounting fell out of sync!"; + state->didStop = true; + state.unlock(); + allMessagesWritten_.notify_one(); + return; + } + } + + char newline = '\n'; + std::array iov; + iov[0].iov_base = message.data(); + iov[0].iov_len = message.size(); + iov[1].iov_base = &newline; + iov[1].iov_len = sizeof(newline); + if (fd.writevFull(iov.data(), iov.size()).hasException()) { + // TODO: We could attempt to restart the process here. + XLOG(ERR) << "Failed to writev to logger process stdin: " + << folly::errnoStr(errno) << ". Giving up!"; + // Give up. Allow the ScribeLogger class to be destroyed. + { + auto state = state_.lock(); + state->didStop = true; + state->messages.clear(); + state->totalBytes = 0; + } + allMessagesWritten_.notify_one(); + return; + } + } +} + +} // namespace facebook::eden diff --git a/eden/common/telemetry/SubprocessScribeLogger.h b/eden/common/telemetry/SubprocessScribeLogger.h new file mode 100644 index 00000000..4086a535 --- /dev/null +++ b/eden/common/telemetry/SubprocessScribeLogger.h @@ -0,0 +1,76 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include + +#include "eden/common/telemetry/ScribeLogger.h" +#include "eden/common/utils/SpawnedProcess.h" + +namespace facebook::eden { + +/** + * SubprocessScribeLogger manages an external unix process and asynchronously + * forwards newline-delimited messages to its stdin. + */ +class SubprocessScribeLogger : public ScribeLogger { + public: + /** + * Launch `executable` with `category` as the first argument. + */ + SubprocessScribeLogger(const char* executable, folly::StringPiece category); + + /** + * Launch the process specified at argv[0] with the given argv, and forward + * its stdout to `stdoutFd`, if non-negative. Otherwise, output goes to + * /dev/null. + */ + explicit SubprocessScribeLogger( + const std::vector& argv, + FileDescriptor stdoutFd = FileDescriptor()); + + /** + * Waits for the managed process to exit. If it is hung and doesn't complete, + * terminates the process. Either way, this destructor will completed within a + * bounded amount of time. + */ + ~SubprocessScribeLogger(); + + /** + * Forwards a log message to the external process. Must not contain newlines, + * since that is how the process distinguishes between messages. + * + * If the writer process is not keeping up, messages are dropped. + */ + void log(std::string message) override; + using ScribeLogger::log; + + private: + void closeProcess(); + void writerThread(); + + struct State { + bool shouldStop = false; + bool didStop = false; + + /// Sum of sizes of queued messages. + size_t totalBytes = 0; + /// Invariant: empty if didStop is true + std::list messages; + }; + + SpawnedProcess process_; + std::thread writerThread_; + + folly::Synchronized state_; + std::condition_variable newMessageOrStop_; + std::condition_variable allMessagesWritten_; +}; + +} // namespace facebook::eden diff --git a/eden/common/telemetry/test/SubprocessScribeLoggerTest.cpp b/eden/common/telemetry/test/SubprocessScribeLoggerTest.cpp new file mode 100644 index 00000000..06c553c1 --- /dev/null +++ b/eden/common/telemetry/test/SubprocessScribeLoggerTest.cpp @@ -0,0 +1,34 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "eden/common/telemetry/SubprocessScribeLogger.h" + +#include +#include +#include +#include + +using namespace facebook::eden; +using namespace folly::string_piece_literals; + +TEST(ScribeLogger, log_messages_are_written_with_newlines) { + folly::test::TemporaryFile output; + + { + SubprocessScribeLogger logger{ + std::vector{"/bin/cat"}, + FileDescriptor( + ::dup(output.fd()), "dup", FileDescriptor::FDType::Generic)}; + logger.log("foo"_sp); + logger.log("bar"_sp); + } + + folly::checkUnixError(lseek(output.fd(), 0, SEEK_SET)); + std::string contents; + folly::readFile(output.fd(), contents); + EXPECT_EQ("foo\nbar\n", contents); +}