Skip to content

Commit

Permalink
Use a background thread to write profile data to the database
Browse files Browse the repository at this point in the history
  • Loading branch information
LourensVeen committed Jun 5, 2023
1 parent 2946e91 commit 3855303
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 22 deletions.
1 change: 1 addition & 0 deletions libmuscle/python/libmuscle/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def stop(self) -> None:
self._server.stop()
self._snapshot_registry.shutdown()
self._snapshot_registry.join()
self._profile_store.close()
self._logger.close()

def wait(self) -> bool:
Expand Down
3 changes: 2 additions & 1 deletion libmuscle/python/libmuscle/manager/mmp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ def close(self) -> None:
On shutdown of the server, this will be called by each server
thread before it shuts down.
"""
self._profile_store.close()
# no longer used, but kept in case we need it again
pass

def _register_instance(
self, instance_id: str, locations: List[str],
Expand Down
86 changes: 65 additions & 21 deletions libmuscle/python/libmuscle/manager/profile_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
from pathlib import Path
from queue import Queue
from threading import Thread
from typing import Iterable, Optional, Tuple

from libmuscle.profiling import ProfileEvent, ProfileEventType
Expand All @@ -10,6 +12,11 @@
_logger = logging.getLogger(__name__)


# When True causes add_events to return after the data is written.
# Used for testing only.
_SYNCHED = False


class ProfileStore(ProfileDatabase):
"""Creates and fills a profiling database.
Expand All @@ -36,6 +43,22 @@ def __init__(self, db_dir: Path) -> None:
super().__init__(db_file)
self._init_database()

# 500 batches is about 250MB
Item = Optional[Tuple[Reference, Iterable[ProfileEvent]]]
self._queue: Queue[Item] = Queue(500)
self._confirmation_queue: Queue[None] = Queue()
self._thread = Thread(target=self._storage_thread, daemon=True)
self._thread.start()

def close(self) -> None:
"""Shut down the profile store.
This closes the database connection cleanly.
"""
self._queue.put(None)
self._thread.join()
super().close()

def add_events(
self, instance_id: Reference, events: Iterable[ProfileEvent]
) -> None:
Expand All @@ -44,20 +67,18 @@ def add_events(
Args:
events: The events to add.
"""
cur = self._get_cursor()
cur.execute("BEGIN IMMEDIATE TRANSACTION")
cur.execute(
"SELECT oid FROM instances WHERE name = ?",
(str(instance_id),))
oids = cur.fetchall()
if oids:
instance_oid = oids[0][0]
else:
cur.execute(
"INSERT INTO instances (name) VALUES (?)",
(str(instance_id),))
instance_oid = cur.lastrowid
self._queue.put((instance_id, events))
if _SYNCHED:
self._confirmation_queue.get()

def _storage_thread(self) -> None:
"""Background thread that stores the data.
We're getting issues with the database being locked occasionally
on slow file systems when we try to access it from multiple
threads. So we use a single background thread now to do the
writing.
"""
Record = Tuple[
int, int, float, float, Optional[str], Optional[int],
Optional[int], Optional[int], Optional[int], Optional[int],
Expand All @@ -77,14 +98,37 @@ def to_tuple(e: ProfileEvent) -> Record:
e.port_length, e.slot, e.message_number, e.message_size,
e.message_timestamp)

cur.executemany(
"INSERT INTO events"
" (instance, event_type, start_time, stop_time, port_name,"
" port_operator, port_length, slot, message_number,"
" message_size, message_timestamp)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
map(to_tuple, events))
cur.execute("COMMIT")
cur = self._get_cursor()
batch = self._queue.get()
while batch is not None:
instance_id, events = batch

cur = self._get_cursor()
cur.execute("BEGIN IMMEDIATE TRANSACTION")
cur.execute(
"SELECT oid FROM instances WHERE name = ?",
(str(instance_id),))
oids = cur.fetchall()
if oids:
instance_oid = oids[0][0]
else:
cur.execute(
"INSERT INTO instances (name) VALUES (?)",
(str(instance_id),))
instance_oid = cur.lastrowid

cur.executemany(
"INSERT INTO events"
" (instance, event_type, start_time, stop_time, port_name,"
" port_operator, port_length, slot, message_number,"
" message_size, message_timestamp)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
map(to_tuple, events))
cur.execute("COMMIT")
if _SYNCHED:
self._confirmation_queue.put(None)
batch = self._queue.get()

cur.close()

def _init_database(self) -> None:
Expand Down
2 changes: 2 additions & 0 deletions libmuscle/python/libmuscle/manager/test/test_profile_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ymmsl import Operator, Port, Reference

import sqlite3
from unittest.mock import patch


def test_create_profile_store(tmp_path):
Expand Down Expand Up @@ -43,6 +44,7 @@ def test_create_profile_store(tmp_path):
conn.close()


@patch('libmuscle.manager.profile_store._SYNCHED', True)
def test_add_events(tmp_path):
db = ProfileStore(tmp_path)

Expand Down

0 comments on commit 3855303

Please sign in to comment.