Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create default Santa target counters #994

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/santa/test_api_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from zentral.contrib.santa.models import Configuration, Rule, RuleSet, Target, Enrollment, Bundle
from zentral.core.events.base import AuditEvent
from zentral.utils.payloads import get_payload_identifier
from .test_rule_engine import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id
from .utils import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id


class APIViewsTestCase(TestCase):
Expand Down
219 changes: 163 additions & 56 deletions tests/santa/test_metrics_views.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,15 @@
import uuid
from unittest.mock import call, patch
from django.urls import reverse
from django.test import TestCase
from django.utils.crypto import get_random_string
from prometheus_client.parser import text_string_to_metric_families
from zentral.contrib.inventory.models import EnrollmentSecret, MetaBusinessUnit
from zentral.contrib.santa.models import Configuration, EnrolledMachine, Enrollment, Rule, Target
from zentral.contrib.santa.models import Rule, Target, translate_rule_policy
from zentral.conf import settings
from .test_rule_engine import new_cdhash, new_sha256, new_team_id
from .utils import force_configuration, force_enrolled_machine, force_rule, force_target_counter


class SantaMetricsViewsTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.configuration = Configuration.objects.create(name=get_random_string(256))
cls.meta_business_unit = MetaBusinessUnit.objects.create(name=get_random_string(64))
cls.enrollment_secret = EnrollmentSecret.objects.create(meta_business_unit=cls.meta_business_unit)
cls.enrollment = Enrollment.objects.create(configuration=cls.configuration,
secret=cls.enrollment_secret)
cls.machine_serial_number = get_random_string(64)
cls.enrolled_machine = EnrolledMachine.objects.create(enrollment=cls.enrollment,
hardware_uuid=uuid.uuid4(),
serial_number=cls.machine_serial_number,
client_mode=Configuration.MONITOR_MODE,
santa_version="2021.7")

# utility methods

def _force_configuration(self):
return Configuration.objects.create(name=get_random_string(12))

def _make_authenticated_request(self):
return self.client.get(reverse("santa_metrics:all"),
HTTP_AUTHORIZATION=f'Bearer {settings["api"]["metrics_bearer_token"]}')
Expand All @@ -44,69 +25,195 @@ def test_metrics_permission_ok(self):
self.assertEqual(response.status_code, 200)

def test_configurations(self):
config_m = force_configuration(lockdown=False)
config_l = force_configuration(lockdown=True)
response = self._make_authenticated_request()
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if family.name != "zentral_santa_configurations":
if family.name != "zentral_santa_configurations_info":
continue
else:
self.assertEqual(len(family.samples), 1)
sample = family.samples[0]
self.assertEqual(sample.value, 1)
self.assertEqual(sample.labels["mode"], "monitor")
self.assertEqual(len(family.samples), 2)
for sample in family.samples:
self.assertEqual(sample.value, 1)
if int(sample.labels["pk"]) == config_m.pk:
self.assertEqual(sample.labels["mode"], "MONITOR")
self.assertEqual(sample.labels["name"], config_m.name)
elif int(sample.labels["pk"]) == config_l.pk:
self.assertEqual(sample.labels["mode"], "LOCKDOWN")
self.assertEqual(sample.labels["name"], config_l.name)
else:
raise AssertionError("Unknown config")
break
else:
raise AssertionError("could not find expected metric family")
self.assertEqual(response.status_code, 200)

@patch("zentral.contrib.santa.metrics_views.connection")
@patch("zentral.contrib.santa.metrics_views.logger.warning")
def test_configurations_info_unknown_mode(self, warning, connection):
mocked_fetchall = connection.cursor.return_value.__enter__.return_value.fetchall
mocked_fetchall.side_effect = [
[(1, "yolo", 42)], # 1st call with bad mode
[], # 2nd call for the enrolled machines gauge
[], # 3rd call for the rules gauge
[], # 4th call for the targets gauge
]
response = self._make_authenticated_request()
family_count = 0
sample_count = 0
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if family.name != "zentral_santa_configurations_info":
continue
family_count += 1
sample_count += len(family.samples)
self.assertEqual(family_count, 1)
self.assertEqual(sample_count, 0)
warning.assert_called_once_with("Unknown santa configuration mode: %s", 42)
self.assertEqual(mocked_fetchall.mock_calls, [call() for _ in range(4)])

