Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修改def extract_zip(file, password, extract_full_path)解决文件夹乱码以及多层压缩无法解压的问题 #6

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
15 changes: 15 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"version": "0.2.0",
"configurations": [

{
"name": "Python: Debug convert.py",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/corpus_processing/extract.py",
"args": ["--folder_path","/Volumes/Elements/AITest/test/11/123", "--passwords_files","/Volumes/Elements/AITest/test/README_1.txt"],
// "args": ["--src_dir", "/Volumes/Elements/AITest/testlog.zip", "--dst_dir", "/Volumes/Elements/AITest/AITest", "--n_process", "4", "--threshold", "0.7"],
"console": "integratedTerminal"
}
]
}
213 changes: 142 additions & 71 deletions corpus_processing/extract.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os, hashlib
import argparse
import shutil, io
import shutil
import tarfile
import zipfile
import bz2
Expand All @@ -9,7 +9,6 @@
import py7zr
import os, sys
from charset_mnbvc import api
from better_zipfile import fixcharset_zipfile

def get_extension(file_path):
filename, extension = os.path.splitext(file_path)
Expand All @@ -23,8 +22,20 @@ def get_extension(file_path):
filename = filename_1
return filename, ''.join(extensions)

def test_encode(file_path : bytes):
try:
bytes.decode(file_path, encoding='utf-8')
return 'utf-8'
except:
pass
try:
bytes.decode(file_path, encoding='gb18030')
return 'gb18030'
except:
pass
return None

def check_long_name(extract_full_path, zip_file_name):# longname返回true
def check_long_name(extract_full_path, zip_file_name):
paths = zip_file_name.split('/')
file_name = paths[-1]
if len(file_name.encode()) > 255 and len(os.path.join(extract_full_path, zip_file_name).encode()) < 4095:
Expand All @@ -33,49 +44,19 @@ def check_long_name(extract_full_path, zip_file_name):# longname返回true
length = (255-len(extensions.encode())-8)//2
basename = basename.encode()[:length].decode('utf-8', errors='ignore')+hashlib.md5(file_name.encode()).hexdigest()[:8]+basename.encode()[-length:].decode('utf-8', errors='ignore')
new_name = basename + extensions
return os.path.join(extract_full_path, '/'.join(paths[:-1]), new_name), True
return os.path.join(extract_full_path, '/'.join(paths[:-1]), new_name)
elif any(len(path.encode()) > 255 for path in paths) or len(os.path.join(extract_full_path, zip_file_name).encode()) > 4095:
print(f"File name too long: \n {os.path.join(extract_full_path, zip_file_name)} \n")

length = min(255, 4096-len(os.path.join(extract_full_path, 'long_name').encode()))-8

