Skip to content

Commit

Permalink
job_queue: improve state reporting
Browse files Browse the repository at this point in the history
Only report ready when an item is in the queue and automatic transition is enabled.

Signed-off-by:  Eric Callahan <[email protected]>
  • Loading branch information
Arksine committed Jul 5, 2024
1 parent 8f3b30a commit dc00d38
Showing 1 changed file with 25 additions and 17 deletions.
42 changes: 25 additions & 17 deletions moonraker/components/job_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@
from ..common import WebRequest, UserInfo
from .klippy_apis import KlippyAPI
from .file_manager.file_manager import FileManager
from .job_state import JobState

class JobQueue:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.queued_jobs: Dict[str, QueuedJob] = {}
self.lock = asyncio.Lock()
self.pause_requested: bool = False
self.load_on_start = config.getboolean("load_on_startup", False)
self.automatic = config.getboolean("automatic_transition", False)
self.queue_state: str = "ready" if self.automatic else "paused"
self.queue_state: str = "paused"
self.job_delay = config.getfloat("job_transition_delay", 0.01)
if self.job_delay <= 0.:
raise config.error(
Expand All @@ -52,8 +54,7 @@ def __init__(self, config: ConfigHelper) -> None:

self.server.register_notification("job_queue:job_queue_changed")
self.server.register_remote_method("pause_job_queue", self.pause_queue)
self.server.register_remote_method("start_job_queue",
self.start_queue)
self.server.register_remote_method("start_job_queue", self.start_queue)

self.server.register_endpoint(
"/server/job_queue/job", RequestType.POST | RequestType.DELETE,
Expand Down Expand Up @@ -84,9 +85,9 @@ async def _handle_ready(self) -> None:
1., self._pop_job, False)

async def _handle_shutdown(self) -> None:
has_requested_pause = self.pause_requested
await self.pause_queue()
if not self.queued_jobs and self.automatic:
self._set_queue_state("ready")
self.pause_requested = has_requested_pause

async def _on_job_state_changed(self, job_event: JobEvent, *args) -> None:
if job_event == JobEvent.COMPLETE:
Expand Down Expand Up @@ -116,8 +117,7 @@ async def _pop_job(self, need_transition: bool = True) -> None:
if self.queue_state == "paused":
return
if not self.queued_jobs:
qs = "ready" if self.automatic else "paused"
self._set_queue_state(qs)
self._set_queue_state("paused")
return
kapis: KlippyAPI = self.server.lookup_component('klippy_apis')
uid, job = list(self.queued_jobs.items())[0]
Expand All @@ -132,6 +132,7 @@ async def _pop_job(self, need_transition: bool = True) -> None:
# Check to see if the queue was paused while running
# the job transition gcode
if self.queue_state != "loading":
self._set_queue_state("paused")
raise self.server.error(
"Queue State Changed during Transition Gcode")
self._set_queue_state("starting")
Expand All @@ -144,8 +145,9 @@ async def _pop_job(self, need_transition: bool = True) -> None:
else:
self.queued_jobs.pop(uid, None)
if self.queue_state == "starting":
# If the queue was not paused while starting the print,
set_ready = not self.queued_jobs or self.automatic
# Set the queue to ready if items are in the queue
# and auto transition is enabled
set_ready = len(self.queued_jobs) > 0 and self.automatic
self.queue_state = "ready" if set_ready else "paused"
self._send_queue_event(action="job_loaded")

Expand Down Expand Up @@ -184,6 +186,11 @@ async def queue_job(self,
queued_job = QueuedJob(fname, user)
self.queued_jobs[queued_job.job_id] = queued_job
self._send_queue_event(action="jobs_added")
if self.automatic and not self.pause_requested:
jstate: JobState = self.server.lookup_component("job_state")
last_evt = jstate.get_last_job_event()
if last_evt.is_printing or last_evt == JobEvent.PAUSED:
self._set_queue_state("ready")

async def delete_job(self,
job_ids: Union[str, List[str]],
Expand All @@ -210,23 +217,24 @@ async def pause_queue(self) -> None:
if self.pop_queue_handle is not None:
self.pop_queue_handle.cancel()
self.pop_queue_handle = None
# Acquire the lock to wait for any pending operations to
# complete
await self.lock.acquire()
self.lock.release()
# Acquire the lock to wait for any pending operations to complete
async with self.lock:
self.pause_requested = True

async def start_queue(self) -> None:
async with self.lock:
if self.queue_state != "loading":
if self.queued_jobs and await self._check_can_print():
self.pause_requested = False
if self.queue_state in ("ready", "paused"):
if not self.queued_jobs:
self._set_queue_state("paused")
elif await self._check_can_print():
self._set_queue_state("loading")
event_loop = self.server.get_event_loop()
self.pop_queue_handle = event_loop.delay_callback(
0.01, self._pop_job, False
)
else:
qs = "ready" if self.automatic else "paused"
self._set_queue_state(qs)
self._set_queue_state("ready" if self.automatic else "paused")

def _job_map_to_list(self) -> List[Dict[str, Any]]:
cur_time = time.time()
Expand Down

0 comments on commit dc00d38

Please sign in to comment.