dingxin_toolbox
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

179 lines
6.4 KiB

from git import Repo
from update.models import Release
import os
import shutil
import hashlib
import pymysql
from dingxin_toolbox_drf import settings
"""
功能:
1. clone代码(仅内部使用)
2. 拉去指定版本代码
3. 获取所有版本并写入数据库
4. 将指定版本的cine.sql拉取到本地
5. 对本执行版本的cine.sql和本地同版本的文件是否一致
6. 将本地的cine.sql写入数据库
"""
class GitUtil:
def __init__(self, short_release):
self.short_release = short_release
self.local_code_path = os.path.join(settings.BASE_DIR, r'dx\code')
self.cine_sql_path = os.path.join(settings.BASE_DIR, r'dx\sql')
self.db_config = {
'host': '172.16.3.112',
'user': 'test',
'password': 'cine123456',
'connect_timeout': 5,
}
# 执行一次 clone代码到本地
def clone(self):
git_url = 'http://dingxin_readonly:cine123456@172.16.3.3:8081/root/dingxin.git'
Repo.clone_from(git_url, to_path=self.local_code_path, branch='master')
# 获取指定版本的最新代码
def checkout_release(self):
local_repo = Repo(self.local_code_path)
r = local_repo.git.checkout(self.short_release)
print(r)
r = local_repo.git.pull()
print(r)
# 获取版本并写入数据库
def set_release_to_db(self):
local_repo = Repo(self.local_code_path)
release = []
for remote in local_repo.remotes:
for ref in remote.refs:
release.append(ref.name)
saved_release = Release.objects.all()
saved_release_version_list = [rel.short_release for rel in saved_release]
for rel in release:
if rel in saved_release_version_list:
pass
else:
data = {
'release': rel,
'short_release': rel[7:-8] if '_Release' in rel else rel,
'main_ver': rel[7:18],
'ver_id': int(rel[14:18]),
'status': '0'
}
Release.objects.create(**data)
# 复制文cine.sql到本地路径
def copy_cine_sql(self):
self.checkout_release()
short_ver = 'cine_' + self.short_release
org_path = os.path.join(self.local_code_path, 'install', 'cine.sql')
target_path = os.path.join(self.cine_sql_path, f'{self.short_release}.sql')
# if os.path.exists(target_path):
# os.remove(target_path)
with open(target_path, 'w', encoding='utf-8') as target_file:
with open(org_path, 'r', encoding='utf') as f:
lines = f.readlines()
target_lines = []
for line in lines:
if line.startswith('CREATE DATABASE'):
print('CREATE DATABASE')
target_lines.append(f'DROP DATABASE IF EXISTS `{short_ver}`;\r\n')
target_lines.append(
f'CREATE DATABASE /*!32312 IF NOT EXISTS*/`{short_ver}` /*!40100 DEFAULT CHARACTER SET utf8 */;\r\n')
elif line.startswith('USE `cine`'):
print('USE `cine`')
target_lines.append(f'USE `{short_ver}`;\r\n')
else:
target_lines.append(line)
f.close()
target_file.writelines(target_lines)
target_file.close()
# 检查cine.sql是否是最新的
def check_cine_sql(self):
if os.path.exists(sql_path := os.path.join(self.cine_sql_path, f'{self.short_release}.sql')):
saved_md5 = self.get_md5(sql_path)
new_md5 = self.get_md5(os.path.join(self.local_code_path, 'install', 'cine.sql'))
if saved_md5 == new_md5:
print('md5相同')
return True
else:
print('md5不同')
os.remove(sql_path)
return False
else:
print('文件不存在')
return False
def write_sql(self):
if os.path.exists(sql_path := os.path.join(self.cine_sql_path, f'{self.short_release}.sql')):
execute_sql_list = []
with open(sql_path, 'r', encoding='utf-8') as sql_file:
content = sql_file.read()
sql_list = content.split('\n\n')
for sql in sql_list:
sql = sql.strip()
if sql.startswith('CREATE'):
print('CREATE')
execute_sql_list.append(sql + ';')
if sql.startswith('DROP'):
print('DROP')
execute_sql_list.append(sql + ';')
if sql.startswith('USE'):
print('USE')
execute_sql_list.append(sql + ';')
db_conn = pymysql.Connect(**self.db_config)
db_cursor = db_conn.cursor()
# for ext_sql in execute_sql_list:
# result = db_cursor.execute(ext_sql)
# print(result, ext_sql)
def write_cine_sql_by_mysql(self):
if os.path.exists(sql_path := os.path.join(self.cine_sql_path, f'{self.short_release}.sql')):
cmd = f'mysql -h172.16.3.112 -P3306 -utest -pcine123456 < {sql_path}'
r = os.system(cmd)
print(type(r))
print(r)
# 获取数据库名称
def get_db_name(self):
return 'cine_' + self.short_release
# 写入cine.sql
def handle_create_cine(self):
if not self.check_cine_sql():
self.copy_cine_sql()
self.write_cine_sql_by_mysql()
# 获取端短版本号
# @staticmethod
# def get_short_version_by_release(_release):
# data = Release.objects.filter(short_release=_release).first()
# return data.short_release
@staticmethod
def get_short_version():
return Release.objects.exclude(status='0').order_by('-ver_id').all()
@staticmethod
def get_md5(path):
with open(path, "rb") as f:
m = hashlib.md5() # 创建md5对象
with open(path, 'rb') as fobj:
while True:
data = fobj.read(4096)
if not data:
break
m.update(data) # 更新md5对象
return m.hexdigest() # 返回md5对象
if __name__ == '__main__':
git_util = GitUtil()
# git_util.clone()