Skip to content

Commit

Permalink
Fix DeprecationWarning about fork() and multi-threaded processes in…
Browse files Browse the repository at this point in the history
… unit tests

Ensure all threads are properly shutting down in the unit tests. This prevents the following warning:
```
DeprecationWarning: This process (pid=208847) is multi-threaded, use of fork() may lead to deadlocks in the child.
```
  • Loading branch information
maarten-ic committed Aug 20, 2024
1 parent 7f518aa commit 543939b
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 11 deletions.
10 changes: 10 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import threading

import pytest


@pytest.fixture(autouse=True)
def assert_no_threads_left():
assert len(threading.enumerate()) == 1
yield
assert len(threading.enumerate()) == 1
2 changes: 1 addition & 1 deletion libmuscle/python/libmuscle/manager/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def mmp_configuration():
def profile_store(tmp_path):
test_profile_store = ProfileStore(tmp_path)
yield test_profile_store
test_profile_store.close()
test_profile_store.shutdown()


@pytest.fixture
Expand Down
4 changes: 2 additions & 2 deletions libmuscle/python/libmuscle/manager/test/test_profile_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

def test_create_profile_store(tmp_path):
db = ProfileStore(tmp_path)
db.close()
db.shutdown()

db_path = tmp_path / 'performance.sqlite'
conn = sqlite3.connect(db_path, isolation_level=None)
Expand Down Expand Up @@ -114,4 +114,4 @@ def check_register_event(typ, start, stop):

cur.close()
conn.close()
db.close()
db.shutdown()
25 changes: 17 additions & 8 deletions libmuscle/python/libmuscle/test/test_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,23 +142,26 @@ def instance_dont_apply_overlay(
def test_create_instance_manager_location_default(
instance_argv, MMPClient, declared_ports):

Instance(declared_ports)
instance = Instance(declared_ports)
instance.error_shutdown("") # ensure all threads and resources are cleaned up

MMPClient.assert_called_once_with(Ref('component'), 'tcp:localhost:9000')


def test_create_instance_manager_location_argv(
manager_location_argv, instance_argv, MMPClient, declared_ports):

Instance(declared_ports)
instance = Instance(declared_ports)
instance.error_shutdown("") # ensure all threads and resources are cleaned up

MMPClient.assert_called_once_with(Ref('component'), 'tcp:localhost:9001')


def test_create_instance_manager_location_envvar(
manager_location_envvar, instance_envvar, MMPClient, declared_ports):

Instance(declared_ports)
instance = Instance(declared_ports)
instance.error_shutdown("") # ensure all threads and resources are cleaned up

MMPClient.assert_called_once_with(Ref('component[13]'), 'tcp:localhost:9002')

Expand All @@ -167,7 +170,8 @@ def test_create_instance_registration(
manager_location_argv, instance_argv, mmp_client, communicator, port_manager,
profiler, declared_ports):

Instance(declared_ports)
instance = Instance(declared_ports)
instance.error_shutdown("") # ensure all threads and resources are cleaned up

locations = communicator.get_locations.return_value

Expand Down Expand Up @@ -195,9 +199,10 @@ def test_create_instance_registration(
def test_create_instance_profiling(
manager_location_argv, instance_argv, profiler, declared_ports):

Instance(declared_ports)
instance = Instance(declared_ports)

assert len(profiler.record_event.mock_calls) == 2
instance.error_shutdown("Ensure all threads and resources are cleaned up")


def test_create_instance_connecting(
Expand All @@ -214,7 +219,7 @@ def test_create_instance_connecting(
settings = MagicMock()
mmp_client.get_settings.return_value = settings

Instance(declared_ports)
instance = Instance(declared_ports)

port_manager.connect_ports.assert_called_once()
peer_info = port_manager.connect_ports.call_args[0][0]
Expand All @@ -227,13 +232,14 @@ def test_create_instance_connecting(
assert communicator.set_peer_info.call_args[0][0] == peer_info

assert settings_manager.base == settings
instance.error_shutdown("Ensure all threads and resources are cleaned up")


def test_create_instance_set_up_checkpointing(
manager_location_argv, instance_argv, mmp_client, trigger_manager,
no_resume_snapshot_manager, settings_manager, declared_ports):

Instance(declared_ports)
instance = Instance(declared_ports)

elapsed_time, checkpoints, resume_path, snapshot_path = (
mmp_client.get_checkpoint_info.return_value)
Expand All @@ -242,6 +248,7 @@ def test_create_instance_set_up_checkpointing(
no_resume_snapshot_manager.prepare_resume.assert_called_with(
resume_path, snapshot_path)
assert settings_manager.overlay != no_resume_snapshot_manager.resume_overlay
instance.error_shutdown("Ensure all threads and resources are cleaned up")


def test_create_instance_set_up_logging(
Expand Down Expand Up @@ -272,6 +279,7 @@ def get_logger(name=''):
root_logger.setLevel.assert_called_with(logging.ERROR)
libmuscle_logger.setLevel.assert_called_with(logging.DEBUG)
ymmsl_logger.setLevel.assert_called_with(logging.DEBUG)
instance.error_shutdown("Ensure all threads and resources are cleaned up")


def test_shutdown_instance(
Expand Down Expand Up @@ -540,4 +548,5 @@ def test_checkpoint_support(
mmp_client.get_checkpoint_info.return_value = checkpoint_info

with expectation:
Instance(flags=flags)
instance = Instance(flags=flags)
instance.error_shutdown("Ensure all threads and resources are cleaned up")

0 comments on commit 543939b

Please sign in to comment.