diff --git a/cve_bin_tool/cvedb.py b/cve_bin_tool/cvedb.py index 0c529b27df..8d21eca46a 100644 --- a/cve_bin_tool/cvedb.py +++ b/cve_bin_tool/cvedb.py @@ -60,6 +60,69 @@ class CVEDB: nvd_source.NVD_Source, # last to avoid data overwrites ] + TABLE_SCHEMAS = { + "cve_severity": """ + CREATE TABLE IF NOT EXISTS cve_severity ( + cve_number TEXT, + severity TEXT, + description TEXT, + score INTEGER, + cvss_version INTEGER, + cvss_vector TEXT, + data_source TEXT, + last_modified TIMESTAMP, + PRIMARY KEY(cve_number, data_source) + ) + """, + "cve_range": """ + CREATE TABLE IF NOT EXISTS cve_range ( + cve_number TEXT, + vendor TEXT, + product TEXT, + version TEXT, + versionStartIncluding TEXT, + versionStartExcluding TEXT, + versionEndIncluding TEXT, + versionEndExcluding TEXT, + data_source TEXT, + FOREIGN KEY(cve_number, data_source) REFERENCES cve_severity(cve_number, data_source) + ) + """, + "cve_exploited": """ + CREATE TABLE IF NOT EXISTS cve_exploited ( + cve_number TEXT, + product TEXT, + description TEXT, + PRIMARY KEY(cve_number) + ) + """, + "cve_metrics": """ + CREATE TABLE IF NOT EXISTS cve_metrics ( + cve_number TEXT, + metric_id INTEGER, + metric_score REAL, + metric_field TEXT, + FOREIGN KEY(cve_number) REFERENCES cve_severity(cve_number), + FOREIGN KEY(metric_id) REFERENCES metrics(metric_id) + ) + """, + "metrics": """ + CREATE TABLE IF NOT EXISTS metrics ( + metrics_id INTEGER, + metrics_name TEXT, + PRIMARY KEY(metrics_id) + ) + """, + } + + EMPTY_SELECT_QUERIES = { + "cve_severity": "SELECT * FROM cve_severity WHERE 1=0", + "cve_range": "SELECT * FROM cve_range WHERE 1=0", + "cve_exploited": "SELECT * FROM cve_exploited WHERE 1=0", + "cve_metrics": "SELECT * FROM cve_metrics WHERE 1=0", + "metrics": "SELECT * FROM metrics WHERE 1=0", + } + INSERT_QUERIES = { "insert_severity": """ INSERT or REPLACE INTO cve_severity( @@ -226,9 +289,15 @@ def get_cvelist_if_stale(self) -> None: severity_schema, range_schema, exploit_schema, - cve_metrics_schema, - metrics_schema, - ) = self.table_schemas() + # cve_metrics_schema, + # metrics_schema, + ) = ( + self.TABLE_SCHEMAS["cve_severity"], + self.TABLE_SCHEMAS["cve_range"], + self.TABLE_SCHEMAS["cve_exploited"], + # self.TABLE_SCHEMAS["cve_metrics"], + # self.TABLE_SCHEMAS["metrics"], + ) if ( not self.latest_schema("cve_severity", severity_schema) or not self.latest_schema("cve_range", range_schema) @@ -248,7 +317,7 @@ def latest_schema( """Check database is using latest schema""" if table_name == "": # If no table specified, check cve_range (the last one changed) - _, range_schema, __, _, _ = self.table_schemas() + range_schema = self.TABLE_SCHEMAS["cve_range"] return self.latest_schema("cve_range", range_schema) self.LOGGER.debug("Check database is using latest schema") @@ -310,69 +379,6 @@ async def get_data(self): for r in await asyncio.gather(*tasks): self.data.append(r) - def table_schemas(self): - """Returns sql commands for creating cve_severity, cve_range and cve_exploited tables.""" - cve_data_create = """ - CREATE TABLE IF NOT EXISTS cve_severity ( - cve_number TEXT, - severity TEXT, - description TEXT, - score INTEGER, - cvss_version INTEGER, - cvss_vector TEXT, - data_source TEXT, - last_modified TIMESTAMP, - PRIMARY KEY(cve_number, data_source) - ) - """ - version_range_create = """ - CREATE TABLE IF NOT EXISTS cve_range ( - cve_number TEXT, - vendor TEXT, - product TEXT, - version TEXT, - versionStartIncluding TEXT, - versionStartExcluding TEXT, - versionEndIncluding TEXT, - versionEndExcluding TEXT, - data_source TEXT, - FOREIGN KEY(cve_number, data_source) REFERENCES cve_severity(cve_number, data_source) - ) - """ - exploit_table_create = """ - CREATE TABLE IF NOT EXISTS cve_exploited ( - cve_number TEXT, - product TEXT, - description TEXT, - PRIMARY KEY(cve_number) - ) - """ - cve_metrics_table = """ - CREATE TABLE IF NOT EXISTS cve_metrics ( - cve_number TEXT, - metric_id INTEGER, - metric_score REAL, - metric_field TEXT, - FOREIGN KEY(cve_number) REFERENCES cve_severity(cve_number), - FOREIGN KEY(metric_id) REFERENCES metrics(metric_id) - ) - """ - metrics_table = """ - CREATE TABLE IF NOT EXISTS metrics ( - metrics_id INTEGER, - metrics_name TEXT, - PRIMARY KEY(metrics_id) - ) - """ - - return ( - cve_data_create, - version_range_create, - exploit_table_create, - cve_metrics_table, - metrics_table, - ) - def init_database(self) -> None: """Initialize db tables used for storing cve/version data.""" @@ -383,7 +389,13 @@ def init_database(self) -> None: exploit_table_create, cve_metrics_table_create, metrics_table_create, - ) = self.table_schemas() + ) = ( + self.TABLE_SCHEMAS["cve_severity"], + self.TABLE_SCHEMAS["cve_range"], + self.TABLE_SCHEMAS["cve_exploited"], + self.TABLE_SCHEMAS["cve_metrics"], + self.TABLE_SCHEMAS["metrics"], + ) index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)" cursor.execute(cve_data_create) cursor.execute(version_range_create) @@ -398,7 +410,13 @@ def init_database(self) -> None: exploit_schema, cve_metrics_schema, metrics_schema, - ) = self.table_schemas() + ) = ( + self.TABLE_SCHEMAS["cve_severity"], + self.TABLE_SCHEMAS["cve_range"], + self.TABLE_SCHEMAS["cve_exploited"], + self.TABLE_SCHEMAS["cve_metrics"], + self.TABLE_SCHEMAS["metrics"], + ) # Check schema on cve_severity if not self.latest_schema("cve_severity", severity_schema, cursor): # Recreate table using latest schema @@ -831,7 +849,7 @@ def get_exploits_count(self) -> int: def create_exploit_db(self): """Create table of exploits in database if it does not already exist.""" cursor = self.db_open_and_get_cursor() - (_, _, create_exploit_table, _, _) = self.table_schemas() + create_exploit_table = self.TABLE_SCHEMAS["cve_exploited"] cursor = self.db_open_and_get_cursor() cursor.execute(create_exploit_table) self.connection.commit() @@ -1063,7 +1081,13 @@ def json_to_db_wrapper(self, path, pubkey, ignore_signature, log_signature_error exploit_table_create, cve_metrics_create, metrics_create, - ) = self.table_schemas() + ) = ( + self.TABLE_SCHEMAS["cve_severity"], + self.TABLE_SCHEMAS["cve_range"], + self.TABLE_SCHEMAS["cve_exploited"], + self.TABLE_SCHEMAS["cve_metrics"], + self.TABLE_SCHEMAS["metrics"], + ) index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)" cursor.execute(cve_data_create) cursor.execute(version_range_create)