import os.path from time import sleep from log import logger import unzip import file import db root_path = r'F:\Temp\leshe_20240101' # 初始化数据库 db_obj = db.DbAction() file_obj = file.FilesUnzip(root_path) unzip_obj = unzip.UnzipFile() # 初始化成功和失败任务列表 unzip_succeed, unzip_failed, unknown, none_unzip_pwd = [], [], [], [] def unzip_task(folder): result = True all_file = file_obj.get_cate_files(os.path.join(root_path, folder)) # 整理文件返回整理后的结果 logger.info(all_file) # print(all_file) if not all_file['handle_zip'] and not all_file['zip'] and not all_file['others'] and not all_file['unknown']: file_col_obj = file.FilesCollection(os.path.join(root_path, folder)) file_col_obj.remove_empty_root_folder() unzip_failed.append(folder) if folder not in unzip_failed else '' return False # 从数据库中获取数据 data = db_obj.get_data_by_id(folder) print(data) if not data: logger.info(f'{folder} 在数据库中不存在或以处理完成,请检查') unzip_failed.append(folder) if folder not in unzip_failed else '' return False if not data['unzip_pwd']: logger.info(f'解压密码缺失:{folder}') none_unzip_pwd.append(folder) if folder not in none_unzip_pwd else '' unzip_failed.append(folder) if folder not in unzip_failed else '' return False if data and all_file['handle_zip']: # 解压, 增加适用历史密码重试的功能 result_pass = [] result_fail = [] for unzip_file in all_file['handle_zip']: logger.info(f"使用密码{data['unzip_pwd']}解压{unzip_file}") if unzip_obj.unzip(unzip_file, data['unzip_pwd']): result_pass.append(unzip_file) else: result_fail.append(unzip_file) if len(result_pass) == len(all_file['handle_zip']): logger.info(f"全部解压成功,删除全部压缩文件") file_obj.del_all_files(all_file['zip']) return for pwd in db_obj.get_available_pwd(): logger.info(f'使用历史密码{pwd}重新解压') for index, unzip_failed_file in enumerate(result_fail): if unzip_obj.unzip(unzip_failed_file, pwd): db_obj.update_pwd(pwd) result_pass.append(unzip_failed_file) result_fail.pop(index) logger.info(f"部分解压成功,删除全部压缩文件") file_obj.del_all_files(all_file['zip']) # 重新获取文件检查解压结果 all_file = file_obj.get_cate_files(os.path.join(root_path, folder)) # 整理文件返回整理后的结果 # print(all_file) # 检查没有解压的文件 if all_file['handle_zip'] or all_file['zip']: logger.info(f'{folder} 中依然存在没有解压的文件,请检查') unzip_failed.append(folder) if folder not in unzip_failed else '' result = False # 检查打印结果 if not all_file['handle_zip'] and not all_file['zip'] and not all_file['unknown']: db_obj.insert_pwd(data['unzip_pwd']) unzip_succeed.append(folder) if folder not in unzip_succeed else '' unzip_failed.remove(folder) if folder in unzip_failed else '' logger.info('全部文件已解压') # 检查未知文件 if all_file['unknown']: logger.info("打印没有处理的文件扩展名:") logger.info(', '.join(all_file['unknown'])) unzip_failed.append(folder) if folder not in unzip_failed else '' unknown.append(folder) if folder not in unknown else '' result = False return result # 开始任务 def start_unzip_task(): for folder in sorted(list(set(file_obj.get_root_folder_list()) - set(unzip_succeed))): logger.info(f'开始解压 {folder}') n = 1 # unzip_result = False while n <= 5: logger.info(f'第{n}轮解压任务') unzip_result = unzip_task(folder) if unzip_result: break n += 1 # 整理文件夹 def start_collation_task(): logger.info('开始整理文件夹') for folder in file_obj.get_root_folder_list(): folder_path = os.path.join(root_path, folder) file_col_obj = file.FilesCollection(folder_path) if db_obj.get_data_by_id(folder) and folder not in unzip_failed: logger.info(f'开始整理 {folder}') if file_col_obj.remove_empty_root_folder(): continue # 清除多余的文件 file_col_obj.clear_files() # 整理无效文件夹 file_col_obj.move_files() # 删除空文件夹 file_col_obj.remove_empty() # 从数据库中获取数据 data = db_obj.get_data_by_id(folder) # 重命名文件夹 name = 'id' + str(data['id']) + '_' + data['name'] file_col_obj.rename_root_folder(root_path, name, folder_path, os.path.join(root_path, name)) def pc_sleep(delay): sleep(delay) os.system("rundll32.exe powrprof.dll,SetSuspendState Sleep") def main(): start_unzip_task() # if unzip_result: start_collation_task() logger.info('解压失败的任务:') logger.info(', '.join(unzip_failed)) db_obj.get_failed(unzip_failed) logger.info('包含未知文件:') logger.info(', '.join(unknown)) logger.info('解压密码缺失:') logger.info(', '.join(none_unzip_pwd)) # pc_sleep(10) if __name__ == '__main__': main()