Skip to content

Commit

Permalink
feat: eventsource client (#1)
Browse files Browse the repository at this point in the history
Implements an eventsource client. The client does not yet pass redirect/retry contract tests.
  • Loading branch information
cwaldren-ld authored Mar 29, 2023
1 parent c02c70c commit ab2b0fe
Show file tree
Hide file tree
Showing 35 changed files with 1,862 additions and 555 deletions.
13 changes: 11 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,20 @@ project(
LANGUAGES CXX C
)

if (${CMAKE_VERSION} VERSION_GREATER_EQUAL "3.24")
# Affects robustness of timestamp checking on FetchContent dependencies.
cmake_policy(SET CMP0135 NEW)
endif ()

# All projects in this repo should share the same version of 3rd party depends.
# It's the only way to remain sane.
set(CMAKE_FILES "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
set(CMAKE_CXX_STANDARD 17)


option(BUILD_TESTING "Enable C++ unit tests." ON)

if(BUILD_TESTING)
if (BUILD_TESTING)
include(FetchContent)
FetchContent_Declare(
googletest
Expand All @@ -26,7 +35,7 @@ if(BUILD_TESTING)
FetchContent_MakeAvailable(googletest)

enable_testing()
endif()
endif ()


add_subdirectory(libs/common)
Expand Down
2 changes: 2 additions & 0 deletions apps/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
#add_subdirectory(hello-c)
add_subdirectory(sse-contract-tests)
add_subdirectory(hello-cpp)
32 changes: 23 additions & 9 deletions apps/hello-cpp/main.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
#include <iostream>
#include <launchdarkly/api.hpp>
#include <launchdarkly/sse/sse.hpp>
#include <thread>
#include <launchdarkly/sse/client.hpp>

#include <boost/asio/io_context.hpp>

#include "console_backend.hpp"
#include "logger.hpp"

#include <iostream>
#include <utility>

namespace net = boost::asio; // from <boost/asio.hpp>

using launchdarkly::ConsoleBackend;
Expand All @@ -22,20 +26,30 @@ int main() {

net::io_context ioc;

// curl "https://stream-stg.launchdarkly.com/all?filter=even-flags-2" -H
// "Authorization: sdk-66a5dbe0-8b26-445a-9313-761e7e3d381b" -v
char const* key = std::getenv("STG_SDK_KEY");
if (!key) {
std::cout << "Set environment variable STG_SDK_KEY to the sdk key\n";
return 1;
}
auto client =
launchdarkly::sse::builder(ioc,
launchdarkly::sse::Builder(ioc.get_executor(),
"https://stream-stg.launchdarkly.com/all")
.header("Authorization", "sdk-66a5dbe0-8b26-445a-9313-761e7e3d381b")
.header("Authorization", key)
.receiver([&](launchdarkly::sse::Event ev) {
LD_LOG(logger, LogLevel::kInfo) << "event: " << ev.type();
LD_LOG(logger, LogLevel::kInfo)
<< "data: " << std::move(ev).take();
})
.logger([&](std::string msg) {
LD_LOG(logger, LogLevel::kDebug) << std::move(msg);
})
.build();

if (!client) {
LD_LOG(logger, LogLevel::kError) << "Failed to build client";
return 1;
}

std::thread t([&]() { ioc.run(); });

client->run();
ioc.run();
}
29 changes: 29 additions & 0 deletions apps/sse-contract-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Required for Apple Silicon support.
cmake_minimum_required(VERSION 3.19)

project(
LaunchDarklyCPPSSETestHarness
VERSION 0.1
DESCRIPTION "LaunchDarkly CPP SSE Test Harness"
LANGUAGES CXX
)

include(${CMAKE_FILES}/json.cmake)

add_executable(sse-tests
src/main.cpp
src/server.cpp
src/entity_manager.cpp
src/session.cpp
src/event_outbox.cpp
)

target_link_libraries(sse-tests PRIVATE
launchdarkly::sse
launchdarkly::common
nlohmann_json::nlohmann_json
)

target_include_directories(sse-tests PUBLIC include)

#add_definitions(-DBOOST_ASIO_ENABLE_HANDLER_TRACKING)
39 changes: 39 additions & 0 deletions apps/sse-contract-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
## SSE contract tests

Contract tests have a "test service" on one side, and the "test harness" on
the other.

This project implements the test service for the C++ EventSource client.

**session (session.hpp)**

This provides a simple REST API for creating/destroying
test entities. Examples:

`GET /` - returns the capabilities of this service.

`DELETE /` - shutdown the service.

`POST /` - create a new test entity, and return its ID.

`DELETE /entity/1` - delete the an entity identified by `1`.

**entity manager (entity_manager.hpp)**

This manages "entities" - the combination of an SSE client, and an outbox that posts events _received_ from the stream
_back to_ the test harness.

The point is to allow the test harness to assert that events were parsed and dispatched as expected.

**event outbox (event_outbox.hpp)**

The 2nd half of an "entity". It receives events from the SSE client, pushes them into a queue,
and then periodically flushes the queue out to the test harness.

**definitions (definitions.hpp)**

Contains JSON definitions that are used to communicate with the test harness.

**server (server.hpp)**

Glues everything together, mainly providing the TCP acceptor that spawns new sessions.
86 changes: 86 additions & 0 deletions apps/sse-contract-tests/include/definitions.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#pragma once

#include <launchdarkly/sse/event.hpp>
#include <optional>
#include <string>
#include <unordered_map>
#include "nlohmann/json.hpp"

namespace nlohmann {

template <typename T>
struct adl_serializer<std::optional<T>> {
static void to_json(json& j, std::optional<T> const& opt) {
if (opt == std::nullopt) {
j = nullptr;
} else {
j = *opt; // this will call adl_serializer<T>::to_json which will
// find the free function to_json in T's namespace!
}
}

static void from_json(json const& j, std::optional<T>& opt) {
if (j.is_null()) {
opt = std::nullopt;
} else {
opt = j.get<T>(); // same as above, but with
// adl_serializer<T>::from_json
}
}
};
} // namespace nlohmann

// Represents the initial JSON configuration sent by the test harness.
struct ConfigParams {
std::string streamUrl;
std::string callbackUrl;
std::string tag;
std::optional<uint32_t> initialDelayMs;
std::optional<uint32_t> readTimeoutMs;
std::optional<std::string> lastEventId;
std::optional<std::unordered_map<std::string, std::string>> headers;
std::optional<std::string> method;
std::optional<std::string> body;
};

NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(ConfigParams,
streamUrl,
callbackUrl,
tag,
initialDelayMs,
readTimeoutMs,
lastEventId,
headers,
method,
body);

// Represents an event payload that this service posts back
// to the test harness. The events are originally received by this server
// via the SSE stream; they are posted back so the test harness can verify
// that we parsed and dispatched them successfully.
struct Event {
std::string type;
std::string id;
std::string data;
Event() = default;
explicit Event(launchdarkly::sse::Event event)
: type(event.type()),
id(event.id().value_or("")),
data(std::move(event).take()) {}
};

NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(Event, type, data, id);

struct EventMessage {
std::string kind;
Event event;
};

NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(EventMessage, kind, event);

struct CommentMessage {
std::string kind;
std::string comment;
};

NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(CommentMessage, kind, comment);
52 changes: 52 additions & 0 deletions apps/sse-contract-tests/include/entity_manager.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include "definitions.hpp"
#include "logger.hpp"

#include <launchdarkly/sse/client.hpp>

#include <boost/asio/any_io_executor.hpp>

#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>

class EventOutbox;

class EntityManager {
using Inbox = std::shared_ptr<launchdarkly::sse::Client>;
using Outbox = std::shared_ptr<EventOutbox>;
using Entity = std::pair<Inbox, Outbox>;

std::unordered_map<std::string, Entity> entities_;

std::size_t counter_;
boost::asio::any_io_executor executor_;

launchdarkly::Logger& logger_;

public:
/**
* Create an entity manager, which can be used to create and destroy
* entities (SSE clients + event channel back to test harness).
* @param executor Executor.
* @param logger Logger.
*/
EntityManager(boost::asio::any_io_executor executor,
launchdarkly::Logger& logger);
/**
* Create an entity with the given configuration.
* @param params Config of the entity.
* @return An ID representing the entity, or none if the entity couldn't
* be created.
*/
std::optional<std::string> create(ConfigParams params);
/**
* Destroy an entity with the given ID.
* @param id ID of the entity.
* @return True if the entity was found and destroyed.
*/
bool destroy(std::string const& id);
};
70 changes: 70 additions & 0 deletions apps/sse-contract-tests/include/event_outbox.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#pragma once

#include "entity_manager.hpp"

#include <launchdarkly/sse/client.hpp>

#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/http.hpp>
#include <boost/lockfree/spsc_queue.hpp>

#include <memory>
#include <string>

namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;

class EventOutbox : public std::enable_shared_from_this<EventOutbox> {
using RequestType = http::request<http::string_body>;

std::string callback_url_;
std::string callback_port_;
std::string callback_host_;
size_t callback_counter_;

net::any_io_executor executor_;
tcp::resolver resolver_;
beast::tcp_stream event_stream_;

boost::lockfree::spsc_queue<RequestType> outbox_;

net::deadline_timer flush_timer_;
std::string id_;

bool shutdown_;

public:
/**
* Instantiate an outbox; events will be posted to the given URL.
* @param executor Executor.
* @param callback_url Target URL.
*/
EventOutbox(net::any_io_executor executor, std::string callback_url);
/**
* Enqueues an event, which will be posted to the server
* later.
* @param event Event to post.
*/
void post_event(launchdarkly::sse::Event event);
/**
* Begins an async operation to connect to the server.
*/
void run();
/**
* Begins an async operation to disconnect from the server.
*/
void stop();

private:
RequestType build_request(std::size_t counter, launchdarkly::sse::Event ev);
void on_resolve(beast::error_code ec, tcp::resolver::results_type results);
void on_connect(beast::error_code ec,
tcp::resolver::results_type::endpoint_type);
void on_flush_timer(boost::system::error_code ec);
void on_write(beast::error_code ec, std::size_t);
void do_shutdown(beast::error_code ec, std::string what);
};
Loading

0 comments on commit ab2b0fe

Please sign in to comment.