def test_enrolled_machines(self):
em_m = force_enrolled_machine(lockdown=False, santa_version="2024.5")
em_l = force_enrolled_machine(lockdown=True, santa_version="2024.6")
response = self._make_authenticated_request()
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if family.name != "zentral_santa_enrolled_machines":
if family.name != "zentral_santa_enrolled_machines_total":
continue
else:
self.assertEqual(len(family.samples), 1)
sample = family.samples[0]
self.assertEqual(sample.value, 1)
self.assertEqual(sample.labels["configuration"], self.configuration.name)
self.assertEqual(sample.labels["mode"], "monitor")
self.assertEqual(sample.labels["santa_version"], self.enrolled_machine.santa_version)
self.assertEqual(len(family.samples), 2)
for sample in family.samples:
self.assertEqual(sample.value, 1)
cfg_pk = int(sample.labels["cfg_pk"])
if cfg_pk == em_m.enrollment.configuration.pk:
self.assertEqual(sample.labels["mode"], "MONITOR")
self.assertEqual(sample.labels["santa_version"], "2024.5")
elif cfg_pk == em_l.enrollment.configuration.pk:
self.assertEqual(sample.labels["mode"], "LOCKDOWN")
self.assertEqual(sample.labels["santa_version"], "2024.6")
else:
raise AssertionError("Unknown enrolled machine")
break
else:
raise AssertionError("could not find expected metric family")
self.assertEqual(response.status_code, 200)

@patch("zentral.contrib.santa.metrics_views.connection")
@patch("zentral.contrib.santa.metrics_views.logger.warning")
def test_enrolled_machines_unknown_mode(self, warning, connection):
mocked_fetchall = connection.cursor.return_value.__enter__.return_value.fetchall
mocked_fetchall.side_effect = [
[], # 1st call for the configurations info gauge
[(1, 42, "2024.5", 1)], # 2nd call for the enrolled machines gauge
[], # 3rd call for the rules gauge
[], # 4th call for the targets gauge
]
response = self._make_authenticated_request()
family_count = 0
sample_count = 0
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if family.name != "zentral_santa_enrolled_machines_total":
continue
family_count += 1
sample_count += len(family.samples)
self.assertEqual(family_count, 1)
self.assertEqual(sample_count, 0)
warning.assert_called_once_with("Unknown santa configuration mode: %s", 42)
self.assertEqual(mocked_fetchall.mock_calls, [call() for _ in range(4)])

def test_rules(self):
for target_type in (Target.BINARY,
Target.BUNDLE,
Target.CDHASH,
Target.CERTIFICATE,
Target.TEAM_ID,
Target.SIGNING_ID):
if target_type == Target.CDHASH:
identifier = new_cdhash()
if target_type == Target.TEAM_ID:
identifier = new_team_id()
elif target_type == Target.SIGNING_ID:
identifier = "platform:com.apple.curl"
else:
identifier = new_sha256()
target = Target.objects.create(type=target_type, identifier=identifier)
Rule.objects.create(configuration=self.configuration, target=target, policy=Rule.BLOCKLIST)
rules = {}
for target_type, policy in (
(Target.BINARY, Rule.ALLOWLIST),
(Target.BUNDLE, Rule.BLOCKLIST),
(Target.CDHASH, Rule.ALLOWLIST_COMPILER),
(Target.CERTIFICATE, Rule.SILENT_BLOCKLIST),
(Target.TEAM_ID, Rule.ALLOWLIST),
(Target.SIGNING_ID, Rule.BLOCKLIST),
):
rule = force_rule(target_type=target_type, policy=policy)
rules[str(rule.configuration.pk)] = rule
response = self._make_authenticated_request()
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if family.name != "zentral_santa_rules":
if family.name != "zentral_santa_rules_total":
continue
else:
self.assertEqual(len(family.samples), 6)
target_type_set = set()
for sample in family.samples:
self.assertEqual(sample.value, 1)
self.assertEqual(sample.labels["configuration"], self.configuration.name)
self.assertEqual(sample.labels["ruleset"], "_")
self.assertEqual(sample.labels["policy"], "blocklist")
target_type_set.add(sample.labels["target_type"])
self.assertEqual(target_type_set, {"binary", "bundle", "cdhash", "certificate", "teamid", "signingid"})
rule = rules[sample.labels["cfg_pk"]]
self.assertEqual(sample.labels["policy"], translate_rule_policy(rule.policy))
self.assertEqual(sample.labels["target_type"], rule.target.type)
break
else:
raise AssertionError("could not find expected metric family")
self.assertEqual(response.status_code, 200)

