Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for new ISAPI urls #49

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,4 @@ target/
bin/
local/
share/
.idea/*
178 changes: 178 additions & 0 deletions pyhik/hikvision.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def initialize(self):
device_info = self.get_device_info()

if device_info is None:
print('Hello this is it')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this unwanted print statement.

self.name = None
self.cam_id = None
self.event_states = None
Expand Down Expand Up @@ -401,6 +402,7 @@ def get_device_info(self):
except (requests.exceptions.RequestException,
requests.exceptions.ConnectionError) as err:
_LOGGING.error('Unable to fetch deviceInfo, error: %s', err)
print('This is test')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this unwanted print statement

return None

if response.status_code == requests.codes.unauthorized:
Expand Down Expand Up @@ -430,6 +432,182 @@ def get_device_info(self):
_LOGGING.error('There was a problem: %s', err)
return None

"""
This function is added by Ayush Pratap Singh ([email protected])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attribution comments aren't needed. You'll be the owner on the merged PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed attribution comment

This function is used to get the status of the device
"""

def get_device_status(self):
"""Parse deviceInfo into dictionary."""
device_status = {}
url = '%s/ISAPI/System/status' % self.root_url
using_digest = False
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resetting the digest flag isn't needed. It will be set once the initialize function is called.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed resetting of digest flag


try:
response = self.hik_request.get(url, timeout=CONNECT_TIMEOUT)
if response.status_code == requests.codes.unauthorized:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the using_digest variable to determine what auth type to use.

_LOGGING.debug('Basic authentication failed. Using digest.')
self.hik_request.auth = HTTPDigestAuth(self.usr, self.pwd)
using_digest = True
response = self.hik_request.get(url)

if response.status_code == requests.codes.not_found:
# Try alternate URL for deviceInfo
_LOGGING.debug('Using alternate deviceInfo URL.')
url = '%s/System/status' % self.root_url
response = self.hik_request.get(url)
# Seems to be difference between camera and nvr, they can't seem to
# agree if they should 404 or 401 first
if not using_digest and response.status_code == requests.codes.unauthorized:
_LOGGING.debug('Basic authentication failed. Using digest.')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, use the variable to decide.

self.hik_request.auth = HTTPDigestAuth(self.usr, self.pwd)
using_digest = True
response = self.hik_request.get(url)

except (requests.exceptions.RequestException,
requests.exceptions.ConnectionError) as err:
_LOGGING.error('Unable to fetch status of device, error: %s', err)
return None

if response.status_code == requests.codes.unauthorized:
_LOGGING.error('Authentication failed')
return None

if response.status_code != requests.codes.ok:
# If we didn't receive 200, abort
_LOGGING.debug('Unable to fetch status of device.')
return None

try:
tree = ET.fromstring(response.text)

device_status['currentdevicetime'] = tree[0].text.strip()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the element_query function to find the fields you want. This allows the XML ordering to change and you can still return the correct result.

device_status['deviceuptime'] = tree[1].text.strip()
device_status['cpudescription'] = tree[2][0][0].text.strip()
device_status['cpuutilization'] = tree[2][0][1].text.strip()
device_status['memorydescription'] = tree[3][0][0].text.strip()
device_status['memoryusage'] = tree[3][0][1].text.strip()
device_status['memoryavailable'] = tree[3][0][2].text.strip()

return device_status

except AttributeError as err:
_LOGGING.error('Entire response: %s', response.text)
_LOGGING.error('There was a problem: %s', err)
return None

def get_device_time(self):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement same changes on this function as the previous one.

"""
Parse device time into dictionary
"""
device_time = {}
url = "%s/ISAPI/System/time" % self.root_url
using_digest = False

try:
response = self.hik_request.get(url, timeout=CONNECT_TIMEOUT)
if response.status_code == requests.codes.unauthorized:
_LOGGING.debug('Basic authentication failed. Using digest.')
self.hik_request.auth = HTTPDigestAuth(self.usr, self.pwd)
using_digest = True
response = self.hik_request.get(url)

if response.status_code == requests.codes.not_found:
"""
Tyr alternate URL for time from device
"""
_LOGGING.debug('Using alternate URL for device from time')
url = '%s/System/time' % self.root_url
response = self.hik_request.get(url)

if not using_digest and response.status_code == requests.codes.unauthorized:
_LOGGING.debug('Basic authentication failed. Using digest.')
self.hik_request.auth = HTTPDigestAuth(self.usr, self.pwd)
using_digest = True
response = self.hik_request.get(url)

except (requests.exceptions.RequestException, requests.exceptions.ConnectionError) as err:
_LOGGING.error('Unable to fetch time of device, error: %s', err)
return None

if response.status_code == requests.codes.unauthorized:
_LOGGING.error('Authentication failed')
return None

if response.status_code != requests.codes.ok:
# If we didn't receive 200, abort
_LOGGING.debug('Unable to fetch time of device')
return None
try:
tree = ET.fromstring(response.text)

device_time['timemode'] = tree[0].text.strip()
device_time['localtime'] = tree[1].text.strip()
device_time['timezone'] = tree[2].text.strip()

return device_time

except AttributeError as err:
_LOGGING.error('Entire response: %s', response.text)
_LOGGING.error('There was a problem: %s', err)

"""
Get the Ports using the endpoint : /ISAPI/System/Network/UPnP/ports/status
"""
#TODO : This needs to be implemented
def get_upnp_ports_status(self):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement same changes on this function as the previous one.

"""
Parse the upnp port status into dictionary
"""
upnp_port = {}
url = "%s/ISAPI/System/Network/UPnP/ports/status" % self.root_url
using_digest = False

try:
response = self.hik_request.get(url, timeout=CONNECT_TIMEOUT)
if response.status_code == requests.codes.unauthorized:
_LOGGING.debug('basic authentication failed. Using digest.')
self.hik_request.auth = HTTPDigestAuth(self.usr, self.pwd)
using_digest = True
response = self.hik_request.get(url)

if response.status_code == requests.codes.not_found:
"""
Try alternate URL
"""
_LOGGING.debug('Using alternate URL for Upnp ports')
url = '%s/System/time' % self.root_url
response = self.hik_request.get(url)

if not using_digest and response.status_code == requests.codes.unauthorized:
_LOGGING.debug('Basic authentication failed. Using digest')
self.hik_request.auth = HTTPDigestAuth(self.usr, self.pwd)
using_digest = True
response = self.hik_request.get(url)
except (requests.exceptions.RequestException, requests.exceptions.ConnectionError) as err:
_LOGGING.error('Unable to fetch upnp ports, error: %s', err)
return None

if response.status_code == requests.codes.unauthorized:
_LOGGING.error('Authentication failed')
return None

if response.status_code != requests.codes.ok:
# If we didn't receive 200, abort
_LOGGING.debug('Unable to fetch upnp ports')
return None
try:
tree = ET.fromstring(response.text)
# Parse portstatus
upnp_port['httpPort'] = tree[3][0][3].text.strip()
upnp_port['rtspPort'] = tree[3][2][3].text.strip()
upnp_port['httpsPort'] = tree[3][3][3].text.strip()

return upnp_port
except AttributeError as err:
_LOGGING.error('Entire response: %s', response.text)
_LOGGING.error('There was a problem: %s', err)

def watchdog_handler(self):
"""Take care of threads if wachdog expires."""
_LOGGING.debug('%s Watchdog expired. Resetting connection.', self.name)
Expand Down