new_name = zip_file_name.encode()[:length//2-1].decode('utf-8', errors='ignore') +hashlib.md5(zip_file_name.encode()).hexdigest()[:8]+ zip_file_name.encode()[1-length//2:].decode('utf-8', errors='ignore')
new_name = '_'.join(new_name.split('/'))
return os.path.join(extract_full_path, 'long_name', new_name), True

return os.path.join(extract_full_path, zip_file_name), False


def extract_zip(file, password, extract_full_path):

with fixcharset_zipfile.ZipFile(file, 'r') as zip:
zip.setpassword(password)

auto_filelists = []

for file in zip.namelist():
problem = False
if file.endswith('/'):
continue

new_file_path, if_long_name = check_long_name(extract_full_path, file)
if if_long_name:
problem = True

if problem:
basename = os.path.dirname(new_file_path)
os.makedirs(basename, exist_ok=True)
with zip.open(file, 'r') as f_in:
data = f_in.read()
with open(new_file_path, 'wb') as f_out:
f_out.write(data)
else:
auto_filelists.append(file)

zip.extractall(extract_full_path, auto_filelists)



return os.path.join(extract_full_path, 'long_name', new_name)

zipfileName = remove_between_first_second_slash( zip_file_name)
fullPath = os.path.join(extract_full_path, zipfileName)
return fullPath


def extract_archive(file_path, extract_full_path, file, password=None):
Expand All @@ -85,51 +66,33 @@ def extract_archive(file_path, extract_full_path, file, password=None):
try:
if extension == '.tar':
with tarfile.open(file_path, 'r') as tar:
tar.extractall(extract_full_path)
extract_and_convert_zip(tar,extract_full_path)
os.remove(file_path)
elif extension == '.tbz2' or extension == '.tar.bz2':
with tarfile.open(file_path, 'r:bz2') as tar:
tar.extractall(extract_full_path)
os.remove(file_path)
elif extension == '.tgz' or extension == '.tar.gz' or extension == '.tar.Z':
with tarfile.open(file_path, 'r:gz') as tar:
tar.extractall(extract_full_path)
os.remove(file_path)
elif extension == '.tar.xz':
with tarfile.open(file_path, 'r:xz') as tar:
tar.extractall(extract_full_path)
os.remove(file_path)
elif extension == '.bz2':
if not os.path.exists(extract_full_path):
os.mkdir(extract_full_path)
with bz2.open(file_path, 'rb') as f_in:
with open(os.path.join(extract_full_path, filename), 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(file_path)
elif extension == '.rar':
with rarfile.RarFile(file_path, 'r') as rar:
rar.setpassword(password)

problem = False

for file in rar.namelist():
if file.endswith('/'):
continue
new_file_path, if_long_name = check_long_name(extract_full_path, file)
if if_long_name:
problem = True
break

if problem:
for file in rar.namelist():
if file.endswith('/'):
continue
new_file_path, _ = check_long_name(extract_full_path, file)
basename = os.path.dirname(new_file_path)

os.makedirs(basename, exist_ok=True)
with rar.open(file, 'r') as f_in:
data = f_in.read()
with open(new_file_path, 'wb') as f_out:
f_out.write(data)
# print(f"File extract to: {new_file_path}")
else:
rar.extractall(extract_full_path)
extract_and_convert_zip(rar,extract_full_path)
os.remove(file_path)


elif extension == '.gz':
if not os.path.exists(extract_full_path):
Expand All @@ -138,9 +101,55 @@ def extract_archive(file_path, extract_full_path, file, password=None):
with gzip.open(file_path, 'rb') as f_in:
with open(os.path.join(extract_full_path, filename), 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(file_path)
elif extension in ('.zip', '.exe'):
extract_zip(file_path, password, extract_full_path)

try:
with zipfile.ZipFile(file_path, 'r') as zip:
zip.setpassword(password)
extract_and_convert_zip(zip,extract_full_path)
# for file_info in zip.infolist():
# file = file_info.filename
# if file.endswith('/') or file.startswith('__MACOSX/') or is_macos_metadata(file):
# continue

# try:
# file_bytes = file.encode('cp437')
# except:
# file_bytes = file.encode('utf-8')

# coding_name = test_encode(file_bytes)

# if coding_name is None:
# coding_name = api.from_data(file_bytes, mode=2)

# utf8_name = api.convert_encoding(
# source_data=file_bytes,
# source_encoding=coding_name,
# target_encoding="utf-8",
# )

# new_file_path = check_long_name(extract_full_path, utf8_name)
# basename = os.path.dirname(new_file_path)
# os.makedirs(basename, exist_ok=True)
# # 复制文件
# source = zip.open(file_info)
# with open(new_file_path, "wb") as target:
# shutil.copyfileobj(source, target)

os.remove(file_path)
print("解压缩完成")
except zipfile.BadZipFile:
print("错误:无效的ZIP文件。")
extract_succcessful = False
except FileNotFoundError:
print("错误:文件未找到。")
extract_succcessful = False
except IOError as e:
print(f"输入/输出错误:{e}")
extract_succcessful = False
except Exception as e:
print(f"发生了一个未知错误:{e}")
extract_succcessful = False
elif extension == '.7z':
with py7zr.SevenZipFile(file_path, mode='r', password=password) as seven_zip:
seven_zip.extractall(extract_full_path)
Expand Down Expand Up @@ -174,7 +183,8 @@ def traverse_directory(folder_path, passwords=None):

for root, dirs, files in os.walk(folder_path):
extract_path_set = set(dirs)

if root != folder_path:
continue
for file in files:
# 判断文件是否为压缩包类型
if file.endswith(('.tar', '.tbz2', '.tgz', '.tar.bz2', '.tar.gz', '.tar.xz', '.tar.Z', '.bz2', '.rar', '.gz', '.zip', '.xz', '.7z', '.exe')):
Expand Down Expand Up @@ -203,12 +213,73 @@ def traverse_directory(folder_path, passwords=None):
if extract_succcessful:
break

if extract_succcessful:
traverse_directory(extract_full_path)
# if extract_succcessful:
# traverse_directory(extract_full_path)

extract_path_set.add(extract_path)


def remove_between_first_second_slash(path):
"""
Remove the substring from the first '/' to the second '/' in the path, inclusive.

:param path: The original path as a string.
:return: The path after removing the substring between the first and second '/'.
"""
first_slash_index = path.find('/')
if first_slash_index == -1:
return path # No '/' found


return path[first_slash_index + 1:]

def is_macos_metadata(path):
"""
Check if the given path is a macOS metadata file.

:param path: The path to be checked.
:return: True if the path is a macOS metadata file, False otherwise.
"""
return path.split('/')[-1].startswith('._')


def extract_and_convert_zip(zip_file, extract_full_path):
"""
Extract files from the zip file, convert filenames to UTF-8, and save them to a target directory.

:param zip_file: ZipFile object to be extracted.
"""
for file_info in zip_file.infolist():
file = file_info.filename

if file.endswith('/') or file.startswith('__MACOSX/') or is_macos_metadata(file):
continue

try:
file_bytes = file.encode('cp437')
except:
file_bytes = file.encode('utf-8')

coding_name = test_encode(file_bytes)

if coding_name is None:
coding_name = api.from_data(file_bytes, mode=2)

utf8_name = api.convert_encoding(
source_data=file_bytes,
source_encoding=coding_name,
target_encoding="utf-8",
)

new_file_path = check_long_name(extract_full_path, utf8_name)
basename = os.path.dirname(new_file_path)
os.makedirs(basename, exist_ok=True)

# 复制文件
with zip_file.open(file_info) as source:
with open(new_file_path, "wb") as target:
shutil.copyfileobj(source, target)

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--folder_path', type=str, required=True, help="压缩包路径")
Expand Down
Loading