Skip to content

Commit

Permalink
refactor stig spider to use new class
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Kersting committed May 1, 2024
1 parent 873dfae commit 2dbbb94
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 108 deletions.
8 changes: 6 additions & 2 deletions dataPipelines/gc_scrapy/gc_scrapy/doc_item_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ def __init__(
downloadable_items: dict,
download_url: str,
file_ext: str,
display_doc_type: str = None,
):
self.doc_name = doc_name
self.doc_title = doc_title
self.doc_num = doc_num
self.doc_type = doc_type
self.display_doc_type = doc_type
if display_doc_type is None:
self.display_doc_type = doc_type
else:
self.display_doc_type = display_doc_type
self.publication_date = publication_date.strftime("%Y-%m-%dT%H:%M:%S")
self.cac_login_required = cac_login_required
self.source_page_url = source_page_url
Expand All @@ -41,7 +45,7 @@ def get_version_hash_fields(self) -> dict:
"doc_num": self.doc_num,
"publication_date": self.publication_date,
"download_url": self.download_url,
"display_title": self.doc_title,
"display_title": self.display_title,
}

def set_display_name(self, name: str) -> None:
Expand Down
163 changes: 57 additions & 106 deletions dataPipelines/gc_scrapy/gc_scrapy/spiders/stig_spider.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
from typing import Any, Generator, Tuple
import scrapy
import re

from dataPipelines.gc_scrapy.gc_scrapy.items import DocItem
from dataPipelines.gc_scrapy.gc_scrapy.GCSpider import GCSpider
import re
from dataPipelines.gc_scrapy.gc_scrapy.utils import parse_timestamp

from dataPipelines.gc_scrapy.gc_scrapy.doc_item_fields import DocItemFields

from urllib.parse import urlparse
from dataPipelines.gc_scrapy.gc_scrapy.utils import dict_to_sha256_hex_digest, get_pub_date

class StigSpider(GCSpider):
"""
As of 04/30/2024
crawls https://public.cyber.mil/stigs/downloads/ for 47 pdfs (doc_type = stig)
"""
name = "stig_pubs" # Crawler name

name = "stig_pubs" # Crawler name

start_urls = [
"https://public.cyber.mil/stigs/downloads/"
]
start_urls = ["https://public.cyber.mil/stigs/downloads/"]

download_base_url = 'https://public.cyber.mil/'
download_base_url = "https://public.cyber.mil/"
rotate_user_agent = True

doc_type = "STIG"
Expand All @@ -28,35 +30,41 @@ class StigSpider(GCSpider):
}

@staticmethod
def extract_doc_number(doc_title):
def extract_doc_number(doc_title: str) -> Tuple[str, str]:
"""Accepts doc title and returns a tuple of the doc_title and doc_num"""
if doc_title.find(" Ver ") != -1:
ver_num = (re.findall(r' Ver (\w+)', doc_title))[0]
ver_num = (re.findall(r" Ver (\w+)", doc_title))[0]
else:
if " Version " in doc_title:
ver_num = (re.findall(r' Version (\w+)', doc_title))[0]
ver_num = (re.findall(r" Version (\w+)", doc_title))[0]
else:
ver_num = 0

if doc_title.find(" Rel ") != -1:
ref_num = (re.findall(r' Rel (\w+)', doc_title))[0]
ref_num = (re.findall(r" Rel (\w+)", doc_title))[0]
else:
if "Release Memo" in doc_title:
ref_num = 1
else:
ref_num = 0

doc_num = "V{}R{}".format(ver_num, ref_num)
doc_num = f"V{ver_num}R{ref_num}"
return doc_title, doc_num

def parse(self, response):
rows = response.css('table tbody tr')
rows = [a for a in rows if a.css('a::attr(href)').get()]
rows = [a for a in rows if a.css('a::attr(href)').get().endswith("pdf")]
def parse(self, response: scrapy.http.Response) -> Generator[DocItem, Any, None]:
"""Parses doc items out of STIG downloads site"""
rows = response.css("table tbody tr")
rows = [a for a in rows if a.css("a::attr(href)").get()]
rows = [a for a in rows if a.css("a::attr(href)").get().endswith("pdf")]

