Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for auth_log iterator #156

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 107 additions & 70 deletions duo_client/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,18 @@
VALID_AUTHLOG_REQUEST_PARAMS = [
'mintime',
'maxtime',
'limit',
'sort',
'next_offset',
'event_types',
'reasons',
'results',
'users',
'applications',
'phone_numbers',
'registration_id',
'token_id',
'webauthnkey',
'groups',
'factors',
'api_version'
]


Expand Down Expand Up @@ -320,41 +321,48 @@ def get_administrator_log(self,
row['host'] = self.host
return response

def get_authentication_log(self, api_version=1, **kwargs):
def get_authentication_log_iterator(self, params={}):
"""
Returns authentication log events.

api_version - The api version of the handler to use. Currently, the
default api version is v1, but the v1 api will be
deprecated in a future version of the Duo Admin API.
Please migrate to the v2 api at your earliest convenience.
For details on the differences between v1 and v2,
please see Duo's Admin API documentation. (Optional)

API Version v1:

mintime - Fetch events only >= mintime (to avoid duplicate
records that have already been fetched)
Provides a generator which produces authentication logs. Under the
hood, this generator uses pagination, so it will only store one page of
administrative_units at a time in memory.
Returns: A generator which produces authentication logs.
Raises RuntimeError on error.
"""

multi_auth_log_parameters = [
'applications',
'users',
'event_types',
'factors',
'groups',
'phone_numbers',
'reasons',
'results',
'registration_id',
'token_id',
'webauthnkey'
]

# This will take a python list for params that support multiple
# parameters and make a comma seperated string
for multi_auth_log_parameter in multi_auth_log_parameters:
if multi_auth_log_parameter in params.keys():
params[multi_auth_log_parameter] = ",".join(
params[multi_auth_log_parameter]
)

Returns:
[
{'timestamp': <int:unix timestamp>,
'eventtype': "authentication",
'host': <str:host>,
'username': <str:username>,
'factor': <str:factor>,
'result': <str:result>,
'ip': <str:ip address>,
'new_enrollment': <bool:if event corresponds to enrollment>,
'integration': <str:integration>,
'location': {
'state': '<str:state>',
'city': '<str:city>',
'country': '<str:country>'
}
}]
return self.json_paging_api_call(
'GET',
"/admin/v2/logs/authentication",
params
)

Raises RuntimeError on error.
def get_authentication_log(self,
sort="ts:asc",
**kwargs):
"""
Returns a list of authentication log events.

API Version v2:

Expand All @@ -370,6 +378,12 @@ def get_authentication_log(self, api_version=1, **kwargs):
reasons - List of reasons to filter to filter on
factors - List of factors to filter on
event_types - List of event_types to filter on
phone_numbers - List of phone_numbers to filter on
registration_id - List of registration ids to filter on
token_id - List of token ids to filter on
webauthnkey - List of webauthn keys to filter on
factors - List of factors to filter on


Returns:
{
Expand Down Expand Up @@ -419,51 +433,74 @@ def get_authentication_log(self, api_version=1, **kwargs):
Raises RuntimeError on error.
"""

if api_version not in [1,2]:
raise ValueError("Invalid API Version")

params = {}
for k in kwargs:
if kwargs[k] is not None and k in VALID_AUTHLOG_REQUEST_PARAMS:
params[k] = kwargs[k]

if api_version == 1: #v1
params['mintime'] = kwargs['mintime'] if 'mintime' in kwargs else 0;
# Sanity check mintime as unix timestamp, then transform to string
params['mintime'] = '{:d}'.format(int(params['mintime']))
warnings.warn(
'The v1 Admin API for retrieving authentication log events '
'will be deprecated in a future release of the Duo Admin API. '
'Please migrate to the v2 API.',
DeprecationWarning)
else: #v2
for k in kwargs:
if kwargs[k] is not None and k in VALID_AUTHLOG_REQUEST_PARAMS:
params[k] = kwargs[k]
if 'mintime' not in params:
params['mintime'] = (int(time.time()) - 86400) * 1000
# Sanity check mintime as unix timestamp, then transform to string
params['mintime'] = '{:d}'.format(int(params['mintime']))

# Querying for results more recent than two mins will return as empty.
if 'maxtime' not in params:
params['maxtime'] = int(time.time() - 120) * 1000
# Sanity check maxtime as unix timestamp, then transform to string
params['maxtime'] = '{:d}'.format(int(params['maxtime']))

if 'mintime' not in params:
params['mintime'] = (int(time.time()) - 86400) * 1000
# Sanity check mintime as unix timestamp, then transform to string
params['mintime'] = '{:d}'.format(int(params['mintime']))
# Set the default limit to 1000, the max
if 'limit' not in params:
params['limit'] = "1000"


if 'maxtime' not in params:
params['maxtime'] = int(time.time()) * 1000
# Sanity check maxtime as unix timestamp, then transform to string
params['maxtime'] = '{:d}'.format(int(params['maxtime']))
return list(self.get_authentication_log_iterator(params))

def get_authentication_log_v1(self, **kwargs):
"""
Returns a list of authentication log events..

mintime - Fetch events only >= mintime (to avoid duplicate
records that have already been fetched)

Returns:
[
{'timestamp': <int:unix timestamp>,
'eventtype': "authentication",
'host': <str:host>,
'username': <str:username>,
'factor': <str:factor>,
'result': <str:result>,
'ip': <str:ip address>,
'new_enrollment': <bool:if event corresponds to enrollment>,
'integration': <str:integration>,
'location': {
'state': '<str:state>',
'city': '<str:city>',
'country': '<str:country>'
}
}]

Raises RuntimeError on error.
"""
params = {}
params['mintime'] = kwargs['mintime'] if 'mintime' in kwargs else 0
# Sanity check mintime as unix timestamp, then transform to string
params['mintime'] = '{:d}'.format(int(params['mintime']))
warnings.warn(
'The v1 Admin API for retrieving authentication log events '
'will be deprecated in a future release of the Duo Admin API. '
'Please migrate to the v2 API.', DeprecationWarning)
response = self.json_api_call(
'GET',
'/admin/v{}/logs/authentication'.format(api_version),
params,
'GET',
'/admin/v1/logs/authentication',
params
)

if api_version == 1:
for row in response:
row['eventtype'] = 'authentication'
row['host'] = self.host
else:
for row in response['authlogs']:
row['eventtype'] = 'authentication'
row['host'] = self.host
for row in response:
row['eventtype'] = 'authentication'
row['host'] = self.host

return response

def get_telephony_log(self,
Expand Down
Loading