调整逻辑,待测试

main
roger_home_pc 1 year ago
parent 96043538df
commit b020f65fb3
  1. 115
      file.py
  2. 73
      main.py
  3. 1
      unzip.py

@ -6,7 +6,24 @@ from log import logger, log_info
from data_dict import ext_name_list, know_zip_ext_name, know_ext_name, handle_zip_ext_name, clear_list
class Files:
# 定义通用方法
# 目标目录, 获取此目录下的全部文件,未经整理
def get_all_files(_path):
file_list = []
for item in os.walk(_path):
if len(item[2]) > 0:
for file_name in item[2]:
file_list.append(os.path.join(item[0], file_name))
return file_list
# 目标文件, 重命名文件
def file_rename(_org, _target):
os.rename(_org, _target)
# 用于整理解压文件的对象
class FilesUnzip:
def __init__(self, root_path):
self.root_path = root_path
@ -15,21 +32,6 @@ class Files:
def get_root_folder_list(self):
return list(os.walk(self.root_path))[0][1]
# 目标目录, 获取此目录下的全部文件,未经整理
@staticmethod
def get_all_files(_path):
file_list = []
for item in os.walk(_path):
if len(item[2]) > 0:
for file_name in item[2]:
file_list.append(os.path.join(item[0], file_name))
return file_list
# 目标文件, 重命名文件
@staticmethod
def file_rename(_org, _target):
os.rename(_org, _target)
# 目标文件, 获取文件扩展名, 返回 文件路径+文件名, 扩展名
@staticmethod
def get_ext_name(_file):
@ -37,22 +39,10 @@ class Files:
ext_name = os.path.splitext(_file)[1].lower()
return base_name, ext_name
# 目标文件, 将错误的扩展名换成正确的,执行重命名操作,返回新的文件名列表
def change_to_know_name(self, _file):
base_file_name, ext_file_name = self.get_ext_name(_file)
if ext_file_name in know_zip_ext_name:
return _file
for unknown_ext, know_ext in ext_name_list.items():
if ext_file_name == unknown_ext:
new_file_name = base_file_name + know_ext
self.file_rename(_file, new_file_name)
return new_file_name
return _file
# 目标文件, 将文件分类整理返回字典格式文件列表
def cate_files(self, files_list):
def get_cate_files(self, _path):
new_file_list = dict(handle_zip=[], zip=[], others=[], unknown=[])
for _f in files_list:
for _f in get_all_files(_path):
new = self.change_to_know_name(_f)
_, ext_new = self.get_ext_name(new)
if ext_new in handle_zip_ext_name:
@ -66,6 +56,26 @@ class Files:
new_file_list['unknown'].append(new)
return new_file_list
# 目标文件, 将错误的扩展名换成正确的,执行重命名操作,返回新的文件名列表
def change_to_know_name(self, _file):
base_file_name, ext_file_name = self.get_ext_name(_file)
if ext_file_name in know_zip_ext_name:
return _file
for unknown_ext, know_ext in ext_name_list.items():
if ext_file_name == unknown_ext:
new_file_name = base_file_name + know_ext
file_rename(_file, new_file_name)
return new_file_name
return _file
# 删除文件
@staticmethod
def del_all_files(path_list):
for path in path_list:
result = os.system(f'del "{path}"')
# logger.info(f"删除文件成功: {path}")
log_info(f"删除文件{'成功' if result else '失败'}{path}")
# 获取可删除文件列表
# def get_del_files(self, _path):
# del_list = []
@ -78,9 +88,15 @@ class Files:
# 根据clear_list清除无效文件
# 用于整理文件夹的对象
class FilesCollection:
def __init__(self, path):
self.path = path
# 清理无效文件
def clear_files(self, _path):
for file in self.get_all_files(_path):
def clear_files(self):
for file in get_all_files(self.path):
for file_name in clear_list['file_name']:
if os.path.basename(file).lower() == file_name.lower():
os.system(f'del "{file}"')
@ -94,11 +110,10 @@ class Files:
log_info(f"删除文件成功: {file}")
# 用来整理文件夹,返回原路径和去掉无效后的路径
@staticmethod
def get_move_files(_path):
todos = [[_path, _path]]
def get_move_files(self):
todos = [[self.path, self.path]]
move_files = []
tree = list(os.walk(_path))
tree = list(os.walk(self.path))
# print(json.dumps(list(tree)))
# print(todos)
while len(todos) > 0:
@ -137,8 +152,8 @@ class Files:
return move_files
# 移动文件
def move_files(self, _path):
for move in self.get_move_files(_path):
def move_files(self):
for move in self.get_move_files():
if not os.path.exists(os.path.dirname(move[1])):
os.makedirs(os.path.dirname(move[1]))
shutil.move(move[0], move[1])
@ -154,33 +169,25 @@ class Files:
return False
# 清除全部空文件夹
def remove_empty(self, _path):
while _empty := self.get_empty(_path):
def remove_empty(self):
while _empty := self.get_empty(self.path):
# print(_empty)
os.system(f"attrib -r {_empty}")
os.removedirs(_empty)
# 重命名根路径的文件夹, 如果只有一个文件就把文件重命名
def rename_root_folder(self, root, name, _org, _target):
if len(file := self.get_all_files(_org)) == 1:
def rename_root_folder(self, _root, name, _org, _target):
if len(file := get_all_files(_org)) == 1:
ext = os.path.splitext(file[0])[1]
self.file_rename(file[0], os.path.join(root, name + ext))
self.remove_empty(_org)
file_rename(file[0], os.path.join(_root, name + ext))
self.remove_empty()
else:
self.file_rename(_org, _target)
# 删除文件
@staticmethod
def del_all_files(path_list):
for path in path_list:
os.system(f'del "{path}"')
# logger.info(f"删除文件成功: {path}")
log_info(f"删除文件成功: {path}")
file_rename(_org, _target)
if __name__ == '__main__':
root = r"F:\Temp\sjry\hj"
files = Files(root)
files = FilesUnzip(root)
# print(files.get_root_folder_list())
# print(files.get_all_files(r"F:\Temp\sjry\hj\35316"))
# files.file_rename('F:\\Temp\\sjry\\hj\\35316\\22\\新建 文本文档.123', 'F:\\Temp\\sjry\\hj\\35316\\22\\新建 文本文档.222')

