Skip to content

Commit

Permalink
refactor and clean up implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Kersting committed May 16, 2024
1 parent d15ee73 commit b8ac6f2
Showing 1 changed file with 65 additions and 60 deletions.
125 changes: 65 additions & 60 deletions dataPipelines/gc_scrapy/gc_scrapy/spiders/ic_policies_spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,10 @@
import bs4
import time
from selenium.webdriver import Chrome
from urllib.parse import urljoin, urlparse
from datetime import datetime

from dataPipelines.gc_scrapy.gc_scrapy.doc_item_fields import DocItemFields
from dataPipelines.gc_scrapy.gc_scrapy.GCSeleniumSpider import GCSeleniumSpider
from dataPipelines.gc_scrapy.gc_scrapy.items import DocItem
from dataPipelines.gc_scrapy.gc_scrapy.utils import abs_url
from dataPipelines.gc_scrapy.gc_scrapy.utils import (
dict_to_sha256_hex_digest,
parse_timestamp,
)
from dataPipelines.gc_scrapy.gc_scrapy.utils import abs_url, parse_timestamp


class IcPoliciesSpider(GCSeleniumSpider):
Expand Down Expand Up @@ -49,7 +42,7 @@ class IcPoliciesSpider(GCSeleniumSpider):
headers = {
"authority": "www.dni.gov",
"referer": "https://www.dni.gov/",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept": "text/html,application/xhtml+xml;q=0.9,image/webp,*/*;q=0.8;v=b3;q=0.7",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"pragma": "no-cache",
Expand All @@ -70,95 +63,107 @@ class IcPoliciesSpider(GCSeleniumSpider):
}

@staticmethod
def get_display_doc_type(doc_type):
def get_display_doc_type(doc_type: str) -> str:
"""Returns value for display_doc_type based on doc_type -> display_doc_type mapping"""
display_type_dict = {"icd": "Directive", "icpg": "Guide", "icpm": "Manual"}
if doc_type.lower() in display_type_dict.keys():
if doc_type.lower() in display_type_dict:
return display_type_dict[doc_type.lower()]
else:
return "Document"
return "Document"

@staticmethod
def get_doc_type(url: str) -> str:
"""Set doc type from URL"""
if url.endswith("directives"):
return "ICD"
if url.endswith("guidance"):
return "ICPG"
if url.endswith("memorandums"):
return "ICPM"
return "ICLR"

@staticmethod
def get_policy_doc_info(data: str) -> tuple:
"""Get doc info for ICD, ICPG, and ICPM docs"""
name_pattern = re.compile(r"^[A-Z]*\s\d*.\d*.\d*.\d*\s")
names = re.findall(name_pattern, data)
parsed_text = names[0]
parsed_name = parsed_text.split(" ")

doc_name = " ".join(parsed_name[:2])
doc_num = parsed_name[1]
doc_title = re.sub(parsed_text, "", data)
return doc_name, doc_num, doc_title

@staticmethod
def get_legal_doc_info(data: str) -> tuple:
"""Get doc info for legal reference"""
split_data = data.split(" ")
doc_name = " ".join(split_data[:-1])
doc_num = split_data[-1]
doc_title = doc_name
return doc_name, doc_num, doc_title

@staticmethod
def is_cac_required(pdf_url: str, doc_title: str) -> bool:
"""Return true if cac is required for access"""
cac_required = ["CAC", "PKI certificate required", "placeholder", "FOUO"]
return (
True
if any(x in pdf_url for x in cac_required)
or any(x in doc_title for x in cac_required)
else False
)

def parse(self, response):
"""Parses doc items out of IC Policies and Directives site"""

# Assign Chrome as the WebDriver instance to perform "user" actions
driver: Chrome = response.meta["driver"]

for page_url in self.start_urls:
driver.get(page_url)
time.sleep(5)

# parse html response
soup = bs4.BeautifulSoup(driver.page_source, features="html.parser")
div = soup.find("div", attrs={"itemprop": "articleBody"})
pub_list = div.find_all("p")
div = bs4.BeautifulSoup(driver.page_source, features="html.parser").find(
"div", attrs={"itemprop": "articleBody"}
)

# set policy type
if page_url.endswith("directives"):
doc_type = "ICD"
elif page_url.endswith("guidance"):
doc_type = "ICPG"
elif page_url.endswith("memorandums"):
doc_type = "ICPM"
else:
doc_type = "ICLR"
doc_type = self.get_doc_type(page_url)

# iterate through each publication
cac_required = ["CAC", "PKI certificate required", "placeholder", "FOUO"]
for row in pub_list:

for row in div.find_all("p"):
# skip empty rows
if row.a is None:
continue

data = re.sub(r"\u00a0", " ", row.text)
link = row.a["href"]
pdf_url = abs_url(self.base_url, row.a["href"])

# patterns to match
name_pattern = re.compile(r"^[A-Z]*\s\d*.\d*.\d*.\d*\s")

names = re.findall(name_pattern, data)
try:
parsed_text = names[0]
parsed_name = parsed_text.split(" ")
doc_name = " ".join(parsed_name[:2])
doc_num = parsed_name[1]
doc_title = re.sub(parsed_text, "", data)
doc_name, doc_num, doc_title = self.get_policy_doc_info(data)
except IndexError:
split_data = data.split(" ")
doc_name = " ".join(split_data[:-1])
doc_num = split_data[-1]
doc_title = doc_name

pdf_url = abs_url(self.base_url, link)
doc_name, doc_num, doc_title = self.get_legal_doc_info(data)

# extract publication date from the pdf url
matches = re.findall(r"\((.+)\)", pdf_url.replace("%20", "-"))
publication_date = matches[-1] if len(matches) > 0 else None

# set boolean if CAC is required to view document
cac_login_required = (
True
if any(x in pdf_url for x in cac_required)
or any(x in doc_title for x in cac_required)
else False
)
downloadable_items = [
{
"doc_type": "pdf",
"download_url": pdf_url,
"compression_type": None,
}
]
fields = DocItemFields(
doc_name=doc_name.strip(),
doc_title=doc_title,
doc_num=doc_num,
doc_type=doc_type,
publication_date=parse_timestamp(publication_date),
cac_login_required=cac_login_required,
cac_login_required=self.is_cac_required(pdf_url, doc_title),
source_page_url=page_url.strip(),
downloadable_items=downloadable_items,
downloadable_items=[
{
"doc_type": "pdf",
"download_url": pdf_url,
"compression_type": None,
}
],
download_url=pdf_url,
file_ext="pdf",
display_doc_type=self.get_display_doc_type(doc_type),
Expand Down

0 comments on commit b8ac6f2

Please sign in to comment.