Skip to content

Commit

Permalink
refactor: database schema check (#3968)
Browse files Browse the repository at this point in the history
added dictionary to be used for better table name validation. This
will help resolve bandit issues in #3965.

---------

Signed-off-by: Meet Soni <[email protected]>
  • Loading branch information
inosmeet committed Apr 10, 2024
1 parent 83e30ee commit 1794aac
Showing 1 changed file with 95 additions and 71 deletions.
166 changes: 95 additions & 71 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,69 @@ class CVEDB:
nvd_source.NVD_Source, # last to avoid data overwrites
]

TABLE_SCHEMAS = {
"cve_severity": """
CREATE TABLE IF NOT EXISTS cve_severity (
cve_number TEXT,
severity TEXT,
description TEXT,
score INTEGER,
cvss_version INTEGER,
cvss_vector TEXT,
data_source TEXT,
last_modified TIMESTAMP,
PRIMARY KEY(cve_number, data_source)
)
""",
"cve_range": """
CREATE TABLE IF NOT EXISTS cve_range (
cve_number TEXT,
vendor TEXT,
product TEXT,
version TEXT,
versionStartIncluding TEXT,
versionStartExcluding TEXT,
versionEndIncluding TEXT,
versionEndExcluding TEXT,
data_source TEXT,
FOREIGN KEY(cve_number, data_source) REFERENCES cve_severity(cve_number, data_source)
)
""",
"cve_exploited": """
CREATE TABLE IF NOT EXISTS cve_exploited (
cve_number TEXT,
product TEXT,
description TEXT,
PRIMARY KEY(cve_number)
)
""",
"cve_metrics": """
CREATE TABLE IF NOT EXISTS cve_metrics (
cve_number TEXT,
metric_id INTEGER,
metric_score REAL,
metric_field TEXT,
FOREIGN KEY(cve_number) REFERENCES cve_severity(cve_number),
FOREIGN KEY(metric_id) REFERENCES metrics(metric_id)
)
""",
"metrics": """
CREATE TABLE IF NOT EXISTS metrics (
metrics_id INTEGER,
metrics_name TEXT,
PRIMARY KEY(metrics_id)
)
""",
}

EMPTY_SELECT_QUERIES = {
"cve_severity": "SELECT * FROM cve_severity WHERE 1=0",
"cve_range": "SELECT * FROM cve_range WHERE 1=0",
"cve_exploited": "SELECT * FROM cve_exploited WHERE 1=0",
"cve_metrics": "SELECT * FROM cve_metrics WHERE 1=0",
"metrics": "SELECT * FROM metrics WHERE 1=0",
}

