Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add bulk operations methods to admin client #256

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 96 additions & 8 deletions duo_client/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,17 +174,18 @@
"""
from __future__ import absolute_import

import six.moves.urllib

from . import client, Accounts
from .logs.telephony import Telephony
import six
import warnings
import base64
import json
import time
import base64
import warnings
from datetime import datetime, timedelta, timezone

import six
import six.moves.urllib

from . import Accounts, client
from .logs.telephony import Telephony

USER_STATUS_ACTIVE = "active"
USER_STATUS_BYPASS = "bypass"
USER_STATUS_DISABLED = "disabled"
Expand Down Expand Up @@ -493,7 +494,7 @@ def get_authentication_log(self, api_version=1, **kwargs):
params = {}

if api_version == 1: #v1
params['mintime'] = kwargs['mintime'] if 'mintime' in kwargs else 0;
params['mintime'] = kwargs['mintime'] if 'mintime' in kwargs else 0
# Sanity check mintime as unix timestamp, then transform to string
params['mintime'] = '{:d}'.format(int(params['mintime']))
warnings.warn(
Expand Down Expand Up @@ -3514,6 +3515,93 @@ def get_policy_summary_v2(self):
response = self.json_api_call("GET", path, {})
return response

def bulk_operations(self, operations):
"""
Perform a series of bulk operations sequentially

Args:
operations (list) - list of JSON objects representing the operations to perform.

Returns (list) - response objects for each item in the operations list

Raises RuntimeError on error

Example:
operations = [
{
"method": "POST",
"path": "/admin/v1/users",
"body": {
"username": "uname1",
"alias1": "my_alias1",
"alias2": "my_alias2",
"alias3": "my_alias3",
"alias4": "my_alias4",
"email": "[email protected]",
"status": "active",
"notes": "This is a user",
},
},
{
"method": "POST",
"path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXX1",
"body": {
"alias2": "updated_alias2",
"email": "[email protected]",
"status": "active",
"notes": "This is another user",
},
},
{
"method": "DELETE",
"path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXX1",
"body": {},
},
{
"method": "POST",
"path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXX1/groups",
"body": {
"group_id": "DUXXXXXXXXXXXXXXXXX1",
},
},
{
"method": "DELETE",
"path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXX1/groups/DUXXXXXXXXXXXXXXXXX1",
"body": {},
},
]

"""

path = "/admin/v1/bulk"
params = {"operations": json.dumps(operations)}
response = self.json_api_call("POST", path, params)
return response

def bulk_add_users(self, users):
"""
Create users in bulk atomically

Args:
users (list) - list of JSON objects representing each user object to create.

Returns (list) - user response objects for each user in the list argument

Raises RuntimeError on error

Example:
users = [
{"username": "example_username_1", "email": "[email protected]"},
{"username": "example_username_2", "status": "disabled"},
]

"""

path = "/admin/v1/bulk_create"
params = {"users": json.dumps(users)}
response = self.json_api_call("POST", path, params)
return response


class AccountAdmin(Admin):
"""AccountAdmin manages a child account using an Accounts API integration."""
Expand Down
38 changes: 38 additions & 0 deletions tests/admin/test_bulk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import json

from .base import TestAdmin


class TestBulkOperations(TestAdmin):
def test_bulk_add_users(self):
"""Test to bulk create users
"""
response = self.client_list.bulk_add_users(
[{"username": "example_username_1", "email": "[email protected]"},
{"username": "example_username_2", "status": "disabled"}, ])

result_list = json.loads(json.loads(response[0]['body'])['users'])

self.assertEqual(result_list[0]['username'], 'example_username_1')
self.assertEqual(result_list[0]['email'], '[email protected]')

def test_bulk_operations(self):
"""Test to execute bulk operations
"""

input_data = [{"method": "POST", "path": "/admin/v1/users",
"body": {"username": "uname1", "alias1": "my_alias1", "alias2": "my_alias2",
"alias3": "my_alias3", "alias4": "my_alias4", "email": "[email protected]",
"status": "active", "notes": "This is a user", }, },
{"method": "POST", "path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXXX",
"body": {"alias2": "updated_alias2", "email": "[email protected]", "status": "active",
"notes": "This is another user", }, },
{"method": "DELETE", "path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXXX", "body": {}, },
{"method": "POST", "path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXXX/groups",
"body": {"group_id": "DUXXXXXXXXXXXXXXXXXX", }, },
{"method": "DELETE", "path": "/admin/v1/users/DUXXXXXXXXXXXXXXXXXX/groups/DUXXXXXXXXXXXXXXXXXX",
"body": {}, }, ]
response = self.client_list.bulk_operations(input_data)

result_list = json.loads(json.loads(response[0]['body'])['operations'])
self.assertEqual(result_list, input_data)
Loading