Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: table init + add bonus purl2cpe init #4241

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 49 additions & 120 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ class CVEDB:
""",
}

# This is mostly to make bandit happier because we won't be
# executing compound strings.
TABLE_DROP = {
"cve_severity": "DROP TABLE cve_severity",
"cve_range": "DROP TABLE cve_range",
"cve_exploited": "DROP TABLE cve_exploited",
"cve_metrics": "DROP TABLE cve_metrics",
"metrics": "DROP TABLE metrics",
"mismatch": "DROP TABLE mismatch",
"purl2cpe": "DROP TABLE purl2cpe",
}

INDEXES = {
"range": "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)",
"purl": "CREATE INDEX IF NOT EXISTS purl_index ON mismatch (purl)",
}

EMPTY_SELECT_QUERIES = {
"cve_severity": "SELECT * FROM cve_severity WHERE 1=0",
"cve_range": "SELECT * FROM cve_range WHERE 1=0",
Expand Down Expand Up @@ -301,25 +318,14 @@ def get_cvelist_if_stale(self) -> None:
self.LOGGER.info(
"Using cached CVE data (<24h old). Use -u now to update immediately."
)
(
severity_schema,
range_schema,
exploit_schema,
# cve_metrics_schema,
# metrics_schema,
) = (
self.TABLE_SCHEMAS["cve_severity"],
self.TABLE_SCHEMAS["cve_range"],
self.TABLE_SCHEMAS["cve_exploited"],
# self.TABLE_SCHEMAS["cve_metrics"],
# self.TABLE_SCHEMAS["metrics"],
)
if (
not self.latest_schema("cve_severity", severity_schema)
or not self.latest_schema("cve_range", range_schema)
or not self.latest_schema("cve_exploited", exploit_schema)
# or not self.latest_schema("cve_metrics",cve_metrics_schema)
# or not self.latest_schema("metrics",metrics_schema)
not self.latest_schema(
"cve_severity", self.TABLE_SCHEMAS["cve_severity"]
)
or not self.latest_schema("cve_range", self.TABLE_SCHEMAS["cve_range"])
or not self.latest_schema(
"cve_exploited", self.TABLE_SCHEMAS["cve_exploited"]
)
):
self.refresh_cache_and_update_db()
self.time_of_last_update = datetime.datetime.today()
Expand Down Expand Up @@ -399,93 +405,24 @@ def init_database(self) -> None:
"""Initialize db tables used for storing cve/version data."""

cursor = self.db_open_and_get_cursor()
(
cve_data_create,
version_range_create,
exploit_table_create,
cve_metrics_table_create,
metrics_table_create,
mismatch,
purl2cpe,
) = (
self.TABLE_SCHEMAS["cve_severity"],
self.TABLE_SCHEMAS["cve_range"],
self.TABLE_SCHEMAS["cve_exploited"],
self.TABLE_SCHEMAS["cve_metrics"],
self.TABLE_SCHEMAS["metrics"],
self.TABLE_SCHEMAS["mismatch"],
self.TABLE_SCHEMAS["purl2cpe"],
)
index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)"
index_purl = "CREATE INDEX IF NOT EXISTS purl_index ON mismatch (purl)"
cursor.execute(cve_data_create)
cursor.execute(version_range_create)
cursor.execute(exploit_table_create)
cursor.execute(cve_metrics_table_create)
cursor.execute(metrics_table_create)
cursor.execute(mismatch)
cursor.execute(purl2cpe)
cursor.execute(index_range)
cursor.execute(index_purl)

(
severity_schema,
range_schema,
exploit_schema,
cve_metrics_schema,
metrics_schema,
) = (
self.TABLE_SCHEMAS["cve_severity"],
self.TABLE_SCHEMAS["cve_range"],
self.TABLE_SCHEMAS["cve_exploited"],
self.TABLE_SCHEMAS["cve_metrics"],
self.TABLE_SCHEMAS["metrics"],
)
# Check schema on cve_severity
if not self.latest_schema("cve_severity", severity_schema, cursor):
# Recreate table using latest schema
self.LOGGER.info("Upgrading cve_severity data. This may take some time.")
self.LOGGER.info(
"If this step hangs, try using `-u now` to get a fresh db."
)
cursor.execute("DROP TABLE cve_severity")
cursor.execute(cve_data_create)

# Check schema on cve_range
if not self.latest_schema("cve_range", range_schema, cursor):
self.LOGGER.info("Upgrading cve_range data. This may take some time.")
self.LOGGER.info(
"If this step hangs, try using `-u now` to get a fresh db."
)
cursor.execute("DROP TABLE cve_range")
cursor.execute(version_range_create)

# Check schema on cve_exploits
if not self.latest_schema("cve_exploited", exploit_schema, cursor):
self.LOGGER.info("Upgrading cve_exploited data. This may take some time.")
self.LOGGER.info(
"If this step hangs, try using `-u now` to get a fresh db."
)
cursor.execute("DROP TABLE cve_exploited")
cursor.execute(exploit_table_create)
# Create all tables from latest schemas
for table in self.TABLE_SCHEMAS:
cursor.execute(self.TABLE_SCHEMAS[table])

# Check schema on cve_metrics
if not self.latest_schema("cve_metrics", cve_metrics_schema, cursor):
self.LOGGER.info("Upgrading cve_metrics data. This may take some time.")
self.LOGGER.info(
"If this step hangs, try using `-u now` to get a fresh db."
)
cursor.execute("DROP TABLE cve_metrics")
cursor.execute(cve_metrics_table_create)
# add indexes
for index in self.INDEXES:
cursor.execute(self.INDEXES[index])

# Check schema on metrics
if not self.latest_schema("metrics", metrics_schema, cursor):
self.LOGGER.info("Upgrading metrics data. This may take some time.")
self.LOGGER.info(
"If this step hangs, try using `-u now` to get a fresh db."
)
cursor.execute("DROP TABLE metrics")
cursor.execute(metrics_table_create)
# Check schemas
for table in self.TABLE_SCHEMAS:
if not self.latest_schema(table, self.TABLE_SCHEMAS[table], cursor):
self.LOGGER.info(f"Upgrading {table} data. This may take some time.")
self.LOGGER.info(
"If this step hangs, try using `-u now` to get a fresh db."
)
cursor.execute(self.TABLE_DROP[table])
cursor.execute(self.TABLE_SCHEMAS[table])

if self.connection is not None:
self.connection.commit()
Expand All @@ -501,6 +438,11 @@ def populate_purl2cpe(self):
cve_conn = sqlite3.connect(self.dbpath)
cve_cursor = cve_conn.cursor()

# we are occasionally seeing an error where the cache doesn't have
# purl2cpe and thus we get an error, so attempt to initalize here
cve_cursor.execute(self.TABLE_SCHEMAS["purl2cpe"])
cve_cursor.execute(self.INDEXES["purl"])

Comment on lines +441 to +445
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I proposed a change in commit 622efe8. With this change, we should no longer face this issue. So, it's not necessary, but I don't know we can have it as a added security?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try your change out first and see how it goes! As in, I'll plan to merge your script before this PR gets completed.

I think I still want the rest of the refactoring in here, though, so I'll continue to work to get that behaving rather than closing this PR. I'll decide later if we need to take out these two lines or (since I don't think they'll add much overhead) if they should just say in as a backup safety check.

purl2cpe_cursor.execute("SELECT purl, cpe FROM purl2cpe")

insert_query = """
Expand Down Expand Up @@ -1120,25 +1062,12 @@ def json_to_db_wrapper(self, path, pubkey, ignore_signature, log_signature_error
self.cachedir.mkdir(parents=True)

cursor = self.db_open_and_get_cursor()
(
cve_data_create,
version_range_create,
exploit_table_create,
cve_metrics_create,
metrics_create,
) = (
self.TABLE_SCHEMAS["cve_severity"],
self.TABLE_SCHEMAS["cve_range"],
self.TABLE_SCHEMAS["cve_exploited"],
self.TABLE_SCHEMAS["cve_metrics"],
self.TABLE_SCHEMAS["metrics"],
)
cursor.execute(self.TABLE_SCHEMAS["cve_severity"])
cursor.execute(self.TABLE_SCHEMAS["cve_range"])
cursor.execute(self.TABLE_SCHEMAS["cve_exploited"])
cursor.execute(self.TABLE_SCHEMAS["cve_metrics"])
cursor.execute(self.TABLE_SCHEMAS["metrics"])
index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)"
cursor.execute(cve_data_create)
cursor.execute(version_range_create)
cursor.execute(exploit_table_create)
cursor.execute(cve_metrics_create)
cursor.execute(metrics_create)
cursor.execute(index_range)
metadata_fd = open(path / "metadata.json")
metadata = json.loads(metadata_fd.read())
Expand Down
Loading