Skip to content

Commit

Permalink
Merge branch 'release/v1.4.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
i-vovk committed Feb 17, 2022
2 parents 15969c7 + f6927a6 commit ddbb3da
Show file tree
Hide file tree
Showing 11 changed files with 639 additions and 7 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 1.4.0 (2022-02-18)

### Features

* Implement logger facility and support user-defined overriding - (#19)


## 1.3.0 (2022-02-16)

### Features
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.0 FATAL_ERROR)

set(STREAMCLIENT_VERSION_MAJOR "1")
set(STREAMCLIENT_VERSION_MINOR "3")
set(STREAMCLIENT_VERSION_MINOR "4")
set(STREAMCLIENT_VERSION_RELEASE "0")
set(STREAMCLIENT_SUMMARY "C++ library")
set(STREAMCLIENT_REPOSITORY_URL "https://github.com/TinkoffCreditSystems/stream-client")
Expand Down
50 changes: 47 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ client->receive(boost::asio::buffer(&recv_data[0], send_data.size()));
Represents container occupied with opened sockets. Uses [connector](#connector) to open new sockets in the background thread which is triggered once there are vacant places in the pool. User can call *get_session()* to obtain a socket from the pool and *return_session()* to give it back.
There are two strategies to refill the pool:
- greedy (`stream_client::connector::greedy_strategy`). If there are vacant places it will try to fill them with new sessions simultaneously.
- conservative (`stream_client::connector::conservative_strategy`). Will try to fill up to 2/3 of vacant places in the poll. If failed will back of for some time and retry later. Also, after failures it will create only one new session.
- **greedy** (`stream_client::connector::greedy_strategy`). If there are vacant places it will try to fill them with new sessions simultaneously. This is the default one.
- **conservative** (`stream_client::connector::conservative_strategy`). Will try to fill up to 2/3 of vacant places in the poll. If failed will back of for some time and retry later. Also, after failures it will create only one new session.
Both of them are defined in terms of `stream_client::connector::pool_strategy` interface, so you are free to implement new one.
Limitations:
Expand All @@ -143,7 +144,12 @@ Connection pools:
* `stream_client::connector::http_pool` - pool of `stream_client::http::http_client` sockets.
* `stream_client::connector::https_pool` - pool of `stream_client::http::https_client` sockets.
*All these pools are using `stream_client::connector::greedy_strategy`.*
There are also aliases for the same pools but using conservative reconnection strategy:
* `stream_client::connector::tcp_conservative_pool`
* `stream_client::connector::udp_conservative_pool`
* `stream_client::connector::ssl_conservative_pool`
* `stream_client::connector::http_conservative_pool`
* `stream_client::connector::https_conservative_pool`
#### Example
```c++
Expand Down Expand Up @@ -181,6 +187,44 @@ for (auto& t : threads) {
}
```

### Logging

The library uses basic logger interface implemented internally. You can modify logger level using `stream_client::set_log_level()` or `stream_client::get_log_level()` functions.

These levels are supported:
```cpp
enum class log_level : int
{
trace = 0,
debug,
info,
warning,
error,
};
```

By default library prints messages to stdout with decent formatting. If you want to overwrite this behavior you can set you own logger via:
```cpp
void stream_client::set_logger(std::shared_ptr<stream_client::log_interface> logger);

void stream_client::set_logger(stream_client::log_level level, stream_client::log_func_type log_func);
```
Which allows to either overwrite logger instance of use a callback of with proper signature. These types are defines as:
```cpp
class log_interface
{
public:
virtual void set_level(log_level level) noexcept = 0;
virtual log_level get_level() const noexcept = 0;
virtual void message(log_level level, const std::string& location, const std::string& message) const = 0;
};
using log_func_type = std::function<void(log_level level, const std::string& location, const std::string& message)>;
```

For more information please look inside [logger.hpp](./include/stream-client/logger.hpp).

## How to build

This library supposed to be somewhat multi-platform, however, it was tested and mainly used on ubuntu and macOS. </br>
Expand Down
8 changes: 7 additions & 1 deletion include/stream-client/connector/connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ class base_connection_pool
/// Background routine used to maintain the pool.
void watch_pool_routine();

Strategy reconnection_;
std::string name_; ///< Name pf the pool.
Strategy reconnection_; ///< Instance of reconnection strategy used to fill the pool.
connector_type connector_; ///< Underlying connector used to establish sockets.

std::size_t pool_max_size_; ///< Number of stream to keep in the @p sesson_pool_.
Expand All @@ -330,16 +331,21 @@ class base_connection_pool

//! Connections pool with sockets over plain TCP protocol.
using tcp_pool = base_connection_pool<tcp_connector>;
using tcp_conservative_pool = base_connection_pool<tcp_connector, conservative_strategy<tcp_connector>>;
//! Connections pool with sockets over plain UDP protocol.
using udp_pool = base_connection_pool<udp_connector>;
using udp_conservative_pool = base_connection_pool<udp_connector, conservative_strategy<udp_connector>>;

//! Connections pool with sockets over encrypted TCP protocol.
using ssl_pool = base_connection_pool<ssl_connector>;
using ssl_conservative_pool = base_connection_pool<ssl_connector, conservative_strategy<ssl_connector>>;

//! Connections pool with sockets over HTTP protocol.
using http_pool = base_connection_pool<http_connector>;
using http_conservative_pool = base_connection_pool<http_connector, conservative_strategy<http_connector>>;
//! Connections pool with sockets over HTTPS protocol.
using https_pool = base_connection_pool<https_connector>;
using https_conservative_pool = base_connection_pool<https_connector, conservative_strategy<https_connector>>;

} // namespace connector
} // namespace stream_client
Expand Down
4 changes: 4 additions & 0 deletions include/stream-client/connector/impl/connection_pool.ipp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "stream-client/logger.hpp"

