Skip to content

Commit

Permalink
Merge branch 'main' into update-e
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Nino <jnino@lyft.com>
  • Loading branch information
Jose Nino committed Oct 21, 2020
2 parents 1fd195a + 3007ebd commit 802ad41
Show file tree
Hide file tree
Showing 12 changed files with 335 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ typedef void (*envoy_filter_resume_f)(const void* context);
*/
typedef struct {
envoy_filter_resume_f resume_iteration;
envoy_filter_release_f release_callbacks;
const void* callback_context;
} envoy_http_filter_callbacks;

typedef void (*envoy_filter_set_callbacks_f)(envoy_http_filter_callbacks callbacks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ namespace PlatformBridge {

Http::FilterFactoryCb PlatformBridgeFilterFactory::createFilterFactoryFromProtoTyped(
const envoymobile::extensions::filters::http::platform_bridge::PlatformBridge& proto_config,
const std::string&, Server::Configuration::FactoryContext&) {
const std::string&, Server::Configuration::FactoryContext& context) {

PlatformBridgeFilterConfigSharedPtr filter_config =
std::make_shared<PlatformBridgeFilterConfig>(proto_config);
return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<PlatformBridgeFilter>(filter_config));
return [filter_config, &context](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(
std::make_shared<PlatformBridgeFilter>(filter_config, context.dispatcher()));
};
}

Expand Down
84 changes: 82 additions & 2 deletions library/common/extensions/filters/http/platform_bridge/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,38 @@ namespace Extensions {
namespace HttpFilters {
namespace PlatformBridge {

static void envoy_filter_release_callbacks(const void* context) {
PlatformBridgeFilterWeakPtr* weak_filter =
static_cast<PlatformBridgeFilterWeakPtr*>(const_cast<void*>(context));
delete weak_filter;
}

static void envoy_filter_callback_resume_decoding(const void* context) {
PlatformBridgeFilterWeakPtr* weak_filter =
static_cast<PlatformBridgeFilterWeakPtr*>(const_cast<void*>(context));
if (auto filter = weak_filter->lock()) {
filter->resumeDecoding();
}
}

static void envoy_filter_callback_resume_encoding(const void* context) {
PlatformBridgeFilterWeakPtr* weak_filter =
static_cast<PlatformBridgeFilterWeakPtr*>(const_cast<void*>(context));
if (auto filter = weak_filter->lock()) {
filter->resumeEncoding();
}
}

PlatformBridgeFilterConfig::PlatformBridgeFilterConfig(
const envoymobile::extensions::filters::http::platform_bridge::PlatformBridge& proto_config)
: filter_name_(proto_config.platform_filter_name()),
platform_filter_(static_cast<envoy_http_filter*>(
Api::External::retrieveApi(proto_config.platform_filter_name()))) {}

PlatformBridgeFilter::PlatformBridgeFilter(PlatformBridgeFilterConfigSharedPtr config)
: filter_name_(config->filter_name()), platform_filter_(*config->platform_filter()) {
PlatformBridgeFilter::PlatformBridgeFilter(PlatformBridgeFilterConfigSharedPtr config,
Event::Dispatcher& dispatcher)
: dispatcher_(dispatcher), filter_name_(config->filter_name()),
platform_filter_(*config->platform_filter()) {
// The initialization above sets platform_filter_ to a copy of the struct stored on the config.
// In the typical case, this will represent a filter implementation that needs to be intantiated.
// static_context will contain the necessary platform-specific mechanism to produce a filter
Expand All @@ -44,6 +68,25 @@ PlatformBridgeFilter::PlatformBridgeFilter(PlatformBridgeFilterConfigSharedPtr c
ASSERT(platform_filter_.instance_context,
fmt::format("init_filter unsuccessful for {}", filter_name_));
iteration_state_ = IterationState::Ongoing;

if (platform_filter_.set_request_callbacks) {
platform_request_callbacks_.resume_iteration = envoy_filter_callback_resume_decoding;
platform_request_callbacks_.release_callbacks = envoy_filter_release_callbacks;
// We use a weak_ptr wrapper for the filter to ensure presence before dispatching callbacks.
platform_request_callbacks_.callback_context =
new PlatformBridgeFilterWeakPtr{weak_from_this()};
platform_filter_.set_request_callbacks(platform_request_callbacks_,
platform_filter_.instance_context);
}

if (platform_filter_.set_response_callbacks) {
platform_response_callbacks_.resume_iteration = envoy_filter_callback_resume_encoding;
platform_response_callbacks_.release_callbacks = envoy_filter_release_callbacks;
platform_response_callbacks_.callback_context =
new PlatformBridgeFilterWeakPtr{weak_from_this()};
platform_filter_.set_response_callbacks(platform_response_callbacks_,
platform_filter_.instance_context);
}
}

void PlatformBridgeFilter::onDestroy() {
Expand Down Expand Up @@ -172,6 +215,7 @@ Http::FilterDataStatus PlatformBridgeFilter::onData(Buffer::Instance& data, bool
data.drain(data.length());
data.addBufferFragment(*Buffer::BridgeFragment::createBridgeFragment(result.data));
}
iteration_state_ = IterationState::Ongoing;
return Http::FilterDataStatus::Continue;

default:
Expand Down Expand Up @@ -227,6 +271,7 @@ PlatformBridgeFilter::onTrailers(Http::HeaderMap& trailers, Buffer::Instance* in
free(result.pending_data);
}
PlatformBridgeFilter::replaceHeaders(trailers, result.trailers);
iteration_state_ = IterationState::Ongoing;
return Http::FilterTrailersStatus::Continue;

default:
Expand Down Expand Up @@ -325,7 +370,27 @@ PlatformBridgeFilter::encodeTrailers(Http::ResponseTrailerMap& trailers) {
return status;
}

void PlatformBridgeFilter::resumeDecoding() {
auto weak_self = weak_from_this();
// TODO(goaway): There's a potential shutdown race here, due to the fact that the shared
// reference that now holds the filter does not retain the dispatcher. In the future we should
// make this safer by, e.g.:
// 1) adding support to Envoy for (optionally) retaining the dispatcher, or
// 2) retaining the engine to transitively retain the dispatcher via Envoy's ownership graph, or
// 3) dispatching via a safe intermediary
// Relevant: https://github.com/lyft/envoy-mobile/issues/332
dispatcher_.post([weak_self]() -> void {
if (auto self = weak_self.lock()) {
self->onResumeDecoding();
}
});
}

void PlatformBridgeFilter::onResumeDecoding() {
if (iteration_state_ == IterationState::Ongoing) {
return;
}

Buffer::Instance* internal_buffer = nullptr;
if (decoder_callbacks_->decodingBuffer()) {
decoder_callbacks_->modifyDecodingBuffer([&internal_buffer](Buffer::Instance& mutable_buffer) {
Expand Down Expand Up @@ -388,10 +453,24 @@ void PlatformBridgeFilter::onResumeDecoding() {
pending_request_trailers_ = nullptr;
free(result.pending_trailers);
}
iteration_state_ = IterationState::Ongoing;
decoder_callbacks_->continueDecoding();
}

void PlatformBridgeFilter::resumeEncoding() {
auto weak_self = weak_from_this();
dispatcher_.post([weak_self]() -> void {
if (auto self = weak_self.lock()) {
self->onResumeEncoding();
}
});
}

void PlatformBridgeFilter::onResumeEncoding() {
if (iteration_state_ == IterationState::Ongoing) {
return;
}

Buffer::Instance* internal_buffer = nullptr;
if (encoder_callbacks_->encodingBuffer()) {
encoder_callbacks_->modifyEncodingBuffer([&internal_buffer](Buffer::Instance& mutable_buffer) {
Expand Down Expand Up @@ -451,6 +530,7 @@ void PlatformBridgeFilter::onResumeEncoding() {
pending_response_trailers_ = nullptr;
free(result.pending_trailers);
}
iteration_state_ = IterationState::Ongoing;
encoder_callbacks_->continueEncoding();
}

Expand Down
49 changes: 34 additions & 15 deletions library/common/extensions/filters/http/platform_bridge/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,18 @@ enum class IterationState { Ongoing, Stopped };
* For more information on implementing platform filters, see the docs.
*/
class PlatformBridgeFilter final : public Http::PassThroughFilter,
Logger::Loggable<Logger::Id::filter> {
public Logger::Loggable<Logger::Id::filter>,
public std::enable_shared_from_this<PlatformBridgeFilter> {
public:
PlatformBridgeFilter(PlatformBridgeFilterConfigSharedPtr config);
PlatformBridgeFilter(PlatformBridgeFilterConfigSharedPtr config, Event::Dispatcher& dispatcher);

// Scheduled on the dispatcher when resumeRequest is called from platform
// filter callbacks. Provides a snapshot of pending request state to the
// platform filter, and consumes invocation results to modify pending HTTP
// entities before resuming decoding.
void onResumeDecoding();
// Asynchronously trigger resumption of filter iteration, if applicable.
// This is a no-op if filter iteration is already ongoing.
void resumeDecoding();

// Scheduled on the dispatcher when resumeResponse is called from platform
// filter callbacks. Provides a snapshot of pending response state to the
// platform filter, and consumes invocation results to modify pending HTTP
// entities before resuming encoding.
void onResumeEncoding();
// Asynchronously trigger resumption of filter iteration, if applicable.
// This is a no-op if filter iteration is already ongoing.
void resumeEncoding();

// StreamFilterBase
void onDestroy() override;
Expand All @@ -79,26 +76,48 @@ class PlatformBridgeFilter final : public Http::PassThroughFilter,

private:
static void replaceHeaders(Http::HeaderMap& headers, envoy_headers c_headers);

Http::FilterHeadersStatus onHeaders(Http::HeaderMap& headers, bool end_stream,
envoy_filter_on_headers_f on_headers);

Http::FilterDataStatus onData(Buffer::Instance& data, bool end_stream,
Buffer::Instance* internal_buffer,
Http::HeaderMap** pending_headers, envoy_filter_on_data_f on_data);

Http::FilterTrailersStatus onTrailers(Http::HeaderMap& trailers,
Buffer::Instance* internal_buffer,
Http::HeaderMap** pending_headers,
envoy_filter_on_trailers_f on_trailers);
const std::string filter_name_;
IterationState iteration_state_;
envoy_http_filter platform_filter_;

// Scheduled on the dispatcher when resumeRequest is called from platform
// filter callbacks. Provides a snapshot of pending request state to the
// platform filter, and consumes invocation results to modify pending HTTP
// entities before resuming decoding.
void onResumeDecoding();

// Scheduled on the dispatcher when resumeResponse is called from platform
// filter callbacks. Provides a snapshot of pending response state to the
// platform filter, and consumes invocation results to modify pending HTTP
// entities before resuming encoding.
void onResumeEncoding();

Event::Dispatcher& dispatcher_;
Http::HeaderMap* pending_request_headers_{};
Http::HeaderMap* pending_response_headers_{};
Http::HeaderMap* pending_request_trailers_{};
Http::HeaderMap* pending_response_trailers_{};
IterationState iteration_state_;
const std::string filter_name_;
envoy_http_filter platform_filter_;
envoy_http_filter_callbacks platform_request_callbacks_{};
envoy_http_filter_callbacks platform_response_callbacks_{};
bool request_complete_{};
bool response_complete_{};
};

using PlatformBridgeFilterSharedPtr = std::shared_ptr<PlatformBridgeFilter>;
using PlatformBridgeFilterWeakPtr = std::weak_ptr<PlatformBridgeFilter>;

} // namespace PlatformBridge
} // namespace HttpFilters
} // namespace Extensions
Expand Down
2 changes: 2 additions & 0 deletions library/objective-c/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ objc_library(
"EnvoyEngineImpl.m",
"EnvoyHTTPCallbacks.m",
"EnvoyHTTPFilter.m",
"EnvoyHTTPFilterCallbacksImpl.h",
"EnvoyHTTPFilterCallbacksImpl.m",
"EnvoyHTTPFilterFactory.m",
"EnvoyHTTPStreamImpl.m",
"EnvoyNetworkMonitor.m",
Expand Down
7 changes: 0 additions & 7 deletions library/objective-c/EnvoyEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,6 @@ extern const int kEnvoyFilterResumeStatusResumeIteration;

@end

#pragma mark - EnvoyHTTPFilterCallbacksImpl

// Concrete implementation of the `EnvoyHTTPFilterCallbacks` protocol.
@interface EnvoyHTTPFilterCallbacksImpl : NSObject <EnvoyHTTPFilterCallbacks>

@end

#pragma mark - EnvoyHTTPStream

@protocol EnvoyHTTPStream
Expand Down
20 changes: 17 additions & 3 deletions library/objective-c/EnvoyEngineImpl.m
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#import "library/objective-c/EnvoyEngine.h"
#import "library/objective-c/EnvoyBridgeUtility.h"
#import "library/objective-c/EnvoyHTTPFilterCallbacksImpl.h"

#import "library/common/main_interface.h"
#import "library/common/types/c_types.h"
#import "library/common/extensions/filters/http/platform_bridge/c_types.h"

#import <UIKit/UIKit.h>

Expand Down Expand Up @@ -192,12 +192,26 @@ static envoy_filter_trailers_status ios_http_filter_on_response_trailers(envoy_h

static void ios_http_filter_set_request_callbacks(envoy_http_filter_callbacks callbacks,
const void *context) {
// TODO(goaway): implement me
EnvoyHTTPFilter *filter = (__bridge EnvoyHTTPFilter *)context;
if (filter.setRequestFilterCallbacks == nil) {
return;
}

EnvoyHTTPFilterCallbacksImpl *requestFilterCallbacks =
[[EnvoyHTTPFilterCallbacksImpl alloc] initWithCallbacks:callbacks];
filter.setRequestFilterCallbacks(requestFilterCallbacks);
}

static void ios_http_filter_set_response_callbacks(envoy_http_filter_callbacks callbacks,
const void *context) {
// TODO(goaway): implement me
EnvoyHTTPFilter *filter = (__bridge EnvoyHTTPFilter *)context;
if (filter.setResponseFilterCallbacks == nil) {
return;
}

EnvoyHTTPFilterCallbacksImpl *responseFilterCallbacks =
[[EnvoyHTTPFilterCallbacksImpl alloc] initWithCallbacks:callbacks];
filter.setResponseFilterCallbacks(responseFilterCallbacks);
}

static void ios_http_filter_release(const void *context) {
Expand Down
19 changes: 0 additions & 19 deletions library/objective-c/EnvoyFilterCallbacksImpl.m

This file was deleted.

12 changes: 12 additions & 0 deletions library/objective-c/EnvoyHTTPFilterCallbacksImpl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#import <Foundation/Foundation.h>

#import "library/objective-c/EnvoyEngine.h"

#import "library/common/extensions/filters/http/platform_bridge/c_types.h"

// Concrete implementation of the `EnvoyHTTPFilterCallbacks` protocol.
@interface EnvoyHTTPFilterCallbacksImpl : NSObject <EnvoyHTTPFilterCallbacks>

- (instancetype)initWithCallbacks:(envoy_http_filter_callbacks)callbacks;

@end
27 changes: 27 additions & 0 deletions library/objective-c/EnvoyHTTPFilterCallbacksImpl.m
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#import "library/objective-c/EnvoyHTTPFilterCallbacksImpl.h"

#pragma mark - EnvoyHTTPFilterCallbacksImpl

@implementation EnvoyHTTPFilterCallbacksImpl {
envoy_http_filter_callbacks _callbacks;
}

- (instancetype)initWithCallbacks:(envoy_http_filter_callbacks)callbacks {
self = [super init];
if (!self) {
return nil;
}

_callbacks = callbacks;
return self;
}

- (void)resumeIteration {
_callbacks.resume_iteration(_callbacks.callback_context);
}

- (void)dealloc {
_callbacks.release_callbacks(_callbacks.callback_context);
}

@end
1 change: 1 addition & 0 deletions test/common/extensions/filters/http/platform_bridge/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ envoy_extension_cc_test(
"//library/common/api:external_api_lib",
"//library/common/extensions/filters/http/platform_bridge:config",
"//library/common/extensions/filters/http/platform_bridge:pkg_cc_proto",
"@envoy//test/mocks/event:event_mocks",
"@envoy//test/mocks/http:http_mocks",
"@envoy//test/test_common:utility_lib",
],
Expand Down
Loading

0 comments on commit 802ad41

Please sign in to comment.