From 6ae82637182f3b307ef7e5a628bfb50962ffd211 Mon Sep 17 00:00:00 2001 From: Fabrice Fontaine Date: Fri, 6 Jan 2023 17:18:57 +0100 Subject: [PATCH] feat: add hyperscan support hyperscan will run simultaneously all version checkers on a file which reduce processing time. pyperscan package is used instead of the most well-known hyperscan package as pyperscan allows to add a tag for each pattern. This feature will allow to retrieve easily the checker associated to the matched pattern. Fix #2485 Signed-off-by: Fabrice Fontaine --- cve_bin_tool/checkers/__init__.py | 36 +++++++++++--------- cve_bin_tool/version_scanner.py | 56 ++++++++++++++++++++++++++++--- requirements.csv | 1 + requirements.txt | 1 + test/test_checkers.py | 6 ++-- 5 files changed, 78 insertions(+), 22 deletions(-) diff --git a/cve_bin_tool/checkers/__init__.py b/cve_bin_tool/checkers/__init__.py index 352f28841b..c8631d177c 100644 --- a/cve_bin_tool/checkers/__init__.py +++ b/cve_bin_tool/checkers/__init__.py @@ -393,10 +393,11 @@ def __new__(cls, name, bases, props): else: cls.IGNORE_PATTERNS = list(map(re.compile, cls.IGNORE_PATTERNS)) # Compile regex - cls.CONTAINS_PATTERNS = list(map(re.compile, cls.CONTAINS_PATTERNS)) - cls.VERSION_PATTERNS = list(map(re.compile, cls.VERSION_PATTERNS)) - cls.FILENAME_PATTERNS = list(map(re.compile, cls.FILENAME_PATTERNS)) - cls.CONTAINS_PATTERNS.extend(cls.VERSION_PATTERNS) + cls.REGEX_CONTAINS_PATTERNS = list(map(re.compile, cls.CONTAINS_PATTERNS)) + cls.REGEX_VERSION_PATTERNS = list(map(re.compile, cls.VERSION_PATTERNS)) + cls.REGEX_FILENAME_PATTERNS = list(map(re.compile, cls.FILENAME_PATTERNS)) + cls.REGEX_CONTAINS_PATTERNS.extend(cls.REGEX_VERSION_PATTERNS) + cls.version_info = dict() # Return the new checker class return cls @@ -405,26 +406,31 @@ class Checker(metaclass=CheckerMetaClass): CONTAINS_PATTERNS: list[str] = [] VERSION_PATTERNS: list[str] = [] FILENAME_PATTERNS: list[str] = [] + REGEX_CONTAINS_PATTERNS: list[str] = [] + REGEX_VERSION_PATTERNS: list[str] = [] + REGEX_FILENAME_PATTERNS: list[str] = [] VENDOR_PRODUCT: list[tuple[str, str]] = [] IGNORE_PATTERNS: list[str] = [] def guess_contains(self, lines): - if any(pattern.search(lines) for pattern in self.CONTAINS_PATTERNS): + if any(pattern.search(lines) for pattern in self.REGEX_CONTAINS_PATTERNS): return True return False def get_version(self, lines, filename): - version_info = dict() + if any(pattern.match(filename) for pattern in self.REGEX_FILENAME_PATTERNS): + self.version_info["is_or_contains"] = "is" - if any(pattern.match(filename) for pattern in self.FILENAME_PATTERNS): - version_info["is_or_contains"] = "is" + if "is_or_contains" not in self.version_info and self.guess_contains(lines): + self.version_info["is_or_contains"] = "contains" - if "is_or_contains" not in version_info and self.guess_contains(lines): - version_info["is_or_contains"] = "contains" - - if "is_or_contains" in version_info: - version_info["version"] = regex_find( - lines, self.VERSION_PATTERNS, self.IGNORE_PATTERNS + if "is_or_contains" in self.version_info: + version = regex_find( + lines, self.REGEX_VERSION_PATTERNS, self.IGNORE_PATTERNS ) - return version_info + # Don't override a "correct" version with UNKNOWN + if "version" not in self.version_info or version != "UNKNOWN": + self.version_info["version"] = version + + return self.version_info diff --git a/cve_bin_tool/version_scanner.py b/cve_bin_tool/version_scanner.py index 2b6e01661b..d8fbaa6ad6 100644 --- a/cve_bin_tool/version_scanner.py +++ b/cve_bin_tool/version_scanner.py @@ -8,6 +8,9 @@ from pathlib import Path, PurePath from typing import Iterator +import attr +from pyperscan import Pattern, Scan, StreamDatabase + from cve_bin_tool.checkers import Checker from cve_bin_tool.cvedb import CVEDB from cve_bin_tool.egg_updater import IS_DEVELOP, update_egg @@ -29,6 +32,14 @@ import importlib_resources as resources +@attr.define +class HyperscanMatchContext: + version_scanner: VersionScanner + filename: str + lines: str + task_result: dict + + class InvalidFileError(Exception): """Filepath is invalid for scanning.""" @@ -73,6 +84,7 @@ def __init__( self.validate = validate # self.logger.info("Checkers loaded: %s" % (", ".join(self.checkers.keys()))) self.language_checkers = self.available_language_checkers() + self.hyperscan_db = None @classmethod def load_checkers(cls) -> dict[str, type[Checker]]: @@ -239,15 +251,51 @@ def scan_file(self, filename: str) -> Iterator[ScanInfo]: yield from self.run_checkers(filename, lines) + def build_hyperscan_database(self, checkers: Checker) -> None: + # The database must be built only once to improve performance + if self.hyperscan_db is None: + patterns = [] + for dummy_checker_name, checker in self.checkers.items(): + checker = checker() + checker.dummy_checker_name = dummy_checker_name + for pattern in checker.VERSION_PATTERNS + checker.CONTAINS_PATTERNS: + patterns.append(Pattern(pattern.encode(), tag=checker)) + if patterns: + self.hyperscan_db = StreamDatabase(*patterns) + + @staticmethod + def hyperscan_match( + context: HyperscanMatchContext, checker: Checker, offset: int, end: int + ) -> Scan: + # Confirm hyperscan match with get_version as hyperscan doesn't support + # group capture. SOM_LEFTMOST is not enabled (offset is always 0) + result = checker.get_version(context.lines[offset:end], context.filename) + + context.task_result[checker] = result + + return Scan.Continue + def run_checkers(self, filename: str, lines: str) -> Iterator[ScanInfo]: """process a Set of checker objects, run them on file lines, and yield information about detected products and versions. It uses logging to provide debug and error information along the way.""" + self.build_hyperscan_database(self.checkers) + + task_result = dict() + hyperscan_context = HyperscanMatchContext( + version_scanner=self, + filename=filename, + lines=lines, + task_result=task_result, + ) + + if self.hyperscan_db is not None: + scanner = self.hyperscan_db.build(hyperscan_context, self.hyperscan_match) + scanner.scan(lines.encode()) - # tko - for dummy_checker_name, checker in self.checkers.items(): - checker = checker() - result = checker.get_version(lines, filename) + for checker in task_result: + result = task_result[checker] + dummy_checker_name = checker.dummy_checker_name # do some magic so we can iterate over all results, even the ones that just return 1 hit if "is_or_contains" in result: results = [dict()] diff --git a/requirements.csv b/requirements.csv index 6bb20b3008..95be2797d9 100644 --- a/requirements.csv +++ b/requirements.csv @@ -22,3 +22,4 @@ python_not_in_db,importlib_resources vsajip_not_in_db,python-gnupg anthonyharrison_not_in_db,lib4sbom the_purl_authors_not_in_db,packageurl-python +vlaci_not_in_db,pyperscan diff --git a/requirements.txt b/requirements.txt index bf7c07c84e..9836bb1985 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ python-gnupg packageurl-python packaging plotly +pyperscan pyyaml>=5.4 requests rich diff --git a/test/test_checkers.py b/test/test_checkers.py index 10ffb136ce..2a0bbb0130 100644 --- a/test/test_checkers.py +++ b/test/test_checkers.py @@ -29,9 +29,9 @@ class MyChecker(Checker): VENDOR_PRODUCT = [("myvendor", "myproduct")] IGNORE_PATTERNS = [r"ignore"] - assert type(MyChecker.CONTAINS_PATTERNS[0]) is Pattern - assert type(MyChecker.VERSION_PATTERNS[0]) is Pattern - assert type(MyChecker.FILENAME_PATTERNS[0]) is Pattern + assert type(MyChecker.REGEX_CONTAINS_PATTERNS[0]) is Pattern + assert type(MyChecker.REGEX_VERSION_PATTERNS[0]) is Pattern + assert type(MyChecker.REGEX_FILENAME_PATTERNS[0]) is Pattern assert type(MyChecker.VENDOR_PRODUCT[0]) is VendorProductPair assert type(MyChecker.IGNORE_PATTERNS[0]) is Pattern