diff --git a/desktop/core/src/desktop/settings.py b/desktop/core/src/desktop/settings.py index aa7f41e89aa..9be548188d9 100644 --- a/desktop/core/src/desktop/settings.py +++ b/desktop/core/src/desktop/settings.py @@ -452,6 +452,12 @@ 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', 'LOCATION': CACHES_HIVE_DISCOVERY_KEY } +CACHES_WEBHDFS_DELEGATION_TOKEN_KEY = 'webhdfs_delegation_token' +CACHES[CACHES_WEBHDFS_DELEGATION_TOKEN_KEY] = { + 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', + 'LOCATION': CACHES_WEBHDFS_DELEGATION_TOKEN_KEY, + 'TIMEOUT': desktop.conf.KERBEROS.REINIT_FREQUENCY +} CACHES_CELERY_KEY = 'celery' CACHES_CELERY_QUERY_RESULT_KEY = 'celery_query_results' diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index 537775f5402..3434c5b9fd9 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -23,6 +23,7 @@ standard_library.install_aliases() from builtins import oct from builtins import object +from datetime import datetime, timedelta import errno import logging import posixpath @@ -32,12 +33,13 @@ import time import urllib.request, urllib.error +from django.core.cache import caches from django.utils.encoding import smart_str import hadoop.conf -import desktop.conf from desktop.lib.rest import http_client, resource +from desktop.settings import CACHES_WEBHDFS_DELEGATION_TOKEN_KEY from past.builtins import long from hadoop.fs import normpath as fs_normpath, SEEK_SET, SEEK_CUR, SEEK_END from hadoop.fs.hadoopfs import Hdfs @@ -61,6 +63,8 @@ LOG = logging.getLogger() +cache = caches[CACHES_WEBHDFS_DELEGATION_TOKEN_KEY] + class WebHdfs(Hdfs): """ @@ -212,11 +216,26 @@ def current_trash_path(self, trash_path): return self.join(trash_path, self.TRASH_CURRENT) def _getparams(self): + if self._security_enabled: + token = cache.get(self.user, None) + if not token: + token = self.get_delegation_token(self.user) + cache.set(self.user, token) + return {'delegation': token} return { "user.name": WebHdfs.DEFAULT_USER, "doas": self.user } + def get_delegation_token(self, user): + params = {} + params['op'] = 'GETDELEGATIONTOKEN' + params['doas'] = user + params['renewer'] = user + headers = self._getheaders() + res = self._root.get(params=params, headers=headers) + return res['Token'] and res['Token']['urlString'] + def _getheaders(self): return None @@ -561,16 +580,6 @@ def read_url(self, path, offset=0, length=None, bufsize=None): params['length'] = long(length) if bufsize is not None: params['bufsize'] = bufsize - if self._security_enabled: - token = self.get_delegation_token(self.user) - if token: - params['delegation'] = token - # doas should not be present with delegation token as the token includes the username - # https://hadoop.apache.org/docs/r1.0.4/webhdfs.html - if 'doas' in params: - del params['doas'] - if 'user.name' in params: - del params['user.name'] unquoted_path = urllib_unquote(smart_str(path)) return self._client._make_url(unquoted_path, params) @@ -896,19 +905,6 @@ def _get_redirect_url(self, webhdfs_ex): LOG.exception("Failed to read redirect from response: %s (%s)" % (webhdfs_ex, ex)) raise webhdfs_ex - def get_delegation_token(self, renewer): - """get_delegation_token(user) -> Delegation token""" - # Workaround for HDFS-3988 - if self._security_enabled: - self.get_home_dir() - - params = self._getparams() - params['op'] = 'GETDELEGATIONTOKEN' - params['renewer'] = renewer - headers = self._getheaders() - res = self._root.get(params=params, headers=headers) - return res['Token'] and res['Token']['urlString'] - def do_as_user(self, username, fn, *args, **kwargs): prev_user = self.user