diff --git a/libcaf_core/caf/actor_registry.hpp b/libcaf_core/caf/actor_registry.hpp index 31388259c1..ad63a18dd0 100644 --- a/libcaf_core/caf/actor_registry.hpp +++ b/libcaf_core/caf/actor_registry.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include "caf/abstract_actor.hpp" #include "caf/actor.hpp" @@ -53,15 +54,18 @@ class CAF_CORE_EXPORT actor_registry { /// Increases running-actors-count by one. /// @returns the increased count. - size_t inc_running(); + size_t inc_running(actor_id key); /// Decreases running-actors-count by one. /// @returns the decreased count. - size_t dec_running(); + size_t dec_running(actor_id key); /// Returns the number of currently running actors. size_t running() const; + /// Returns the the actor ids of all currently running actors. + const std::unordered_set& running_ids() const; + /// Blocks the caller until running-actors-count becomes `expected` /// (must be either 0 or 1). void await_running_count_equal(size_t expected) const; @@ -112,6 +116,7 @@ class CAF_CORE_EXPORT actor_registry { mutable std::mutex running_mtx_; mutable std::condition_variable running_cv_; + std::unordered_set running_; mutable detail::shared_spinlock instances_mtx_; entries entries_; diff --git a/libcaf_core/src/abstract_actor.cpp b/libcaf_core/src/abstract_actor.cpp index d02ede40b1..5c71e6739c 100644 --- a/libcaf_core/src/abstract_actor.cpp +++ b/libcaf_core/src/abstract_actor.cpp @@ -80,7 +80,7 @@ void abstract_actor::register_at_system() { if (getf(is_registered_flag)) return; setf(is_registered_flag); - [[maybe_unused]] auto count = home_system().registry().inc_running(); + [[maybe_unused]] auto count = home_system().registry().inc_running(id()); CAF_LOG_DEBUG("actor" << id() << "increased running count to" << count); } @@ -88,7 +88,7 @@ void abstract_actor::unregister_from_system() { if (!getf(is_registered_flag)) return; unsetf(is_registered_flag); - [[maybe_unused]] auto count = home_system().registry().dec_running(); + [[maybe_unused]] auto count = home_system().registry().dec_running(id()); CAF_LOG_DEBUG("actor" << id() << "decreased running count to" << count); } diff --git a/libcaf_core/src/actor_registry.cpp b/libcaf_core/src/actor_registry.cpp index 24326b95e6..b6b0fdfd88 100644 --- a/libcaf_core/src/actor_registry.cpp +++ b/libcaf_core/src/actor_registry.cpp @@ -80,18 +80,27 @@ void actor_registry::erase(actor_id key) { } } -size_t actor_registry::inc_running() { - return ++*system_.base_metrics().running_actors; +size_t actor_registry::inc_running(actor_id key) { + std::unique_lock guard(running_mtx_); + running_.emplace(key); + return running_.size(); } size_t actor_registry::running() const { - return static_cast(system_.base_metrics().running_actors->value()); + std::unique_lock guard(running_mtx_); + return running_.size(); } -size_t actor_registry::dec_running() { - size_t new_val = --*system_.base_metrics().running_actors; +const std::unordered_set& actor_registry::running_ids() const { + std::unique_lock guard(running_mtx_); + return running_; +} + +size_t actor_registry::dec_running(actor_id key) { + std::unique_lock guard(running_mtx_); + running_.erase(key); + size_t new_val = running_.size(); if (new_val <= 1) { - std::unique_lock guard(running_mtx_); running_cv_.notify_all(); } return new_val; @@ -101,8 +110,8 @@ void actor_registry::await_running_count_equal(size_t expected) const { CAF_ASSERT(expected == 0 || expected == 1); CAF_LOG_TRACE(CAF_ARG(expected)); std::unique_lock guard{running_mtx_}; - while (running() != expected) { - CAF_LOG_DEBUG(CAF_ARG(running())); + while (running_.size() != expected) { + CAF_LOG_DEBUG(CAF_ARG(running_.size())); running_cv_.wait(guard); } } diff --git a/libcaf_core/src/blocking_actor.cpp b/libcaf_core/src/blocking_actor.cpp index 5abc0b2169..f9d21bbe5e 100644 --- a/libcaf_core/src/blocking_actor.cpp +++ b/libcaf_core/src/blocking_actor.cpp @@ -132,7 +132,7 @@ class blocking_actor_runner : public resumable { auto& sys = ctx->system(); sys.release_private_thread(thread_); if (!hidden_) { - [[maybe_unused]] auto count = sys.registry().dec_running(); + [[maybe_unused]] auto count = sys.registry().dec_running(self_->id()); CAF_LOG_DEBUG("actor" << self_->id() << "decreased running count to" << count); } @@ -166,7 +166,7 @@ void blocking_actor::launch(execution_unit*, bool, bool hide) { // Note: must *not* call register_at_system() to stop actor cleanup from // decrementing the count before releasing the thread. if (!hide) { - [[maybe_unused]] auto count = sys.registry().inc_running(); + [[maybe_unused]] auto count = sys.registry().inc_running(id()); CAF_LOG_DEBUG("actor" << id() << "increased running count to" << count); } thread->resume(new blocking_actor_runner(this, thread, hide));