from git import Repo from update.models import Release import os import shutil import hashlib import pymysql from dingxin_toolbox_drf import settings class GitUtil: def __init__(self): self.local_path = os.path.join(settings.BASE_DIR, r'dx\code') self.db_config = { 'host': 'home.rogersun.cn', 'user': 'dingxin', 'password': 'cine123456', 'connect_timeout': 5, } # 执行一次 clone代码到本地 def clone(self): git_url = 'http://dingxin_readonly:cine123456@172.16.3.3:8081/root/dingxin.git' Repo.clone_from(git_url, to_path=self.local_path, branch='master') # 获取指定版本的最新代码 def checkout_release(self, _release): local_repo = Repo(self.local_path) local_repo.git.checkout(_release) local_repo.git.pull() # 获取版本并写入数据库 def set_release_to_db(self): local_repo = Repo(self.local_path) release = [] for remote in local_repo.remotes: for ref in remote.refs: release.append(ref.name) saved_release = Release.objects.all() saved_release_version_list = [rel['version'] for rel in saved_release] for rel in release: if rel in saved_release_version_list: pass else: data = { 'version': rel, 'short_ver': rel[7:-8] if '_Release' in rel else rel, 'status': '0' } Release.objects.create(**data) # 复制文cine.sql到本地路径 def copy_cine_sql(self, _release): self.checkout_release(_release) short_ver = 'cine_' + self.get_short_version_by_release(_release) org_path = os.path.join(self.local_path, 'install/cine.sql') target_path = os.path.join(self.local_path, 'sql/' + _release + '.sql') if os.path.exists(target_path): os.remove(target_path) target_file = open(target_path, 'a', encoding='utf-8') with open(org_path, 'r', encoding='utf') as f: lines = f.readlines() for index, line in enumerate(lines): if line == 'CREATE DATABASE /*!32312 IF NOT EXISTS*/`cine` /*!40100 DEFAULT CHARACTER SET utf8 */;': lines[ index] = f'CREATE DATABASE /*!32312 IF NOT EXISTS*/`{short_ver}` /*!40100 DEFAULT CHARACTER SET utf8 */;' if line == 'USE `cine`;': lines[index] = f'USE `{short_ver}`;' f.close() target_file.writelines(lines) target_file.close() # 检查cine.sql是否是最新的 def check_cine_sql(self, _release): if os.path.exists(sql_path := os.path.join(self.local_path, 'sql/' + _release + '.sql')): saved_md5 = self.get_md5(sql_path) new_md5 = self.get_md5(os.path.join(self.local_path, 'install/cine.sql')) if saved_md5 == new_md5: return True return False def write_sql(self, _release): if os.path.exists(sql_path := os.path.join(self.local_path, 'sql/' + _release + '.sql')): short_ver = 'cine_' + self.get_short_version_by_release(_release) execute_sql_list = [f'DROP DATABASE IF EXISTS {short_ver};'] with open(sql_path, 'r', encoding='utf-8') as sql_file: content = sql_file.read() sql_list = content.split(';') for sql in sql_list: sql = sql.strip() if sql.startswith('CREATE'): execute_sql_list.append(sql + ';') if sql.startswith('DROP'): execute_sql_list.append(sql + ';') if sql.startswith('USE'): execute_sql_list.append(sql + ';') db_conn = pymysql.Connect(**self.db_config) db_cursor = db_conn.cursor() for ext_sql in execute_sql_list: result = db_cursor.execute(ext_sql) print(result, ext_sql) # 获取数据库名称 def get_db_name(self, _release): return 'cine_' + self.get_short_version_by_release(_release) # 获取端短版本号 @staticmethod def get_short_version_by_release(_release): data = Release.objects.filter(version=_release).first() return data.short_version @staticmethod def get_short_version(): data = Release.objects.exclude(status='0').order_by('-short_ver').all() return [{'version': r.version, 'short_ver': r.short_ver, 'status': r.status} for r in data] @staticmethod def get_md5(path): with open(path, "rb") as f: m = hashlib.md5() # 创建md5对象 with open(path, 'rb') as fobj: while True: data = fobj.read(4096) if not data: break m.update(data) # 更新md5对象 return m.hexdigest() # 返回md5对象 if __name__ == '__main__': git_util = GitUtil() # git_util.clone()