You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

156 lines
5.6 KiB

import os.path
from time import sleep
from log import logger
import unzip
import file
import db
root_path = r'F:\Temp\leshe_20240101'
# 初始化数据库
db_obj = db.DbAction()
file_obj = file.FilesUnzip(root_path)
unzip_obj = unzip.UnzipFile()
# 初始化成功和失败任务列表
unzip_succeed, unzip_failed, unknown, none_unzip_pwd = [], [], [], []
def unzip_task(folder):
result = True
all_file = file_obj.get_cate_files(os.path.join(root_path, folder)) # 整理文件返回整理后的结果
logger.info(all_file)
# print(all_file)
if not all_file['handle_zip'] and not all_file['zip'] and not all_file['others'] and not all_file['unknown']:
file_col_obj = file.FilesCollection(os.path.join(root_path, folder))
file_col_obj.remove_empty_root_folder()
unzip_failed.append(folder) if folder not in unzip_failed else ''
return False
# 从数据库中获取数据
data = db_obj.get_data_by_id(folder)
print(data)
if not data:
logger.info(f'{folder} 在数据库中不存在或以处理完成,请检查')
unzip_failed.append(folder) if folder not in unzip_failed else ''
return False
if not data['unzip_pwd']:
logger.info(f'解压密码缺失:{folder}')
none_unzip_pwd.append(folder) if folder not in none_unzip_pwd else ''
unzip_failed.append(folder) if folder not in unzip_failed else ''
return False
if data and all_file['handle_zip']:
# 解压, 增加适用历史密码重试的功能
result_pass = []
result_fail = []
for unzip_file in all_file['handle_zip']:
logger.info(f"使用密码{data['unzip_pwd']}解压{unzip_file}")
if unzip_obj.unzip(unzip_file, data['unzip_pwd']):
result_pass.append(unzip_file)
else:
result_fail.append(unzip_file)
if len(result_pass) == len(all_file['handle_zip']):
logger.info(f"全部解压成功,删除全部压缩文件")
file_obj.del_all_files(all_file['zip'])
return
for pwd in db_obj.get_available_pwd():
logger.info(f'使用历史密码{pwd}重新解压')
for index, unzip_failed_file in enumerate(result_fail):
if unzip_obj.unzip(unzip_failed_file, pwd):
db_obj.update_pwd(pwd)
result_pass.append(unzip_failed_file)
result_fail.pop(index)
logger.info(f"部分解压成功,删除全部压缩文件")
file_obj.del_all_files(all_file['zip'])
# 重新获取文件检查解压结果
all_file = file_obj.get_cate_files(os.path.join(root_path, folder)) # 整理文件返回整理后的结果
# print(all_file)
# 检查没有解压的文件
if all_file['handle_zip'] or all_file['zip']:
logger.info(f'{folder} 中依然存在没有解压的文件,请检查')
unzip_failed.append(folder) if folder not in unzip_failed else ''
result = False
# 检查打印结果
if not all_file['handle_zip'] and not all_file['zip'] and not all_file['unknown']:
db_obj.insert_pwd(data['unzip_pwd'])
unzip_succeed.append(folder) if folder not in unzip_succeed else ''
unzip_failed.remove(folder) if folder in unzip_failed else ''
logger.info('全部文件已解压')
# 检查未知文件
if all_file['unknown']:
logger.info("打印没有处理的文件扩展名:")
logger.info(', '.join(all_file['unknown']))
unzip_failed.append(folder) if folder not in unzip_failed else ''
unknown.append(folder) if folder not in unknown else ''
result = False
return result
# 开始任务
def start_unzip_task():
for folder in sorted(list(set(file_obj.get_root_folder_list()) - set(unzip_succeed))):
logger.info(f'开始解压 {folder}')
n = 1
# unzip_result = False
while n <= 5:
logger.info(f'{n}轮解压任务')
unzip_result = unzip_task(folder)
if unzip_result:
break
n += 1
# 整理文件夹
def start_collation_task():
logger.info('开始整理文件夹')
for folder in file_obj.get_root_folder_list():
folder_path = os.path.join(root_path, folder)
file_col_obj = file.FilesCollection(folder_path)
if db_obj.get_data_by_id(folder) and folder not in unzip_failed:
logger.info(f'开始整理 {folder}')
if file_col_obj.remove_empty_root_folder():
continue
# 清除多余的文件
file_col_obj.clear_files()
# 整理无效文件夹
file_col_obj.move_files()
# 删除空文件夹
file_col_obj.remove_empty()
# 从数据库中获取数据
data = db_obj.get_data_by_id(folder)
# 重命名文件夹
name = 'id' + str(data['id']) + '_' + data['name']
file_col_obj.rename_root_folder(root_path, name, folder_path, os.path.join(root_path, name))
def pc_sleep(delay):
sleep(delay)
os.system("rundll32.exe powrprof.dll,SetSuspendState Sleep")
def main():
start_unzip_task()
# if unzip_result:
start_collation_task()
logger.info('解压失败的任务:')
logger.info(', '.join(unzip_failed))
db_obj.get_failed(unzip_failed)
logger.info('包含未知文件:')
logger.info(', '.join(unknown))
logger.info('解压密码缺失:')
logger.info(', '.join(none_unzip_pwd))
# pc_sleep(10)
if __name__ == '__main__':
main()