from git import Repo from dspt_api.util.insert_api_to_db import db_conn, db_cursor from update.models import Release import os import shutil import hashlib import pymysql from dingxin_toolbox_drf import settings from env import ENV """ 功能: 1. clone代码(仅内部使用) 2. 拉去指定版本代码 3. 获取所有版本并写入数据库 4. 将指定版本的cine.sql拉取到本地 5. 对本执行版本的cine.sql和本地同版本的文件是否一致 6. 将本地的cine.sql写入数据库 """ class GitUtil: def __init__(self, short_release): self.short_release = short_release self.local_code_path = os.path.join(settings.BASE_DIR, 'dx', 'code') self.cine_sql_path = os.path.join(settings.BASE_DIR, 'dx', 'sql') self.db_name = self.get_db_name(self.short_release) self.db_config = { 'host': settings.CONFIG[ENV]['DB']['HOST'], 'user': settings.CONFIG[ENV]['DB']['USER'], 'password': settings.CONFIG[ENV]['DB']['PASSWORD'], 'port': 3309, 'database': self.db_name, 'connect_timeout': 5, } # 执行一次 clone代码到本地 def clone(self): git_url = 'http://dingxin_readonly:cine123456@172.16.3.3:8081/root/dingxin.git' Repo.clone_from(git_url, to_path=self.local_code_path, branch='master') # 获取指定版本的最新代码 def checkout_release(self): print('操作git获取指定版本的cine.sql') local_repo = Repo(self.local_code_path) checkout_result = local_repo.git.checkout(self.short_release) print("checkout: ", checkout_result) pull_result = local_repo.git.pull() print("pull: ", pull_result) return pull_result # 复制cine.sql到本地路径cine_xxx_org.sql,然后生成新数据库名称的cine_xxx.sql # 如果cine.sql和cine_xxx_org.sql文件md5一致则跳过 # 如果cine.sql和cine_xxx_org.sql文件md5不一致,则重新拷贝cine_xxx_org.sql并生成cine_xxx.sql # 如果生成新的cine_xxx.sql则返回True,否则返回False def create_cine_sql(self): output = self.checkout_release() org_path = os.path.join(self.local_code_path, 'install', 'cine.sql') target_org_path = os.path.join(self.cine_sql_path, f'{self.db_name}_org.sql') target_path = os.path.join(self.cine_sql_path, f'{self.db_name}.sql') if os.path.exists(target_org_path): print(f'存在同名文件 {target_org_path}') if self.get_md5(org_path) == self.get_md5(target_org_path): print('新文件的md5与之前的相同,不需要处理') if os.path.exists(target_path): return False else: print('新文件的md5与之前的不同,开始处理流程') os.remove(target_org_path) if os.path.exists(target_org_path): print(f'删除失败{target_org_path}') print(f'删除成功{target_org_path}') os.remove(target_path) if os.path.exists(target_path): print(f'删除失败{target_path}') print(f'删除成功{target_path}') r = shutil.copyfile(org_path, target_org_path) print('拷贝到目标路径成功', r) else: print(f'没有同名文件 {target_org_path}') r = shutil.copyfile(org_path, target_org_path) print('拷贝到目标路径成功', r) with open(target_path, 'w', encoding='utf-8') as target_file: with open(org_path, 'r', encoding='utf') as f: lines = f.readlines() target_lines = [] for line in lines: if line.startswith('CREATE DATABASE'): print('修改 DROP DATABASE 和 CREATE DATABASE 语句为对应版本语句') target_lines.append(f'DROP DATABASE IF EXISTS `{self.db_name}`;\r\n') target_lines.append( f'CREATE DATABASE /*!32312 IF NOT EXISTS*/`{self.db_name}` /*!40100 DEFAULT CHARACTER SET utf8 */;\r\n') elif line.startswith('USE `cine`'): print('修改 USE 语句为对应版本语句') target_lines.append(f'USE `{self.db_name}`;\r\n') else: target_lines.append(line) f.close() target_file.writelines(target_lines) target_file.close() # print('create_cine_sql', output) return True # 将生成的cine.sql写入数据库 def write_cine_sql_by_mysql(self): print('开始将cine.sql写入数据库') if os.path.exists(sql_path := os.path.join(self.cine_sql_path, f'{self.db_name}.sql')): cmd = f'mysql -h{self.db_config["host"]} -P{self.db_config["port"]} -u{self.db_config["user"]} -p{self.db_config["password"]} < {sql_path}' print('执行命令', cmd) r = os.system(cmd) print('cine.sql写入结果', r) self.del_func_view() def del_func_view(self): cmd_list = [ f'DROP FUNCTION IF EXISTS `{self.db_name}`.`GetCostAverage`;', f'DROP FUNCTION IF EXISTS `{self.db_name}`.`GetGoodsCount`;', f'DROP FUNCTION IF EXISTS `{self.db_name}`.`GetSumIn`;', f'DROP FUNCTION IF EXISTS `{self.db_name}`.`GetCostFifo`;', f'DROP FUNCTION IF EXISTS `{self.db_name}`.`GetSumOut`;', f'DROP FUNCTION IF EXISTS `{self.db_name}`.`GetCost`;', f'DROP FUNCTION IF EXISTS `{self.db_name}`.`GetGoodsPrice`;', f'DROP FUNCTION IF EXISTS `{self.db_name}`.`GetCostMobile`;', f'DROP VIEW IF EXISTS `{self.db_name}`.`retail_inventory_list`;' ] print('self.db_config', self.db_config) _db_conn = pymysql.connect(**self.db_config) _db_cursor = db_conn.cursor() _db_cursor.execute() for cmd in cmd_list: r = _db_cursor.execute(cmd) print(cmd, r) _db_conn.commit() _db_cursor.close() _db_conn.close() # 获取数据库名称 def get_db_name(self, _release=''): if _release == '': release = self.short_release else: release = _release # print(release) rel = Release.objects.filter(short_release=release).first() # print(rel) return 'cine_' + str(rel.ver_id) # 写入cine.sql def handle_create_cine(self): # 如果生成了新的文件则写库,如果sql没有变化则跳过 if self.create_cine_sql(): self.write_cine_sql_by_mysql() return '成功' # # 获取release信息 # def get_release(self, _release=''): # if _release == '': # release = self.short_release # else: # release = _release # rel = Release.objects.filter(short_release=release).first() # return rel.release @staticmethod def get_short_version(): return Release.objects.exclude(status='0').order_by('-ver_id').all() @staticmethod def get_md5(path): with open(path, "rb") as f: m = hashlib.md5() # 创建md5对象 with open(path, 'rb') as fobj: while True: data = fobj.read(4096) if not data: break m.update(data) # 更新md5对象 return m.hexdigest() # 返回md5对象 class GitDbUtil: def __init__(self): self.local_code_path = os.path.join(settings.BASE_DIR, 'dx', 'code') # 获取可用的版本,可用状态需要手动维护数据库 @staticmethod def get_short_version(): return Release.objects.exclude(status='0').order_by('-ver_id').all() # 从git中获取版本列表后和数据库对比,添加新的版本到数据库 def set_release_to_db(self): local_repo = Repo(self.local_code_path) local_repo.git.execute(command='git remote update origin --prune', shell=True) # 更新远程列表 local_repo.git.execute(command='git fetch --all', shell=True) release = [] for remote in local_repo.remotes: for ref in remote.refs: release.append(ref.name) saved_release = Release.objects.all() saved_release_version_list = [rel.release for rel in saved_release] for rel in release: # print(rel) if rel in saved_release_version_list: pass # print('pass') else: print(rel) data = { 'release': rel, 'short_release': rel.replace('origin/', ''), 'main_ver': rel[7:18], 'ver_id': int(rel[14:18]), 'status': '1' } Release.objects.create(**data) for local_rel in saved_release_version_list: if local_rel in release: pass else: Release.objects.filter(release=local_rel).update(status='0') # if __name__ == '__main__': # git_util = GitUtil() # # git_util.clone()