需要密码重试机制

需要解压失败时候的后续处理
需要整理打印日志
main
roger_home_pc 1 year ago
parent 729a3a741d
commit 96043538df
  1. 2
      data_dict.py
  2. 21
      db.py
  3. 196
      file.py
  4. 50
      main.py

@ -19,7 +19,7 @@ re_ext_list = {'.7z': r'(\.7z.+?$)',
clear_list = {
'file_name': ['ds_store'],
'ext_name': ['.ds_store', '.ini', '.html', '.co', '.torrent', '.js', '.downloading', '.lnk'],
'ext_name': ['.ds_store', '.ini', '.html', '.co', '.torrent', '.js', '.downloading', '.lnk', '.txt'],
}
pwd_dict = {

21
db.py

@ -10,8 +10,6 @@ db_config = {
'database': 'scrapy'
}
SELECT_SQL = "SELECT * FROM scrapyh s WHERE s.id = %s;"
class DbAction:
def __init__(self):
@ -29,7 +27,24 @@ class DbAction:
return _pwd
def get_data_by_id(self, _id):
SELECT_SQL = "SELECT * FROM scrapyh s WHERE s.id = %s;"
self.cursor.execute(SELECT_SQL, (_id,))
result = self.cursor.fetchone()
result['unzip_pwd'] = self.decode_pwd(result['unzip_pwd'])
if result:
result['unzip_pwd'] = self.decode_pwd(result['unzip_pwd'])
return result
def get_available_pwd(self):
SELECT_SQL = "SELECT * FROM scrapyh_pwd;"
self.cursor.execute(SELECT_SQL)
result = self.cursor.fetchall()
return [r['pwd'] for r in result]
def insert_pwd(self, _pwd):
SELECT_SQL = "SELECT * FROM scrapyh_pwd sp WHERE sp.pwd = %s;"
self.cursor.execute(SELECT_SQL, (_pwd,))
result = self.cursor.fetchone()
if not result:
INSERT_SQL = "INSERT INTO scrapyh_pwd sp VALUES (pwd = %s);"
self.cursor.execute(INSERT_SQL, (_pwd,))
self.conn.commit()

@ -1,5 +1,7 @@
import json
import os
import shutil
from log import logger, log_info
from data_dict import ext_name_list, know_zip_ext_name, know_ext_name, handle_zip_ext_name, clear_list
@ -13,14 +15,6 @@ class Files:
def get_root_folder_list(self):
return list(os.walk(self.root_path))[0][1]
# 目标根目录,返回根目录下全部文件夹的列表
@staticmethod
def get_folder_dict(_path):
data_dict = dict()
for d in os.walk(_path):
data_dict[d[0]] = dict(folders=d[1], files=d[2])
return data_dict
# 目标目录, 获取此目录下的全部文件,未经整理
@staticmethod
def get_all_files(_path):
@ -56,7 +50,7 @@ class Files:
return _file
# 目标文件, 将文件分类整理返回字典格式文件列表
def clear_files(self, files_list):
def cate_files(self, files_list):
new_file_list = dict(handle_zip=[], zip=[], others=[], unknown=[])
for _f in files_list:
new = self.change_to_know_name(_f)
@ -73,89 +67,111 @@ class Files:
return new_file_list
# 获取可删除文件列表
def get_del_files(self, _path):
del_list = []
for _f in self.get_all_files(_path):
if os.path.basename(_f) in clear_list['file_name']:
del_list.append(_f)
if os.path.splitext(_f)[1] in clear_list['ext_name']:
del_list.append(_f)
return del_list
# 目标文件, 将文件按照目录整理
def collection_files(self, _path):
_all_files = self.get_all_files(_path)
files_dict = dict(path_files={}, path=[], files=_all_files)
for _f in _all_files:
file_path = os.path.dirname(_f)
if file_path not in files_dict['path_files'].keys():
files_dict['path'].append(file_path)
files_dict['path_files'][file_path] = [_f]
else:
files_dict['path_files'][file_path].append(_f)
return files_dict
def get_file_dict(self, _path):
for p in os.walk(_path):
pass
def move_files(self, _root_path):
move_file_list = []
_files = self.collection_files(_root_path)
if not _files['files']:
log_info('文件夹为空')
# def get_del_files(self, _path):
# del_list = []
# for _f in self.get_all_files(_path):
# if os.path.basename(_f) in clear_list['file_name']:
# del_list.append(_f)
# if os.path.splitext(_f)[1] in clear_list['ext_name']:
# del_list.append(_f)
# return del_list
# 根据clear_list清除无效文件
# 清理无效文件
def clear_files(self, _path):
for file in self.get_all_files(_path):
for file_name in clear_list['file_name']:
if os.path.basename(file).lower() == file_name.lower():
os.system(f'del "{file}"')
# logger.info(f"删除文件成功: {path}")
log_info(f"删除文件成功: {file}")
break
for ext_name in clear_list['ext_name']:
if os.path.splitext(file)[1].lower() == ext_name.lower():
os.system(f'del "{file}"')
# logger.info(f"删除文件成功: {path}")
log_info(f"删除文件成功: {file}")
# 用来整理文件夹,返回原路径和去掉无效后的路径
@staticmethod
def get_move_files(_path):
todos = [[_path, _path]]
move_files = []
tree = list(os.walk(_path))
# print(json.dumps(list(tree)))
# print(todos)
while len(todos) > 0:
# print('todos')
# print(todos)
temp = []
for todo in todos:
for leaf in tree:
if todo[0] == leaf[0]:
# print('todo[0] - leaf[0]')
# print(todo[0])
# print(leaf[0])
if len(leaf[1]) == 0:
# print(1)
for file in leaf[2]:
move_files.append([os.path.join(leaf[0], file), os.path.join(todo[1], file)])
elif len(leaf[1]) == 1 and len(leaf[2]) == 0:
# print(2)
temp.append([os.path.join(leaf[0], leaf[1][0]), todo[1]])
elif len(leaf[1]) == 1 and len(leaf[2]) > 0:
# print(3)
temp.append([os.path.join(leaf[0], leaf[1][0]), os.path.join(leaf[0], leaf[1][0])])
for file in leaf[2]:
move_files.append([os.path.join(leaf[0], file), os.path.join(todo[1], file)])
elif len(leaf[1]) > 1:
# print(4)
for folder in leaf[1]:
temp.append([os.path.join(leaf[0], folder), os.path.join(todo[1], folder)])
for file in leaf[2]:
move_files.append([os.path.join(leaf[0], file), os.path.join(todo[1], file)])
else:
print('看看啥情况')
# print('temp')
# print(temp)
todos = temp
return move_files
# 移动文件
def move_files(self, _path):
for move in self.get_move_files(_path):
if not os.path.exists(os.path.dirname(move[1])):
os.makedirs(os.path.dirname(move[1]))
shutil.move(move[0], move[1])
# 获取一个空文件夹
@staticmethod
def get_empty(_path):
tree = list(os.walk(_path))
empty_list = []
for leaf in tree:
if not leaf[1] and not leaf[2]:
return leaf[0]
return False
# 清除全部空文件夹
def remove_empty(self, _path):
while _empty := self.get_empty(_path):
# print(_empty)
os.system(f"attrib -r {_empty}")
os.removedirs(_empty)
# 重命名根路径的文件夹, 如果只有一个文件就把文件重命名
def rename_root_folder(self, root, name, _org, _target):
if len(file := self.get_all_files(_org)) == 1:
ext = os.path.splitext(file[0])[1]
self.file_rename(file[0], os.path.join(root, name + ext))
self.remove_empty(_org)
else:
if len(_files['files']) == 1: # 单文件处理
source = _files['files'][0]
target = os.path.join(_root_path, os.path.basename(source))
move_file_list.append((source, target))
elif len(_files['path']) == 1: # 只有一个目录下有多文件处理
for _f in _files['files']:
source = _f
target = os.path.join(_root_path, os.path.basename(source))
move_file_list.append((source, target))
else: # 有多个目录,每个目录下有一个或多个文件
temp_data = dict()
for _p in _files['path']:
for _f in _files['path_files'][_p]:
temp_data[_p] = dict(current=_p, valid='', name=os.path.basename(_f))
# 寻找不重复的基础路径
# 传入的字典格式 {文件路径:{current: 当前路径,valid: 基础路径之后的部分, name: 文件名}}
# def get_base_dir(self, file_list):
# base_dir = dict()
# temp = []
# for _f in file_list.keys():
# current = base_dir[_f]['current']
# base_dir[_f]['current'] = os.path.dirname(current)
# base_dir[_f]['valid'] = os.path.basename(current) + '/' + base_dir[_f]['valid']
# temp.append(base_dir[_f]['valid'])
# if len(set(temp)) == len(temp):
# return base_dir
# else:
# self.get_base_dir(base_dir)
def get_base_dir(self, _path):
files_dict = self.get_folder_dict(_path)
# for _p in check_list:
# if len(files_dict[_p]['folders']) > 1 and len(files_dict[_p]['files']) == 0:
# for _f in files_dict[_p]['folders']:
# os.path.join()
# elif len(files_dict[_path]['folders']) > 1 and len(files_dict[_path]['files']) > 0:
# pass
# elif len(files_dict[_path]['folders']) == 1 and len(files_dict[_path]['files']) == 0:
# pass
# elif len(files_dict[_path]['folders']) == 1 and len(files_dict[_path]['files']) > 0:
# pass
# elif len(files_dict[_path]['folders']) == 0 and len(files_dict[_path]['files']) > 0:
# pass
# else:
# pass
self.file_rename(_org, _target)
# 删除文件
@staticmethod
def del_file(path_list):
def del_all_files(path_list):
for path in path_list:
os.system(f'del "{path}"')
# logger.info(f"删除文件成功: {path}")
@ -174,4 +190,4 @@ if __name__ == '__main__':
# all_file = files.get_all_files(os.path.join(root, f))
# print(files.clear_files(all_file))
# print(files.collection_files(r"F:\Temp\sjry\hj\35316"))
print(json.dumps(files.get_folder_dict(r'F:\Temp\test\12345')))
print(json.dumps(files.get_folder_dict(r'F:\Temp\test\12345')))

@ -17,43 +17,57 @@ def start_unzip_task():
result = True
for folder in file_obj.get_root_folder_list():
all_file = file_obj.get_all_files(os.path.join(root_path, folder)) # 获取文件夹下的全部文件的原始数据
all_file = file_obj.clear_files(all_file) # 整理文件返回整理后的结果
all_file = file_obj.cate_files(all_file) # 整理文件返回整理后的结果
print(all_file)
# 从数据库中获取数据
data = db_obj.get_data_by_id(folder)
# 解压
if all_file['handle_zip'] and unzip_obj.unzip(all_file['handle_zip'][0], data['unzip_pwd']):
file_obj.del_file(all_file['zip'])
if data:
# 解压
if all_file['handle_zip'] and unzip_obj.unzip(all_file['handle_zip'][0], data['unzip_pwd']):
file_obj.del_all_files(all_file['zip'])
# 打印未知文件
if all_file['unknown']:
logger.info("打印没有处理的文件扩展名:")
logger.info(', '.join(all_file['unknown']))
result = False
# 打印未知文件
if all_file['unknown']:
logger.info("打印没有处理的文件扩展名:")
logger.info(', '.join(all_file['unknown']))
result = False
# 检查打印结果
if not (all_file['handle_zip'] and all_file['zip'] and all_file['unknown']):
logger.info('全部文件已解压')
# 检查打印结果
if not (all_file['handle_zip'] and all_file['zip'] and all_file['unknown']):
logger.info('全部文件已解压')
return result
# 整理文件夹
def start_collation_task():
result = True
for folder in file_obj.get_root_folder_list():
all_file = file_obj.get_all_files(os.path.join(root_path, folder)) # 获取文件夹下的全部文件的原始数据
all_file = file_obj.clear_files(all_file) # 整理文件返回整理后的结果
print(all_file)
if db_obj.get_data_by_id(folder):
folder_path = os.path.join(root_path, folder)
# 清除多余的文件
file_obj.clear_files(folder_path)
# 整理无效文件夹
file_obj.move_files(folder_path)
# 删除空文件夹
file_obj.remove_empty(folder_path)
# 从数据库中获取数据
data = db_obj.get_data_by_id(folder)
# 重命名文件夹
name = data['name']
file_obj.rename_root_folder(root_path, name, folder_path, os.path.join(root_path, name))
def main():
n = 5
unzip_result = False
while n > 0:
result = start_unzip_task()
if result:
unzip_result = start_unzip_task()
if unzip_result:
break
n -= 1
if unzip_result:
start_collation_task()
if __name__ == '__main__':

Loading…
Cancel
Save