Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebrowser] Add get delegation token logic for secure hadoop (#3301) (Related with: #3324 ) #3449

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions desktop/core/src/desktop/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,12 @@
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': CACHES_HIVE_DISCOVERY_KEY
}
CACHES_WEBHDFS_DELEGATION_TOKEN_KEY = 'webhdfs_delegation_token'
CACHES[CACHES_WEBHDFS_DELEGATION_TOKEN_KEY] = {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': CACHES_WEBHDFS_DELEGATION_TOKEN_KEY,
'TIMEOUT': desktop.conf.KERBEROS.REINIT_FREQUENCY
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kerberos tickets are renewed periodically, so I use Timeout option that the cache expires accordingly.

}

CACHES_CELERY_KEY = 'celery'
CACHES_CELERY_QUERY_RESULT_KEY = 'celery_query_results'
Expand Down
44 changes: 20 additions & 24 deletions desktop/libs/hadoop/src/hadoop/fs/webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
standard_library.install_aliases()
from builtins import oct
from builtins import object
from datetime import datetime, timedelta
import errno
import logging
import posixpath
Expand All @@ -32,12 +33,13 @@
import time
import urllib.request, urllib.error

from django.core.cache import caches
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

�Requesting a delegation token every time using filebrowser can put a stress on the hadoop namenode. So I used cache.

from django.utils.encoding import smart_str

import hadoop.conf
import desktop.conf

from desktop.lib.rest import http_client, resource
from desktop.settings import CACHES_WEBHDFS_DELEGATION_TOKEN_KEY
from past.builtins import long
from hadoop.fs import normpath as fs_normpath, SEEK_SET, SEEK_CUR, SEEK_END
from hadoop.fs.hadoopfs import Hdfs
Expand All @@ -61,6 +63,8 @@

LOG = logging.getLogger(__name__)

cache = caches[CACHES_WEBHDFS_DELEGATION_TOKEN_KEY]


class WebHdfs(Hdfs):
"""
Expand Down Expand Up @@ -212,11 +216,26 @@ def current_trash_path(self, trash_path):
return self.join(trash_path, self.TRASH_CURRENT)

def _getparams(self):
if self._security_enabled:
token = cache.get(self.user, None)
if not token:
token = self.get_delegation_token(self.user)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use the impersonation function to grant permission to each user, you must use a delegation token. (reference: https://blog.cloudera.com/hadoop-delegation-tokens-explained/)

cache.set(self.user, token)
return {'delegation': token}
return {
"user.name": WebHdfs.DEFAULT_USER,
"doas": self.user
}

def get_delegation_token(self, user):
params = {}
params['op'] = 'GETDELEGATIONTOKEN'
params['doas'] = user
params['renewer'] = user
headers = self._getheaders()
res = self._root.get(params=params, headers=headers)
return res['Token'] and res['Token']['urlString']

def _getheaders(self):
return None

Expand Down Expand Up @@ -561,16 +580,6 @@ def read_url(self, path, offset=0, length=None, bufsize=None):
params['length'] = long(length)
if bufsize is not None:
params['bufsize'] = bufsize
if self._security_enabled:
token = self.get_delegation_token(self.user)
if token:
params['delegation'] = token
# doas should not be present with delegation token as the token includes the username
# https://hadoop.apache.org/docs/r1.0.4/webhdfs.html
if 'doas' in params:
del params['doas']
if 'user.name' in params:
del params['user.name']
unquoted_path = urllib_unquote(smart_str(path))
return self._client._make_url(unquoted_path, params)

Expand Down Expand Up @@ -896,19 +905,6 @@ def _get_redirect_url(self, webhdfs_ex):
LOG.exception("Failed to read redirect from response: %s (%s)" % (webhdfs_ex, ex))
raise webhdfs_ex

def get_delegation_token(self, renewer):
"""get_delegation_token(user) -> Delegation token"""
# Workaround for HDFS-3988
if self._security_enabled:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This problem was fixed in hadoop 2.6 version. (reference: https://issues.apache.org/jira/browse/HDFS-3988)

self.get_home_dir()

params = self._getparams()
params['op'] = 'GETDELEGATIONTOKEN'
params['renewer'] = renewer
headers = self._getheaders()
res = self._root.get(params=params, headers=headers)
return res['Token'] and res['Token']['urlString']


def do_as_user(self, username, fn, *args, **kwargs):
prev_user = self.user
Expand Down