Skip to content

Commit

Permalink
Merge pull request #7 from zkarpinski/create_project_feature
Browse files Browse the repository at this point in the history
Added create project api support
  • Loading branch information
zkarpinski committed Jan 9, 2024
2 parents a449deb + fb0b9db commit d66adb9
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 93 deletions.
2 changes: 1 addition & 1 deletion codeinsight_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .client import CodeInsightClient
from .client import CodeInsightClient
9 changes: 7 additions & 2 deletions codeinsight_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ class CodeInsightClient:
def __init__(self,
base_url: str,
api_token: str,
timeout: int = 60,
verify_ssl: bool = True
):
self.base_url = base_url
self.api_url = f"{base_url}/codeinsight/api"
Expand All @@ -20,6 +22,8 @@ def __init__(self,
"Authorization": "Bearer %s" % self.__api_token,
"User-Agent": "codeinsight_sdk_python",
}
self.__timeout = timeout
self.__verify_ssl = verify_ssl

def request(self, method, url_part: str, params: dict = None, body: any = None ):
url = f"{self.api_url}/{url_part}"
Expand All @@ -31,10 +35,11 @@ def request(self, method, url_part: str, params: dict = None, body: any = None )
del params[k]

response = requests.request(method, url,
headers=self.__api_headers, params=params, json=body)
headers=self.__api_headers, params=params, json=body,
timeout=self.__timeout, verify=self.__verify_ssl)

if not response.ok:
logger.error(f"Error: {response.status_code} - {response.reason}")
logger.error(f"Error: {response.status_code} - {response.reason}", exc_info=True)
logger.error(response.text)
raise CodeInsightError(response)

Expand Down
244 changes: 155 additions & 89 deletions codeinsight_sdk/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,100 +27,135 @@ def get(self):
class ProjectHandler(Handler):
def __init__(self, client):
super().__init__(client)
self.cls = Project

self.cls = Project

def create(self, name:str, description:str = None, folder:str = None,
scanProfileName:str = None,
owner:str = None,
risk:str = None,
folderId:int = None,
customFields:List[dict] = None,
) -> int:
"""
Creates a project.
Args:
name (str): The name of the project.
description (str, optional): The description of the project. Defaults to None.
folder (str, optional): The folder of the project. Defaults to None.
Returns:
Project: The created project id.
"""
path = "projects"
data = {"name": name,
"description": description,
"folderName": folder,
"scanProfileName": scanProfileName,
"owner": owner,
"risk": risk,
"folderId": folderId,
"customFields": customFields}
resp = self.client.request("POST", url_part=path, body=data)
try:
project_id = resp.json()['data']['id']
except KeyError:
raise CodeInsightError(resp)
return project_id


#Note API endpoints switch between projects and project...
def all(self) -> List[Project]:
"""
Retrieves all projects from the server.
"""
Retrieves all projects from the server.
Returns:
A list of Project objects representing all the projects.
"""
path = "projects"
resp = self.client.request("GET", url_part=path)
projects = []
for project_data in resp.json()['data']:
projects.append(self.cls.from_dict(project_data))
return projects
Returns:
A list of Project objects representing all the projects.
"""

path = "projects"
resp = self.client.request("GET", url_part=path)
projects = []
for project_data in resp.json()['data']:
projects.append(self.cls.from_dict(project_data))
return projects

def get(self, id:int) -> Project:
"""
Retrieves a project by its ID.
Args:
id (int): The ID of the project requested.
Returns:
Project: The retrieved project.
"""
path = f"projects/{id}"
resp = self.client.request("GET", url_part=path)
project_data = resp.json()['data']
return self.cls.from_dict(project_data)
"""
Retrieves a project by its ID.
Args:
id (int): The ID of the project requested.
Returns:
Project: The retrieved project.
"""
path = f"projects/{id}"
resp = self.client.request("GET", url_part=path)
project_data = resp.json()['data']
return self.cls.from_dict(project_data)

def get_id(self, project_name:str) -> int:
"""
Retrieves the ID of a project based on its name.
Args:
projectName (str): The name of the project.
Returns:
int: The ID of the project.
"""
path = "project/id"
params = {"projectName": project_name}
resp = self.client.request("GET", url_part=path, params=params)
try:
project_id = resp.json()['Content: '] # Yes, the key is called 'Content: ' ...
except KeyError:
raise CodeInsightError(resp)
return project_id
"""
Retrieves the ID of a project based on its name.
Args:
projectName (str): The name of the project.
Returns:
int: The ID of the project.
"""
path = "project/id"
params = {"projectName": project_name}
resp = self.client.request("GET", url_part=path, params=params)
try:
project_id = resp.json()['Content: '] # Yes, the key is called 'Content: ' ...
except KeyError:
raise CodeInsightError(resp)
return project_id

