Skip to content

Commit

Permalink
Store ids of all running actors in the registry
Browse files Browse the repository at this point in the history
This is useful for debugging shutdown hangs, when an actor that
does not terminate blocks the destructor of the actor system itself.
  • Loading branch information
tobim committed Jun 3, 2024
1 parent 4ff66bf commit bb7dfa6
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 14 deletions.
9 changes: 7 additions & 2 deletions libcaf_core/caf/actor_registry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>

#include "caf/abstract_actor.hpp"
#include "caf/actor.hpp"
Expand Down Expand Up @@ -53,15 +54,18 @@ class CAF_CORE_EXPORT actor_registry {

/// Increases running-actors-count by one.
/// @returns the increased count.
size_t inc_running();
size_t inc_running(actor_id key);

/// Decreases running-actors-count by one.
/// @returns the decreased count.
size_t dec_running();
size_t dec_running(actor_id key);

/// Returns the number of currently running actors.
size_t running() const;

/// Returns the the actor ids of all currently running actors.
const std::unordered_set<actor_id>& running_ids() const;

/// Blocks the caller until running-actors-count becomes `expected`
/// (must be either 0 or 1).
void await_running_count_equal(size_t expected) const;
Expand Down Expand Up @@ -112,6 +116,7 @@ class CAF_CORE_EXPORT actor_registry {

mutable std::mutex running_mtx_;
mutable std::condition_variable running_cv_;
std::unordered_set<actor_id> running_;

mutable detail::shared_spinlock instances_mtx_;
entries entries_;
Expand Down
4 changes: 2 additions & 2 deletions libcaf_core/src/abstract_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ void abstract_actor::register_at_system() {
if (getf(is_registered_flag))
return;
setf(is_registered_flag);
[[maybe_unused]] auto count = home_system().registry().inc_running();
[[maybe_unused]] auto count = home_system().registry().inc_running(id());
CAF_LOG_DEBUG("actor" << id() << "increased running count to" << count);
}

void abstract_actor::unregister_from_system() {
if (!getf(is_registered_flag))
return;
unsetf(is_registered_flag);
[[maybe_unused]] auto count = home_system().registry().dec_running();
[[maybe_unused]] auto count = home_system().registry().dec_running(id());
CAF_LOG_DEBUG("actor" << id() << "decreased running count to" << count);
}

Expand Down
25 changes: 17 additions & 8 deletions libcaf_core/src/actor_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,27 @@ void actor_registry::erase(actor_id key) {
}
}

size_t actor_registry::inc_running() {
return ++*system_.base_metrics().running_actors;
size_t actor_registry::inc_running(actor_id key) {
std::unique_lock<std::mutex> guard(running_mtx_);
running_.emplace(key);
return running_.size();
}

size_t actor_registry::running() const {
return static_cast<size_t>(system_.base_metrics().running_actors->value());
std::unique_lock<std::mutex> guard(running_mtx_);
return running_.size();
}

size_t actor_registry::dec_running() {
size_t new_val = --*system_.base_metrics().running_actors;
const std::unordered_set<actor_id>& actor_registry::running_ids() const {
std::unique_lock<std::mutex> guard(running_mtx_);
return running_;
}

size_t actor_registry::dec_running(actor_id key) {
std::unique_lock<std::mutex> guard(running_mtx_);
running_.erase(key);
size_t new_val = running_.size();
if (new_val <= 1) {
std::unique_lock<std::mutex> guard(running_mtx_);
running_cv_.notify_all();
}
return new_val;
Expand All @@ -101,8 +110,8 @@ void actor_registry::await_running_count_equal(size_t expected) const {
CAF_ASSERT(expected == 0 || expected == 1);
CAF_LOG_TRACE(CAF_ARG(expected));
std::unique_lock<std::mutex> guard{running_mtx_};
while (running() != expected) {
CAF_LOG_DEBUG(CAF_ARG(running()));
while (running_.size() != expected) {
CAF_LOG_DEBUG(CAF_ARG(running_.size()));
running_cv_.wait(guard);
}
}
Expand Down
4 changes: 2 additions & 2 deletions libcaf_core/src/blocking_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class blocking_actor_runner : public resumable {
auto& sys = ctx->system();
sys.release_private_thread(thread_);
if (!hidden_) {
[[maybe_unused]] auto count = sys.registry().dec_running();
[[maybe_unused]] auto count = sys.registry().dec_running(self_->id());
CAF_LOG_DEBUG("actor" << self_->id() << "decreased running count to"
<< count);
}
Expand Down Expand Up @@ -166,7 +166,7 @@ void blocking_actor::launch(execution_unit*, bool, bool hide) {
// Note: must *not* call register_at_system() to stop actor cleanup from
// decrementing the count before releasing the thread.
if (!hide) {
[[maybe_unused]] auto count = sys.registry().inc_running();
[[maybe_unused]] auto count = sys.registry().inc_running(id());
CAF_LOG_DEBUG("actor" << id() << "increased running count to" << count);
}
thread->resume(new blocking_actor_runner(this, thread, hide));
Expand Down

0 comments on commit bb7dfa6

Please sign in to comment.