Skip to content

Commit

Permalink
8229517: Support for optional asynchronous/buffered logging
Browse files Browse the repository at this point in the history
Reviewed-by: ysuenaga, simonis, stuefe, phh, dholmes, ayang
  • Loading branch information
Xin Liu authored and Paul Hohensee committed May 27, 2021
1 parent 7c85f35 commit 41185d3
Show file tree
Hide file tree
Showing 23 changed files with 713 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/hotspot/os/windows/os_windows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ bool os::create_thread(Thread* thread, ThreadType thr_type,
case os::vm_thread:
case os::pgc_thread:
case os::cgc_thread:
case os::asynclog_thread:
case os::watcher_thread:
if (VMThreadStackSize > 0) stack_size = (size_t)(VMThreadStackSize * K);
break;
Expand Down
194 changes: 194 additions & 0 deletions src/hotspot/share/logging/logAsyncWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*
*/
#include "precompiled.hpp"
#include "logging/logAsyncWriter.hpp"
#include "logging/logConfiguration.hpp"
#include "logging/logFileOutput.hpp"
#include "logging/logHandle.hpp"
#include "runtime/atomic.hpp"

Semaphore AsyncLogWriter::_sem(0);
Semaphore AsyncLogWriter::_io_sem(1);

class AsyncLogLocker : public StackObj {
private:
static Semaphore _lock;
public:
AsyncLogLocker() {
_lock.wait();
}

~AsyncLogLocker() {
_lock.signal();
}
};

Semaphore AsyncLogLocker::_lock(1);

void AsyncLogWriter::enqueue_locked(const AsyncLogMessage& msg) {
if (_buffer.size() >= _buffer_max_size) {
bool p_created;
uint32_t* counter = _stats.add_if_absent(msg.output(), 0, &p_created);
*counter = *counter + 1;
// drop the enqueueing message.
return;
}

assert(_buffer.size() < _buffer_max_size, "_buffer is over-sized.");
_buffer.push_back(msg);
_sem.signal();
}

void AsyncLogWriter::enqueue(LogFileOutput& output, const LogDecorations& decorations, const char* msg) {
AsyncLogMessage m(output, decorations, os::strdup(msg));

{ // critical area
AsyncLogLocker lock;
enqueue_locked(m);
}
}

// LogMessageBuffer consists of a multiple-part/multiple-line messsage.
// The lock here guarantees its integrity.
void AsyncLogWriter::enqueue(LogFileOutput& output, LogMessageBuffer::Iterator msg_iterator) {
AsyncLogLocker lock;

for (; !msg_iterator.is_at_end(); msg_iterator++) {
AsyncLogMessage m(output, msg_iterator.decorations(), os::strdup(msg_iterator.message()));
enqueue_locked(m);
}
}

AsyncLogWriter::AsyncLogWriter()
: _initialized(false),
_stats(17 /*table_size*/) {
if (os::create_thread(this, os::asynclog_thread)) {
_initialized = true;
} else {
log_warning(logging, thread)("AsyncLogging failed to create thread. Falling back to synchronous logging.");
}

log_info(logging)("The maximum entries of AsyncLogBuffer: " SIZE_FORMAT ", estimated memory use: " SIZE_FORMAT " bytes",
_buffer_max_size, AsyncLogBufferSize);
}

class AsyncLogMapIterator {
AsyncLogBuffer& _logs;

public:
AsyncLogMapIterator(AsyncLogBuffer& logs) :_logs(logs) {}
bool do_entry(LogFileOutput* output, uint32_t* counter) {
using none = LogTagSetMapping<LogTag::__NO_TAG>;

if (*counter > 0) {
LogDecorations decorations(LogLevel::Warning, none::tagset(), output->decorators());
stringStream ss;
ss.print(UINT32_FORMAT_W(6) " messages dropped due to async logging", *counter);
AsyncLogMessage msg(*output, decorations, ss.as_string(true /*c_heap*/));
_logs.push_back(msg);
*counter = 0;
}

return true;
}
};

void AsyncLogWriter::write() {
// Use kind of copy-and-swap idiom here.
// Empty 'logs' swaps the content with _buffer.
// Along with logs destruction, all processed messages are deleted.
//
// The operation 'pop_all()' is done in O(1). All I/O jobs are then performed without
// lock protection. This guarantees I/O jobs don't block logsites.
AsyncLogBuffer logs;
bool own_io = false;

{ // critical region
AsyncLogLocker lock;

_buffer.pop_all(&logs);
// append meta-messages of dropped counters
AsyncLogMapIterator dropped_counters_iter(logs);
_stats.iterate(&dropped_counters_iter);
own_io = _io_sem.trywait();
}

LinkedListIterator<AsyncLogMessage> it(logs.head());
if (!own_io) {
_io_sem.wait();
}

while (!it.is_empty()) {
AsyncLogMessage* e = it.next();
char* msg = e->message();

if (msg != nullptr) {
e->output()->write_blocking(e->decorations(), msg);
os::free(msg);
}
}
_io_sem.signal();
}

void AsyncLogWriter::run() {
while (true) {
// The value of a semphore cannot be negative. Therefore, the current thread falls asleep
// when its value is zero. It will be waken up when new messages are enqueued.
_sem.wait();
write();
}
}

AsyncLogWriter* AsyncLogWriter::_instance = nullptr;

void AsyncLogWriter::initialize() {
if (!LogConfiguration::is_async_mode()) return;

assert(_instance == nullptr, "initialize() should only be invoked once.");

AsyncLogWriter* self = new AsyncLogWriter();
if (self->_initialized) {
Atomic::release_store_fence(&AsyncLogWriter::_instance, self);
// All readers of _instance after the fence see non-NULL.
// We use LogOutputList's RCU counters to ensure all synchronous logsites have completed.
// After that, we start AsyncLog Thread and it exclusively takes over all logging I/O.
for (LogTagSet* ts = LogTagSet::first(); ts != NULL; ts = ts->next()) {
ts->wait_until_no_readers();
}
os::start_thread(self);
log_debug(logging, thread)("Async logging thread started.");
}
}

AsyncLogWriter* AsyncLogWriter::instance() {
return _instance;
}

// write() acquires and releases _io_sem even _buffer is empty.
// This guarantees all logging I/O of dequeued messages are done when it returns.
void AsyncLogWriter::flush() {
if (_instance != nullptr) {
_instance->write();
}
}
176 changes: 176 additions & 0 deletions src/hotspot/share/logging/logAsyncWriter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*
*/
#ifndef SHARE_LOGGING_LOGASYNCWRITER_HPP
#define SHARE_LOGGING_LOGASYNCWRITER_HPP
#include "logging/log.hpp"
#include "logging/logDecorations.hpp"
#include "logging/logFileOutput.hpp"
#include "logging/logMessageBuffer.hpp"
#include "memory/resourceArea.hpp"
#include "runtime/nonJavaThread.hpp"
#include "utilities/hashtable.hpp"
#include "utilities/linkedlist.hpp"

template <typename E, MEMFLAGS F>
class LinkedListDeque : private LinkedListImpl<E, ResourceObj::C_HEAP, F> {
private:
LinkedListNode<E>* _tail;
size_t _size;

public:
LinkedListDeque() : _tail(NULL), _size(0) {}
void push_back(const E& e) {
if (!_tail) {
_tail = this->add(e);
} else {
_tail = this->insert_after(e, _tail);
}

++_size;
}

// pop all elements to logs.
void pop_all(LinkedList<E>* logs) {
logs->move(static_cast<LinkedList<E>* >(this));
_tail = NULL;
_size = 0;
}

void pop_all(LinkedListDeque<E, F>* logs) {
logs->_size = _size;
logs->_tail = _tail;
pop_all(static_cast<LinkedList<E>* >(logs));
}

void pop_front() {
LinkedListNode<E>* h = this->unlink_head();
if (h == _tail) {
_tail = NULL;
}

if (h != NULL) {
--_size;
this->delete_node(h);
}
}

size_t size() const { return _size; }

const E* front() const {
return this->_head == NULL ? NULL : this->_head->peek();
}

const E* back() const {
return _tail == NULL ? NULL : _tail->peek();
}

LinkedListNode<E>* head() const {
return this->_head;
}
};

class AsyncLogMessage {
LogFileOutput& _output;
const LogDecorations _decorations;
char* _message;

public:
AsyncLogMessage(LogFileOutput& output, const LogDecorations& decorations, char* msg)
: _output(output), _decorations(decorations), _message(msg) {}

// placeholder for LinkedListImpl.
bool equals(const AsyncLogMessage& o) const { return false; }

LogFileOutput* output() const { return &_output; }
const LogDecorations& decorations() const { return _decorations; }
char* message() const { return _message; }
};

typedef LinkedListDeque<AsyncLogMessage, mtLogging> AsyncLogBuffer;
typedef KVHashtable<LogFileOutput*, uint32_t, mtLogging> AsyncLogMap;

//
// ASYNC LOGGING SUPPORT
//
// Summary:
// Async Logging is working on the basis of singleton AsyncLogWriter, which manages an intermediate buffer and a flushing thread.
//
// Interface:
//
// initialize() is called once when JVM is initialized. It creates and initializes the singleton instance of AsyncLogWriter.
// Once async logging is established, there's no way to turn it off.
//
// instance() is MT-safe and returns the pointer of the singleton instance if and only if async logging is enabled and has well
// initialized. Clients can use its return value to determine async logging is established or not.
//
// The basic operation of AsyncLogWriter is enqueue(). 2 overloading versions of it are provided to match LogOutput::write().
// They are both MT-safe and non-blocking. Derived classes of LogOutput can invoke the corresponding enqueue() in write() and
// return 0. AsyncLogWriter is responsible of copying neccessary data.
//
// The static member function flush() is designated to flush out all pending messages when JVM is terminating.
// In normal JVM termination, flush() is invoked in LogConfiguration::finalize(). flush() is MT-safe and can be invoked arbitrary
// times. It is no-op if async logging is not established.
//
class AsyncLogWriter : public NonJavaThread {
static AsyncLogWriter* _instance;
// _sem is a semaphore whose value denotes how many messages have been enqueued.
// It decreases in AsyncLogWriter::run()
static Semaphore _sem;
// A lock of IO
static Semaphore _io_sem;

volatile bool _initialized;
AsyncLogMap _stats; // statistics for dropped messages
AsyncLogBuffer _buffer;

// The memory use of each AsyncLogMessage (payload) consists of itself and a variable-length c-str message.
// A regular logging message is smaller than vwrite_buffer_size, which is defined in logtagset.cpp
const size_t _buffer_max_size = {AsyncLogBufferSize / (sizeof(AsyncLogMessage) + vwrite_buffer_size)};

AsyncLogWriter();
void enqueue_locked(const AsyncLogMessage& msg);
void write();
void run() override;
void pre_run() override {
NonJavaThread::pre_run();
log_debug(logging, thread)("starting AsyncLog Thread tid = " INTX_FORMAT, os::current_thread_id());
}
char* name() const override { return (char*)"AsyncLog Thread"; }
bool is_Named_thread() const override { return true; }
void print_on(outputStream* st) const override {
st->print("\"%s\" ", name());
Thread::print_on(st);
st->cr();
}

public:
void enqueue(LogFileOutput& output, const LogDecorations& decorations, const char* msg);
void enqueue(LogFileOutput& output, LogMessageBuffer::Iterator msg_iterator);

static AsyncLogWriter* instance();
static void initialize();
static void flush();
};

#endif // SHARE_LOGGING_LOGASYNCWRITER_HPP
Loading

1 comment on commit 41185d3

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.