@patch("zentral.contrib.santa.metrics_views.connection")
@patch("zentral.contrib.santa.metrics_views.logger.error")
def test_rules_unknown_policy(self, warning, connection):
mocked_fetchall = connection.cursor.return_value.__enter__.return_value.fetchall
mocked_fetchall.side_effect = [
[], # 1st call for the configurations info
[], # 2nd call for the enrolled machines gauge
[(1, None, "BUNDLE", 42, 1)], # 3rd call with unknown policy
[], # 4th call for the targets gauge
]
response = self._make_authenticated_request()
family_count = 0
sample_count = 0
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if family.name != "zentral_santa_rules_total":
continue
family_count += 1
sample_count += len(family.samples)
self.assertEqual(family_count, 1)
self.assertEqual(sample_count, 0)
warning.assert_called_once_with("Unknown rule policy: %s", 42)
self.assertEqual(mocked_fetchall.mock_calls, [call() for _ in range(4)])

def test_targets(self):
target_counters = {}
for target_type, blocked_count, collected_count, executed_count, is_rule in (
(Target.BINARY, 11, 0, 0, True),
(Target.BUNDLE, 11, 22, 0, False),
(Target.CDHASH, 11, 22, 33, False),
(Target.CERTIFICATE, 1, 0, 0, False),
(Target.TEAM_ID, 1, 2, 0, False),
(Target.SIGNING_ID, 1, 2, 3, True),
):
target_counter = force_target_counter(
target_type,
blocked_count=blocked_count,
collected_count=collected_count,
executed_count=executed_count,
is_rule=is_rule,
)
target_counters.setdefault(str(target_counter.configuration.pk), {})[target_counter.target.type] = {
"total": 1,
"blocked_total": blocked_count,
"collected_total": collected_count,
"executed_total": executed_count,
"rules_total": 1 if is_rule else 0
}
response = self._make_authenticated_request()
family_count = 0
total_keys = set()
for family in text_string_to_metric_families(response.content.decode("utf-8")):
if not family.name.startswith("zentral_santa_targets_"):
continue
family_count += 1
total_key = family.name.removeprefix("zentral_santa_targets_")
total_keys.add(total_key)
sample_count = 0
for sample in family.samples:
sample_count += 1
self.assertEqual(
sample.value,
target_counters[sample.labels["cfg_pk"]][sample.labels["type"]][total_key]
)
self.assertEqual(sample_count, 6)
self.assertEqual(family_count, 5)
self.assertEqual(
total_keys,
{"total", "blocked_total", "collected_total", "executed_total", "rules_total"}
)
17 changes: 1 addition & 16 deletions tests/santa/test_rule_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,7 @@
from zentral.contrib.santa.models import (Bundle, Configuration, EnrolledMachine, Enrollment,
MachineRule, Rule, Target, translate_rule_policy)
from zentral.contrib.santa.forms import test_cdhash, test_signing_id_identifier


def new_cdhash():
return get_random_string(length=40, allowed_chars='abcdef0123456789')


def new_sha256():
return get_random_string(length=64, allowed_chars='abcdef0123456789')


def new_team_id():
return get_random_string(10, allowed_chars="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ")


def new_signing_id_identifier():
return ":".join((new_team_id(), get_random_string(10, allowed_chars="abcdefghij")))
from .utils import new_cdhash, new_sha256, new_team_id, new_signing_id_identifier


class SantaRuleEngineTestCase(TestCase):
Expand Down
2 changes: 1 addition & 1 deletion tests/santa/test_santa_api_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from zentral.contrib.santa.models import (Bundle, Configuration, EnrolledMachine, Enrollment,
MachineRule, Rule, Target, TargetCounter)
from zentral.core.incidents.models import Severity
from .test_rule_engine import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id
from .utils import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id


@override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage')
Expand Down
2 changes: 1 addition & 1 deletion tests/santa/test_santa_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
SantaEnrollmentEvent, SantaEventEvent,
SantaRuleSetUpdateEvent, SantaRuleUpdateEvent)
from zentral.contrib.santa.models import Bundle, Configuration, Target
from .test_rule_engine import new_sha256
from .utils import new_sha256


class SantaEventTestCase(TestCase):
Expand Down
2 changes: 1 addition & 1 deletion tests/santa/test_setup_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from zentral.contrib.inventory.models import EnrollmentSecret, MetaBusinessUnit, File, Tag
from zentral.contrib.santa.models import Bundle, Configuration, Enrollment, Rule, Target
from zentral.core.events.base import AuditEvent
from .test_rule_engine import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id
from .utils import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id


@override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage')
Expand Down
2 changes: 1 addition & 1 deletion tests/santa/test_targets_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from accounts.models import User
from zentral.contrib.santa.models import Bundle, Configuration, Target
from zentral.core.stores.conf import frontend_store
from .test_rule_engine import new_cdhash, new_sha256, new_team_id
from .utils import new_cdhash, new_sha256, new_team_id


@override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage')
Expand Down
Loading
Loading