namespace stream_client {
namespace connector {

Expand All @@ -12,7 +14,9 @@ base_connection_pool<Connector, Strategy>::base_connection_pool(std::size_t size
, idle_timeout_(idle_timeout)
, watch_pool_(true)
{
name_ = "connection_pool[" + connector_.get_target() + "](" + std::to_string(pool_max_size_) + ")";
pool_watcher_ = std::thread([this]() { this->watch_pool_routine(); });
STREAM_LOG_TRACE(name_ + " has been created");
}

template <typename Connector, typename Strategy>
Expand Down
6 changes: 4 additions & 2 deletions include/stream-client/connector/impl/pool_strategy.ipp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "stream-client/logger.hpp"

#include <boost/system/system_error.hpp>

#include <atomic>
Expand Down Expand Up @@ -31,7 +33,7 @@ bool greedy_strategy<Connector>::refill(connector_type& connector, std::size_t v
auto new_session = connector.new_session();
append_func(std::move(new_session));
} catch (const boost::system::system_error& e) {
// TODO: log errors ?
STREAM_LOG_ERROR("failed to establish new session to " + connector.get_target() + ": " + e.what());
}
};

Expand Down Expand Up @@ -75,7 +77,7 @@ bool conservative_strategy<Connector>::refill(connector_type& connector, std::si
append_func(std::move(new_session));
is_added = true;
} catch (const boost::system::system_error& e) {
// TODO: log errors ?
STREAM_LOG_ERROR("failed to establish new session to " + connector.get_target() + ": " + e.what());
}
};

Expand Down
108 changes: 108 additions & 0 deletions include/stream-client/impl/logger.ipp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#pragma once

#include <iomanip>
#include <iostream>
#include <mutex>
#include <sstream>

namespace stream_client {
namespace detail {

/// Get/set global logger instance.
inline std::shared_ptr<log_interface> logger_instance(std::shared_ptr<log_interface> new_logger)
{
// By default use cout logger with trace level
static std::shared_ptr<log_interface> glob_instance = std::make_shared<cout_logger>();

if (new_logger) {
glob_instance = new_logger;
}
return glob_instance;
}

} // namespace detail

base_logger::base_logger(log_level level)
: level_(level)
{
}

void base_logger::set_level(log_level level) noexcept
{
level_ = level;
}

log_level base_logger::get_level() const noexcept
{
return level_;
}

func_logger::func_logger(log_level level, stream_client::log_func_type log_func)
: base_logger(level)
, log_func_(log_func)
{
}

void func_logger::message(log_level level, const std::string& location, const std::string& message) const
{
if (log_func_) {
log_func_(level, location, message);
}
}

cout_logger::cout_logger(log_level level)
: base_logger(level)
{
}

void cout_logger::message(log_level level, const std::string& location, const std::string& message) const
{
static constexpr const char* kLevelPrefixes[] = {
"TRACE", "DEBUG", "INFO", "WARNING", "ERROR",
};

std::stringstream ss;
const auto now = std::chrono::system_clock::now();
const std::time_t t_c = std::chrono::system_clock::to_time_t(now);
std::lock_guard<std::mutex> lock(mutex_);
ss << std::put_time(std::localtime(&t_c), "%Y-%m-%dT%H:%M:%SZ") << ": " << kLevelPrefixes[static_cast<int>(level)]
<< ": " << location << ": " << message << std::endl;
std::cout << ss.str();
}

inline void set_logger(std::shared_ptr<log_interface> logger)
{
detail::logger_instance(std::move(logger));
}

inline void set_logger(log_level level, stream_client::log_func_type log_func)
{
detail::logger_instance(std::make_shared<func_logger>(level, std::move(log_func)));
}

inline void set_log_level(log_level level)
{
const auto logger = detail::logger_instance();
if (logger) {
logger->set_level(level);
}
}

inline log_level get_log_level()
{
const auto logger = detail::logger_instance();
if (logger) {
return logger->get_level();
}
return log_level::mute;
}

inline void log_message(log_level level, const std::string& location, const std::string& message)
{
const auto logger = detail::logger_instance();
if (logger) {
logger->message(level, location, message);
}
}

} // namespace stream_client
Loading

0 comments on commit ddbb3da

Please sign in to comment.