Skip to content

Commit

Permalink
0.3.4: Merge pull request #7 from CODIANZ/development
Browse files Browse the repository at this point in the history
0.3.4
  • Loading branch information
terukazu-inoue authored May 2, 2023
2 parents 388c65c + 5d7ef6f commit 6b235be
Show file tree
Hide file tree
Showing 25 changed files with 109 additions and 107 deletions.
11 changes: 7 additions & 4 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
"command": "valgrind",
"args": [
"--leak-check=full",
// "--num-callers=500",
// "--max-threads=5000",
"${workspaceFolder}/build/another-rxcpp"
],
"problemMatcher": [
Expand All @@ -18,10 +20,11 @@
{
"label": "build",
"type": "shell",
"command": "make",
"options": {
"cwd": "${workspaceFolder}/build"
},
"command": "cmake",
"args": [
"--build",
"./build"
],
"problemMatcher": [
"$gcc"
]
Expand Down
2 changes: 2 additions & 0 deletions include/another-rxcpp/observables/blocking.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <mutex>
#include <numeric>
#include <thread>
#include <condition_variable>
#include <atomic>

namespace another_rxcpp {
namespace observables {
Expand Down
1 change: 1 addition & 0 deletions include/another-rxcpp/observables/connectable.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <unordered_map>
#include <mutex>
#include <vector>
#include <algorithm>

namespace another_rxcpp {
namespace observables {
Expand Down
2 changes: 1 addition & 1 deletion include/another-rxcpp/observables/iterate.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ template <typename T>
using TT = typename T::value_type;
return observable<>::create<TT>([arr](subscriber<TT> s){
for(auto it = std::cbegin(arr); it != std::cend(arr); it++) {
if(!s.is_subscribed()) break;
if(!s.is_subscribed()) return;
s.on_next(*it);
}
s.on_completed();
Expand Down
2 changes: 1 addition & 1 deletion include/another-rxcpp/observables/range.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ template <typename T>
using TT = typename internal::strip_const_reference<T>::type;
return observable<>::create<TT>([start, end](subscriber<TT> s){
for(TT i = start; i <= end; i++) {
if(!s.is_subscribed()) break;
if(!s.is_subscribed()) return;
s.on_next(i);
}
s.on_completed();
Expand Down
2 changes: 2 additions & 0 deletions include/another-rxcpp/operators/take.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#include "../internal/tools/stream_controller.h"
#include "../observable.h"
#include <atomic>
#include <condition_variable>

namespace another_rxcpp {
namespace operators {
Expand Down
22 changes: 6 additions & 16 deletions include/another-rxcpp/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <memory>
#include <queue>
#include <mutex>
#include <condition_variable>

namespace another_rxcpp {

Expand All @@ -26,8 +27,8 @@ class scheduler_interface {
schedule_type get_schedule_type() const noexcept { return schedule_type_; }

virtual void run(call_in_context_fn_t call_in_context) noexcept = 0;
virtual void detach() noexcept = 0;
virtual void schedule(const function_type& f) noexcept = 0;
virtual void detach() noexcept = 0;
};

class scheduler {
Expand Down Expand Up @@ -56,7 +57,7 @@ class scheduler {

if(m_->interface_->get_schedule_type() == scheduler_interface::schedule_type::queuing){
auto m = m_;
m->interface_->run([m](){
m_->interface_->run([m](){
while(true){
auto q = [m]() -> std::queue<function_type> {
std::unique_lock<std::mutex> lock(m->mtx_);
Expand Down Expand Up @@ -88,27 +89,16 @@ class scheduler {
virtual ~scheduler() noexcept {
}

void schedule(const function_type& f) const noexcept {
if(m_->interface_->get_schedule_type() == scheduler_interface::schedule_type::queuing){
auto cpf = f;
schedule(std::move(f));
}
else{
/* m_->interface_->get_schedule_type() == scheduler_interface::schedule_type::direct */
m_->interface_->schedule(f);
}
}

void schedule(function_type&& f) const noexcept {
template <typename F> void schedule(F&& f) const noexcept {
if(m_->interface_->get_schedule_type() == scheduler_interface::schedule_type::queuing){
std::unique_lock<std::mutex> lock(m_->mtx_);
m_->queue_.push(std::move(f));
m_->queue_.push(std::forward<F>(f));
lock.unlock();
m_->cond_.notify_one();
}
else{
/* m_->interface_->get_schedule_type() == scheduler_interface::schedule_type::direct */
m_->interface_->schedule(std::move(f));
m_->interface_->schedule(std::forward<F>(f));
}
}

Expand Down
1 change: 1 addition & 0 deletions include/another-rxcpp/schedulers/async_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class async_scheduler_interface : public scheduler_interface {
}

virtual void detach() noexcept override {
while(future_.valid()) {}
}

virtual void schedule(const function_type& f) noexcept override {
Expand Down
31 changes: 23 additions & 8 deletions include/another-rxcpp/subjects/subject.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "../observables/error.h"
#include "../operators/on_error_resume_next.h"
#include "../operators/publish.h"
#include <algorithm>
#include <mutex>

namespace another_rxcpp {
namespace subjects {
Expand All @@ -23,8 +25,14 @@ template <typename T> class subject {
subscriber_type subscriber_;
std::exception_ptr error_ = nullptr;
subscription subscription_;
std::recursive_mutex mtx_;
std::vector<internal::stream_controller<value_type>> sctls_;
~member() {
subscription_.unsubscribe();
std::lock_guard<std::recursive_mutex> lock(mtx_);
std::for_each(sctls_.begin(), sctls_.end(), [](auto& sctl){
sctl.finalize();
});
}
};
std::shared_ptr<member> m_;
Expand All @@ -37,12 +45,14 @@ template <typename T> class subject {
subject() noexcept :
m_(std::make_shared<member>())
{
auto m = m_;
std::weak_ptr<member> m = m_;
m_->source_ = observable<>::create<value_type>([m](subscriber_type s){
m->subscriber_ = s;
auto mm = m.lock();
if(mm) mm->subscriber_ = s;
})
| operators::on_error_resume_next([m](std::exception_ptr err){
m->error_ = err;
auto mm = m.lock();
if(mm) mm->error_ = err;
return observables::error<value_type>(err);
})
| operators::publish();
Expand All @@ -59,12 +69,11 @@ template <typename T> class subject {
auto m = m_;
return observable<>::create<value_type>([m](subscriber_type s) {
auto sctl = internal::stream_controller<value_type>(s);
if(m->error_){
sctl.sink_error(m->error_);
}
else if(!m->subscription_.is_subscribed()){
sctl.sink_completed_force();
{
std::lock_guard<std::recursive_mutex> lock(m->mtx_);
m->sctls_.push_back(sctl);
}
sctl.set_on_finalize([m]{}); /** hold on `m` until on finalize */
m->source_.subscribe(sctl.template new_observer<value_type>(
[sctl](auto, const value_type& x){
sctl.sink_next(x);
Expand All @@ -76,6 +85,12 @@ template <typename T> class subject {
sctl.sink_completed(serial);
}
));
if(m->error_){
sctl.sink_error(m->error_);
}
else if(!m->subscription_.is_subscribed()){
sctl.sink_completed_force();
}
});
}

Expand Down
2 changes: 0 additions & 2 deletions test/async_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,5 @@ void test_async_scheduler() {

while(x.is_subscribed()) {}

wait(1000);

log() << "test_async_scheduler -- end" << std::endl << std::endl;
}
2 changes: 0 additions & 2 deletions test/blocking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,5 @@ void test_blocking() {
}
}

wait(1000);

log() << "test_blocking -- end" << std::endl << std::endl;
}
5 changes: 2 additions & 3 deletions test/case_4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,10 @@ void test_case_4() {

while(sbsc.is_subscribed()) {}

log() << "final wait" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(2000));

log() << "emitters.run = " << *emitters.run << std::endl;
}

wait(2000);

log() << "test_case_4 -- end" << std::endl << std::endl;
}
1 change: 0 additions & 1 deletion test/case_5.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ void test_case_5() {
);

while(sbsc.is_subscribed()) {}
wait(1000);

log() << "test_case_5 -- end" << std::endl << std::endl;
}
51 changes: 35 additions & 16 deletions test/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
#include <thread>
#include <iostream>
#include <another-rxcpp/observable.h>
#include <another-rxcpp/schedulers/new_thread_scheduler.h>
#include <another-rxcpp/operators/subscribe.h>

using namespace another_rxcpp;
using namespace another_rxcpp::operators;

inline void setTimeout(std::function<void()> f, int x) {
auto t = std::thread([f, x]{
std::this_thread::sleep_for(std::chrono::milliseconds(x));
observables::just(0)
.observe_on(schedulers::new_thread_scheduler())
.delay(std::chrono::milliseconds(x))
.subscribe([f](auto){
f();
});
t.detach();
}

inline std::ostream& log() {
Expand All @@ -29,20 +31,37 @@ inline void wait(int ms) {
}


template <typename T, typename TT = typename std::remove_const<typename std::remove_reference<T>::type>::type>
auto ovalue(T&& value, int delay = 0) -> observable<TT> {
auto v = std::make_shared<TT>(std::forward<T>(value));
return observable<>::create<TT>([v, delay](subscriber<TT> s) {
if(delay == 0){
s.on_next(*v);
template <typename T>
auto ovalue(T&& value, int delay = 0) {
if(delay == 0){
return observables::just(std::forward<T>(value));
}
else{
return observables::just(std::forward<T>(value))
.observe_on(schedulers::new_thread_scheduler())
.delay(std::chrono::milliseconds(delay));
}
}

inline auto interval_range(int from, int to, int msec) {
return observable<>::create<int>([from, to, msec](subscriber<int> s){
std::thread([from, to, msec, s]{
for(int i = from; i <= to; i++){
if(!s.is_subscribed()){
log() << "interval_range break" << std::endl;
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(msec));
if(!s.is_subscribed()){
log() << "interval_range break" << std::endl;
return;
}
log() << "interval_range emit " << i << std::endl;
s.on_next(i);
}
log() << "interval_range complete" << std::endl;
s.on_completed();
}
else{
setTimeout([s, v](){
s.on_next(*v);
s.on_completed();
}, delay);
}
}).detach();
});
}

Expand Down
2 changes: 0 additions & 2 deletions test/delay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,5 @@ void test_delay() {

while(x.is_subscribed()) {}

wait(1000);

log() << "test_delay -- end" << std::endl << std::endl;
}
9 changes: 2 additions & 7 deletions test/first.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@ using namespace another_rxcpp::operators;
void test_first() {
log() << "test_first -- begin" << std::endl;

auto o = observables::range(1, 100);

doSubscribe(o | first());
doSubscribe(observables::range(1, 100) | first());

auto x = doSubscribe(
o
| flat_map([](int x){
return ovalue(x, 100);
})
interval_range(1, 100, 100)
| first()
);
while(x.is_subscribed()) {}
Expand Down
7 changes: 2 additions & 5 deletions test/last.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ void test_last() {

auto o = observables::range(1, 100);

doSubscribe(o | last());
doSubscribe(observables::range(1, 100) | last());

auto x = doSubscribe(
o
| flat_map([](int x){
return ovalue(x, 100);
})
interval_range(1, 10, 100)
| last()
);
while(x.is_subscribed()) {}
Expand Down
4 changes: 1 addition & 3 deletions test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ int main() {
DO(test_empty)
DO(test_on_error_resume_next)
DO(test_retry)
DO(test_observe_on)
DO(test_subscribe_on)
DO(test_new_thread_scheduler)
DO(test_take_until)
Expand Down Expand Up @@ -66,8 +67,5 @@ int main() {
#endif /* defined(SUPPORTS_RXCPP_COMPATIBLE) */

log() << "**** finish ****" << std::endl;

std::cout << "wait for some worker threads destruct objects (eg: sources in subscription)" << std::endl;
wait(100);
}

7 changes: 0 additions & 7 deletions test/observable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@ void test_observable() {
log() << x << std::endl;
return ovalue(x + 1);
});
// {
// log() << "#2 wait with notify_on_unsubscribe()" << std::endl;
// auto x = doSubscribe(ob);
// std::mutex mtx;
// std::unique_lock<std::mutex> lock(mtx);
// x.unsubscribe_notice()->wait(lock, [x](){ return x.is_subscribed(); });
// }
{
log() << "#3 wait until is_subscribed() == true" << std::endl;
auto x = doSubscribe(ob);
Expand Down
Loading

0 comments on commit 6b235be

Please sign in to comment.