Skip to content

Commit

Permalink
Handle Celery tasks with REVOKED status (#1540)
Browse files Browse the repository at this point in the history
* Handle Celery tasks with REVOKED status

* Updates

* Update variable name

* Style updates

* Remove extra newline
  • Loading branch information
jleaniz committed Aug 30, 2024
1 parent 4831195 commit 23eb6bd
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 24 deletions.
3 changes: 2 additions & 1 deletion turbinia/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@
'API_UPLOAD_CHUNK_SIZE',
'API_EVIDENCE_UPLOAD_DIR',
'API_MAX_UPLOAD_SIZE',
'WEBUI_PATH'
'WEBUI_PATH',
'CELERY_TASK_EXPIRATION_TIME'
]

# Optional config vars. Some may be mandatory depending on the configuration
Expand Down
5 changes: 5 additions & 0 deletions turbinia/config/turbinia_config_tmpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@
# Storage for task results/status
CELERY_BACKEND = f'redis://{REDIS_HOST}'

# Task expiration (in seconds). Tasks will be revoked
# after the expiration time elapses. Revoked tasks will not
# be processed by Turbinia workers
CELERY_TASK_EXPIRATION_TIME = 86400 # 24 hours

# Can be the same as CELERY_BROKER
KOMBU_BROKER = CELERY_BROKER

Expand Down
30 changes: 23 additions & 7 deletions turbinia/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def enqueue_task(self, task, evidence_, timeout_limit):
"""
raise NotImplementedError

def process_result(self, task_result):
def process_result(self, task_result: workers.TurbiniaTaskResult):
"""Runs final task results recording.
self.process_tasks handles things that have failed at the task queue layer
Expand Down Expand Up @@ -526,8 +526,9 @@ def process_result(self, task_result):
job = self.get_job(task_result.job_id)
if not job:
log.warning(
f'Received task results for unknown Job from Task ID '
f'{task_result.task_id:s}')
f'Received task results from Task ID {task_result.task_id:s} '
f'with no associated Job ID. This could indicate the task did '
f'not run (e.g. if it was revoked).')

# Reprocess new evidence and save instance for later consumption by finalize
# tasks.
Expand Down Expand Up @@ -669,24 +670,39 @@ def process_tasks(self):
list[TurbiniaTask]: all completed tasks
"""
completed_tasks = []
task: workers.TurbiniaTask = None
for task in self.tasks:
check_timeout = False
celery_task = task.stub
# ref: https://docs.celeryq.dev/en/stable/reference/celery.states.html
if not celery_task:
log.debug(f'Task {task.stub.task_id:s} not yet created')
log.debug(f'Task {task.stub.task_id:s} not yet created.')
check_timeout = True
elif celery_task.status == celery_states.STARTED:
log.debug(f'Task {celery_task.id:s} not finished')
log.debug(f'Task {celery_task.id:s} not finished.')
check_timeout = True
elif celery_task.status == celery_states.FAILURE:
log.warning(f'Task {celery_task.id:s} failed.')
completed_tasks.append(task)
elif celery_task.status == celery_states.SUCCESS:
task.result = workers.TurbiniaTaskResult.deserialize(celery_task.result)
completed_tasks.append(task)
elif celery_task.status == celery_states.PENDING:
task.status = 'pending'
log.debug(f'Task {celery_task.id:s} status pending.')
elif celery_task.status == celery_states.REVOKED:
message = (
f'Celery task {celery_task.id:s} associated with Turbinia '
f'task {task.id} was revoked. This could be caused if the task is '
f'not started before the CELERY_TASK_EXPIRATION_TIME or if the '
f'task is manually revoked. Task will not be processed.')
log.warning(message)
task = self.timeout_task(task, config.CELERY_TASK_EXPIRATION_TIME)
task.result.status = message
completed_tasks.append(task)
else:
check_timeout = True
log.debug(f'Task {celery_task.id:s} status unknown')
log.debug(f'Task {celery_task.id:s} status unknown.')

# For certain Task states we want to check whether the Task has timed out
# or not.
Expand Down Expand Up @@ -764,4 +780,4 @@ def enqueue_task(self, task, evidence_, timeout):
task.stub = self.celery_runner.apply_async(
(task.serialize(), evidence_.serialize()), retry=False,
soft_time_limit=celery_soft_timeout, time_limit=celery_hard_timeout,
expires=celery_hard_timeout)
expires=config.CELERY_TASK_EXPIRATION_TIME)
3 changes: 2 additions & 1 deletion turbinia/workers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,8 @@ def run_wrapper(self, evidence):
self._evidence_config = evidence.config
self.task_config = self.get_task_recipe(evidence.config)
self.worker_start_time = datetime.now()
self.update_task_status(self, 'running')
updated_status = f'{self.id} is running on worker {self.worker_name}'
self.update_task_status(self, updated_status)
self.result = self.run(evidence, self.result)

# pylint: disable=broad-except
Expand Down
29 changes: 14 additions & 15 deletions web/src/components/TaskList.vue
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ limitations under the License.
<template>
<div>
<v-list density="compact">
<v-empty-state v-if="taskList.length === 0"
text="No Tasks are available. Try adjusting your filters.">
<v-empty-state v-if="taskList.length === 0" text="No Tasks are available. Try adjusting your filters.">
</v-empty-state>
<v-virtual-scroll :items="taskList" :item-height="40" :height="400" v-else>
<template v-slot:default="{ item }">
Expand All @@ -41,7 +40,7 @@ limitations under the License.
</v-list-item-action>
</div>
<v-list-item :max-width="800">
{{ item.task_name }} {{ $filters.truncate(item.task_status, 128, '...') }}
{{ item.task_name }}: {{ $filters.truncate(item.task_status, 384, '...') }}
</v-list-item>
</v-list-item>
<v-divider> </v-divider>
Expand Down Expand Up @@ -77,26 +76,26 @@ export default {
let taskStatusTemp = task_dict.status
// As pending status requests show as null or pending
if (taskStatusTemp === null || taskStatusTemp === "pending") {
taskStatusTemp = 'pending on server.'
taskStatusTemp = 'is pending on server.'
}
if (this.filterJobs.length > 0) {
let jobName = task_dict.job_name.toLowerCase()
if ( this.radioFilter && !this.filterJobs.includes(jobName)) {
if (this.radioFilter && !this.filterJobs.includes(jobName)) {
continue;
} else if ( !this.radioFilter && this.filterJobs.includes(jobName)) {
} else if (!this.radioFilter && this.filterJobs.includes(jobName)) {
continue
}
}
let taskListTemp = {
job_name: task_dict.job_name,
task_name: task_dict.name,
task_id: task_dict.id,
task_status: taskStatusTemp,
task_success: task_dict.successful,
evidence_name: task_dict.evidence_name,
evidence_id: task_dict.evidence_id,
evidence_size: task_dict.evidence_size,
}
job_name: task_dict.job_name,
task_name: task_dict.name,
task_id: task_dict.id,
task_status: taskStatusTemp,
task_success: task_dict.successful,
evidence_name: task_dict.evidence_name,
evidence_id: task_dict.evidence_id,
evidence_size: task_dict.evidence_size,
}
// When Failed filter chip is applied
if (task_dict.successful === false && this.filterFailed) {
taskList.push(taskListTemp)
Expand Down

0 comments on commit 23eb6bd

Please sign in to comment.