for row in rows:
href_raw = row.css('a::attr(href)').get()
doc_title_text, publication_date_raw = row.css('span[style="display:none;"] ::text').getall()
doc_title = self.ascii_clean(doc_title_text).replace("/ ", " ").replace("/", " ")
href_raw = row.css("a::attr(href)").get()
doc_title_text, publication_date_raw = row.css(
'span[style="display:none;"] ::text'
).getall()
doc_title = (
self.ascii_clean(doc_title_text).replace("/ ", " ").replace("/", " ")
)
publication_date = self.ascii_clean(publication_date_raw)
doc_title, doc_num = StigSpider.extract_doc_number(doc_title)
doc_name = f"{self.doc_type} {doc_num} {doc_title}"
Expand All @@ -67,89 +75,32 @@ def parse(self, response):
display_doc_type = "STIG"

file_type = self.get_href_file_extension(href_raw)
web_url = self.ensure_full_href_url(
href_raw, self.download_base_url)

fields = {
'doc_name': doc_name,
'doc_num': doc_num,
'doc_title': doc_title,
'doc_type': self.doc_type,
'display_doc_type':display_doc_type,
'file_type':file_type,
'cac_login_required': False,
'download_url': web_url,
'source_page_url':response.url,
'publication_date': publication_date
}
## Instantiate DocItem class and assign document's metadata values
doc_item = self.populate_doc_item(fields)

yield doc_item



def populate_doc_item(self, fields):
'''
This functions provides both hardcoded and computed values for the variables
in the imported DocItem object and returns the populated metadata object
'''
display_org = "Security Technical Implementation Guides" # Level 1: GC app 'Source' filter for docs from this crawler
data_source = "Security Technical Implementation Guides" # Level 2: GC app 'Source' metadata field for docs from this crawler
source_title = "Unlisted Source" # Level 3 filter

doc_name = fields['doc_name']
doc_num = fields['doc_num']
doc_title = fields['doc_title']
doc_type = fields['doc_type']
cac_login_required = fields['cac_login_required']
download_url = fields['download_url']
publication_date = get_pub_date(fields['publication_date'])

display_doc_type = fields['display_doc_type'] # Doc type for display on app
display_source = data_source + " - " + source_title
display_title = doc_type + " " + doc_num + ": " + doc_title
is_revoked = False
source_page_url = fields['source_page_url']
source_fqdn = urlparse(source_page_url).netloc

downloadable_items = [{
"doc_type": fields['file_type'],
"download_url": download_url.replace(' ', '%20'),
"compression_type": None,
}]

## Assign fields that will be used for versioning
version_hash_fields = {
"doc_name":doc_name,
"doc_num": doc_num,
"publication_date": publication_date,
"download_url": download_url,
"display_title": display_title
}

version_hash = dict_to_sha256_hex_digest(version_hash_fields)

return DocItem(
doc_name = doc_name,
doc_title = doc_title,
doc_num = doc_num,
doc_type = doc_type,
display_doc_type = display_doc_type, #
publication_date = publication_date,
cac_login_required = cac_login_required,
crawler_used = self.name,
downloadable_items = downloadable_items,
source_page_url = source_page_url, #
source_fqdn = source_fqdn, #
download_url = download_url, #
version_hash_raw_data = version_hash_fields, #
version_hash = version_hash,
display_org = display_org, #
data_source = data_source, #
source_title = source_title, #
display_source = display_source, #
display_title = display_title, #
file_ext = fields['file_type'], #
is_revoked = is_revoked, #
)
web_url = self.ensure_full_href_url(href_raw, self.download_base_url)

downloadable_items = [
{
"doc_type": file_type,
"download_url": web_url.replace(" ", "%20"),
"compression_type": None,
}
]

doc_item_fields = DocItemFields(
doc_name=doc_name,
doc_num=doc_num,
doc_title=doc_title,
doc_type=self.doc_type,
display_doc_type=display_doc_type,
cac_login_required=False,
source_page_url=response.url,
downloadable_items=downloadable_items,
download_url=web_url,
publication_date=parse_timestamp(publication_date),
file_ext=file_type,
)
yield doc_item_fields.populate_doc_item(
display_org="Security Technical Implementation Guides",
data_source="Security Technical Implementation Guides",
source_title="Unlisted Source",
crawler_used=self.name,
)

0 comments on commit 2dbbb94

Please sign in to comment.