dingxin_toolbox
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

137 lines
5.1 KiB

10 months ago
from git import Repo
from update.models import Release
import os
import shutil
import hashlib
import pymysql
10 months ago
from dingxin_toolbox_drf import settings
10 months ago
class GitUtil:
def __init__(self):
10 months ago
self.local_path = os.path.join(settings.BASE_DIR, r'dx\code')
10 months ago
self.db_config = {
'host': 'home.rogersun.cn',
'user': 'dingxin',
'password': 'cine123456',
'connect_timeout': 5,
}
# 执行一次 clone代码到本地
def clone(self):
git_url = 'http://dingxin_readonly:cine123456@172.16.3.3:8081/root/dingxin.git'
Repo.clone_from(git_url, to_path=self.local_path, branch='master')
# 获取指定版本的最新代码
def checkout_release(self, _release):
local_repo = Repo(self.local_path)
local_repo.git.checkout(_release)
local_repo.git.pull()
# 获取版本并写入数据库
def set_release_to_db(self):
local_repo = Repo(self.local_path)
release = []
for remote in local_repo.remotes:
for ref in remote.refs:
10 months ago
release.append(ref.name)
10 months ago
saved_release = Release.objects.all()
10 months ago
saved_release_version_list = [rel['version'] for rel in saved_release]
10 months ago
for rel in release:
10 months ago
if rel in saved_release_version_list:
10 months ago
pass
else:
data = {
'version': rel,
10 months ago
'short_ver': rel[7:-8] if '_Release' in rel else rel,
10 months ago
'status': '0'
}
10 months ago
Release.objects.create(**data)
10 months ago
# 复制文cine.sql到本地路径
def copy_cine_sql(self, _release):
self.checkout_release(_release)
10 months ago
short_ver = 'cine_' + self.get_short_version_by_release(_release)
10 months ago
org_path = os.path.join(self.local_path, 'install/cine.sql')
target_path = os.path.join(self.local_path, 'sql/' + _release + '.sql')
if os.path.exists(target_path):
os.remove(target_path)
target_file = open(target_path, 'a', encoding='utf-8')
with open(org_path, 'r', encoding='utf') as f:
lines = f.readlines()
for index, line in enumerate(lines):
if line == 'CREATE DATABASE /*!32312 IF NOT EXISTS*/`cine` /*!40100 DEFAULT CHARACTER SET utf8 */;':
10 months ago
lines[
index] = f'CREATE DATABASE /*!32312 IF NOT EXISTS*/`{short_ver}` /*!40100 DEFAULT CHARACTER SET utf8 */;'
10 months ago
if line == 'USE `cine`;':
lines[index] = f'USE `{short_ver}`;'
f.close()
target_file.writelines(lines)
target_file.close()
# 检查cine.sql是否是最新的
def check_cine_sql(self, _release):
10 months ago
if os.path.exists(sql_path := os.path.join(self.local_path, 'sql/' + _release + '.sql')):
10 months ago
saved_md5 = self.get_md5(sql_path)
new_md5 = self.get_md5(os.path.join(self.local_path, 'install/cine.sql'))
if saved_md5 == new_md5:
return True
return False
def write_sql(self, _release):
10 months ago
if os.path.exists(sql_path := os.path.join(self.local_path, 'sql/' + _release + '.sql')):
10 months ago
short_ver = 'cine_' + self.get_short_version_by_release(_release)
10 months ago
execute_sql_list = [f'DROP DATABASE IF EXISTS {short_ver};']
with open(sql_path, 'r', encoding='utf-8') as sql_file:
content = sql_file.read()
sql_list = content.split(';')
for sql in sql_list:
sql = sql.strip()
if sql.startswith('CREATE'):
execute_sql_list.append(sql + ';')
if sql.startswith('DROP'):
execute_sql_list.append(sql + ';')
if sql.startswith('USE'):
execute_sql_list.append(sql + ';')
10 months ago
db_conn = pymysql.Connect(**self.db_config)
db_cursor = db_conn.cursor()
10 months ago
for ext_sql in execute_sql_list:
result = db_cursor.execute(ext_sql)
print(result, ext_sql)
# 获取数据库名称
def get_db_name(self, _release):
10 months ago
return 'cine_' + self.get_short_version_by_release(_release)
10 months ago
# 获取端短版本号
@staticmethod
10 months ago
def get_short_version_by_release(_release):
10 months ago
data = Release.objects.filter(version=_release).first()
return data.short_version
10 months ago
@staticmethod
def get_short_version():
data = Release.objects.exclude(status='0').order_by('-short_ver').all()
return [{'version': r.version, 'short_ver': r.short_ver, 'status': r.status} for r in data]
10 months ago
@staticmethod
def get_md5(path):
with open(path, "rb") as f:
m = hashlib.md5() # 创建md5对象
with open(path, 'rb') as fobj:
while True:
data = fobj.read(4096)
if not data:
break
m.update(data) # 更新md5对象
return m.hexdigest() # 返回md5对象
if __name__ == '__main__':
git_util = GitUtil()
10 months ago
# git_util.clone()