Skip to content

Commit

Permalink
ready
Browse files Browse the repository at this point in the history
  • Loading branch information
alnkapa committed Oct 12, 2024
1 parent c3fbd9d commit 1f1fe87
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 26 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)

add_subdirectory(async)
#find_library(ASYNC_LIB NAMES async PATHS /usr)
#add_subdirectory(async)
find_library(ASYNC_LIB NAMES async PATHS /usr)

add_executable(bulk main.cpp)

Expand Down
19 changes: 4 additions & 15 deletions async/async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ file(std::weak_ptr<BlockingQueueValue> queue, std::string prefix)
}
auto ts = val.time_stamp();
std::ofstream file("bulk" + ts.String() + +"." + prefix + ".log");
std::cout << "open file "
<< "bulk" + ts.String() + +"." + prefix + ".log"
<< "\n";
if (file.is_open())
{
file << "bulk: ";
Expand All @@ -105,19 +102,12 @@ file(std::weak_ptr<BlockingQueueValue> queue, std::string prefix)
}
file << std::endl;
}
else
{
std::cout << "error open file "
<< "bulk" + ts.String() + +"." + prefix + ".log"
<< "\n";
}
}
else
{
break;
}
}
std::cout << "exit file " << prefix << "\n";
}

class Worker
Expand All @@ -136,7 +126,6 @@ class Worker
std::shared_ptr<BlockingQueueValue> log_queue;
std::shared_ptr<BlockingQueueValue> file_queue;
std::shared_ptr<PublisherValue> pub;
std::shared_ptr<sub_type> pub_block_plus;
std::shared_ptr<Sub> sub;
std::atomic<bool> m_running{false};
BlockingQueue<std::variant<Done, std::string>> m_queue;
Expand Down Expand Up @@ -215,9 +204,9 @@ class Worker
proccess()
{
Status no_block(n, pub);
auto block = std::make_shared<StatusBlock>(n, pub);
StatusBlockPlus block_plus(n, pub_block_plus);
pub_block_plus->subscribe(block);
auto block_plus = std::make_shared<StatusBlockPlus>(n);
auto block = std::make_shared<StatusBlock>(n, pub);
block_plus->subscribe(block);
while (m_running)
{
auto val = m_queue.take(); // block here
Expand All @@ -241,7 +230,7 @@ class Worker
m_status_block = block->add(std::move(message));
break;
case block_plus_status: // block ++
m_status_block = block_plus.add(std::move(message));
m_status_block = block_plus->add(std::move(message));
break;
}
}
Expand Down
5 changes: 1 addition & 4 deletions async/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ StatusBlockPlus::add(std::string &&line)
{
if (!m_store.empty())
{
if (auto ptr = m_pub.lock())
{
ptr->notify(std::move(m_store));
}
notify(std::move(m_store));
}
m_store.clear();
m_counter = 0;
Expand Down
9 changes: 4 additions & 5 deletions async/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const int block_plus_status = 2;
using PublisherValue = pubsub::Publisher<Value>;
using sub_type = pubsub::Publisher<std::vector<std::string>>;

class StatusBlockPlus
class StatusBlockPlus : public sub_type
{
private:
using stack_t = std::pair<std::vector<std::string>, std::size_t>;
Expand All @@ -104,11 +104,10 @@ class StatusBlockPlus
bool m_stop{false};
std::size_t m_stop_level{0};
std::size_t m_counter{0};
std::vector<stack_t> m_stack;
std::weak_ptr<sub_type> m_pub;
std::vector<stack_t> m_stack;

public:
StatusBlockPlus(std::size_t N, std::weak_ptr<sub_type> pub)
StatusBlockPlus(std::size_t N)
: N(N) {}

int
Expand Down Expand Up @@ -137,7 +136,7 @@ class StatusBlock : public pubsub::Subscriber<std::vector<std::string>>
int
add(std::string &&);
void
callback(std::vector<std::string>) override;
callback(std::vector<std::string>) override;
};

class Status
Expand Down

0 comments on commit 1f1fe87

Please sign in to comment.