INSERT_QUERIES = {
"insert_severity": """
INSERT or REPLACE INTO cve_severity(
Expand Down Expand Up @@ -226,9 +289,15 @@ def get_cvelist_if_stale(self) -> None:
severity_schema,
range_schema,
exploit_schema,
cve_metrics_schema,
metrics_schema,
) = self.table_schemas()
# cve_metrics_schema,
# metrics_schema,
) = (
self.TABLE_SCHEMAS["cve_severity"],
self.TABLE_SCHEMAS["cve_range"],
self.TABLE_SCHEMAS["cve_exploited"],
# self.TABLE_SCHEMAS["cve_metrics"],
# self.TABLE_SCHEMAS["metrics"],
)
if (
not self.latest_schema("cve_severity", severity_schema)
or not self.latest_schema("cve_range", range_schema)
Expand All @@ -248,7 +317,7 @@ def latest_schema(
"""Check database is using latest schema"""
if table_name == "":
# If no table specified, check cve_range (the last one changed)
_, range_schema, __, _, _ = self.table_schemas()
range_schema = self.TABLE_SCHEMAS["cve_range"]
return self.latest_schema("cve_range", range_schema)

self.LOGGER.debug("Check database is using latest schema")
Expand Down Expand Up @@ -310,69 +379,6 @@ async def get_data(self):
for r in await asyncio.gather(*tasks):
self.data.append(r)

def table_schemas(self):
"""Returns sql commands for creating cve_severity, cve_range and cve_exploited tables."""
cve_data_create = """
CREATE TABLE IF NOT EXISTS cve_severity (
cve_number TEXT,
severity TEXT,
description TEXT,
score INTEGER,
cvss_version INTEGER,
cvss_vector TEXT,
data_source TEXT,
last_modified TIMESTAMP,
PRIMARY KEY(cve_number, data_source)
)
"""
version_range_create = """
CREATE TABLE IF NOT EXISTS cve_range (
cve_number TEXT,
vendor TEXT,
product TEXT,
version TEXT,
versionStartIncluding TEXT,
versionStartExcluding TEXT,
versionEndIncluding TEXT,
versionEndExcluding TEXT,
data_source TEXT,
FOREIGN KEY(cve_number, data_source) REFERENCES cve_severity(cve_number, data_source)
)
"""
exploit_table_create = """
CREATE TABLE IF NOT EXISTS cve_exploited (
cve_number TEXT,
product TEXT,
description TEXT,
PRIMARY KEY(cve_number)
)
"""
cve_metrics_table = """
CREATE TABLE IF NOT EXISTS cve_metrics (
cve_number TEXT,
metric_id INTEGER,
metric_score REAL,
metric_field TEXT,
FOREIGN KEY(cve_number) REFERENCES cve_severity(cve_number),
FOREIGN KEY(metric_id) REFERENCES metrics(metric_id)
)
"""
metrics_table = """
CREATE TABLE IF NOT EXISTS metrics (
metrics_id INTEGER,
metrics_name TEXT,
PRIMARY KEY(metrics_id)
)
"""

return (
cve_data_create,
version_range_create,
exploit_table_create,
cve_metrics_table,
metrics_table,
)

def init_database(self) -> None:
"""Initialize db tables used for storing cve/version data."""

Expand All @@ -383,7 +389,13 @@ def init_database(self) -> None:
exploit_table_create,
cve_metrics_table_create,
metrics_table_create,
) = self.table_schemas()
) = (
self.TABLE_SCHEMAS["cve_severity"],
self.TABLE_SCHEMAS["cve_range"],
self.TABLE_SCHEMAS["cve_exploited"],
self.TABLE_SCHEMAS["cve_metrics"],
self.TABLE_SCHEMAS["metrics"],
)
index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)"
cursor.execute(cve_data_create)
cursor.execute(version_range_create)
Expand All @@ -398,7 +410,13 @@ def init_database(self) -> None:
exploit_schema,
cve_metrics_schema,
metrics_schema,
) = self.table_schemas()
) = (
self.TABLE_SCHEMAS["cve_severity"],
self.TABLE_SCHEMAS["cve_range"],
self.TABLE_SCHEMAS["cve_exploited"],
self.TABLE_SCHEMAS["cve_metrics"],
self.TABLE_SCHEMAS["metrics"],
)
# Check schema on cve_severity
if not self.latest_schema("cve_severity", severity_schema, cursor):
# Recreate table using latest schema
Expand Down Expand Up @@ -831,7 +849,7 @@ def get_exploits_count(self) -> int:
def create_exploit_db(self):
"""Create table of exploits in database if it does not already exist."""
cursor = self.db_open_and_get_cursor()
(_, _, create_exploit_table, _, _) = self.table_schemas()
create_exploit_table = self.TABLE_SCHEMAS["cve_exploited"]
cursor = self.db_open_and_get_cursor()
cursor.execute(create_exploit_table)
self.connection.commit()
Expand Down Expand Up @@ -1063,7 +1081,13 @@ def json_to_db_wrapper(self, path, pubkey, ignore_signature, log_signature_error
exploit_table_create,
cve_metrics_create,
metrics_create,
) = self.table_schemas()
) = (
self.TABLE_SCHEMAS["cve_severity"],
self.TABLE_SCHEMAS["cve_range"],
self.TABLE_SCHEMAS["cve_exploited"],
self.TABLE_SCHEMAS["cve_metrics"],
self.TABLE_SCHEMAS["metrics"],
)
index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)"
cursor.execute(cve_data_create)
cursor.execute(version_range_create)
Expand Down

0 comments on commit 1794aac

Please sign in to comment.