|
|
|
import json
|
|
|
|
import os
|
|
|
|
import shutil
|
|
|
|
|
|
|
|
from log import logger, log_info
|
|
|
|
from data_dict import ext_name_list, know_zip_ext_name, know_ext_name, handle_zip_ext_name, clear_list
|
|
|
|
|
|
|
|
|
|
|
|
# 定义通用方法
|
|
|
|
# 目标目录, 获取此目录下的全部文件,未经整理
|
|
|
|
def get_all_files(_path):
|
|
|
|
file_list = []
|
|
|
|
for item in os.walk(_path):
|
|
|
|
if len(item[2]) > 0:
|
|
|
|
for file_name in item[2]:
|
|
|
|
file_list.append(os.path.join(item[0], file_name))
|
|
|
|
return file_list
|
|
|
|
|
|
|
|
|
|
|
|
# 目标文件, 重命名文件
|
|
|
|
def file_rename(_org, _target):
|
|
|
|
os.rename(_org, _target)
|
|
|
|
|
|
|
|
|
|
|
|
# 用于整理解压文件的对象
|
|
|
|
class FilesUnzip:
|
|
|
|
|
|
|
|
def __init__(self, root_path):
|
|
|
|
self.root_path = root_path
|
|
|
|
|
|
|
|
# 目标根目录,返回根目录下全部文件夹的列表
|
|
|
|
def get_root_folder_list(self):
|
|
|
|
return sorted(list(os.walk(self.root_path))[0][1])
|
|
|
|
|
|
|
|
# 目标文件, 获取文件扩展名, 返回 文件路径+文件名, 扩展名
|
|
|
|
@staticmethod
|
|
|
|
def get_ext_name(_file):
|
|
|
|
base_name = os.path.splitext(_file)[0]
|
|
|
|
ext_name = os.path.splitext(_file)[1].lower()
|
|
|
|
return base_name, ext_name
|
|
|
|
|
|
|
|
# 目标文件, 将文件分类整理返回字典格式文件列表
|
|
|
|
def get_cate_files(self, _path):
|
|
|
|
new_file_list = dict(handle_zip=[], zip=[], others=[], unknown=[])
|
|
|
|
for _f in get_all_files(_path):
|
|
|
|
new = self.change_to_know_name(_f)
|
|
|
|
_, ext_new = self.get_ext_name(new)
|
|
|
|
if ext_new in handle_zip_ext_name:
|
|
|
|
if self.del_small_zip_file(new):
|
|
|
|
continue
|
|
|
|
new_file_list['handle_zip'].append(new)
|
|
|
|
new_file_list['zip'].append(new)
|
|
|
|
elif ext_new in know_zip_ext_name:
|
|
|
|
new_file_list['zip'].append(new)
|
|
|
|
elif ext_new in know_ext_name:
|
|
|
|
new_file_list['others'].append(new)
|
|
|
|
else:
|
|
|
|
new_file_list['unknown'].append(new)
|
|
|
|
return new_file_list
|
|
|
|
|
|
|
|
# 目标文件, 将错误的扩展名换成正确的,执行重命名操作,返回新的文件名列表
|
|
|
|
def change_to_know_name(self, _file):
|
|
|
|
base_file_name, ext_file_name = self.get_ext_name(_file)
|
|
|
|
if ext_file_name in know_zip_ext_name:
|
|
|
|
return _file
|
|
|
|
for unknown_ext, know_ext in ext_name_list.items():
|
|
|
|
if ext_file_name == unknown_ext:
|
|
|
|
new_file_name = base_file_name + know_ext
|
|
|
|
log_info(f'修改未知扩展名: {_file} -> {new_file_name}')
|
|
|
|
file_rename(_file, new_file_name)
|
|
|
|
return new_file_name
|
|
|
|
return _file
|
|
|
|
|
|
|
|
# 删除文件
|
|
|
|
@staticmethod
|
|
|
|
def del_all_files(path_list):
|
|
|
|
for path in path_list:
|
|
|
|
result = os.system(f'del "{path}"')
|
|
|
|
log_info(f"删除文件{'成功' if result == '0' else '失败'}: {path}")
|
|
|
|
|
|
|
|
# 删除小文件
|
|
|
|
@staticmethod
|
|
|
|
def del_small_zip_file(_path):
|
|
|
|
if os.path.getsize(_path) < 1024000:
|
|
|
|
log_info(f"无效文件小于1MB,将被删除 {_path}")
|
|
|
|
result = os.system(f'del "{_path}"')
|
|
|
|
log_info(f"删除文件{'成功' if result == '0' else '失败'}: {_path}")
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
# 获取可删除文件列表
|
|
|
|
# def get_del_files(self, _path):
|
|
|
|
# del_list = []
|
|
|
|
# for _f in self.get_all_files(_path):
|
|
|
|
# if os.path.basename(_f) in clear_list['file_name']:
|
|
|
|
# del_list.append(_f)
|
|
|
|
# if os.path.splitext(_f)[1] in clear_list['ext_name']:
|
|
|
|
# del_list.append(_f)
|
|
|
|
# return del_list
|
|
|
|
|
|
|
|
# 根据clear_list清除无效文件
|
|
|
|
|
|
|
|
|
|
|
|
# 用于整理文件夹的对象
|
|
|
|
class FilesCollection:
|
|
|
|
def __init__(self, path):
|
|
|
|
self.path = path
|
|
|
|
|
|
|
|
# 清理无效文件
|
|
|
|
def clear_files(self):
|
|
|
|
for file in get_all_files(self.path):
|
|
|
|
for file_name in clear_list['file_name']:
|
|
|
|
if os.path.basename(file).lower() == file_name.lower():
|
|
|
|
os.system(f'del "{file}"')
|
|
|
|
# logger.info(f"删除文件成功: {path}")
|
|
|
|
log_info(f"删除文件成功: {file}")
|
|
|
|
break
|
|
|
|
for ext_name in clear_list['ext_name']:
|
|
|
|
if os.path.splitext(file)[1].lower() == ext_name.lower():
|
|
|
|
os.system(f'del "{file}"')
|
|
|
|
# logger.info(f"删除文件成功: {path}")
|
|
|
|
log_info(f"删除文件成功: {file}")
|
|
|
|
|
|
|
|
# 用来整理文件夹,返回原路径和去掉无效后的路径
|
|
|
|
def get_move_files(self):
|
|
|
|
todos = [[self.path, self.path]]
|
|
|
|
move_files = []
|
|
|
|
tree = list(os.walk(self.path))
|
|
|
|
# print(json.dumps(list(tree)))
|
|
|
|
# print(todos)
|
|
|
|
while len(todos) > 0:
|
|
|
|
# print('todos')
|
|
|
|
# print(todos)
|
|
|
|
temp = []
|
|
|
|
for todo in todos:
|
|
|
|
for leaf in tree:
|
|
|
|
if todo[0] == leaf[0]:
|
|
|
|
# print('todo[0] - leaf[0]')
|
|
|
|
# print(todo[0])
|
|
|
|
# print(leaf[0])
|
|
|
|
if len(leaf[1]) == 0: # 文件夹为0个, 文件有0个或多个
|
|
|
|
# print(1)
|
|
|
|
for file in leaf[2]:
|
|
|
|
move_files.append([os.path.join(leaf[0], file), os.path.join(todo[1], file)])
|
|
|
|
elif len(leaf[1]) == 1 and len(leaf[2]) == 0: # 1个文件夹 和 0个文件
|
|
|
|
# print(2)
|
|
|
|
temp.append([os.path.join(leaf[0], leaf[1][0]), todo[1]])
|
|
|
|
elif len(leaf[1]) == 1 and len(leaf[2]) > 0: # 1个文件夹 和 多个文件
|
|
|
|
# print(3)
|
|
|
|
temp.append([os.path.join(leaf[0], leaf[1][0]), os.path.join(todo[1], leaf[1][0])])
|
|
|
|
for file in leaf[2]:
|
|
|
|
move_files.append([os.path.join(leaf[0], file), os.path.join(todo[1], file)])
|
|
|
|
elif len(leaf[1]) > 1: # 有多个文件夹
|
|
|
|
# print(4)
|
|
|
|
for folder in leaf[1]:
|
|
|
|
temp.append([os.path.join(leaf[0], folder), os.path.join(todo[1], folder)])
|
|
|
|
for file in leaf[2]:
|
|
|
|
move_files.append([os.path.join(leaf[0], file), os.path.join(todo[1], file)])
|
|
|
|
else:
|
|
|
|
print('看看啥情况')
|
|
|
|
# print('temp')
|
|
|
|
# print(temp)
|
|
|
|
todos = temp
|
|
|
|
return move_files
|
|
|
|
|
|
|
|
# 移动文件
|
|
|
|
def move_files(self):
|
|
|
|
for move in self.get_move_files():
|
|
|
|
# print(move)
|
|
|
|
if not os.path.exists(os.path.dirname(move[1])):
|
|
|
|
log_info(f'创建文件夹: {os.path.dirname(move[1])}')
|
|
|
|
os.makedirs(os.path.dirname(move[1]))
|
|
|
|
log_info(f'移动文件:{move[0]} -> {move[1]}')
|
|
|
|
shutil.move(move[0], move[1])
|
|
|
|
|
|
|
|
# 获取一个空文件夹, 如果是空文件夹则返回路径,否则返回False
|
|
|
|
@staticmethod
|
|
|
|
def is_empty(_path):
|
|
|
|
tree = list(os.walk(_path))
|
|
|
|
empty_list = []
|
|
|
|
for leaf in tree:
|
|
|
|
if not leaf[1] and not leaf[2]:
|
|
|
|
return leaf[0]
|
|
|
|
return False
|
|
|
|
|
|
|
|
# 清除全部空文件夹
|
|
|
|
def remove_empty(self):
|
|
|
|
while _empty := self.is_empty(self.path):
|
|
|
|
# print(_empty)
|
|
|
|
log_info(f'移除空文件夹:{_empty}')
|
|
|
|
os.system(f"attrib -r {_empty}")
|
|
|
|
os.removedirs(_empty)
|
|
|
|
|
|
|
|
# 如果根目录是空的话就移除根目录并返回True
|
|
|
|
def remove_empty_root_folder(self):
|
|
|
|
if _empty := self.is_empty(self.path):
|
|
|
|
log_info(f'移除空的根目录:{_empty}')
|
|
|
|
os.system(f"attrib -r {_empty}")
|
|
|
|
os.removedirs(_empty)
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
# 重命名根路径的文件夹, 如果只有一个文件就把文件重命名
|
|
|
|
def rename_root_folder(self, _root, name, _org, _target):
|
|
|
|
if len(file := get_all_files(_org)) == 1:
|
|
|
|
ext = os.path.splitext(file[0])[1]
|
|
|
|
log_info(f'重命名:{file[0]} -> {os.path.join(_root, name + ext)}')
|
|
|
|
file_rename(file[0], os.path.join(_root, name + ext))
|
|
|
|
self.remove_empty()
|
|
|
|
else:
|
|
|
|
log_info(f'重命名:{_org} -> {_target}')
|
|
|
|
file_rename(_org, _target)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
root = r"F:\Temp\sjry\hj"
|
|
|
|
files = FilesUnzip(root)
|
|
|
|
# print(files.get_root_folder_list())
|
|
|
|
# print(files.get_all_files(r"F:\Temp\sjry\hj\35316"))
|
|
|
|
# files.file_rename('F:\\Temp\\sjry\\hj\\35316\\22\\新建 文本文档.123', 'F:\\Temp\\sjry\\hj\\35316\\22\\新建 文本文档.222')
|
|
|
|
# print(files.get_ext_name('F:\\Temp\\sjry\\hj\\35316\\22\\新建 文本文档.222'))
|
|
|
|
# print(files.change_to_know_name('F:\\Temp\\sjry\\hj\\35316\\22\\新建 文本文档.win'))
|
|
|
|
# for f in files.get_root_folder_list():
|
|
|
|
# all_file = files.get_all_files(os.path.join(root, f))
|
|
|
|
# print(files.clear_files(all_file))
|
|
|
|
# print(files.collection_files(r"F:\Temp\sjry\hj\35316"))
|
|
|
|
# print(json.dumps(files.get_folder_dict(r'F:\Temp\test\12345')))
|
|
|
|
col_obj = FilesCollection(r'F:\Temp\leshe\leshe_20230810\35480')
|
|
|
|
print(col_obj.remove_empty_root_folder())
|