def get_inventory_summary(self, project_id:int,
vulnerabilitySummary : bool = False,
cvssVersion: str = 'ANY',
published: str = 'ALL',
offset:int = 1,
limit:int = 25) -> List[ProjectInventoryItem]:
"""
Retrieves the inventory summary for a specific project.
Args:
project_id (int): The ID of the project.
vulnerabilitySummary (bool, optional): Flag to include vulnerability summary. Defaults to False.
cvssVersion (str, optional): The CVSS version to filter vulnerabilities. Defaults to 'ANY'.
published (str, optional): The publication status. Defaults to 'ALL'.
offset (int, optional): The offset for pagination. Defaults to 1.
limit (int, optional): The maximum number of items to return. Defaults to 25.
Returns:
List[ProjectInventoryItem]: A list of ProjectInventoryItem objects representing the inventory summary.
"""
path = f"projects/{project_id}/inventorySummary"
params = {"vulnerabilitySummary": vulnerabilitySummary,
"cvssVersion": cvssVersion,
"published": published,
"offset": offset,
"limit": limit
}
resp = self.client.request("GET", url_part=path, params=params)
current_page = int(resp.headers['current-page'])
number_of_pages = int(resp.headers['number-of-pages'])
total_records = int(resp.headers['total-records'])
inventory = []
for inv_item in resp.json()['data']:
inventory.append(ProjectInventoryItem.from_dict(inv_item))
# Iterate through all the pages
if number_of_pages > offset:
params.update({"offset": offset+1})
chunk = self.get_inventory_summary(project_id, **params)
# Only append the inventory records
inventory.extend(chunk)
return inventory
vulnerabilitySummary : bool = False,
cvssVersion: str = 'ANY',
published: str = 'ALL',
offset:int = 1,
limit:int = 25) -> List[ProjectInventoryItem]:
"""
Retrieves the inventory summary for a specific project.
Args:
project_id (int): The ID of the project.
vulnerabilitySummary (bool, optional): Flag to include vulnerability summary. Defaults to False.
cvssVersion (str, optional): The CVSS version to filter vulnerabilities. Defaults to 'ANY'.
published (str, optional): The publication status. Defaults to 'ALL'.
offset (int, optional): The offset for pagination. Defaults to 1.
limit (int, optional): The maximum number of items to return. Defaults to 25.
Returns:
List[ProjectInventoryItem]: A list of ProjectInventoryItem objects representing the inventory summary.
"""
path = f"projects/{project_id}/inventorySummary"
params = {"vulnerabilitySummary": vulnerabilitySummary,
"cvssVersion": cvssVersion,
"published": published,
"offset": offset,
"limit": limit
}
resp = self.client.request("GET", url_part=path, params=params)
current_page = int(resp.headers['current-page'])
number_of_pages = int(resp.headers['number-of-pages'])
total_records = int(resp.headers['total-records'])
inventory = []
for inv_item in resp.json()['data']:
inventory.append(ProjectInventoryItem.from_dict(inv_item))

# Iterate through all the pages
if number_of_pages > offset:
params.update({"offset": offset+1})
chunk = self.get_inventory_summary(project_id, **params)
# Only append the inventory records
inventory.extend(chunk)
return inventory

def get_inventory(self,project_id:int,
skip_vulnerabilities: bool = False,
Expand Down Expand Up @@ -158,22 +193,53 @@ def get_inventory(self,project_id:int,


class ReportHandler(Handler):
"""
A class that handles operations related to reports.
Args:
client (Client): The client object used for making API requests.
Attributes:
cls (Report): The class representing a report.
Methods:
get(id): Retrieves a report by its ID.
all(): Retrieves all reports.
"""

def __init__(self, client):
super().__init__(client)
self.cls = Report

def get(self, id:int):
"""
Retrieves a report by its ID.
Args:
id (int): The ID of the report to retrieve.
Returns:
Report: The report object.
"""
path = f"reports/{id}"
resp = self.client.request("GET", url_part=path)
report_data = resp.json()['data']
report = self.cls.from_dict(report_data)
return report

def all(self):
"""
Retrieves all reports.
Returns:
list: A list of report objects.
"""
path = "reports"
resp = self.client.request("GET", url_part=path)
reports = []
for report_data in resp.json()['data']:
reports.append(self.cls.from_dict(report_data))
return reports

2 changes: 2 additions & 0 deletions pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[master]
ignore-patterns=test_.*?py
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "codeinsight_sdk"
version = "0.0.4"
version = "0.0.5"
description = "A Python client for the Revenera Code Insight"
authors = ["Zachary Karpinski <[email protected]>"]
readme = "README.md"
Expand Down
8 changes: 8 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ class TestProjectEndpoints:
@pytest.fixture
def client(self):
return CodeInsightClient(TEST_URL, TEST_API_TOKEN)

def test_create_project(self, client):
project_name = "Test"
with requests_mock.Mocker() as m:
m.post(f"{TEST_URL}/codeinsight/api/projects", text='{"data": {"id":1}}')
project_id = client.projects.create(project_name)
assert project_id == 1

def test_get_all_projects(self, client):
with requests_mock.Mocker() as m:
Expand Down Expand Up @@ -163,6 +170,7 @@ def test_get_project_inventory_summary(self,client):
assert len(project_inventory_summary) == 8
assert project_inventory_summary[1].id == 12346


class TestReportsEndpoints:
@pytest.fixture
def client(self):
Expand Down

0 comments on commit d66adb9

Please sign in to comment.