dingxin_toolbox
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

187 lines
6.6 KiB

10 months ago
from git import Repo
from update.models import Release
import os
import shutil
import hashlib
import pymysql
10 months ago
from dingxin_toolbox_drf import settings
10 months ago
10 months ago
"""
功能
1. clone代码仅内部使用
2. 拉去指定版本代码
3. 获取所有版本并写入数据库
4. 将指定版本的cine.sql拉取到本地
5. 对本执行版本的cine.sql和本地同版本的文件是否一致
6. 将本地的cine.sql写入数据库
"""
10 months ago
class GitUtil:
10 months ago
def __init__(self, short_release):
self.short_release = short_release
self.local_code_path = os.path.join(settings.BASE_DIR, r'dx\code')
self.cine_sql_path = os.path.join(settings.BASE_DIR, r'dx\sql')
self.db_name = self.get_db_name(self.short_release)
10 months ago
self.db_config = {
10 months ago
'host': '172.16.3.112',
'user': 'test',
10 months ago
'password': 'cine123456',
'connect_timeout': 5,
}
# 执行一次 clone代码到本地
def clone(self):
git_url = 'http://dingxin_readonly:cine123456@172.16.3.3:8081/root/dingxin.git'
10 months ago
Repo.clone_from(git_url, to_path=self.local_code_path, branch='master')
10 months ago
# 获取指定版本的最新代码
10 months ago
def checkout_release(self):
local_repo = Repo(self.local_code_path)
r = local_repo.git.checkout(self.short_release)
print(r)
r = local_repo.git.pull()
print(r)
10 months ago
# 获取版本并写入数据库
def set_release_to_db(self):
10 months ago
local_repo = Repo(self.local_code_path)
10 months ago
release = []
for remote in local_repo.remotes:
for ref in remote.refs:
10 months ago
release.append(ref.name)
10 months ago
saved_release = Release.objects.all()
10 months ago
saved_release_version_list = [rel.short_release for rel in saved_release]
10 months ago
10 months ago
for rel in release:
10 months ago
if rel in saved_release_version_list:
10 months ago
pass
else:
data = {
10 months ago
'release': rel,
'short_release': rel[7:-8] if '_Release' in rel else rel,
'main_ver': rel[7:18],
'ver_id': int(rel[14:18]),
10 months ago
'status': '0'
}
10 months ago
Release.objects.create(**data)
10 months ago
# 复制文cine.sql到本地路径
10 months ago
def copy_cine_sql(self):
self.checkout_release()
org_path = os.path.join(self.local_code_path, 'install', 'cine.sql')
target_path = os.path.join(self.cine_sql_path, f'{self.db_name}.sql')
10 months ago
# if os.path.exists(target_path):
# os.remove(target_path)
with open(target_path, 'w', encoding='utf-8') as target_file:
10 months ago
with open(org_path, 'r', encoding='utf') as f:
lines = f.readlines()
10 months ago
target_lines = []
for line in lines:
if line.startswith('CREATE DATABASE'):
print('CREATE DATABASE')
target_lines.append(f'DROP DATABASE IF EXISTS `{self.db_name}`;\r\n')
10 months ago
target_lines.append(
f'CREATE DATABASE /*!32312 IF NOT EXISTS*/`{self.db_name}` /*!40100 DEFAULT CHARACTER SET utf8 */;\r\n')
10 months ago
elif line.startswith('USE `cine`'):
print('USE `cine`')
target_lines.append(f'USE `{self.db_name}`;\r\n')
10 months ago
else:
target_lines.append(line)
10 months ago
f.close()
10 months ago
target_file.writelines(target_lines)
10 months ago
target_file.close()
# 检查cine.sql是否是最新的
10 months ago
def check_cine_sql(self):
if os.path.exists(sql_path := os.path.join(self.cine_sql_path, f'{self.db_name}.sql')):
10 months ago
saved_md5 = self.get_md5(sql_path)
10 months ago
new_md5 = self.get_md5(os.path.join(self.local_code_path, 'install', 'cine.sql'))
10 months ago
if saved_md5 == new_md5:
10 months ago
print('md5相同')
10 months ago
return True
10 months ago
else:
print('md5不同')
os.remove(sql_path)
return False
else:
print('文件不存在')
return False
def write_sql(self):
if os.path.exists(sql_path := os.path.join(self.cine_sql_path, f'{self.db_name}.sql')):
10 months ago
execute_sql_list = []
10 months ago
with open(sql_path, 'r', encoding='utf-8') as sql_file:
content = sql_file.read()
10 months ago
sql_list = content.split('\n\n')
10 months ago
for sql in sql_list:
sql = sql.strip()
if sql.startswith('CREATE'):
10 months ago
print('CREATE')
10 months ago
execute_sql_list.append(sql + ';')
if sql.startswith('DROP'):
10 months ago
print('DROP')
10 months ago
execute_sql_list.append(sql + ';')
if sql.startswith('USE'):
10 months ago
print('USE')
10 months ago
execute_sql_list.append(sql + ';')
10 months ago
db_conn = pymysql.Connect(**self.db_config)
db_cursor = db_conn.cursor()
10 months ago
# for ext_sql in execute_sql_list:
# result = db_cursor.execute(ext_sql)
# print(result, ext_sql)
def write_cine_sql_by_mysql(self):
if os.path.exists(sql_path := os.path.join(self.cine_sql_path, f'{self.db_name}.sql')):
10 months ago
cmd = f'mysql -h172.16.3.112 -P3306 -utest -pcine123456 < {sql_path}'
r = os.system(cmd)
print(type(r))
print(r)
10 months ago
# 获取数据库名称
def get_db_name(self, _release=''):
if _release == '':
release = self.short_release
else:
release = _release
print(release)
rel = Release.objects.filter(short_release=release).first()
print(rel)
return 'cine_' + str(rel.ver_id)
10 months ago
# 写入cine.sql
def handle_create_cine(self):
if not self.check_cine_sql():
self.copy_cine_sql()
self.write_cine_sql_by_mysql()
10 months ago
# 获取端短版本号
10 months ago
# @staticmethod
# def get_short_version_by_release(_release):
# data = Release.objects.filter(short_release=_release).first()
# return data.short_release
10 months ago
10 months ago
@staticmethod
def get_short_version():
10 months ago
return Release.objects.exclude(status='0').order_by('-ver_id').all()
10 months ago
10 months ago
@staticmethod
def get_md5(path):
with open(path, "rb") as f:
m = hashlib.md5() # 创建md5对象
with open(path, 'rb') as fobj:
while True:
data = fobj.read(4096)
if not data:
break
m.update(data) # 更新md5对象
return m.hexdigest() # 返回md5对象
if __name__ == '__main__':
git_util = GitUtil()
10 months ago
# git_util.clone()