Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extension: User space event #14712

Merged
merged 17 commits into from
Feb 1, 2021
Merged
2 changes: 2 additions & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,5 @@ extensions/filters/http/oauth2 @rgs1 @derekargueta @snowp
/*/extensions/filters/http/kill_request @qqustc @htuch
# Rate limit expression descriptor
/*/extensions/rate_limit_descriptors/expr @kyessenov @lizan
# user space socket pair and event
/*/extensions/io_socket/user_space @lambdai @antoniovicente
6 changes: 6 additions & 0 deletions source/extensions/extensions_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ EXTENSIONS = {
#

"envoy.rate_limit_descriptors.expr": "//source/extensions/rate_limit_descriptors/expr:config",

#
# IO socket
#

"envoy.io_socket.user_space": "//source/extensions/io_socket/user_space:config",
}

# These can be changed to ["//visibility:public"], for downstream builds which
Expand Down
8 changes: 8 additions & 0 deletions source/extensions/io_socket/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_extension_package",
)

licenses(["notice"]) # Apache 2

envoy_extension_package()
44 changes: 44 additions & 0 deletions source/extensions/io_socket/user_space/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_extension",
"envoy_cc_library",
"envoy_extension_package",
)

licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_cc_extension(
name = "config",
srcs = ["config.h"],
security_posture = "unknown",
status = "wip",
undocumented = True,
deps = [
],
)

envoy_cc_library(
name = "io_handle_lib",
hdrs = ["io_handle.h"],
deps = [
"//source/common/buffer:buffer_lib",
"//source/common/buffer:watermark_buffer_lib",
"//source/common/common:empty_string",
],
)

envoy_cc_library(
name = "file_event_lib",
srcs = [
"file_event_impl.cc",
],
hdrs = [
"file_event_impl.h",
],
deps = [
":io_handle_lib",
"//source/common/event:dispatcher_includes",
],
)
12 changes: 12 additions & 0 deletions source/extensions/io_socket/user_space/config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Envoy {
namespace Extensions {
namespace IoSocket {
namespace UserSpace {

// TODO(lambdai): This file is to follow Envoy extension convention.
// Will add config and factory in #13865.
lambdai marked this conversation as resolved.
Show resolved Hide resolved

}
} // namespace IoSocket
} // namespace Extensions
} // namespace Envoy
76 changes: 76 additions & 0 deletions source/extensions/io_socket/user_space/file_event_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include "extensions/io_socket/user_space/file_event_impl.h"

#include <cstdint>

#include "common/common/assert.h"

#include "extensions/io_socket/user_space/io_handle.h"

namespace Envoy {
namespace Extensions {
namespace IoSocket {
namespace UserSpace {

FileEventImpl::FileEventImpl(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, uint32_t events,
IoHandle& io_source)
: schedulable_(dispatcher.createSchedulableCallback([this, cb]() {
auto ephemeral_events = event_listener_.getAndClearEphemeralEvents();
ENVOY_LOG(trace, "User space event {} invokes callbacks on events = {}",
static_cast<void*>(this), ephemeral_events);
cb(ephemeral_events);
})),
io_source_(io_source) {
setEnabled(events);
}

void FileEventImpl::activate(uint32_t events) {
// Only supported event types are set.
ASSERT((events & (Event::FileReadyType::Read | Event::FileReadyType::Write |
Event::FileReadyType::Closed)) == events);
event_listener_.onEventActivated(events);
schedulable_->scheduleCallbackNextIteration();
}

void FileEventImpl::setEnabled(uint32_t events) {
// Only supported event types are set.
ASSERT((events & (Event::FileReadyType::Read | Event::FileReadyType::Write |
Event::FileReadyType::Closed)) == events);
event_listener_.clearEphemeralEvents();
lambdai marked this conversation as resolved.
Show resolved Hide resolved
event_listener_.setEnabledEvents(events);
bool was_enabled = schedulable_->enabled();
// Recalculate activated events.
uint32_t events_to_notify = 0;
if ((events & Event::FileReadyType::Read) && io_source_.isReadable()) {
lambdai marked this conversation as resolved.
Show resolved Hide resolved
events_to_notify |= Event::FileReadyType::Read;
}
if ((events & Event::FileReadyType::Write) && io_source_.isPeerWritable()) {
events_to_notify |= Event::FileReadyType::Write;
}
if ((events & Event::FileReadyType::Closed) && io_source_.isPeerShutDownWrite()) {
events_to_notify |= Event::FileReadyType::Closed;
}
if (events_to_notify != 0) {
activate(events_to_notify);
} else {
schedulable_->cancel();
}
ENVOY_LOG(
trace,
"User space file event {} set enabled events {} and events {} is active. Will {} reschedule.",
static_cast<void*>(this), events, was_enabled ? "not " : "");
}

void FileEventImpl::activateIfEnabled(uint32_t events) {
ASSERT((events & (Event::FileReadyType::Read | Event::FileReadyType::Write |
Event::FileReadyType::Closed)) == events);
// filtered out disabled events.
lambdai marked this conversation as resolved.
Show resolved Hide resolved
uint32_t filter_enabled = events & event_listener_.getEnabledEvents();
lambdai marked this conversation as resolved.
Show resolved Hide resolved
if (filter_enabled == 0) {
return;
}
activate(filter_enabled);
}
} // namespace UserSpace
} // namespace IoSocket
} // namespace Extensions
} // namespace Envoy
79 changes: 79 additions & 0 deletions source/extensions/io_socket/user_space/file_event_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#pragma once

#include <cstdint>

#include "envoy/event/file_event.h"

#include "common/event/dispatcher_impl.h"
lambdai marked this conversation as resolved.
Show resolved Hide resolved
#include "common/event/event_impl_base.h"
lambdai marked this conversation as resolved.
Show resolved Hide resolved

#include "extensions/io_socket/user_space/io_handle.h"

namespace Envoy {

namespace Extensions {
namespace IoSocket {
namespace UserSpace {

// A FileEvent implementation which is used to drive UserSpaceHandle.
// Declare the class final to safely call virtual function setEnabled in constructor.
class FileEventImpl final : public Event::FileEvent, Logger::Loggable<Logger::Id::io> {
public:
FileEventImpl(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, uint32_t events,
IoHandle& io_source);

// Event::FileEvent
void activate(uint32_t events) override;
void setEnabled(uint32_t events) override;

// This event always acts as edge triggered regardless the underlying OS is level or
// edge triggered. The event owner on windows platform should not emulate edge events.
void unregisterEventIfEmulatedEdge(uint32_t) override {}
void registerEventIfEmulatedEdge(uint32_t) override {}

// Notify events. Unlike activate() method, this method activates the given events only if the
// events are enabled.
void activateIfEnabled(uint32_t events);

private:
// This class maintains the ephemeral events and enabled events.
// getAndClearEphemeralEvents
class EventListener {
public:
~EventListener() = default;

// Reset the enabled events. The caller must refresh the triggered events.
void setEnabledEvents(uint32_t enabled_events) { enabled_events_ = enabled_events; }

// Return the enabled events.
uint32_t getEnabledEvents() { return enabled_events_; }

void clearEphemeralEvents() {
// Clear ephemeral events to align with FileEventImpl::setEnabled().
ephemeral_events_ = 0;
}

void onEventActivated(uint32_t activated_events) { ephemeral_events_ |= activated_events; }

uint32_t getAndClearEphemeralEvents() { return std::exchange(ephemeral_events_, 0); }

private:
// The events set by activate() and will be cleared after the io callback.
uint32_t ephemeral_events_{};
// The events set by setEnabled(). The new value replaces the old value.
uint32_t enabled_events_{};
};

// Used to populate the event operations of enable and activate.
EventListener event_listener_;

// The handle to registered async callback from dispatcher.
Event::SchedulableCallbackPtr schedulable_;

// Supplies readable and writable status.
IoHandle& io_source_;
};
} // namespace UserSpace
} // namespace IoSocket
} // namespace Extensions
} // namespace Envoy
67 changes: 67 additions & 0 deletions source/extensions/io_socket/user_space/io_handle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#pragma once

#include "envoy/buffer/buffer.h"
#include "envoy/common/pure.h"

namespace Envoy {
namespace Extensions {
namespace IoSocket {
namespace UserSpace {

/**
* The interface for the peer as a writer and supplied read status query.
*/
class IoHandle {
public:
virtual ~IoHandle() = default;

/**
* Set the flag to indicate no further write from peer.
*/
virtual void setWriteEnd() PURE;

/**
* @return true if the peer promise no more write.
*/
virtual bool isPeerShutDownWrite() const PURE;

/**
* Raised when peer is destroyed. No further write to peer is allowed.
*/
virtual void onPeerDestroy() PURE;

/**
* Notify that consumable data arrived. The consumable data can be either data to read, or the end
* of stream event.
*/
virtual void setNewDataAvailable() PURE;

/**
* @return the buffer to be written.
*/
virtual Buffer::Instance* getWriteBuffer() PURE;

/**
* @return true if more data is acceptable at the destination buffer.
*/
virtual bool isWritable() const PURE;

/**
* @return true if peer is valid and writable.
*/
virtual bool isPeerWritable() const PURE;

/**
* Raised by the peer when the peer switch from high water mark to low.
*/
virtual void onPeerBufferLowWatermark() PURE;

/**
* @return true if the pending receive buffer is not empty or read_end is set.
*/
virtual bool isReadable() const PURE;
};
} // namespace UserSpace
} // namespace IoSocket
} // namespace Extensions
} // namespace Envoy
29 changes: 29 additions & 0 deletions test/extensions/io_socket/user_space/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_package",
)
load(
"//test/extensions:extensions_build_system.bzl",
"envoy_extension_cc_test",
)

licenses(["notice"]) # Apache 2

envoy_package()

envoy_extension_cc_test(
name = "file_event_impl_test",
srcs = ["file_event_impl_test.cc"],
extension_name = "envoy.io_socket.user_space",
deps = [
"//include/envoy/event:file_event_interface",
"//source/common/event:dispatcher_includes",
"//source/common/event:dispatcher_lib",
"//source/extensions/io_socket/user_space:file_event_lib",
"//source/extensions/io_socket/user_space:io_handle_lib",
"//test/mocks:common_lib",
"//test/test_common:environment_lib",
"//test/test_common:test_runtime_lib",
"//test/test_common:utility_lib",
],
)
Loading