diff --git a/cve_bin_tool/parsers/__init__.py b/cve_bin_tool/parsers/__init__.py index 21b223a486..59c4e05551 100644 --- a/cve_bin_tool/parsers/__init__.py +++ b/cve_bin_tool/parsers/__init__.py @@ -15,6 +15,7 @@ "swift", "php", "perl", + "rpm" ] diff --git a/cve_bin_tool/parsers/parse.py b/cve_bin_tool/parsers/parse.py index 032fdcf30a..e10857b01a 100644 --- a/cve_bin_tool/parsers/parse.py +++ b/cve_bin_tool/parsers/parse.py @@ -11,6 +11,7 @@ from cve_bin_tool.parsers.ruby import RubyParser from cve_bin_tool.parsers.rust import RustParser from cve_bin_tool.parsers.swift import SwiftParser +from cve_bin_tool.parsers.rpm import RpmParser valid_files = { "pom.xml": JavaParser, @@ -25,6 +26,7 @@ "Package.resolved": SwiftParser, "composer.lock": PhpParser, "cpanfile": PerlParser, + ".rpm:": RpmParser } diff --git a/cve_bin_tool/parsers/rpm.py b/cve_bin_tool/parsers/rpm.py new file mode 100644 index 0000000000..f3e48291c6 --- /dev/null +++ b/cve_bin_tool/parsers/rpm.py @@ -0,0 +1,203 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: GPL-3.0-or-later + +from enum import Enum, IntEnum +import io + + +from cve_bin_tool.parsers import Parser +from cve_bin_tool.util import ProductInfo, ScanInfo + +class RpmParser(Parser): + # more details about rpm structure can be found here: + # https://rpm-software-management.github.io/rpm/manual/format.html + class Type(IntEnum): + NULL = 0 + CHAR = 1 + INT8 = 2 + INT16 = 3 + INT32 = 4 + INT64 = 5 + STRING = 6 + BIN = 7 + STRING_ARRAY = 8 + I18NSTRING_TYPE = 9 + + class Tag(IntEnum): + RPMTAG_NAME = 1000 + RPMTAG_VERSION = 1001 + + TAGS_TO_PARSE = [Tag.RPMTAG_NAME, Tag.RPMTAG_VERSION] + + RPM_LEAD_MAGIC=b"\xed\xab\xee\xdb" + RPM_HEADER_MAGIC=b"\x8e\xad\xe8" + RPM_LEAD_LEN = 96 + RPM_LEAD_NAME_OFFSET = 10 + RPM_LEAD_NAME_LEN = 66 + RPM_HEADER_LEN = 16 + RPM_HEADER_INDEX_LEN = 16 + + def __init__(self, cve_db, logger, validate=True): + super().__init__(cve_db, logger) + self.validate = validate + + def validate_rpm(self, filename): + with open(filename, "rb") as rpm: + rpm_lead_magic = rpm.read(len(self.RPM_LEAD_MAGIC)) + if self.RPM_LEAD_MAGIC == rpm_lead_magic: + return True + return False + + def get_rpm_entry(self, rpm, rpm_size, base_offset, entry_type, offset, count): + if rpm_size < (base_offset + offset + count): + self.logger.error(f"{self.filename} - entry corrupted") + return None + rpm.seek(base_offset + offset) + data = b"" + rpm_entry = None + if entry_type == self.Type.STRING: + # string can only have count 1 + char = rpm.read(1) + while char != b"\x00": + data += char + char = rpm.read(1) + try: + rpm_entry = data.rstrip(b"\x00").decode("ascii") + except UnicodeError: + self.logger.error(f"{self.filename} - {data} - invalid string in rpm with nonascii characters at offset 0x{base_offset+offset:X}") + else: + # unsupported - if more info is needed feel free to add parsing here + # at the moment all the data that is extracted is string + pass + return rpm_entry + + def extract_info(self): + # File structure is as follows: + # Lead + # Signature + # Header + # Payload + + with open(self.filename, "rb") as rpm: + rpm.seek(0, io.SEEK_END) + rpm_size = rpm.tell() + rpm.seek(0) + + # Lead + rpm_lead = rpm.read(self.RPM_LEAD_LEN) + if len(rpm_lead) != self.RPM_LEAD_LEN: + # file corrupted + self.logger.error(f"{self.filename} - file is too short, possibly corrupted") + return None + name_bytes = rpm_lead[self.RPM_LEAD_NAME_OFFSET:self.RPM_LEAD_NAME_LEN+1] + try: + self.name = name_bytes.rstrip(b"\x00").decode("ascii") + except UnicodeError: + self.logger.error(f"{self.filename} - invalid name in rpm with nonascii characters") + return None + + self.logger.debug(f"{self.filename} - RPM Lead OK") + self.logger.debug(f"{self.filename} - {self.name}") + + # Signature / Header + # 3 bytes magic + # 1 byte version + # 4 bytes reserved + # 4 bytes number of index entries + # 4 bytes data size + # n i* 16 index entries + + # Signature and header have the same structure + header = rpm.read(self.RPM_HEADER_LEN) + if len(header) != self.RPM_HEADER_LEN: + self.logger.error(f"{self.filename} - file is too short, possibly corrupted") + return None + + if header[0:3] != self.RPM_HEADER_MAGIC: + self.logger.error(f"{self.filename} - corrupted RPM signature header") + return None + + entries = int.from_bytes(header[8:12], byteorder="big") + data_size = int.from_bytes(header[12:16], byteorder="big") + self.logger.debug(f"signature index entries: {entries}") + + # skip signature indexes and data + target_offset = rpm.tell() + (entries * self.RPM_HEADER_INDEX_LEN + data_size) + # Header is aligned to 8-byte boundary + if target_offset % 8: + target_offset = target_offset - (target_offset % 8) + 8 + + if target_offset > rpm_size: + self.logger.error(f"{self.filename} - corrupted RPM") + return None + + rpm.seek(target_offset) + + # Header + header = rpm.read(self.RPM_HEADER_LEN) + if len(header) != self.RPM_HEADER_LEN: + self.logger.error(f"{self.filename} - file is too short, possibly corrupted") + return None + + if header[0:3] != self.RPM_HEADER_MAGIC: + self.logger.error(f"{self.filename} - corrupted RPM header - {header}") + return None + + entries = int.from_bytes(header[8:12], byteorder="big") + data_size = int.from_bytes(header[12:16], byteorder="big") + self.logger.debug(f"header index entries: {entries}") + + header_entries_offset = rpm.tell() + target_offset = rpm.tell() + (entries * self.RPM_HEADER_INDEX_LEN + data_size) + # Header is aligned to 8-byte boundary + if target_offset % 8: + target_offset = target_offset - (target_offset % 8) + 8 + + if target_offset > rpm_size: + self.logger.error(f"{self.filename} - corrupted RPM") + return None + + # Index Entry + # 4 bytes Tag + # 4 bytes Type + # 4 bytes Offset + # 4 bytes Count + # Parse through index entries + data_offset = header_entries_offset + (entries * self.RPM_HEADER_INDEX_LEN) + rpm_info = {} + entries_tags = self.TAGS_TO_PARSE.copy() + for i in range(0, entries): + entry_raw = rpm.read(self.RPM_HEADER_INDEX_LEN) + entry_tag = int.from_bytes(entry_raw[0:4], byteorder="big") + entry_type = self.Type(int.from_bytes(entry_raw[4:8], byteorder="big")) + entry_offset = int.from_bytes(entry_raw[8:12], byteorder="big") + entry_count = int.from_bytes(entry_raw[12:16], byteorder="big") + + if entry_tag in entries_tags: + entries_tags.remove(entry_tag) + restore_offset = rpm.tell() + rpm_entry = self.get_rpm_entry(rpm, rpm_size, data_offset, entry_type, entry_offset, entry_count) + rpm.seek(restore_offset) + self.logger.debug(f"{entry_tag} - {entry_type} - {entry_offset} - {entry_count} - data: {rpm_entry}") + rpm_info[entry_tag] = rpm_entry + if not entries_tags: + # we got all the info we need + break + + self.logger.debug(f"{rpm_info}") + return rpm_info + + def run_checker(self, filename): + """Process RPM file and extract product""" + self.filename = filename + continue_processing = True + if self.validate: + continue_processing = self.validate_rpm(self.filename) + self.logger.debug(f"Validation of {filename} - {continue_processing}") + if continue_processing: + rpm_info = self.extract_info() + if rpm_info: + product_info = self.find_vendor(rpm_info.get(self.Tag.RPMTAG_NAME), rpm_info.get(self.Tag.RPMTAG_VERSION)) + if product_info is not None: + yield from product_info + self.logger.debug(f"Done scanning file: {filename}")