From 093e915ebdca74e2e64952cb461baab3ca13c5ae Mon Sep 17 00:00:00 2001 From: "sonia.comp" Date: Mon, 29 May 2023 13:39:38 +0900 Subject: [PATCH 01/16] [Filebrowser] Add get delegation token logic for secure hadoop (#3301) --- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 67 +++++++++++++++++--- 1 file changed, 57 insertions(+), 10 deletions(-) diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index 8da3a90425f..ef53232c707 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -195,6 +195,8 @@ def trash_path(self, path=None): if not path: path = home_dir params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'GETTRASHROOT' headers = self._getheaders() @@ -268,6 +270,8 @@ def listdir_stats(self, path, glob=None): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) if glob is not None: params['filter'] = glob params['op'] = 'LISTSTATUS' @@ -291,6 +295,8 @@ def get_content_summary(self, path): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'GETCONTENTSUMMARY' headers = self._getheaders() json = self._root.get(path, params, headers) @@ -301,6 +307,8 @@ def _stats(self, path): """This version of stats returns None if the entry is not found""" path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'GETFILESTATUS' headers = self._getheaders() try: @@ -383,6 +391,8 @@ def _delete(self, path, recursive=False): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'DELETE' params['recursive'] = recursive and 'true' or 'false' headers = self._getheaders() @@ -453,6 +463,8 @@ def mkdir(self, path, mode=None): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'MKDIRS' headers = self._getheaders() if mode is None: @@ -470,6 +482,8 @@ def rename(self, old, new): new = Hdfs.join(Hdfs.dirname(old), new) new = self.strip_normpath(new) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'RENAME' # Encode `new' because it's in the params params['destination'] = smart_str(new) @@ -493,6 +507,8 @@ def rename_star(self, old_dir, new_dir): def set_replication(self, filename, repl_factor): """set replication factor(filename, repl_factor)""" params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'SETREPLICATION' params['replication'] = repl_factor headers = self._getheaders() @@ -508,6 +524,8 @@ def chown(self, path, user=None, group=None, recursive=False): params['owner'] = user if group is not None: params['group'] = group + if self._security_enabled: + params = self._set_params_with_delegation_token(params) headers = self._getheaders() if recursive: for xpath in self.listdir_recursive(path): @@ -524,6 +542,8 @@ def chmod(self, path, mode, recursive=False): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'SETPERMISSION' params['permission'] = safe_octal(mode) headers = self._getheaders() @@ -540,6 +560,7 @@ def get_home_dir(self): params['op'] = 'GETHOMEDIRECTORY' headers = self._getheaders() res = self._root.get(params=params, headers=headers) + print(res) for key, value in res.items(): if key.lower() == "path": return self.normpath(value) @@ -555,22 +576,14 @@ def read_url(self, path, offset=0, length=None, bufsize=None): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'OPEN' params['offset'] = long(offset) if length is not None: params['length'] = long(length) if bufsize is not None: params['bufsize'] = bufsize - if self._security_enabled: - token = self.get_delegation_token(self.user) - if token: - params['delegation'] = token - # doas should not be present with delegation token as the token includes the username - # https://hadoop.apache.org/docs/r1.0.4/webhdfs.html - if 'doas' in params: - del params['doas'] - if 'user.name' in params: - del params['user.name'] unquoted_path = urllib_unquote(smart_str(path)) return self._client._make_url(unquoted_path, params) @@ -582,6 +595,8 @@ def read(self, path, offset, length, bufsize=None): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'OPEN' params['offset'] = long(offset) params['length'] = long(length) @@ -624,6 +639,8 @@ def create(self, path, overwrite=False, blocksize=None, replication=None, permis """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'CREATE' params['overwrite'] = overwrite and 'true' or 'false' if blocksize is not None: @@ -645,6 +662,8 @@ def append(self, path, data): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'APPEND' headers = self._getheaders() self._invoke_with_redirect('POST', path, params, data, headers) @@ -654,6 +673,8 @@ def append(self, path, data): def modify_acl_entries(self, path, aclspec): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'MODIFYACLENTRIES' params['aclspec'] = aclspec headers = self._getheaders() @@ -663,6 +684,8 @@ def modify_acl_entries(self, path, aclspec): def remove_acl_entries(self, path, aclspec): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'REMOVEACLENTRIES' params['aclspec'] = aclspec headers = self._getheaders() @@ -672,6 +695,8 @@ def remove_acl_entries(self, path, aclspec): def remove_default_acl(self, path): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'REMOVEDEFAULTACL' headers = self._getheaders() return self._root.put(path, params, headers=headers) @@ -680,6 +705,8 @@ def remove_default_acl(self, path): def remove_acl(self, path): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'REMOVEACL' headers = self._getheaders() return self._root.put(path, params, headers=headers) @@ -688,6 +715,8 @@ def remove_acl(self, path): def set_acl(self, path, aclspec): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'SETACL' params['aclspec'] = aclspec headers = self._getheaders() @@ -697,6 +726,8 @@ def set_acl(self, path, aclspec): def get_acl_status(self, path): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'GETACLSTATUS' headers = self._getheaders() return self._root.get(path, params, headers=headers) @@ -705,6 +736,8 @@ def get_acl_status(self, path): def check_access(self, path, aclspec='rw-'): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'CHECKACCESS' params['fsaction'] = aclspec headers = self._getheaders() @@ -896,6 +929,20 @@ def _get_redirect_url(self, webhdfs_ex): LOG.exception("Failed to read redirect from response: %s (%s)" % (webhdfs_ex, ex)) raise webhdfs_ex + + def _set_params_with_delegation_token(self, params): + token = self.get_delegation_token(self.user) + if token: + params['delegation'] = token + # doas should not be present with delegation token as the token includes the username + # https://hadoop.apache.org/docs/r1.0.4/webhdfs.html + if 'doas' in params: + del params['doas'] + if 'user.name' in params: + del params['user.name'] + return params + + def get_delegation_token(self, renewer): """get_delegation_token(user) -> Delegation token""" # Workaround for HDFS-3988 From c364644977e60698411bb8d4db6ade423a0b1544 Mon Sep 17 00:00:00 2001 From: "sonia.comp" Date: Mon, 29 May 2023 14:00:21 +0900 Subject: [PATCH 02/16] [Filebrowser] Add get delegation token caching logic for reducing NameNode traffic (#3301) --- desktop/core/base_requirements.txt | 1 + desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/desktop/core/base_requirements.txt b/desktop/core/base_requirements.txt index 5e36925a1ee..25557c932e5 100644 --- a/desktop/core/base_requirements.txt +++ b/desktop/core/base_requirements.txt @@ -3,6 +3,7 @@ requests-gssapi==1.2.3 asn1crypto==0.24.0 avro-python3==1.8.2 Babel==2.9.1 +cachetools==5.3.1 celery[redis]==5.2.7 cffi==1.15.0 channels==3.0.3 diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index ef53232c707..8fbe6051645 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -23,6 +23,8 @@ standard_library.install_aliases() from builtins import oct from builtins import object +from cachetools.func import ttl_cache +from datetime import datetime, timedelta import errno import logging import posixpath @@ -942,7 +944,7 @@ def _set_params_with_delegation_token(self, params): del params['user.name'] return params - + @ttl_cache(maxsize=128, ttl=timedelta(hours=8), timer=datetime.now, typed=False) def get_delegation_token(self, renewer): """get_delegation_token(user) -> Delegation token""" # Workaround for HDFS-3988 From c1b513facec66ca169c852312eda2893c148ba1a Mon Sep 17 00:00:00 2001 From: "sonia.comp" Date: Tue, 30 May 2023 16:05:40 +0900 Subject: [PATCH 03/16] [Filebrowser] Recache the delegation token for each user (#3301) --- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index 8fbe6051645..0b563840b62 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -933,7 +933,12 @@ def _get_redirect_url(self, webhdfs_ex): def _set_params_with_delegation_token(self, params): - token = self.get_delegation_token(self.user) + try: + token = self.get_delegation_token(self.user) + except: + with self.get_delegation_token.cache_lock: + self.get_delegation_token.cache.pop(self.get_delegation_token.cache_key(self.user), None) + token = self.get_delegation_token(self.user) if token: params['delegation'] = token # doas should not be present with delegation token as the token includes the username @@ -944,7 +949,9 @@ def _set_params_with_delegation_token(self, params): del params['user.name'] return params - @ttl_cache(maxsize=128, ttl=timedelta(hours=8), timer=datetime.now, typed=False) + # a renew-interval (default is 24 hours) + # https://blog.cloudera.com/hadoop-delegation-tokens-explained/ + @ttl_cache(maxsize=128, ttl=timedelta(hours=24), timer=datetime.now, typed=False) def get_delegation_token(self, renewer): """get_delegation_token(user) -> Delegation token""" # Workaround for HDFS-3988 From 13cee888afeec37adabb66fb4a1cc3c45384e801 Mon Sep 17 00:00:00 2001 From: "sonia.comp" Date: Wed, 31 May 2023 09:15:44 +0900 Subject: [PATCH 04/16] [Filebrowser] Merge _set_params_with_delegation_token into _getparams (#3301) --- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 112 ++++++------------- 1 file changed, 32 insertions(+), 80 deletions(-) diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index 0b563840b62..0da24b9335d 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -197,8 +197,6 @@ def trash_path(self, path=None): if not path: path = home_dir params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'GETTRASHROOT' headers = self._getheaders() @@ -216,10 +214,38 @@ def current_trash_path(self, trash_path): return self.join(trash_path, self.TRASH_CURRENT) def _getparams(self): - return { - "user.name": WebHdfs.DEFAULT_USER, - "doas": self.user - } + if self._security_enabled: + try: + token = self.get_delegation_token(self.user) + except: + with self.get_delegation_token.cache_lock: + self.get_delegation_token.cache.pop(self.get_delegation_token.cache_key(self.user), None) + token = self.get_delegation_token(self.user) + return { + "delegation": token + } + + else: + return { + "user.name": WebHdfs.DEFAULT_USER, + "doas": self.user + } + + # a renew-interval (default is 24 hours) + # https://blog.cloudera.com/hadoop-delegation-tokens-explained/ + @ttl_cache(maxsize=128, ttl=timedelta(hours=24), timer=datetime.now, typed=False) + def get_delegation_token(self, renewer): + """get_delegation_token(user) -> Delegation token""" + # Workaround for HDFS-3988 + if self._security_enabled: + self.get_home_dir() + + params = self._getparams() + params['op'] = 'GETDELEGATIONTOKEN' + params['renewer'] = renewer + headers = self._getheaders() + res = self._root.get(params=params, headers=headers) + return res['Token'] and res['Token']['urlString'] def _getheaders(self): return None @@ -272,8 +298,6 @@ def listdir_stats(self, path, glob=None): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) if glob is not None: params['filter'] = glob params['op'] = 'LISTSTATUS' @@ -297,8 +321,6 @@ def get_content_summary(self, path): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'GETCONTENTSUMMARY' headers = self._getheaders() json = self._root.get(path, params, headers) @@ -309,8 +331,6 @@ def _stats(self, path): """This version of stats returns None if the entry is not found""" path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'GETFILESTATUS' headers = self._getheaders() try: @@ -393,8 +413,6 @@ def _delete(self, path, recursive=False): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'DELETE' params['recursive'] = recursive and 'true' or 'false' headers = self._getheaders() @@ -465,8 +483,6 @@ def mkdir(self, path, mode=None): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'MKDIRS' headers = self._getheaders() if mode is None: @@ -484,8 +500,6 @@ def rename(self, old, new): new = Hdfs.join(Hdfs.dirname(old), new) new = self.strip_normpath(new) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'RENAME' # Encode `new' because it's in the params params['destination'] = smart_str(new) @@ -509,8 +523,6 @@ def rename_star(self, old_dir, new_dir): def set_replication(self, filename, repl_factor): """set replication factor(filename, repl_factor)""" params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'SETREPLICATION' params['replication'] = repl_factor headers = self._getheaders() @@ -526,8 +538,6 @@ def chown(self, path, user=None, group=None, recursive=False): params['owner'] = user if group is not None: params['group'] = group - if self._security_enabled: - params = self._set_params_with_delegation_token(params) headers = self._getheaders() if recursive: for xpath in self.listdir_recursive(path): @@ -544,8 +554,6 @@ def chmod(self, path, mode, recursive=False): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'SETPERMISSION' params['permission'] = safe_octal(mode) headers = self._getheaders() @@ -578,8 +586,6 @@ def read_url(self, path, offset=0, length=None, bufsize=None): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'OPEN' params['offset'] = long(offset) if length is not None: @@ -597,8 +603,6 @@ def read(self, path, offset, length, bufsize=None): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'OPEN' params['offset'] = long(offset) params['length'] = long(length) @@ -641,8 +645,6 @@ def create(self, path, overwrite=False, blocksize=None, replication=None, permis """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'CREATE' params['overwrite'] = overwrite and 'true' or 'false' if blocksize is not None: @@ -664,8 +666,6 @@ def append(self, path, data): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'APPEND' headers = self._getheaders() self._invoke_with_redirect('POST', path, params, data, headers) @@ -675,8 +675,6 @@ def append(self, path, data): def modify_acl_entries(self, path, aclspec): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'MODIFYACLENTRIES' params['aclspec'] = aclspec headers = self._getheaders() @@ -686,8 +684,6 @@ def modify_acl_entries(self, path, aclspec): def remove_acl_entries(self, path, aclspec): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'REMOVEACLENTRIES' params['aclspec'] = aclspec headers = self._getheaders() @@ -697,8 +693,6 @@ def remove_acl_entries(self, path, aclspec): def remove_default_acl(self, path): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'REMOVEDEFAULTACL' headers = self._getheaders() return self._root.put(path, params, headers=headers) @@ -707,8 +701,6 @@ def remove_default_acl(self, path): def remove_acl(self, path): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'REMOVEACL' headers = self._getheaders() return self._root.put(path, params, headers=headers) @@ -717,8 +709,6 @@ def remove_acl(self, path): def set_acl(self, path, aclspec): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'SETACL' params['aclspec'] = aclspec headers = self._getheaders() @@ -728,8 +718,6 @@ def set_acl(self, path, aclspec): def get_acl_status(self, path): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'GETACLSTATUS' headers = self._getheaders() return self._root.get(path, params, headers=headers) @@ -738,8 +726,6 @@ def get_acl_status(self, path): def check_access(self, path, aclspec='rw-'): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'CHECKACCESS' params['fsaction'] = aclspec headers = self._getheaders() @@ -932,40 +918,6 @@ def _get_redirect_url(self, webhdfs_ex): raise webhdfs_ex - def _set_params_with_delegation_token(self, params): - try: - token = self.get_delegation_token(self.user) - except: - with self.get_delegation_token.cache_lock: - self.get_delegation_token.cache.pop(self.get_delegation_token.cache_key(self.user), None) - token = self.get_delegation_token(self.user) - if token: - params['delegation'] = token - # doas should not be present with delegation token as the token includes the username - # https://hadoop.apache.org/docs/r1.0.4/webhdfs.html - if 'doas' in params: - del params['doas'] - if 'user.name' in params: - del params['user.name'] - return params - - # a renew-interval (default is 24 hours) - # https://blog.cloudera.com/hadoop-delegation-tokens-explained/ - @ttl_cache(maxsize=128, ttl=timedelta(hours=24), timer=datetime.now, typed=False) - def get_delegation_token(self, renewer): - """get_delegation_token(user) -> Delegation token""" - # Workaround for HDFS-3988 - if self._security_enabled: - self.get_home_dir() - - params = self._getparams() - params['op'] = 'GETDELEGATIONTOKEN' - params['renewer'] = renewer - headers = self._getheaders() - res = self._root.get(params=params, headers=headers) - return res['Token'] and res['Token']['urlString'] - - def do_as_user(self, username, fn, *args, **kwargs): prev_user = self.user try: From 9f8fa22d7cd60dce766641f3bc6f753647c2eb84 Mon Sep 17 00:00:00 2001 From: Sonia Park Date: Sat, 17 Jun 2023 13:48:05 +0900 Subject: [PATCH 05/16] Update webhdfs.py (fix bug) fix recursive call of delegation token --- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index 0da24b9335d..68dcf3a4a94 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -236,11 +236,9 @@ def _getparams(self): @ttl_cache(maxsize=128, ttl=timedelta(hours=24), timer=datetime.now, typed=False) def get_delegation_token(self, renewer): """get_delegation_token(user) -> Delegation token""" - # Workaround for HDFS-3988 - if self._security_enabled: - self.get_home_dir() - - params = self._getparams() + params = { + "doas": self.user + } params['op'] = 'GETDELEGATIONTOKEN' params['renewer'] = renewer headers = self._getheaders() From 15c35c59bcc2572daa86fd64ca129440d66473da Mon Sep 17 00:00:00 2001 From: Sonia Park Date: Sun, 18 Jun 2023 23:12:06 +0900 Subject: [PATCH 06/16] delete print(res) and fix get_params logic --- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index 68dcf3a4a94..0a9f40c96b7 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -236,9 +236,7 @@ def _getparams(self): @ttl_cache(maxsize=128, ttl=timedelta(hours=24), timer=datetime.now, typed=False) def get_delegation_token(self, renewer): """get_delegation_token(user) -> Delegation token""" - params = { - "doas": self.user - } + params = self._getparams() params['op'] = 'GETDELEGATIONTOKEN' params['renewer'] = renewer headers = self._getheaders() @@ -568,7 +566,6 @@ def get_home_dir(self): params['op'] = 'GETHOMEDIRECTORY' headers = self._getheaders() res = self._root.get(params=params, headers=headers) - print(res) for key, value in res.items(): if key.lower() == "path": return self.normpath(value) @@ -642,7 +639,8 @@ def create(self, path, overwrite=False, blocksize=None, replication=None, permis `permission' should be an octal integer or string. """ path = self.strip_normpath(path) - params = self._getparams() + + params['op'] = 'CREATE' params['overwrite'] = overwrite and 'true' or 'false' if blocksize is not None: From 85f064bbc93a1836d18029919fb3d602f1709371 Mon Sep 17 00:00:00 2001 From: "sonia.comp" Date: Wed, 30 Aug 2023 17:14:49 +0900 Subject: [PATCH 07/16] fix bug --- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index 0a9f40c96b7..5b40fcfe73d 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -236,7 +236,13 @@ def _getparams(self): @ttl_cache(maxsize=128, ttl=timedelta(hours=24), timer=datetime.now, typed=False) def get_delegation_token(self, renewer): """get_delegation_token(user) -> Delegation token""" - params = self._getparams() + params = {} + # Workaround for HDFS-3988 + if self._security_enabled: + self.get_home_dir() + params["doas"] = self.user + else: + params = self._getparams() params['op'] = 'GETDELEGATIONTOKEN' params['renewer'] = renewer headers = self._getheaders() From b423dd886a824bd3bdb771c1f27d098373a17ecf Mon Sep 17 00:00:00 2001 From: "sonia.comp" Date: Mon, 18 Dec 2023 02:40:11 +0900 Subject: [PATCH 08/16] add _security_enabled option to each functions --- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 113 ++++++++++++------- 1 file changed, 75 insertions(+), 38 deletions(-) diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index 5b40fcfe73d..a20c6c22d25 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -197,6 +197,8 @@ def trash_path(self, path=None): if not path: path = home_dir params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'GETTRASHROOT' headers = self._getheaders() @@ -214,40 +216,10 @@ def current_trash_path(self, trash_path): return self.join(trash_path, self.TRASH_CURRENT) def _getparams(self): - if self._security_enabled: - try: - token = self.get_delegation_token(self.user) - except: - with self.get_delegation_token.cache_lock: - self.get_delegation_token.cache.pop(self.get_delegation_token.cache_key(self.user), None) - token = self.get_delegation_token(self.user) - return { - "delegation": token - } - - else: - return { - "user.name": WebHdfs.DEFAULT_USER, - "doas": self.user - } - - # a renew-interval (default is 24 hours) - # https://blog.cloudera.com/hadoop-delegation-tokens-explained/ - @ttl_cache(maxsize=128, ttl=timedelta(hours=24), timer=datetime.now, typed=False) - def get_delegation_token(self, renewer): - """get_delegation_token(user) -> Delegation token""" - params = {} - # Workaround for HDFS-3988 - if self._security_enabled: - self.get_home_dir() - params["doas"] = self.user - else: - params = self._getparams() - params['op'] = 'GETDELEGATIONTOKEN' - params['renewer'] = renewer - headers = self._getheaders() - res = self._root.get(params=params, headers=headers) - return res['Token'] and res['Token']['urlString'] + return { + "user.name": WebHdfs.DEFAULT_USER, + "doas": self.user + } def _getheaders(self): return None @@ -300,6 +272,8 @@ def listdir_stats(self, path, glob=None): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) if glob is not None: params['filter'] = glob params['op'] = 'LISTSTATUS' @@ -323,6 +297,8 @@ def get_content_summary(self, path): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'GETCONTENTSUMMARY' headers = self._getheaders() json = self._root.get(path, params, headers) @@ -333,6 +309,8 @@ def _stats(self, path): """This version of stats returns None if the entry is not found""" path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'GETFILESTATUS' headers = self._getheaders() try: @@ -415,6 +393,8 @@ def _delete(self, path, recursive=False): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'DELETE' params['recursive'] = recursive and 'true' or 'false' headers = self._getheaders() @@ -485,6 +465,8 @@ def mkdir(self, path, mode=None): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'MKDIRS' headers = self._getheaders() if mode is None: @@ -502,6 +484,8 @@ def rename(self, old, new): new = Hdfs.join(Hdfs.dirname(old), new) new = self.strip_normpath(new) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'RENAME' # Encode `new' because it's in the params params['destination'] = smart_str(new) @@ -525,6 +509,8 @@ def rename_star(self, old_dir, new_dir): def set_replication(self, filename, repl_factor): """set replication factor(filename, repl_factor)""" params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'SETREPLICATION' params['replication'] = repl_factor headers = self._getheaders() @@ -540,6 +526,8 @@ def chown(self, path, user=None, group=None, recursive=False): params['owner'] = user if group is not None: params['group'] = group + if self._security_enabled: + params = self._set_params_with_delegation_token(params) headers = self._getheaders() if recursive: for xpath in self.listdir_recursive(path): @@ -556,6 +544,8 @@ def chmod(self, path, mode, recursive=False): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'SETPERMISSION' params['permission'] = safe_octal(mode) headers = self._getheaders() @@ -587,6 +577,8 @@ def read_url(self, path, offset=0, length=None, bufsize=None): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'OPEN' params['offset'] = long(offset) if length is not None: @@ -604,6 +596,8 @@ def read(self, path, offset, length, bufsize=None): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'OPEN' params['offset'] = long(offset) params['length'] = long(length) @@ -645,8 +639,9 @@ def create(self, path, overwrite=False, blocksize=None, replication=None, permis `permission' should be an octal integer or string. """ path = self.strip_normpath(path) - - + params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'CREATE' params['overwrite'] = overwrite and 'true' or 'false' if blocksize is not None: @@ -668,6 +663,8 @@ def append(self, path, data): """ path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'APPEND' headers = self._getheaders() self._invoke_with_redirect('POST', path, params, data, headers) @@ -677,6 +674,8 @@ def append(self, path, data): def modify_acl_entries(self, path, aclspec): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'MODIFYACLENTRIES' params['aclspec'] = aclspec headers = self._getheaders() @@ -686,6 +685,8 @@ def modify_acl_entries(self, path, aclspec): def remove_acl_entries(self, path, aclspec): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'REMOVEACLENTRIES' params['aclspec'] = aclspec headers = self._getheaders() @@ -695,6 +696,8 @@ def remove_acl_entries(self, path, aclspec): def remove_default_acl(self, path): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'REMOVEDEFAULTACL' headers = self._getheaders() return self._root.put(path, params, headers=headers) @@ -703,6 +706,8 @@ def remove_default_acl(self, path): def remove_acl(self, path): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'REMOVEACL' headers = self._getheaders() return self._root.put(path, params, headers=headers) @@ -711,6 +716,8 @@ def remove_acl(self, path): def set_acl(self, path, aclspec): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'SETACL' params['aclspec'] = aclspec headers = self._getheaders() @@ -720,6 +727,8 @@ def set_acl(self, path, aclspec): def get_acl_status(self, path): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'GETACLSTATUS' headers = self._getheaders() return self._root.get(path, params, headers=headers) @@ -728,6 +737,8 @@ def get_acl_status(self, path): def check_access(self, path, aclspec='rw-'): path = self.strip_normpath(path) params = self._getparams() + if self._security_enabled: + params = self._set_params_with_delegation_token(params) params['op'] = 'CHECKACCESS' params['fsaction'] = aclspec headers = self._getheaders() @@ -789,8 +800,7 @@ def copy_remote_dir(self, source, destination, dir_mode=None, owner=None): if dir_mode is None: dir_mode = self.getDefaultDirPerms() - if not self.exists(destination): - self.do_as_user(owner, self.mkdir, destination, mode=dir_mode) + self.do_as_user(owner, self.mkdir, destination, mode=dir_mode) for stat in self.listdir_stats(source): source_file = stat.path @@ -920,6 +930,33 @@ def _get_redirect_url(self, webhdfs_ex): raise webhdfs_ex + def _set_params_with_delegation_token(self, params): + token = self.get_delegation_token(self.user) + if token: + params['delegation'] = token + # doas should not be present with delegation token as the token includes the username + # https://hadoop.apache.org/docs/r1.0.4/webhdfs.html + if 'doas' in params: + del params['doas'] + if 'user.name' in params: + del params['user.name'] + return params + + + @ttl_cache(maxsize=128, ttl=timedelta(hours=8), timer=datetime.now, typed=False) + def get_delegation_token(self, renewer): + """get_delegation_token(user) -> Delegation token""" + # Workaround for HDFS-3988 + if self._security_enabled: + self.get_home_dir() + params = self._getparams() + params['op'] = 'GETDELEGATIONTOKEN' + params['renewer'] = renewer + headers = self._getheaders() + res = self._root.get(params=params, headers=headers) + return res['Token'] and res['Token']['urlString'] + + def do_as_user(self, username, fn, *args, **kwargs): prev_user = self.user try: From 33251c75d9942d9dd5838ef50ac981892e444e4d Mon Sep 17 00:00:00 2001 From: Sonia Park Date: Mon, 18 Dec 2023 02:47:25 +0900 Subject: [PATCH 09/16] change cachetools to django core caches (thread safe) --- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index a20c6c22d25..1f69dc0b1e9 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -23,7 +23,6 @@ standard_library.install_aliases() from builtins import oct from builtins import object -from cachetools.func import ttl_cache from datetime import datetime, timedelta import errno import logging @@ -34,6 +33,7 @@ import time import urllib.request, urllib.error +from django.core.cache import caches from django.utils.encoding import smart_str import hadoop.conf @@ -63,6 +63,8 @@ LOG = logging.getLogger(__name__) +cache = caches["webhdfs_delegation_token"] + class WebHdfs(Hdfs): """ @@ -931,7 +933,10 @@ def _get_redirect_url(self, webhdfs_ex): def _set_params_with_delegation_token(self, params): - token = self.get_delegation_token(self.user) + token = cache.get(self.user, None) + if not token: + token = self.get_delegation_token(self.user) + cache.set(self.user, token, desktop.conf.KERBEROS.REINIT_FREQUENCY) if token: params['delegation'] = token # doas should not be present with delegation token as the token includes the username @@ -943,7 +948,6 @@ def _set_params_with_delegation_token(self, params): return params - @ttl_cache(maxsize=128, ttl=timedelta(hours=8), timer=datetime.now, typed=False) def get_delegation_token(self, renewer): """get_delegation_token(user) -> Delegation token""" # Workaround for HDFS-3988 From 24dcbd361d6e4e26dc435f4d0be957b6b9f26cd2 Mon Sep 17 00:00:00 2001 From: Sonia Park Date: Mon, 18 Dec 2023 03:16:43 +0900 Subject: [PATCH 10/16] fix bug --- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 83 ++++---------------- 1 file changed, 14 insertions(+), 69 deletions(-) diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index 1f69dc0b1e9..4924969de4d 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -218,11 +218,25 @@ def current_trash_path(self, trash_path): return self.join(trash_path, self.TRASH_CURRENT) def _getparams(self): + if self._security_enabled: + token = cache.get(self.user, None) + if not token: + token = self.get_delegation_token(self.user) + cache.set(self.user, token, desktop.conf.KERBEROS.REINIT_FREQUENCY) + return {'delegation': token} return { "user.name": WebHdfs.DEFAULT_USER, "doas": self.user } + def get_delegation_token(self, user): + params['op'] = 'GETDELEGATIONTOKEN' + params['doas'] = user + params['renewer'] = user + headers = self._getheaders() + res = self._root.get(params=params, headers=headers) + return res['Token'] and res['Token']['urlString'] + def _getheaders(self): return None @@ -274,8 +288,6 @@ def listdir_stats(self, path, glob=None): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) if glob is not None: params['filter'] = glob params['op'] = 'LISTSTATUS' @@ -299,8 +311,6 @@ def get_content_summary(self, path): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'GETCONTENTSUMMARY' headers = self._getheaders() json = self._root.get(path, params, headers) @@ -311,8 +321,6 @@ def _stats(self, path): """This version of stats returns None if the entry is not found""" path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'GETFILESTATUS' headers = self._getheaders() try: @@ -395,8 +403,6 @@ def _delete(self, path, recursive=False): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'DELETE' params['recursive'] = recursive and 'true' or 'false' headers = self._getheaders() @@ -467,8 +473,6 @@ def mkdir(self, path, mode=None): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'MKDIRS' headers = self._getheaders() if mode is None: @@ -486,8 +490,6 @@ def rename(self, old, new): new = Hdfs.join(Hdfs.dirname(old), new) new = self.strip_normpath(new) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'RENAME' # Encode `new' because it's in the params params['destination'] = smart_str(new) @@ -511,8 +513,6 @@ def rename_star(self, old_dir, new_dir): def set_replication(self, filename, repl_factor): """set replication factor(filename, repl_factor)""" params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'SETREPLICATION' params['replication'] = repl_factor headers = self._getheaders() @@ -528,8 +528,6 @@ def chown(self, path, user=None, group=None, recursive=False): params['owner'] = user if group is not None: params['group'] = group - if self._security_enabled: - params = self._set_params_with_delegation_token(params) headers = self._getheaders() if recursive: for xpath in self.listdir_recursive(path): @@ -546,8 +544,6 @@ def chmod(self, path, mode, recursive=False): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'SETPERMISSION' params['permission'] = safe_octal(mode) headers = self._getheaders() @@ -579,8 +575,6 @@ def read_url(self, path, offset=0, length=None, bufsize=None): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'OPEN' params['offset'] = long(offset) if length is not None: @@ -598,8 +592,6 @@ def read(self, path, offset, length, bufsize=None): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'OPEN' params['offset'] = long(offset) params['length'] = long(length) @@ -642,8 +634,6 @@ def create(self, path, overwrite=False, blocksize=None, replication=None, permis """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'CREATE' params['overwrite'] = overwrite and 'true' or 'false' if blocksize is not None: @@ -665,8 +655,6 @@ def append(self, path, data): """ path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'APPEND' headers = self._getheaders() self._invoke_with_redirect('POST', path, params, data, headers) @@ -676,8 +664,6 @@ def append(self, path, data): def modify_acl_entries(self, path, aclspec): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'MODIFYACLENTRIES' params['aclspec'] = aclspec headers = self._getheaders() @@ -687,8 +673,6 @@ def modify_acl_entries(self, path, aclspec): def remove_acl_entries(self, path, aclspec): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'REMOVEACLENTRIES' params['aclspec'] = aclspec headers = self._getheaders() @@ -698,8 +682,6 @@ def remove_acl_entries(self, path, aclspec): def remove_default_acl(self, path): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'REMOVEDEFAULTACL' headers = self._getheaders() return self._root.put(path, params, headers=headers) @@ -708,8 +690,6 @@ def remove_default_acl(self, path): def remove_acl(self, path): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'REMOVEACL' headers = self._getheaders() return self._root.put(path, params, headers=headers) @@ -718,8 +698,6 @@ def remove_acl(self, path): def set_acl(self, path, aclspec): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'SETACL' params['aclspec'] = aclspec headers = self._getheaders() @@ -729,8 +707,6 @@ def set_acl(self, path, aclspec): def get_acl_status(self, path): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'GETACLSTATUS' headers = self._getheaders() return self._root.get(path, params, headers=headers) @@ -739,8 +715,6 @@ def get_acl_status(self, path): def check_access(self, path, aclspec='rw-'): path = self.strip_normpath(path) params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'CHECKACCESS' params['fsaction'] = aclspec headers = self._getheaders() @@ -932,35 +906,6 @@ def _get_redirect_url(self, webhdfs_ex): raise webhdfs_ex - def _set_params_with_delegation_token(self, params): - token = cache.get(self.user, None) - if not token: - token = self.get_delegation_token(self.user) - cache.set(self.user, token, desktop.conf.KERBEROS.REINIT_FREQUENCY) - if token: - params['delegation'] = token - # doas should not be present with delegation token as the token includes the username - # https://hadoop.apache.org/docs/r1.0.4/webhdfs.html - if 'doas' in params: - del params['doas'] - if 'user.name' in params: - del params['user.name'] - return params - - - def get_delegation_token(self, renewer): - """get_delegation_token(user) -> Delegation token""" - # Workaround for HDFS-3988 - if self._security_enabled: - self.get_home_dir() - params = self._getparams() - params['op'] = 'GETDELEGATIONTOKEN' - params['renewer'] = renewer - headers = self._getheaders() - res = self._root.get(params=params, headers=headers) - return res['Token'] and res['Token']['urlString'] - - def do_as_user(self, username, fn, *args, **kwargs): prev_user = self.user try: From c54b602281e10344709e98d4e2b440bf05aa1170 Mon Sep 17 00:00:00 2001 From: Sonia Park Date: Mon, 18 Dec 2023 03:17:20 +0900 Subject: [PATCH 11/16] remove cachetools --- desktop/core/base_requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/desktop/core/base_requirements.txt b/desktop/core/base_requirements.txt index 25557c932e5..5e36925a1ee 100644 --- a/desktop/core/base_requirements.txt +++ b/desktop/core/base_requirements.txt @@ -3,7 +3,6 @@ requests-gssapi==1.2.3 asn1crypto==0.24.0 avro-python3==1.8.2 Babel==2.9.1 -cachetools==5.3.1 celery[redis]==5.2.7 cffi==1.15.0 channels==3.0.3 From aa427138c7600ecb63c5f6efc336c8bce8684341 Mon Sep 17 00:00:00 2001 From: Sonia Park Date: Mon, 18 Dec 2023 03:18:14 +0900 Subject: [PATCH 12/16] fix bug --- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index 4924969de4d..b35d0e8b9bc 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -199,8 +199,6 @@ def trash_path(self, path=None): if not path: path = home_dir params = self._getparams() - if self._security_enabled: - params = self._set_params_with_delegation_token(params) params['op'] = 'GETTRASHROOT' headers = self._getheaders() From a595d102641031fbcc9be7596b81c6dd3e197b64 Mon Sep 17 00:00:00 2001 From: Sonia Park Date: Mon, 18 Dec 2023 03:21:40 +0900 Subject: [PATCH 13/16] fix bug --- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index b35d0e8b9bc..c3ff5012f81 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -774,7 +774,8 @@ def copy_remote_dir(self, source, destination, dir_mode=None, owner=None): if dir_mode is None: dir_mode = self.getDefaultDirPerms() - self.do_as_user(owner, self.mkdir, destination, mode=dir_mode) + if not self.exists(destination): + self.do_as_user(owner, self.mkdir, destination, mode=dir_mode) for stat in self.listdir_stats(source): source_file = stat.path From 4463134924925743718ec57e6cb24ac89449477f Mon Sep 17 00:00:00 2001 From: Sonia Park Date: Fri, 5 Jan 2024 11:07:15 +0900 Subject: [PATCH 14/16] [webhdfs] add django cache (fix bug) --- desktop/core/src/desktop/settings.py | 5 +++++ desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/desktop/core/src/desktop/settings.py b/desktop/core/src/desktop/settings.py index 183573dcced..ffce3914f53 100644 --- a/desktop/core/src/desktop/settings.py +++ b/desktop/core/src/desktop/settings.py @@ -452,6 +452,11 @@ 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', 'LOCATION': CACHES_HIVE_DISCOVERY_KEY } +CACHES_WEBHDFS_DELEGATION_TOKEN_KEY = 'webhdfs_delegation_token' +CACHES[CACHES_WEBHDFS_DELEGATION_TOKEN_KEY] = { + 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', + 'LOCATION': CACHES_WEBHDFS_DELEGATION_TOKEN_KEY +} CACHES_CELERY_KEY = 'celery' CACHES_CELERY_QUERY_RESULT_KEY = 'celery_query_results' diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index c3ff5012f81..acc3c3d498a 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -40,6 +40,7 @@ import desktop.conf from desktop.lib.rest import http_client, resource +from desktop.settings import CACHES_WEBHDFS_DELEGATION_TOKEN_KEY from past.builtins import long from hadoop.fs import normpath as fs_normpath, SEEK_SET, SEEK_CUR, SEEK_END from hadoop.fs.hadoopfs import Hdfs @@ -63,7 +64,7 @@ LOG = logging.getLogger(__name__) -cache = caches["webhdfs_delegation_token"] +cache = caches[CACHES_WEBHDFS_DELEGATION_TOKEN_KEY] class WebHdfs(Hdfs): From 5bcb440194d032625c4e89957a45ef7672bee8df Mon Sep 17 00:00:00 2001 From: Sonia Park Date: Fri, 5 Jan 2024 17:29:30 +0900 Subject: [PATCH 15/16] [webhdfs] fix bug --- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index acc3c3d498a..6f13f57b6ef 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -229,6 +229,7 @@ def _getparams(self): } def get_delegation_token(self, user): + params = {} params['op'] = 'GETDELEGATIONTOKEN' params['doas'] = user params['renewer'] = user From 6c4cc1223e61d9381f25aeead12f7b500e86ef80 Mon Sep 17 00:00:00 2001 From: Sonia Park Date: Fri, 5 Jan 2024 20:15:43 +0900 Subject: [PATCH 16/16] [webhdfs] fix bug --- desktop/core/src/desktop/settings.py | 3 ++- desktop/libs/hadoop/src/hadoop/fs/webhdfs.py | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/desktop/core/src/desktop/settings.py b/desktop/core/src/desktop/settings.py index ffce3914f53..3f838206a55 100644 --- a/desktop/core/src/desktop/settings.py +++ b/desktop/core/src/desktop/settings.py @@ -455,7 +455,8 @@ CACHES_WEBHDFS_DELEGATION_TOKEN_KEY = 'webhdfs_delegation_token' CACHES[CACHES_WEBHDFS_DELEGATION_TOKEN_KEY] = { 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', - 'LOCATION': CACHES_WEBHDFS_DELEGATION_TOKEN_KEY + 'LOCATION': CACHES_WEBHDFS_DELEGATION_TOKEN_KEY, + 'TIMEOUT': desktop.conf.KERBEROS.REINIT_FREQUENCY } CACHES_CELERY_KEY = 'celery' diff --git a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py index 6f13f57b6ef..67ff5eed4d7 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py +++ b/desktop/libs/hadoop/src/hadoop/fs/webhdfs.py @@ -37,7 +37,6 @@ from django.utils.encoding import smart_str import hadoop.conf -import desktop.conf from desktop.lib.rest import http_client, resource from desktop.settings import CACHES_WEBHDFS_DELEGATION_TOKEN_KEY @@ -221,7 +220,7 @@ def _getparams(self): token = cache.get(self.user, None) if not token: token = self.get_delegation_token(self.user) - cache.set(self.user, token, desktop.conf.KERBEROS.REINIT_FREQUENCY) + cache.set(self.user, token) return {'delegation': token} return { "user.name": WebHdfs.DEFAULT_USER,