parent
240e8e56bf
commit
729a3a741d
6 changed files with 403 additions and 0 deletions
@ -0,0 +1,73 @@ |
||||
know_ext_name = ['.jpg', '.mp4', '.png', '.mov', '.txt', '.jpeg', '.m4v', '.flv', '.url', '.db', '.avi', '.mkv', '.bmp', |
||||
'.ini', '.doc', '.docx', '.ogg', '.wmv', '.gif', '.ts', '.mts', '.iso', '.mpg', '.webp', '.heic', |
||||
'.livp', '.ppt', '.mp3', '.htm', '.jfif', '.webm', '.3gp', '.m4a', '.rmvb', '.rm', '.asf', '.f4v', |
||||
'.mpeg', '.torrent', '.tiff'] |
||||
|
||||
handle_zip_ext_name = ['.zip', '.7z', '.001', '.rar', '.tar', '.wim'] |
||||
|
||||
know_zip_ext_name = ['.zip', '.z01', '.z02', '.z03', '.z04', '.z05', '.z06', '.z07', '.z08', '.z09', '.z10', '.z11', |
||||
'.z12', '.z13', '.z14', '.z15', '.z16', '.z17', '.z18', '.z19', '.z20', '.7z', '.001', '.002', |
||||
'.003', '.004', '.005', '.006', '.007', '.008', '.009', '.010', '.011', '.012', '.013', '.014', |
||||
'.015', '.016', '.017', '.018', '.019', '.020', '.rar', '.tar', '.wim'] |
||||
|
||||
re_ext_list_7z = r'\.7z\.\d{2,3}' |
||||
|
||||
re_ext_list = {'.7z': r'(\.7z.+?$)', |
||||
'.rar': r'(\.rar.+?$)', |
||||
'.zip': r'(\.zip.+?$)', |
||||
} |
||||
|
||||
clear_list = { |
||||
'file_name': ['ds_store'], |
||||
'ext_name': ['.ds_store', '.ini', '.html', '.co', '.torrent', '.js', '.downloading', '.lnk'], |
||||
} |
||||
|
||||
pwd_dict = { |
||||
'福利大吧': ['福利大吧@fulidaba00(1.2.3.N).com'], |
||||
'2048jp.com': ['https://2048jp.com/', 'https://2048jp.com'], |
||||
'fulibl.net': ['fulibl.com', 'fulibl.cc', 'fulibl.net'], |
||||
'vrzydh.com或vrpyf.com或vrkefu@outlook.com': ['vrzydh.com', 'vrpyf.com', 'vrkefu@outlook.com'], |
||||
'fulizifanji.club 或者fulizifanji.xyz': ['fulizifanji.club', 'fulizifanji.xyz'], |
||||
'fulizifanji.club或者fulizifanji.xyz': ['fulizifanji.club', 'fulizifanji.xyz'] |
||||
} |
||||
|
||||
ext_name_list = {'.7': '.7z', |
||||
'.7z111': '.7z', |
||||
'.7z12': '.7z', |
||||
'.77': '.7z', |
||||
'.pdf': '.rar', |
||||
'.psd': '.rar', |
||||
'.ra': '.rar', |
||||
'.zi': '.zip', |
||||
'.77z': '.7z', |
||||
'.777': '.7z', |
||||
'': '.zip', |
||||
'.7tar': '.7z', |
||||
'.abc1': '.7z', |
||||
'.abc2': '.7z', |
||||
'.abc3': '.7z', |
||||
'.tex': '.7z', |
||||
'.mp41': '.mp4', |
||||
'.avi1': '.avi', |
||||
'.tif': '.zip', |
||||
'.lzh': '.zip', |
||||
'.下载后改成rar': '.rar', |
||||
'.c': '.zip', |
||||
'.word': '.rar', |
||||
} |
||||
|
||||
special_symbols = {'==1==': '<', |
||||
'==2==': '>', |
||||
'==3==': '|', |
||||
'==4==': '*', |
||||
'==5==': '?', |
||||
'==6==': '/', |
||||
'==7==': '\\', |
||||
'==8==': ':', |
||||
'==9==': "'", |
||||
'==10==': '"', |
||||
} |
||||
|
||||
escape_list = [('&', '&'), ('"', '"'), ('<', '<'), ('>', '>'), (' ', ' ')] |
||||
|
||||
check_size = {'.mp3': '.zip', } |
@ -0,0 +1,177 @@ |
||||
import json |
||||
import os |
||||
from log import logger, log_info |
||||
from data_dict import ext_name_list, know_zip_ext_name, know_ext_name, handle_zip_ext_name, clear_list |
||||
|
||||
|
||||
class Files: |
||||
|
||||
def __init__(self, root_path): |
||||
self.root_path = root_path |
||||
|
||||
# 目标根目录,返回根目录下全部文件夹的列表 |
||||
def get_root_folder_list(self): |
||||
return list(os.walk(self.root_path))[0][1] |
||||
|
||||
# 目标根目录,返回根目录下全部文件夹的列表 |
||||
@staticmethod |
||||
def get_folder_dict(_path): |
||||
data_dict = dict() |
||||
for d in os.walk(_path): |
||||
data_dict[d[0]] = dict(folders=d[1], files=d[2]) |
||||
return data_dict |
||||
|
||||
# 目标目录, 获取此目录下的全部文件,未经整理 |
||||
@staticmethod |
||||
def get_all_files(_path): |
||||
file_list = [] |
||||
for item in os.walk(_path): |
||||
if len(item[2]) > 0: |
||||
for file_name in item[2]: |
||||
file_list.append(os.path.join(item[0], file_name)) |
||||
return file_list |
||||
|
||||
# 目标文件, 重命名文件 |
||||
@staticmethod |
||||
def file_rename(_org, _target): |
||||
os.rename(_org, _target) |
||||
|
||||
# 目标文件, 获取文件扩展名, 返回 文件路径+文件名, 扩展名 |
||||
@staticmethod |
||||
def get_ext_name(_file): |
||||
base_name = os.path.splitext(_file)[0] |
||||
ext_name = os.path.splitext(_file)[1].lower() |
||||
return base_name, ext_name |
||||
|
||||
# 目标文件, 将错误的扩展名换成正确的,执行重命名操作,返回新的文件名列表 |
||||
def change_to_know_name(self, _file): |
||||
base_file_name, ext_file_name = self.get_ext_name(_file) |
||||
if ext_file_name in know_zip_ext_name: |
||||
return _file |
||||
for unknown_ext, know_ext in ext_name_list.items(): |
||||
if ext_file_name == unknown_ext: |
||||
new_file_name = base_file_name + know_ext |
||||
self.file_rename(_file, new_file_name) |
||||
return new_file_name |
||||
return _file |
||||
|
||||
# 目标文件, 将文件分类整理返回字典格式文件列表 |
||||
def clear_files(self, files_list): |
||||
new_file_list = dict(handle_zip=[], zip=[], others=[], unknown=[]) |
||||
for _f in files_list: |
||||
new = self.change_to_know_name(_f) |
||||
_, ext_new = self.get_ext_name(new) |
||||
if ext_new in handle_zip_ext_name: |
||||
new_file_list['handle_zip'].append(new) |
||||
new_file_list['zip'].append(new) |
||||
elif ext_new in know_zip_ext_name: |
||||
new_file_list['zip'].append(new) |
||||
elif ext_new in know_ext_name: |
||||
new_file_list['others'].append(new) |
||||
else: |
||||
new_file_list['unknown'].append(new) |
||||
return new_file_list |
||||
|
||||
# 获取可删除文件列表 |
||||
def get_del_files(self, _path): |
||||
del_list = [] |
||||
for _f in self.get_all_files(_path): |
||||
if os.path.basename(_f) in clear_list['file_name']: |
||||
del_list.append(_f) |
||||
if os.path.splitext(_f)[1] in clear_list['ext_name']: |
||||
del_list.append(_f) |
||||
return del_list |
||||
|
||||
# 目标文件, 将文件按照目录整理 |
||||
def collection_files(self, _path): |
||||
_all_files = self.get_all_files(_path) |
||||
files_dict = dict(path_files={}, path=[], files=_all_files) |
||||
for _f in _all_files: |
||||
file_path = os.path.dirname(_f) |
||||
if file_path not in files_dict['path_files'].keys(): |
||||
files_dict['path'].append(file_path) |
||||
files_dict['path_files'][file_path] = [_f] |
||||
else: |
||||
files_dict['path_files'][file_path].append(_f) |
||||
return files_dict |
||||
|
||||
def get_file_dict(self, _path): |
||||
for p in os.walk(_path): |
||||
pass |
||||
|
||||
|
||||
def move_files(self, _root_path): |
||||
move_file_list = [] |
||||
_files = self.collection_files(_root_path) |
||||
if not _files['files']: |
||||
log_info('文件夹为空') |
||||
else: |
||||
if len(_files['files']) == 1: # 单文件处理 |
||||
source = _files['files'][0] |
||||
target = os.path.join(_root_path, os.path.basename(source)) |
||||
move_file_list.append((source, target)) |
||||
elif len(_files['path']) == 1: # 只有一个目录下有多文件处理 |
||||
for _f in _files['files']: |
||||
source = _f |
||||
target = os.path.join(_root_path, os.path.basename(source)) |
||||
move_file_list.append((source, target)) |
||||
else: # 有多个目录,每个目录下有一个或多个文件 |
||||
temp_data = dict() |
||||
for _p in _files['path']: |
||||
for _f in _files['path_files'][_p]: |
||||
temp_data[_p] = dict(current=_p, valid='', name=os.path.basename(_f)) |
||||
|
||||
# 寻找不重复的基础路径 |
||||
# 传入的字典格式 {文件路径:{current: 当前路径,valid: 基础路径之后的部分, name: 文件名}} |
||||
# def get_base_dir(self, file_list): |
||||
# base_dir = dict() |
||||
# temp = [] |
||||
# for _f in file_list.keys(): |
||||
# current = base_dir[_f]['current'] |
||||
# base_dir[_f]['current'] = os.path.dirname(current) |
||||
# base_dir[_f]['valid'] = os.path.basename(current) + '/' + base_dir[_f]['valid'] |
||||
# temp.append(base_dir[_f]['valid']) |
||||
# if len(set(temp)) == len(temp): |
||||
# return base_dir |
||||
# else: |
||||
# self.get_base_dir(base_dir) |
||||
|
||||
def get_base_dir(self, _path): |
||||
files_dict = self.get_folder_dict(_path) |
||||
|
||||
# for _p in check_list: |
||||
# if len(files_dict[_p]['folders']) > 1 and len(files_dict[_p]['files']) == 0: |
||||
# for _f in files_dict[_p]['folders']: |
||||
# os.path.join() |
||||
# elif len(files_dict[_path]['folders']) > 1 and len(files_dict[_path]['files']) > 0: |
||||
# pass |
||||
# elif len(files_dict[_path]['folders']) == 1 and len(files_dict[_path]['files']) == 0: |
||||
# pass |
||||
# elif len(files_dict[_path]['folders']) == 1 and len(files_dict[_path]['files']) > 0: |
||||
# pass |
||||
# elif len(files_dict[_path]['folders']) == 0 and len(files_dict[_path]['files']) > 0: |
||||
# pass |
||||
# else: |
||||
# pass |
||||
# 删除文件 |
||||
@staticmethod |
||||
def del_file(path_list): |
||||
for path in path_list: |
||||
os.system(f'del "{path}"') |
||||
# logger.info(f"删除文件成功: {path}") |
||||
log_info(f"删除文件成功: {path}") |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
root = r"F:\Temp\sjry\hj" |
||||
files = Files(root) |
||||
# print(files.get_root_folder_list()) |
||||
# print(files.get_all_files(r"F:\Temp\sjry\hj\35316")) |
||||
# files.file_rename('F:\\Temp\\sjry\\hj\\35316\\22\\新建 文本文档.123', 'F:\\Temp\\sjry\\hj\\35316\\22\\新建 文本文档.222') |
||||
# print(files.get_ext_name('F:\\Temp\\sjry\\hj\\35316\\22\\新建 文本文档.222')) |
||||
# print(files.change_to_know_name('F:\\Temp\\sjry\\hj\\35316\\22\\新建 文本文档.win')) |
||||
# for f in files.get_root_folder_list(): |
||||
# all_file = files.get_all_files(os.path.join(root, f)) |
||||
# print(files.clear_files(all_file)) |
||||
# print(files.collection_files(r"F:\Temp\sjry\hj\35316")) |
||||
print(json.dumps(files.get_folder_dict(r'F:\Temp\test\12345'))) |
@ -0,0 +1,26 @@ |
||||
import logging |
||||
# import chardet |
||||
from logging import handlers |
||||
from logging import exception |
||||
|
||||
logger = logging.getLogger('file_unzip') |
||||
|
||||
logger.setLevel(logging.DEBUG) |
||||
|
||||
format_str = '%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s' |
||||
f = logging.Formatter(format_str) |
||||
|
||||
hf = handlers.TimedRotatingFileHandler("log_out.log", when='midnight', interval=1, backupCount=7) |
||||
cf = logging.StreamHandler() |
||||
hf.setFormatter(f) |
||||
cf.setFormatter(f) |
||||
logger.addHandler(hf) |
||||
logger.addHandler(cf) |
||||
|
||||
|
||||
def log_info(msg): |
||||
try: |
||||
# chardet.detect(msg) |
||||
logger.info(msg) |
||||
except exception: |
||||
logger.info(msg.decode('utf-8').encode('gbk')) |
@ -0,0 +1,60 @@ |
||||
import os.path |
||||
from log import logger |
||||
import unzip |
||||
import file |
||||
import db |
||||
|
||||
root_path = r'F:\Temp\sjry\hj' |
||||
|
||||
# 初始化数据库 |
||||
db_obj = db.DbAction() |
||||
file_obj = file.Files(root_path) |
||||
unzip_obj = unzip.UnzipFile() |
||||
|
||||
|
||||
# 开始任务 |
||||
def start_unzip_task(): |
||||
result = True |
||||
for folder in file_obj.get_root_folder_list(): |
||||
all_file = file_obj.get_all_files(os.path.join(root_path, folder)) # 获取文件夹下的全部文件的原始数据 |
||||
all_file = file_obj.clear_files(all_file) # 整理文件返回整理后的结果 |
||||
print(all_file) |
||||
|
||||
# 从数据库中获取数据 |
||||
data = db_obj.get_data_by_id(folder) |
||||
|
||||
# 解压 |
||||
if all_file['handle_zip'] and unzip_obj.unzip(all_file['handle_zip'][0], data['unzip_pwd']): |
||||
file_obj.del_file(all_file['zip']) |
||||
|
||||
# 打印未知文件 |
||||
if all_file['unknown']: |
||||
logger.info("打印没有处理的文件扩展名:") |
||||
logger.info(', '.join(all_file['unknown'])) |
||||
result = False |
||||
|
||||
# 检查打印结果 |
||||
if not (all_file['handle_zip'] and all_file['zip'] and all_file['unknown']): |
||||
logger.info('全部文件已解压') |
||||
return result |
||||
|
||||
|
||||
def start_collation_task(): |
||||
result = True |
||||
for folder in file_obj.get_root_folder_list(): |
||||
all_file = file_obj.get_all_files(os.path.join(root_path, folder)) # 获取文件夹下的全部文件的原始数据 |
||||
all_file = file_obj.clear_files(all_file) # 整理文件返回整理后的结果 |
||||
print(all_file) |
||||
|
||||
|
||||
def main(): |
||||
n = 5 |
||||
while n > 0: |
||||
result = start_unzip_task() |
||||
if result: |
||||
break |
||||
n -= 1 |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
main() |
Binary file not shown.
@ -0,0 +1,67 @@ |
||||
import os |
||||
from log import logger, log_info |
||||
|
||||
|
||||
# 需要安装winrar和7z并且配置环境变量 |
||||
|
||||
class UnzipFile: |
||||
def __init__(self): |
||||
pass |
||||
|
||||
# 处理zip和rar |
||||
@staticmethod |
||||
def unzip_zip(source, password=""): |
||||
# 准备参数 |
||||
cmd = 'WinRAR.exe e -y -ibck -ilog.\\winrar.log' |
||||
files = '*.*' |
||||
args = '-or' |
||||
if password: |
||||
pwd = '-hp"' + str(password).strip() + '"' |
||||
else: |
||||
pwd = '' |
||||
target = os.path.splitext(source)[0] + '\\' |
||||
# 组装参数 |
||||
cmd_list = [cmd, '"' + source + '"', files, '"' + target + '"', pwd, args] |
||||
cmd_str = ' '.join(cmd_list) |
||||
# 打印日志 |
||||
log_info(cmd_str) |
||||
# 解压并处理结果 |
||||
r = os.system(cmd_str) |
||||
if r == 0 and os.path.exists(target): |
||||
return True |
||||
return False |
||||
|
||||
# 处理7z相关 |
||||
@staticmethod |
||||
def unzip_7z(source, password=""): |
||||
# 准备参数 |
||||
cmd = '7z.exe x ' |
||||
args = '-o' |
||||
if password: |
||||
pwd = '-p' + str(password) |
||||
else: |
||||
pwd = '' |
||||
target = os.path.splitext(source)[0] + '\\' |
||||
# 组装参数 |
||||
cmd_list = [cmd, '"' + source + '"', args + '"' + target + '"', pwd] |
||||
cmd_str = ' '.join(cmd_list) |
||||
# 打印日志 |
||||
log_info(cmd_str) |
||||
# 解压并处理结果 |
||||
r = os.system(cmd_str) |
||||
if r == 0 and os.path.exists(target): |
||||
return True |
||||
return False |
||||
|
||||
# 打包一个通用方法 |
||||
def unzip(self, source, password=""): |
||||
ext = os.path.splitext(source)[1] |
||||
if ext == '.7z' or ext == '.001' or ext == '.wim': |
||||
result = self.unzip_7z(source, password) |
||||
else: |
||||
result = self.unzip_zip(source, password) |
||||
return result |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
pass |
Loading…
Reference in new issue