@ -8,66 +8,95 @@ root_path = r'F:\Temp\sjry\hj'
# 初始化数据库
db_obj = db.DbAction()
file_obj = file.Files(root_path)
file_obj = file.FilesUnzip(root_path)
unzip_obj = unzip.UnzipFile()
# 初始化成功和失败任务列表
unzip_succeed, unzip_failed = [], []
# 开始任务
def start_unzip_task():
result = True
for folder in file_obj.get_root_folder_list():
all_file = file_obj.get_all_files(os.path.join(root_path, folder)) # 获取文件夹下的全部文件的原始数据
all_file = file_obj.cate_files(all_file) # 整理文件返回整理后的结果
for folder in (set(file_obj.get_root_folder_list()) - set(unzip_succeed)):
logger.info(f'开始解压 {folder}')
all_file = file_obj.get_cate_files(os.path.join(root_path, folder)) # 整理文件返回整理后的结果
print(all_file)
# 从数据库中获取数据
data = db_obj.get_data_by_id(folder)
if data:
# 解压
if all_file['handle_zip'] and unzip_obj.unzip(all_file['handle_zip'][0], data['unzip_pwd']):
file_obj.del_all_files(all_file['zip'])
# 解压, 增加适用历史密码重试的功能
if all_file['handle_zip']:
if unzip_obj.unzip(all_file['handle_zip'][0], data['unzip_pwd']):
file_obj.del_all_files(all_file['zip'])
else:
for pwd in db_obj.get_available_pwd():
result = unzip_obj.unzip(all_file['handle_zip'][0], pwd)
if result:
file_obj.del_all_files(all_file['zip'])
break
# 重新获取文件检查解压结果
all_file = file_obj.get_cate_files(os.path.join(root_path, folder)) # 整理文件返回整理后的结果
print(all_file)
# 检查没哟解压的文件
if all_file['handle_zip'] or all_file['zip']:
logger.info(f'{folder} 中依然存在没有解压的文件,请检查')
unzip_failed.append(folder) if folder not in unzip_failed else ''
result = False
# 打印未知文件
if all_file['unknown']:
# 检查未知文件
elif all_file['unknown']:
logger.info("打印没有处理的文件扩展名:")
logger.info(', '.join(all_file['unknown']))
unzip_failed.append(folder) if folder not in unzip_failed else ''
result = False
# 检查打印结果
if not (all_file['handle_zip'] and all_file['zip'] and all_file['unknown']):
if not all_file['handle_zip'] and not all_file['zip'] and not all_file['unknown']:
db_obj.insert_pwd(data['pwd'])
unzip_succeed.append(folder) if folder not in unzip_succeed else ''
logger.info('全部文件已解压')
return result
# 整理文件夹
def start_collation_task():
logger.info('开始整理文件夹')
for folder in file_obj.get_root_folder_list():
if db_obj.get_data_by_id(folder):
file_col_obj = file.FilesCollection(folder)
if db_obj.get_data_by_id(folder) and folder not in unzip_failed:
logger.info(f'开始整理 {folder}')
folder_path = os.path.join(root_path, folder)
# 清除多余的文件
file_obj.clear_files(folder_path)
file_col_obj.clear_files()
# 整理无效文件夹
file_obj.move_files(folder_path)
file_col_obj.move_files()
# 删除空文件夹
file_obj.remove_empty(folder_path)
file_col_obj.remove_empty()
# 从数据库中获取数据
data = db_obj.get_data_by_id(folder)
# 重命名文件夹
name = data['name']
file_obj.rename_root_folder(root_path, name, folder_path, os.path.join(root_path, name))
name = str(data['id']) + '_' + data['name']
file_col_obj.rename_root_folder(root_path, name, folder_path, os.path.join(root_path, name))
def main():
n = 5
unzip_result = False
while n > 0:
n = 1
# unzip_result = False
while n <= 3:
logger.info(f'{n}轮解压任务')
unzip_result = start_unzip_task()
if unzip_result:
break
n -= 1
if unzip_result:
start_collation_task()
n += 1
logger.info('失败的任务:')
logger.info(', '.join(unzip_failed))
# if unzip_result:
start_collation_task()
if __name__ == '__main__':

@ -60,6 +60,7 @@ class UnzipFile:
result = self.unzip_7z(source, password)
else:
result = self.unzip_zip(source, password)
log_info(f'解压结果: {result}')
return result

Loading…
Cancel
Save