Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix attribute error and task scheduling error #25

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions kubeluigi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
class KubernetesJobTask:

volumes: List[AttachableVolume] = []

def __init__(self):
self.tolerations: List[V1Toleration] = []
tolerations: List[V1Toleration] = []

def _init_task_metadata(self):
self.uu_name = self.name
Expand Down Expand Up @@ -98,7 +96,7 @@ def build_job_definition(self) -> V1Job:
def onpodstarted(self, pods):
for pod in pods:
logger.info(
f"Tail the Pod logs using: kubectl logs -f -n {pod.namespace} {pod.name}"
f"Tail the Pod logs using: kubectl logs -f -n {pod.metadata.namespace} {pod.metadata.name}"
)

def as_yaml(self):
Expand Down
20 changes: 16 additions & 4 deletions kubeluigi/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class FailedJob(Exception):
def __init__(self, job, pods, message):
self.job = job
self.pods = pods
self.message = message
self.message = job.metadata.name + ": " + message
super().__init__(self.message)


Expand Down Expand Up @@ -91,10 +91,11 @@ def get_container_with_volume_mounts(container):
"""
volumes_spec = container["volume_mounts"]
mount_volumes = []
keys_to_omit = {"host_path"}
for volume in volumes_spec:
mount_path = volume["mountPath"]
name = volume["name"]
mount_volumes.append(V1VolumeMount(mount_path=mount_path, name=name))
# we need things like read_only, sub_path, etc:
volume_std_spec = {k: v for k, v in volume.items() if k not in keys_to_omit}
mount_volumes.append(V1VolumeMount(**volume_std_spec))
container["volume_mounts"] = mount_volumes
return container

Expand Down Expand Up @@ -134,6 +135,17 @@ def kick_off_job(k8s_client: ApiClient, job: V1Job) -> V1Job:
job = k8s_client.read_namespaced_job(
job.metadata.name, job.metadata.namespace
)
# TODO: improve design of this
# Problem: of a job failed, it's currently tracked and keeps
# the Luigi task failing. This is a quick patch to avoid that.
if not job.status.active:
condition = job.status.conditions[0]
if condition.type == "Failed" and condition.reason == "BackoffLimitExceeded":
logger.warning(
"The job you tried to start was in BackoffLimitExceeded state, deleting it"
)
clean_job_resources(k8s_client, job)
raise RuntimeError("Found orphan failed job with the same spec, deleted it.")
else:
raise e

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ def run_tests(self):
license="Apache License 2.0",
packages=find_packages(exclude=["tests"]),
cmdclass={"test": PyTest},
install_requires=["kubernetes>=17.17.0", "luigi", "PyYaml==5.4.1"],
install_requires=["kubernetes>=17.17.0", "luigi", "PyYaml"],
entry_points={},
)
4 changes: 2 additions & 2 deletions test/kubernetes_helpers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_pod_spec_from_dict():
"imagePullPolicy": "Always",
"env": [{"name": "my_env", "value": "env"}],
"volume_mounts": [
{"name": "Vname", "mountPath": "VmountPath", "host_path": "VhostPath"}
{"name": "Vname", "mount_path": "VmountPath", "host_path": "VhostPath"}
],
}
],
Expand Down Expand Up @@ -105,7 +105,7 @@ def test_pod_spec_with_volume_from_dict():
"imagePullPolicy": "Always",
"env": [{"name": "my_env", "value": "env"}],
"volume_mounts": [
{"name": "Vname", "mountPath": "VmountPath", "host_path": "VhostPath"}
{"name": "Vname", "mount_path": "VmountPath", "host_path": "VhostPath"}
],
}

Expand Down
5 changes: 0 additions & 5 deletions test/test_kubernetes_job_task.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import yaml


from mock import patch, MagicMock
import pytest

from kubeluigi import KubernetesJobTask

from kubernetes.client import BatchV1Api


class DummyTask(KubernetesJobTask):
@property
Expand Down
21 changes: 21 additions & 0 deletions test/test_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Any, Dict

import luigi

from kubeluigi import KubernetesJobTask


class DummyK8sTask(KubernetesJobTask, luigi.Task):
@property
def name(self) -> str:
return "dummy-tsk"

def spec_schema(self) -> Dict[str, Any]:
return {}

def run(self):
pass


def test_task_can_be_scheduled_by_luigi():
luigi.build([DummyK8sTask()], local_scheduler=True, log_level="WARNING")