diff --git a/tests/santa/test_api_views.py b/tests/santa/test_api_views.py index d0f3e20b75..d9a3651c1f 100644 --- a/tests/santa/test_api_views.py +++ b/tests/santa/test_api_views.py @@ -18,7 +18,7 @@ from zentral.contrib.santa.models import Configuration, Rule, RuleSet, Target, Enrollment, Bundle from zentral.core.events.base import AuditEvent from zentral.utils.payloads import get_payload_identifier -from .test_rule_engine import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id +from .utils import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id class APIViewsTestCase(TestCase): diff --git a/tests/santa/test_metrics_views.py b/tests/santa/test_metrics_views.py index 430a252dca..9787317155 100644 --- a/tests/santa/test_metrics_views.py +++ b/tests/santa/test_metrics_views.py @@ -1,34 +1,15 @@ -import uuid +from unittest.mock import call, patch from django.urls import reverse from django.test import TestCase -from django.utils.crypto import get_random_string from prometheus_client.parser import text_string_to_metric_families -from zentral.contrib.inventory.models import EnrollmentSecret, MetaBusinessUnit -from zentral.contrib.santa.models import Configuration, EnrolledMachine, Enrollment, Rule, Target +from zentral.contrib.santa.models import Rule, Target, translate_rule_policy from zentral.conf import settings -from .test_rule_engine import new_cdhash, new_sha256, new_team_id +from .utils import force_configuration, force_enrolled_machine, force_rule, force_target_counter class SantaMetricsViewsTestCase(TestCase): - @classmethod - def setUpTestData(cls): - cls.configuration = Configuration.objects.create(name=get_random_string(256)) - cls.meta_business_unit = MetaBusinessUnit.objects.create(name=get_random_string(64)) - cls.enrollment_secret = EnrollmentSecret.objects.create(meta_business_unit=cls.meta_business_unit) - cls.enrollment = Enrollment.objects.create(configuration=cls.configuration, - secret=cls.enrollment_secret) - cls.machine_serial_number = get_random_string(64) - cls.enrolled_machine = EnrolledMachine.objects.create(enrollment=cls.enrollment, - hardware_uuid=uuid.uuid4(), - serial_number=cls.machine_serial_number, - client_mode=Configuration.MONITOR_MODE, - santa_version="2021.7") - # utility methods - def _force_configuration(self): - return Configuration.objects.create(name=get_random_string(12)) - def _make_authenticated_request(self): return self.client.get(reverse("santa_metrics:all"), HTTP_AUTHORIZATION=f'Bearer {settings["api"]["metrics_bearer_token"]}') @@ -44,69 +25,195 @@ def test_metrics_permission_ok(self): self.assertEqual(response.status_code, 200) def test_configurations(self): + config_m = force_configuration(lockdown=False) + config_l = force_configuration(lockdown=True) response = self._make_authenticated_request() for family in text_string_to_metric_families(response.content.decode("utf-8")): - if family.name != "zentral_santa_configurations": + if family.name != "zentral_santa_configurations_info": continue else: - self.assertEqual(len(family.samples), 1) - sample = family.samples[0] - self.assertEqual(sample.value, 1) - self.assertEqual(sample.labels["mode"], "monitor") + self.assertEqual(len(family.samples), 2) + for sample in family.samples: + self.assertEqual(sample.value, 1) + if int(sample.labels["pk"]) == config_m.pk: + self.assertEqual(sample.labels["mode"], "MONITOR") + self.assertEqual(sample.labels["name"], config_m.name) + elif int(sample.labels["pk"]) == config_l.pk: + self.assertEqual(sample.labels["mode"], "LOCKDOWN") + self.assertEqual(sample.labels["name"], config_l.name) + else: + raise AssertionError("Unknown config") break else: raise AssertionError("could not find expected metric family") self.assertEqual(response.status_code, 200) + @patch("zentral.contrib.santa.metrics_views.connection") + @patch("zentral.contrib.santa.metrics_views.logger.warning") + def test_configurations_info_unknown_mode(self, warning, connection): + mocked_fetchall = connection.cursor.return_value.__enter__.return_value.fetchall + mocked_fetchall.side_effect = [ + [(1, "yolo", 42)], # 1st call with bad mode + [], # 2nd call for the enrolled machines gauge + [], # 3rd call for the rules gauge + [], # 4th call for the targets gauge + ] + response = self._make_authenticated_request() + family_count = 0 + sample_count = 0 + for family in text_string_to_metric_families(response.content.decode("utf-8")): + if family.name != "zentral_santa_configurations_info": + continue + family_count += 1 + sample_count += len(family.samples) + self.assertEqual(family_count, 1) + self.assertEqual(sample_count, 0) + warning.assert_called_once_with("Unknown santa configuration mode: %s", 42) + self.assertEqual(mocked_fetchall.mock_calls, [call() for _ in range(4)]) + def test_enrolled_machines(self): + em_m = force_enrolled_machine(lockdown=False, santa_version="2024.5") + em_l = force_enrolled_machine(lockdown=True, santa_version="2024.6") response = self._make_authenticated_request() for family in text_string_to_metric_families(response.content.decode("utf-8")): - if family.name != "zentral_santa_enrolled_machines": + if family.name != "zentral_santa_enrolled_machines_total": continue else: - self.assertEqual(len(family.samples), 1) - sample = family.samples[0] - self.assertEqual(sample.value, 1) - self.assertEqual(sample.labels["configuration"], self.configuration.name) - self.assertEqual(sample.labels["mode"], "monitor") - self.assertEqual(sample.labels["santa_version"], self.enrolled_machine.santa_version) + self.assertEqual(len(family.samples), 2) + for sample in family.samples: + self.assertEqual(sample.value, 1) + cfg_pk = int(sample.labels["cfg_pk"]) + if cfg_pk == em_m.enrollment.configuration.pk: + self.assertEqual(sample.labels["mode"], "MONITOR") + self.assertEqual(sample.labels["santa_version"], "2024.5") + elif cfg_pk == em_l.enrollment.configuration.pk: + self.assertEqual(sample.labels["mode"], "LOCKDOWN") + self.assertEqual(sample.labels["santa_version"], "2024.6") + else: + raise AssertionError("Unknown enrolled machine") break else: raise AssertionError("could not find expected metric family") self.assertEqual(response.status_code, 200) + @patch("zentral.contrib.santa.metrics_views.connection") + @patch("zentral.contrib.santa.metrics_views.logger.warning") + def test_enrolled_machines_unknown_mode(self, warning, connection): + mocked_fetchall = connection.cursor.return_value.__enter__.return_value.fetchall + mocked_fetchall.side_effect = [ + [], # 1st call for the configurations info gauge + [(1, 42, "2024.5", 1)], # 2nd call for the enrolled machines gauge + [], # 3rd call for the rules gauge + [], # 4th call for the targets gauge + ] + response = self._make_authenticated_request() + family_count = 0 + sample_count = 0 + for family in text_string_to_metric_families(response.content.decode("utf-8")): + if family.name != "zentral_santa_enrolled_machines_total": + continue + family_count += 1 + sample_count += len(family.samples) + self.assertEqual(family_count, 1) + self.assertEqual(sample_count, 0) + warning.assert_called_once_with("Unknown santa configuration mode: %s", 42) + self.assertEqual(mocked_fetchall.mock_calls, [call() for _ in range(4)]) + def test_rules(self): - for target_type in (Target.BINARY, - Target.BUNDLE, - Target.CDHASH, - Target.CERTIFICATE, - Target.TEAM_ID, - Target.SIGNING_ID): - if target_type == Target.CDHASH: - identifier = new_cdhash() - if target_type == Target.TEAM_ID: - identifier = new_team_id() - elif target_type == Target.SIGNING_ID: - identifier = "platform:com.apple.curl" - else: - identifier = new_sha256() - target = Target.objects.create(type=target_type, identifier=identifier) - Rule.objects.create(configuration=self.configuration, target=target, policy=Rule.BLOCKLIST) + rules = {} + for target_type, policy in ( + (Target.BINARY, Rule.ALLOWLIST), + (Target.BUNDLE, Rule.BLOCKLIST), + (Target.CDHASH, Rule.ALLOWLIST_COMPILER), + (Target.CERTIFICATE, Rule.SILENT_BLOCKLIST), + (Target.TEAM_ID, Rule.ALLOWLIST), + (Target.SIGNING_ID, Rule.BLOCKLIST), + ): + rule = force_rule(target_type=target_type, policy=policy) + rules[str(rule.configuration.pk)] = rule response = self._make_authenticated_request() for family in text_string_to_metric_families(response.content.decode("utf-8")): - if family.name != "zentral_santa_rules": + if family.name != "zentral_santa_rules_total": continue else: self.assertEqual(len(family.samples), 6) - target_type_set = set() for sample in family.samples: self.assertEqual(sample.value, 1) - self.assertEqual(sample.labels["configuration"], self.configuration.name) self.assertEqual(sample.labels["ruleset"], "_") - self.assertEqual(sample.labels["policy"], "blocklist") - target_type_set.add(sample.labels["target_type"]) - self.assertEqual(target_type_set, {"binary", "bundle", "cdhash", "certificate", "teamid", "signingid"}) + rule = rules[sample.labels["cfg_pk"]] + self.assertEqual(sample.labels["policy"], translate_rule_policy(rule.policy)) + self.assertEqual(sample.labels["target_type"], rule.target.type) break else: raise AssertionError("could not find expected metric family") self.assertEqual(response.status_code, 200) + + @patch("zentral.contrib.santa.metrics_views.connection") + @patch("zentral.contrib.santa.metrics_views.logger.error") + def test_rules_unknown_policy(self, warning, connection): + mocked_fetchall = connection.cursor.return_value.__enter__.return_value.fetchall + mocked_fetchall.side_effect = [ + [], # 1st call for the configurations info + [], # 2nd call for the enrolled machines gauge + [(1, None, "BUNDLE", 42, 1)], # 3rd call with unknown policy + [], # 4th call for the targets gauge + ] + response = self._make_authenticated_request() + family_count = 0 + sample_count = 0 + for family in text_string_to_metric_families(response.content.decode("utf-8")): + if family.name != "zentral_santa_rules_total": + continue + family_count += 1 + sample_count += len(family.samples) + self.assertEqual(family_count, 1) + self.assertEqual(sample_count, 0) + warning.assert_called_once_with("Unknown rule policy: %s", 42) + self.assertEqual(mocked_fetchall.mock_calls, [call() for _ in range(4)]) + + def test_targets(self): + target_counters = {} + for target_type, blocked_count, collected_count, executed_count, is_rule in ( + (Target.BINARY, 11, 0, 0, True), + (Target.BUNDLE, 11, 22, 0, False), + (Target.CDHASH, 11, 22, 33, False), + (Target.CERTIFICATE, 1, 0, 0, False), + (Target.TEAM_ID, 1, 2, 0, False), + (Target.SIGNING_ID, 1, 2, 3, True), + ): + target_counter = force_target_counter( + target_type, + blocked_count=blocked_count, + collected_count=collected_count, + executed_count=executed_count, + is_rule=is_rule, + ) + target_counters.setdefault(str(target_counter.configuration.pk), {})[target_counter.target.type] = { + "total": 1, + "blocked_total": blocked_count, + "collected_total": collected_count, + "executed_total": executed_count, + "rules_total": 1 if is_rule else 0 + } + response = self._make_authenticated_request() + family_count = 0 + total_keys = set() + for family in text_string_to_metric_families(response.content.decode("utf-8")): + if not family.name.startswith("zentral_santa_targets_"): + continue + family_count += 1 + total_key = family.name.removeprefix("zentral_santa_targets_") + total_keys.add(total_key) + sample_count = 0 + for sample in family.samples: + sample_count += 1 + self.assertEqual( + sample.value, + target_counters[sample.labels["cfg_pk"]][sample.labels["type"]][total_key] + ) + self.assertEqual(sample_count, 6) + self.assertEqual(family_count, 5) + self.assertEqual( + total_keys, + {"total", "blocked_total", "collected_total", "executed_total", "rules_total"} + ) diff --git a/tests/santa/test_rule_engine.py b/tests/santa/test_rule_engine.py index a5bba924d9..1399b5f640 100644 --- a/tests/santa/test_rule_engine.py +++ b/tests/santa/test_rule_engine.py @@ -6,22 +6,7 @@ from zentral.contrib.santa.models import (Bundle, Configuration, EnrolledMachine, Enrollment, MachineRule, Rule, Target, translate_rule_policy) from zentral.contrib.santa.forms import test_cdhash, test_signing_id_identifier - - -def new_cdhash(): - return get_random_string(length=40, allowed_chars='abcdef0123456789') - - -def new_sha256(): - return get_random_string(length=64, allowed_chars='abcdef0123456789') - - -def new_team_id(): - return get_random_string(10, allowed_chars="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ") - - -def new_signing_id_identifier(): - return ":".join((new_team_id(), get_random_string(10, allowed_chars="abcdefghij"))) +from .utils import new_cdhash, new_sha256, new_team_id, new_signing_id_identifier class SantaRuleEngineTestCase(TestCase): diff --git a/tests/santa/test_santa_api_views.py b/tests/santa/test_santa_api_views.py index 1f412c7424..bbcb58a6d8 100644 --- a/tests/santa/test_santa_api_views.py +++ b/tests/santa/test_santa_api_views.py @@ -14,7 +14,7 @@ from zentral.contrib.santa.models import (Bundle, Configuration, EnrolledMachine, Enrollment, MachineRule, Rule, Target, TargetCounter) from zentral.core.incidents.models import Severity -from .test_rule_engine import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id +from .utils import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id @override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage') diff --git a/tests/santa/test_santa_events.py b/tests/santa/test_santa_events.py index e740c84dcb..73297de5d4 100644 --- a/tests/santa/test_santa_events.py +++ b/tests/santa/test_santa_events.py @@ -10,7 +10,7 @@ SantaEnrollmentEvent, SantaEventEvent, SantaRuleSetUpdateEvent, SantaRuleUpdateEvent) from zentral.contrib.santa.models import Bundle, Configuration, Target -from .test_rule_engine import new_sha256 +from .utils import new_sha256 class SantaEventTestCase(TestCase): diff --git a/tests/santa/test_setup_views.py b/tests/santa/test_setup_views.py index 9041ca09d5..af401991ab 100644 --- a/tests/santa/test_setup_views.py +++ b/tests/santa/test_setup_views.py @@ -14,7 +14,7 @@ from zentral.contrib.inventory.models import EnrollmentSecret, MetaBusinessUnit, File, Tag from zentral.contrib.santa.models import Bundle, Configuration, Enrollment, Rule, Target from zentral.core.events.base import AuditEvent -from .test_rule_engine import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id +from .utils import new_cdhash, new_sha256, new_signing_id_identifier, new_team_id @override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage') diff --git a/tests/santa/test_targets_views.py b/tests/santa/test_targets_views.py index 6879b9f61e..5e37ebd544 100644 --- a/tests/santa/test_targets_views.py +++ b/tests/santa/test_targets_views.py @@ -11,7 +11,7 @@ from accounts.models import User from zentral.contrib.santa.models import Bundle, Configuration, Target from zentral.core.stores.conf import frontend_store -from .test_rule_engine import new_cdhash, new_sha256, new_team_id +from .utils import new_cdhash, new_sha256, new_team_id @override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage') diff --git a/tests/santa/utils.py b/tests/santa/utils.py new file mode 100644 index 0000000000..cebf45f13e --- /dev/null +++ b/tests/santa/utils.py @@ -0,0 +1,109 @@ +import uuid +from django.utils.crypto import get_random_string +from zentral.contrib.inventory.models import EnrollmentSecret, MetaBusinessUnit +from zentral.contrib.santa.models import Configuration, EnrolledMachine, Enrollment, Rule, Target, TargetCounter + + +# rule identifiers + + +def new_cdhash(): + return get_random_string(length=40, allowed_chars='abcdef0123456789') + + +def new_sha256(): + return get_random_string(length=64, allowed_chars='abcdef0123456789') + + +def new_team_id(): + return get_random_string(10, allowed_chars="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ") + + +def new_signing_id_identifier(): + return ":".join((new_team_id(), get_random_string(10, allowed_chars="abcdefghij"))) + + +# configuration + + +def force_configuration(lockdown=False): + if lockdown: + client_mode = Configuration.LOCKDOWN_MODE + else: + client_mode = Configuration.MONITOR_MODE + return Configuration.objects.create(name=get_random_string(12), client_mode=client_mode) + + +# enrolled machine + + +def force_enrolled_machine( + mbu=None, configuration=None, + lockdown=False, + santa_version="2024.5", +): + if mbu is None: + mbu = MetaBusinessUnit.objects.create(name=get_random_string(64)) + if configuration is None: + configuration = force_configuration() + enrollment_secret = EnrollmentSecret.objects.create(meta_business_unit=mbu) + enrollment = Enrollment.objects.create(configuration=configuration, secret=enrollment_secret) + return EnrolledMachine.objects.create( + enrollment=enrollment, + hardware_uuid=uuid.uuid4(), + serial_number=get_random_string(10), + client_mode=Configuration.LOCKDOWN_MODE if lockdown else Configuration.MONITOR_MODE, + santa_version=santa_version, + ) + + +# target + + +def force_target(type=Target.SIGNING_ID, identifier=None): + if identifier is None: + if type == Target.CDHASH: + identifier = new_cdhash() + if type == Target.TEAM_ID: + identifier = new_team_id() + elif type == Target.SIGNING_ID: + identifier = new_signing_id_identifier() + else: + identifier = new_sha256() + return Target.objects.create(type=type, identifier=identifier) + + +# target counter + + +def force_target_counter(target_type, blocked_count=0, collected_count=0, executed_count=0, is_rule=False): + configuration = force_configuration() + target = force_target(target_type) + if is_rule: + Rule.objects.create( + configuration=configuration, + target=target, + policy=Rule.BLOCKLIST, + ) + return TargetCounter.objects.create( + configuration=configuration, + target=target, + blocked_count=blocked_count, + collected_count=collected_count, + executed_count=executed_count, + ) + + +# rule + + +def force_rule( + target_type=Target.SIGNING_ID, + target_identifier=None, + configuration=None, + policy=Rule.BLOCKLIST, +): + target = force_target(target_type, target_identifier) + if configuration is None: + configuration = force_configuration() + return Rule.objects.create(configuration=configuration, target=target, policy=policy) diff --git a/zentral/contrib/santa/metrics_views.py b/zentral/contrib/santa/metrics_views.py index 399e71a103..9a881a3c2c 100644 --- a/zentral/contrib/santa/metrics_views.py +++ b/zentral/contrib/santa/metrics_views.py @@ -2,7 +2,7 @@ from django.db import connection from prometheus_client import Gauge from zentral.utils.prometheus import BasePrometheusMetricsView -from .models import Configuration, Rule, Target +from .models import Configuration, translate_rule_policy logger = logging.getLogger("zentral.contrib.santa.metrics_views") @@ -12,96 +12,107 @@ class MetricsView(BasePrometheusMetricsView): @staticmethod def _add_mode_to_labels(mode, labels): if mode == Configuration.MONITOR_MODE: - labels["mode"] = "monitor" + labels["mode"] = "MONITOR" elif mode == Configuration.LOCKDOWN_MODE: - labels["mode"] = "lockdown" + labels["mode"] = "LOCKDOWN" else: logger.warning("Unknown santa configuration mode: %s", mode) return False return True - def add_configurations_gauge(self): - g = Gauge('zentral_santa_configurations', 'Zentral Santa Configurations', - ['mode'], registry=self.registry) - query = ( - "select client_mode, count(*) " - "from santa_configuration " - "group by client_mode" - ) - cursor = connection.cursor() - cursor.execute(query) - for mode, count in cursor.fetchall(): - labels = {} - if not self._add_mode_to_labels(mode, labels): - continue - g.labels(**labels).set(count) + def add_configurations_info(self): + g = Gauge('zentral_santa_configurations_info', 'Zentral Santa configuration info', + ['pk', 'name', 'mode'], registry=self.registry) + query = "select id, name, client_mode from santa_configuration;" + with connection.cursor() as cursor: + cursor.execute(query) + for cfg_pk, name, mode in cursor.fetchall(): + labels = { + "pk": cfg_pk, + "name": name, + } + if not self._add_mode_to_labels(mode, labels): + continue + g.labels(**labels).set(1) def add_enrolled_machines_gauge(self): - g = Gauge('zentral_santa_enrolled_machines', 'Zentral Santa Enrolled Machines', - ['configuration', 'mode', 'santa_version'], registry=self.registry) + g = Gauge('zentral_santa_enrolled_machines_total', 'Zentral Santa Enrolled Machines', + ['cfg_pk', 'mode', 'santa_version'], registry=self.registry) query = ( - "select c.name, m.client_mode, m.santa_version, count(*) " + "select e.configuration_id, m.client_mode, m.santa_version, count(*) " "from santa_enrolledmachine as m " "join santa_enrollment as e on (m.enrollment_id = e.id) " - "join santa_configuration as c on (e.configuration_id = c.id) " - "group by c.name, m.client_mode, m.santa_version" + "group by e.configuration_id, m.client_mode, m.santa_version" ) - cursor = connection.cursor() - cursor.execute(query) - for configuration, mode, santa_version, count in cursor.fetchall(): - labels = {"configuration": configuration, - "santa_version": santa_version} - if not self._add_mode_to_labels(mode, labels): - continue - g.labels(**labels).set(count) + with connection.cursor() as cursor: + cursor.execute(query) + for cfg_pk, mode, santa_version, count in cursor.fetchall(): + labels = {"cfg_pk": cfg_pk, + "santa_version": santa_version} + if not self._add_mode_to_labels(mode, labels): + continue + g.labels(**labels).set(count) def add_rules_gauge(self): - g = Gauge('zentral_santa_rules', 'Zentral Santa Rules', - ['configuration', 'ruleset', 'target_type', 'policy'], registry=self.registry) + g = Gauge('zentral_santa_rules_total', 'Zentral Santa Rules', + ['cfg_pk', 'ruleset', 'target_type', 'policy'], registry=self.registry) query = ( - "select c.name, s.name, t.type, r.policy, count(*) " + "select r.configuration_id, s.name, t.type, r.policy, count(*) " "from santa_rule as r " - "join santa_configuration as c on (r.configuration_id = c.id) " "left join santa_ruleset as s on (r.ruleset_id = s.id) " "join santa_target as t on (r.target_id = t.id) " - "group by c.name, s.name, t.type, r.policy" + "group by r.configuration_id, s.name, t.type, r.policy" + ) + with connection.cursor() as cursor: + cursor.execute(query) + for cfg_pk, ruleset, target_type, policy, count in cursor.fetchall(): + try: + policy_label = translate_rule_policy(policy) + except ValueError: + logger.error("Unknown rule policy: %s", policy) + continue + g.labels( + cfg_pk=cfg_pk, + ruleset=ruleset if ruleset else "_", + target_type=target_type, + policy=policy_label, + ).set(count) + + def add_targets_gauges(self): + totals = ("total", "blocked_total", "executed_total", "collected_total", "rules_total") + gauges = {} + for total in totals: + total_for_display = " ".join(w.title() for w in total.split("_") if w != "total") + gauges[total] = Gauge( + f'zentral_santa_targets_{total}', + f'Zentral Santa Targets {total_for_display}'.strip(), + ["cfg_pk", "type"], + registry=self.registry, + ) + query = ( + 'select tc.configuration_id, t.type, count(*) total,' + 'sum(tc.blocked_count) blocked_total,' + 'sum(tc.executed_count) executed_total,' + 'sum(tc.collected_count) collected_total,' + 'sum(case when r.id is null then 0 else 1 end) rules_total ' + 'from santa_targetcounter tc ' + 'join santa_target t on (t.id = tc.target_id) ' + 'left join santa_rule r on (r.target_id = tc.target_id and r.configuration_id = tc.configuration_id) ' + 'group by tc.configuration_id, t.type;' ) - cursor = connection.cursor() - cursor.execute(query) - for configuration, ruleset, target_type, policy, count in cursor.fetchall(): - labels = {"configuration": configuration, - "ruleset": ruleset if ruleset else "_"} - # target type - if target_type == Target.BINARY: - labels["target_type"] = "binary" - elif target_type == Target.BUNDLE: - labels["target_type"] = "bundle" - elif target_type == Target.CDHASH: - labels["target_type"] = "cdhash" - elif target_type == Target.CERTIFICATE: - labels["target_type"] = "certificate" - elif target_type == Target.TEAM_ID: - labels["target_type"] = "teamid" - elif target_type == Target.SIGNING_ID: - labels["target_type"] = "signingid" - else: - logging.warning("Unknown target type: %s", target_type) - continue - # policy - if policy == Rule.ALLOWLIST: - labels["policy"] = "allowlist" - elif policy == Rule.BLOCKLIST: - labels["policy"] = "blocklist" - elif policy == Rule.SILENT_BLOCKLIST: - labels["policy"] = "silent blocklist" - elif policy == Rule.ALLOWLIST_COMPILER: - labels["policy"] = "allowlist compiler" - else: - logging.warning("Unknown rule policy: %s", policy) - continue - g.labels(**labels).set(count) + with connection.cursor() as cursor: + cursor.execute(query) + columns = [c.name for c in cursor.description] + for result in cursor.fetchall(): + result_d = dict(zip(columns, result)) + for total in totals: + gauges[total].labels( + cfg_pk=result_d["configuration_id"], + type=result_d["type"] + ).set(result_d[total]) def populate_registry(self): - self.add_configurations_gauge() + self.add_configurations_info() self.add_enrolled_machines_gauge() self.add_rules_gauge() + self.add_targets_gauges()