diff --git a/cve_bin_tool/cvedb.py b/cve_bin_tool/cvedb.py index a5053dff02..6c3db7dd0e 100644 --- a/cve_bin_tool/cvedb.py +++ b/cve_bin_tool/cvedb.py @@ -133,6 +133,23 @@ class CVEDB: """, } + # This is mostly to make bandit happier because we won't be + # executing compound strings. + TABLE_DROP = { + "cve_severity": "DROP TABLE cve_severity", + "cve_range": "DROP TABLE cve_range", + "cve_exploited": "DROP TABLE cve_exploited", + "cve_metrics": "DROP TABLE cve_metrics", + "metrics": "DROP TABLE metrics", + "mismatch": "DROP TABLE mismatch", + "purl2cpe": "DROP TABLE purl2cpe", + } + + INDEXES = { + "range": "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)", + "purl": "CREATE INDEX IF NOT EXISTS purl_index ON mismatch (purl)", + } + EMPTY_SELECT_QUERIES = { "cve_severity": "SELECT * FROM cve_severity WHERE 1=0", "cve_range": "SELECT * FROM cve_range WHERE 1=0", @@ -301,25 +318,14 @@ def get_cvelist_if_stale(self) -> None: self.LOGGER.info( "Using cached CVE data (<24h old). Use -u now to update immediately." ) - ( - severity_schema, - range_schema, - exploit_schema, - # cve_metrics_schema, - # metrics_schema, - ) = ( - self.TABLE_SCHEMAS["cve_severity"], - self.TABLE_SCHEMAS["cve_range"], - self.TABLE_SCHEMAS["cve_exploited"], - # self.TABLE_SCHEMAS["cve_metrics"], - # self.TABLE_SCHEMAS["metrics"], - ) if ( - not self.latest_schema("cve_severity", severity_schema) - or not self.latest_schema("cve_range", range_schema) - or not self.latest_schema("cve_exploited", exploit_schema) - # or not self.latest_schema("cve_metrics",cve_metrics_schema) - # or not self.latest_schema("metrics",metrics_schema) + not self.latest_schema( + "cve_severity", self.TABLE_SCHEMAS["cve_severity"] + ) + or not self.latest_schema("cve_range", self.TABLE_SCHEMAS["cve_range"]) + or not self.latest_schema( + "cve_exploited", self.TABLE_SCHEMAS["cve_exploited"] + ) ): self.refresh_cache_and_update_db() self.time_of_last_update = datetime.datetime.today() @@ -399,93 +405,24 @@ def init_database(self) -> None: """Initialize db tables used for storing cve/version data.""" cursor = self.db_open_and_get_cursor() - ( - cve_data_create, - version_range_create, - exploit_table_create, - cve_metrics_table_create, - metrics_table_create, - mismatch, - purl2cpe, - ) = ( - self.TABLE_SCHEMAS["cve_severity"], - self.TABLE_SCHEMAS["cve_range"], - self.TABLE_SCHEMAS["cve_exploited"], - self.TABLE_SCHEMAS["cve_metrics"], - self.TABLE_SCHEMAS["metrics"], - self.TABLE_SCHEMAS["mismatch"], - self.TABLE_SCHEMAS["purl2cpe"], - ) - index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)" - index_purl = "CREATE INDEX IF NOT EXISTS purl_index ON mismatch (purl)" - cursor.execute(cve_data_create) - cursor.execute(version_range_create) - cursor.execute(exploit_table_create) - cursor.execute(cve_metrics_table_create) - cursor.execute(metrics_table_create) - cursor.execute(mismatch) - cursor.execute(purl2cpe) - cursor.execute(index_range) - cursor.execute(index_purl) - - ( - severity_schema, - range_schema, - exploit_schema, - cve_metrics_schema, - metrics_schema, - ) = ( - self.TABLE_SCHEMAS["cve_severity"], - self.TABLE_SCHEMAS["cve_range"], - self.TABLE_SCHEMAS["cve_exploited"], - self.TABLE_SCHEMAS["cve_metrics"], - self.TABLE_SCHEMAS["metrics"], - ) - # Check schema on cve_severity - if not self.latest_schema("cve_severity", severity_schema, cursor): - # Recreate table using latest schema - self.LOGGER.info("Upgrading cve_severity data. This may take some time.") - self.LOGGER.info( - "If this step hangs, try using `-u now` to get a fresh db." - ) - cursor.execute("DROP TABLE cve_severity") - cursor.execute(cve_data_create) - # Check schema on cve_range - if not self.latest_schema("cve_range", range_schema, cursor): - self.LOGGER.info("Upgrading cve_range data. This may take some time.") - self.LOGGER.info( - "If this step hangs, try using `-u now` to get a fresh db." - ) - cursor.execute("DROP TABLE cve_range") - cursor.execute(version_range_create) - - # Check schema on cve_exploits - if not self.latest_schema("cve_exploited", exploit_schema, cursor): - self.LOGGER.info("Upgrading cve_exploited data. This may take some time.") - self.LOGGER.info( - "If this step hangs, try using `-u now` to get a fresh db." - ) - cursor.execute("DROP TABLE cve_exploited") - cursor.execute(exploit_table_create) + # Create all tables from latest schemas + for table in self.TABLE_SCHEMAS: + cursor.execute(self.TABLE_SCHEMAS[table]) - # Check schema on cve_metrics - if not self.latest_schema("cve_metrics", cve_metrics_schema, cursor): - self.LOGGER.info("Upgrading cve_metrics data. This may take some time.") - self.LOGGER.info( - "If this step hangs, try using `-u now` to get a fresh db." - ) - cursor.execute("DROP TABLE cve_metrics") - cursor.execute(cve_metrics_table_create) + # add indexes + for index in self.INDEXES: + cursor.execute(self.INDEXES[index]) - # Check schema on metrics - if not self.latest_schema("metrics", metrics_schema, cursor): - self.LOGGER.info("Upgrading metrics data. This may take some time.") - self.LOGGER.info( - "If this step hangs, try using `-u now` to get a fresh db." - ) - cursor.execute("DROP TABLE metrics") - cursor.execute(metrics_table_create) + # Check schemas + for table in self.TABLE_SCHEMAS: + if not self.latest_schema(table, self.TABLE_SCHEMAS[table], cursor): + self.LOGGER.info(f"Upgrading {table} data. This may take some time.") + self.LOGGER.info( + "If this step hangs, try using `-u now` to get a fresh db." + ) + cursor.execute(self.TABLE_DROP[table]) + cursor.execute(self.TABLE_SCHEMAS[table]) if self.connection is not None: self.connection.commit() @@ -501,6 +438,11 @@ def populate_purl2cpe(self): cve_conn = sqlite3.connect(self.dbpath) cve_cursor = cve_conn.cursor() + # we are occasionally seeing an error where the cache doesn't have + # purl2cpe and thus we get an error, so attempt to initalize here + cve_cursor.execute(self.TABLE_SCHEMAS["purl2cpe"]) + cve_cursor.execute(self.INDEXES["purl"]) + purl2cpe_cursor.execute("SELECT purl, cpe FROM purl2cpe") insert_query = """ @@ -1120,25 +1062,12 @@ def json_to_db_wrapper(self, path, pubkey, ignore_signature, log_signature_error self.cachedir.mkdir(parents=True) cursor = self.db_open_and_get_cursor() - ( - cve_data_create, - version_range_create, - exploit_table_create, - cve_metrics_create, - metrics_create, - ) = ( - self.TABLE_SCHEMAS["cve_severity"], - self.TABLE_SCHEMAS["cve_range"], - self.TABLE_SCHEMAS["cve_exploited"], - self.TABLE_SCHEMAS["cve_metrics"], - self.TABLE_SCHEMAS["metrics"], - ) + cursor.execute(self.TABLE_SCHEMAS["cve_severity"]) + cursor.execute(self.TABLE_SCHEMAS["cve_range"]) + cursor.execute(self.TABLE_SCHEMAS["cve_exploited"]) + cursor.execute(self.TABLE_SCHEMAS["cve_metrics"]) + cursor.execute(self.TABLE_SCHEMAS["metrics"]) index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)" - cursor.execute(cve_data_create) - cursor.execute(version_range_create) - cursor.execute(exploit_table_create) - cursor.execute(cve_metrics_create) - cursor.execute(metrics_create) cursor.execute(index_range) metadata_fd = open(path / "metadata.json") metadata = json.loads(metadata_fd.read())