mirror of https://gitee.com/anolis/sysom.git
568 lines
24 KiB
Python
568 lines
24 KiB
Python
import os
|
|
import re
|
|
import urllib
|
|
import subprocess
|
|
import sys
|
|
from clogger import logger
|
|
from django.conf import settings
|
|
import threading
|
|
import redis
|
|
import requests
|
|
import json
|
|
import time
|
|
|
|
from apps.hotfix.models import HotfixModel, OSTypeModel, KernelVersionModel, ReleasedHotfixListModule
|
|
from cec_base.producer import dispatch_producer, Producer
|
|
from cec_base.consumer import Consumer, dispatch_consumer
|
|
from cec_base.admin import dispatch_admin
|
|
from bs4 import BeautifulSoup
|
|
from lib.utils import human_datetime
|
|
from sysom_utils import SysomFramework
|
|
from channel_job import default_channel_job_executor
|
|
|
|
"""
|
|
Function class
|
|
This class contains the support/tool function
|
|
"""
|
|
class FunctionClass():
|
|
|
|
def __init__(self):
|
|
self.producer = SysomFramework.cec_producer()
|
|
|
|
def delete_document(self, doc_path, doc_name):
|
|
document = os.path.join(doc_path, doc_name)
|
|
if os.path.exists(document):
|
|
try:
|
|
os.remove(document)
|
|
except Exception as e:
|
|
logger.error(str(e))
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
def query_formal_hotfix_by_parameters(self, created_time, kernel_version, patch_file, hotfix_name):
|
|
if created_time is not None and len(created_time) <= 0:
|
|
created_time=None
|
|
if kernel_version is not None and len(kernel_version) == 0:
|
|
kernel_version = None
|
|
if patch_file is not None and len(patch_file) == 0:
|
|
patch_file=None
|
|
if hotfix_name is not None and len(hotfix_name) == 0:
|
|
hotfix_name=None
|
|
objects = HotfixModel.objects.all().filter(formal=1)
|
|
if created_time is not None:
|
|
objects = objects.filter(created_at__lt=created_time)
|
|
if kernel_version is not None:
|
|
objects = objects.filter(kernel_version=kernel_version)
|
|
if patch_file is not None:
|
|
objects = objects.filter(patch_file__contains=patch_file)
|
|
if hotfix_name is not None:
|
|
objects = objects.filter(hotfix_name=hotfix_name)
|
|
return objects
|
|
|
|
def get_info_from_version(self, kernel_version, info="os_type"):
|
|
try:
|
|
version_object = KernelVersionModel.objects.all().filter(kernel_version=kernel_version).first()
|
|
if version_object is None:
|
|
logger.error("query kernel version from record failed")
|
|
return None
|
|
if info == "os_type":
|
|
return version_object.os_type
|
|
if info == "source":
|
|
return version_object.source
|
|
if info == "devel_link":
|
|
return version_object.devel_link
|
|
if info == "debuginfo_link":
|
|
return version_object.debuginfo_link
|
|
except Exception as e:
|
|
logger.error(str(e))
|
|
return None
|
|
|
|
def get_sourcerepo_of_os(self, os_type):
|
|
try:
|
|
os_object = OSTypeModel.objects.all().filter(os_type=os_type).first()
|
|
if os_object is None:
|
|
return None
|
|
return os_object.source_repo
|
|
except Exception as e:
|
|
logger.error(str(e))
|
|
return None
|
|
|
|
def get_image_of_os(self, os_type):
|
|
try:
|
|
os_object = OSTypeModel.objects.all().filter(os_type=os_type).first()
|
|
if os_object is None:
|
|
return None
|
|
return os_object.image
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
return None
|
|
|
|
def get_src_pkg_mark_of_os(self, os_type):
|
|
try:
|
|
os_object = OSTypeModel.objects.all().filter(os_type=os_type).first()
|
|
if os_object is None:
|
|
return None
|
|
return os_object.src_pkg_mark
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
return None
|
|
|
|
# building status and formal is set to be 0 when creating a hotfix
|
|
def create_hotfix_object_to_database(self, os_type, kernel_version, hotfix_name, patch_file, patch_path, hotfix_necessary, hotfix_risk,
|
|
log_file, arch):
|
|
res = HotfixModel.objects.create(
|
|
kernel_version = kernel_version,
|
|
os_type=os_type,
|
|
patch_file = patch_file,
|
|
hotfix_name = hotfix_name,
|
|
patch_path = patch_path,
|
|
building_status = 0,
|
|
hotfix_necessary = 0,
|
|
hotfix_risk = 2,
|
|
formal = 0,
|
|
log_file = log_file,
|
|
arch = arch
|
|
)
|
|
return res
|
|
|
|
def create_message_to_cec(self, **kwargs):
|
|
customize = kwargs['customize']
|
|
cec_topic = kwargs['cec_topic']
|
|
os_type = kwargs['os_type']
|
|
hotfix_id = kwargs['hotfix_id']
|
|
kernel_version= kwargs['kernel_version']
|
|
patch_file = kwargs['patch_file']
|
|
hotfix_name = kwargs['hotfix_name']
|
|
patch_path = kwargs['patch_path']
|
|
arch = kwargs['arch']
|
|
log_file = kwargs['log_file']
|
|
try:
|
|
if not customize:
|
|
if re.search('anolis', os_type):
|
|
self.producer.produce(cec_topic, {
|
|
"hotfix_id" : hotfix_id,
|
|
"kernel_version" : kernel_version,
|
|
"patch_name" : patch_file,
|
|
"hotfix_name" : hotfix_name,
|
|
"patch_path" : patch_path,
|
|
"arch": arch,
|
|
"log_file" : log_file,
|
|
"os_type" : os_type,
|
|
"git_repo": "git@gitee.com:anolis/cloud-kernel.git",
|
|
"customize": 0
|
|
})
|
|
else:
|
|
# this is customize kernel
|
|
source_repo = kwargs['source_repo']
|
|
source = kwargs['source']
|
|
devel_link = kwargs['devel_link']
|
|
debuginfo_link = kwargs['debuginfo_link']
|
|
image = kwargs['image']
|
|
is_src_package = kwargs["is_src_package"]
|
|
self.producer.produce(cec_topic, {
|
|
"hotfix_id" : hotfix_id,
|
|
"kernel_version" : kernel_version,
|
|
"hotfix_name" : hotfix_name,
|
|
"patch_name" : patch_file,
|
|
"patch_path" : patch_path,
|
|
"arch": arch,
|
|
"log_file" : log_file,
|
|
"os_type" : os_type,
|
|
"customize": 1,
|
|
"src_repo": source_repo,
|
|
"src_origin": source,
|
|
"devel_link": devel_link,
|
|
"debuginfo_link": debuginfo_link,
|
|
"image": image,
|
|
"is_src_package": is_src_package
|
|
})
|
|
self.producer.flush()
|
|
return True
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
return False
|
|
|
|
def get_hotfix_object_by_id(self, hotfix_id):
|
|
try:
|
|
hotfix_object = HotfixModel.objects.all().filter(id=hotfix_id).first()
|
|
return hotfix_object
|
|
except Exception as e:
|
|
logger.error(str(e))
|
|
return None
|
|
|
|
def get_host_list(self, kernel_version):
|
|
host_url = settings.HOST_URL
|
|
res = requests.get(host_url)
|
|
if res.status_code == 200:
|
|
res = res.json()
|
|
data = res['data']
|
|
host_list = []
|
|
for each_host in data:
|
|
try:
|
|
if each_host['host_info']['kernel_version'] == kernel_version:
|
|
host_list.append(each_host["ip"])
|
|
except Exception as e:
|
|
pass
|
|
return host_list
|
|
else:
|
|
return None
|
|
|
|
def dispatch_hotfix_cmd(self, ip, hotfix_name):
|
|
machine_hotfix_path = "/root/%s" % hotfix_name
|
|
job = default_channel_job_executor.dispatch_job(
|
|
channel_type="ssh",
|
|
params={
|
|
"instance": ip,
|
|
"command": "rpm -ivh %s" % machine_hotfix_path,
|
|
},
|
|
timeout=1000,
|
|
auto_retry=False
|
|
)
|
|
channel_result = job.execute()
|
|
# the result is the output of executing the cmd
|
|
# result = channel_result.result
|
|
if channel_result.code != 1:
|
|
return False
|
|
return True
|
|
|
|
"""
|
|
Args : Given kernel version and the hotfix path
|
|
Return : the number of instance successfully execute the dispatch hotfix and installed hotfix
|
|
"""
|
|
def deploy_hotfix_to_machine(self, kernel_version, hotfix_path):
|
|
try:
|
|
hotfix_list = self.get_host_list(kernel_version)
|
|
hotfix_name = hotfix_path.split("/")[-1]
|
|
"""dispatch the hotfix package to one machine"""
|
|
res = default_channel_job_executor.dispatch_file_job(params={
|
|
"local_path": hotfix_path,
|
|
"remote_path": "/root/%s" % hotfix_name,
|
|
"instances": hotfix_list
|
|
}).execute().result
|
|
|
|
if hotfix_list is None or len(hotfix_list) == 0:
|
|
return -1
|
|
|
|
res = json.loads(res)
|
|
for each_instance in res:
|
|
if not each_instance["success"]:
|
|
logger.error("Instance ip: {} dispatch hotfix:{} failed".format(each_instance["instance"], hotfix_path))
|
|
hotfix_name = hotfix_path.split("/")[-1]
|
|
|
|
success_num = 0
|
|
for each_host in hotfix_list:
|
|
if self.dispatch_hotfix_cmd(each_host, hotfix_name):
|
|
success_num += 1
|
|
|
|
return success_num
|
|
|
|
except Exception as e:
|
|
logger.error(str(e))
|
|
return None
|
|
|
|
def sync_kernel(self, id):
|
|
try:
|
|
os_type_object = OSTypeModel.objects.all().filter(id=id).first()
|
|
src_pkg_mark = os_type_object.src_pkg_mark
|
|
if src_pkg_mark:
|
|
self.sync_source(id, os_type_object)
|
|
else:
|
|
self.sync_git(id, os_type_object)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
|
|
def sync_git(self, id, os_type_model : OSTypeModel):
|
|
try:
|
|
os_type = os_type_model.os_type
|
|
source_devel = os_type_model.source_devel
|
|
source_debuginfo = os_type_model.source_debuginfo
|
|
devel_lists = self.get_rpm_list(source_devel, "devel")
|
|
debuginfo_lists = self.get_rpm_list(source_debuginfo, "debuginfo")
|
|
self.update_ostype_sync_status(id=id, status=0)
|
|
for each_rpm in debuginfo_lists:
|
|
version = each_rpm.replace("kernel-debuginfo-", '').replace(".rpm",'')
|
|
debuginfo_url = source_debuginfo + each_rpm
|
|
devel_rpm = list(filter(lambda x: version in x, devel_lists))[0]
|
|
devel_url = source_devel + devel_rpm
|
|
self.insert_kernel_version_relation_internal(kernel_version=version,os_type=os_type,
|
|
source="", devel_link=devel_url,debuginfo_link=debuginfo_url)
|
|
self.update_ostype_sync_status(id=id, status=2)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
self.update_ostype_sync_status(id=id, status=1)
|
|
|
|
"""
|
|
deprecated
|
|
"""
|
|
def insert_kernel_version_relation_git(self, os_type, git_rule, devel_source, source_devel_list, debuginfo_source):
|
|
for kernel_devel_rpm in source_devel_list:
|
|
version = ".".join(kernel_devel_rpm.replace("kernel-devel-", '').split(".")[:-1])
|
|
if not git_rule:
|
|
source = ""
|
|
else:
|
|
branch = self.git_branch_by_git_rule(git_rule, version)
|
|
if not branch:
|
|
continue
|
|
source = branch
|
|
debuginfo_link = self.get_debuginfo_rpm(debuginfo_source, version)
|
|
debuginfo_link = debuginfo_source + debuginfo_link
|
|
develinfo_link = devel_source + kernel_devel_rpm
|
|
self.insert_kernel_version_relation_internal(kernel_version=version,os_type=os_type,
|
|
source=source, devel_link=develinfo_link,debuginfo_link=debuginfo_link)
|
|
"""
|
|
deprecated
|
|
"""
|
|
def git_branch_by_git_rule(self, git_rule, version):
|
|
branch = None
|
|
rule_file_dir = os.path.join(settings.HOTFIX_FILE_BRANCH_RULE)
|
|
rule_path = os.path.join(rule_file_dir, git_rule)
|
|
if git_rule.endswith(".sh"):
|
|
cmd = f"sh {rule_path} {version}"
|
|
if git_rule.endswith(".py"):
|
|
cmd = f"python {rule_path} {version}"
|
|
logger.info("The command executed is: {}".format(cmd))
|
|
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
|
|
while p.poll() is None:
|
|
if p.wait() is not 0:
|
|
logger.info(f"执行失败")
|
|
break
|
|
else:
|
|
branch=p.stdout.read().decode("utf-8").strip("\r\n")
|
|
if not branch:
|
|
logger.error("branch of version {} not found".format(version))
|
|
return branch
|
|
|
|
def sync_source(self, id, os_type_model : OSTypeModel):
|
|
try:
|
|
os_type = os_type_model.os_type
|
|
source_repo = os_type_model.source_repo
|
|
source_devel = os_type_model.source_devel
|
|
source_debuginfo = os_type_model.source_debuginfo
|
|
image = os_type_model.image
|
|
use_src_package = os_type_model.src_pkg_mark
|
|
self.update_ostype_sync_status(id=id, status=0)
|
|
source_lists = self.get_rpm_list(source_repo, "source")
|
|
devel_lists = self.get_rpm_list(source_devel, "devel")
|
|
debuginfo_lists = self.get_rpm_list(source_debuginfo, "debuginfo")
|
|
for each_rpm in debuginfo_lists: # find each kernel debuginfo package
|
|
version = each_rpm.replace("kernel-debuginfo-", '').replace(".rpm",'') # kernel-debuginfo-5.10.60-9.an8.x86_64.rpm -> 5.10.60-9.an8.x86_64
|
|
version_no_arch = version.replace(version.split(".")[-1], '')
|
|
kernel_package = list(filter(lambda x: version_no_arch in x, source_lists))[0] # find the specific kernel version rpm from kernel package
|
|
kernel_url = source_repo + kernel_package
|
|
debuginfo_rpm = list(filter(lambda x: version in x, debuginfo_lists))[0]
|
|
debuginfo_url = source_debuginfo + debuginfo_rpm
|
|
devel_rpm = list(filter(lambda x: version in x, devel_lists))[0]
|
|
devel_url = source_devel + devel_rpm
|
|
self.insert_kernel_version_relation_internal(kernel_version=version,os_type=os_type,
|
|
source=kernel_url, devel_link=devel_url,debuginfo_link=debuginfo_url,
|
|
image=image, use_src_package=use_src_package)
|
|
self.update_ostype_sync_status(id=id, status=2)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
self.update_ostype_sync_status(id=id, status=1)
|
|
|
|
def insert_kernel_version_relation_internal(self,kernel_version, os_type, source, devel_link, debuginfo_link, image, use_src_package):
|
|
logger.info("start kernel is :%s" %kernel_version)
|
|
kernel_object = KernelVersionModel.objects.all().filter(kernel_version=kernel_version).first()
|
|
if kernel_object is None:
|
|
logger.info("start insert kernel is :%s" %kernel_version)
|
|
kernel_object = KernelVersionModel.objects.create(
|
|
kernel_version = kernel_version,
|
|
os_type=os_type,
|
|
source = source,
|
|
devel_link = devel_link,
|
|
debuginfo_link = debuginfo_link,
|
|
image = image,
|
|
use_src_package = use_src_package
|
|
)
|
|
else:
|
|
kernel_object.kernel_version = kernel_version
|
|
kernel_object.os_type=os_type
|
|
kernel_object.source = source
|
|
kernel_object.devel_link = devel_link
|
|
kernel_object.debuginfo_link = debuginfo_link
|
|
kernel_object.image = image
|
|
kernel_object.use_src_package = use_src_package
|
|
kernel_object.save()
|
|
logger.info("update kernel version in record...")
|
|
|
|
def get_rpm_list(self, package_url, packge_type):
|
|
try:
|
|
response = urllib.request.urlopen(package_url)
|
|
html = response.read()
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
all_rpm_lists = soup.tbody.select('a')
|
|
rpm_list = []
|
|
if packge_type == "source":
|
|
for each_rpm in all_rpm_lists:
|
|
rpm_name = each_rpm.get_text().strip()
|
|
if re.findall('^kernel-\d', rpm_name):
|
|
rpm_list.append(rpm_name)
|
|
elif packge_type == "devel":
|
|
for each_rpm in all_rpm_lists:
|
|
rpm_name = each_rpm.get_text().strip()
|
|
if re.findall('^kernel-devel-\d', rpm_name):
|
|
rpm_list.append(rpm_name)
|
|
elif packge_type == "debuginfo":
|
|
for each_rpm in all_rpm_lists:
|
|
rpm_name = each_rpm.get_text().strip()
|
|
if re.findall('^kernel-debuginfo-\d', rpm_name):
|
|
rpm_list.append(rpm_name)
|
|
else:
|
|
logger.error("This package type is not supported")
|
|
return None
|
|
return rpm_list
|
|
except Exception as e:
|
|
logger.error(e)
|
|
|
|
def update_ostype_sync_status(self, id, status):
|
|
try:
|
|
os_type_object = OSTypeModel.objects.all().filter(id=id).first()
|
|
if os_type_object is None:
|
|
logger.error("can not find the OS type record")
|
|
os_type_object.synced_at = human_datetime()
|
|
os_type_object.sync_status = status
|
|
os_type_object.save()
|
|
except Exception as e:
|
|
logger.error(e)
|
|
|
|
def query_released_hotfix_by_para(self, search_hotfix_id,search_kernel_version,
|
|
search_serious, search_released_time, search_fix_system):
|
|
try:
|
|
objects = ReleasedHotfixListModule.objects.all()
|
|
if search_hotfix_id is not None:
|
|
objects = objects.filter(hotfix_id=search_hotfix_id)
|
|
if search_kernel_version is not None:
|
|
objects = objects.filter(released_kernel_version=search_kernel_version)
|
|
if search_serious is not None:
|
|
objects = objects.filter(serious=search_serious)
|
|
if search_released_time is not None:
|
|
objects = objects.filter(released_time = search_released_time)
|
|
if search_fix_system is not None:
|
|
objects = objects.filter(fix_system=search_fix_system)
|
|
return objects
|
|
except Exception as e:
|
|
logger.error("Error when filtering released hotfix database")
|
|
logger.error("query_released_hotfix_by_para: %s " % str(e))
|
|
return None
|
|
|
|
|
|
"""
|
|
CECListener listen topic of hotfix_msg, which is send by builder
|
|
"""
|
|
class CECListener():
|
|
|
|
def __init__(self, con, cec_url, listen_topic) -> None:
|
|
try:
|
|
logger.info("Server CECListener init ...")
|
|
self.parameters = con
|
|
self.cec_url = cec_url
|
|
self.listen_topic = listen_topic
|
|
self.sync_key = "sync" # this key is to tag this message for sync job
|
|
self.rpm_key = "rpm_name" # this key is to tag this message for sync rpm name
|
|
self.log_key = "log" # this key is to tag this message for sync log
|
|
self.thread_runner = threading.Thread(target=self.listener, name="hotfix_server_listener")
|
|
except Exception as e:
|
|
return None
|
|
|
|
def run(self):
|
|
logger.info("Server CEC Listener start...")
|
|
self.thread_runner.start()
|
|
|
|
def listener(self):
|
|
with dispatch_admin(self.cec_url) as admin:
|
|
if not admin.is_topic_exist(self.listen_topic):
|
|
admin.create_topic(self.listen_topic)
|
|
|
|
consumer_id = Consumer.generate_consumer_id()
|
|
# one server just need one group id and one consumer is enough
|
|
self.consumer = dispatch_consumer(self.cec_url, self.listen_topic,
|
|
consumer_id=consumer_id,
|
|
group_id="server_listener")
|
|
|
|
logger.info("Server CECListener init finished...")
|
|
|
|
retry_time = 0
|
|
try:
|
|
while retry_time < self.parameters.cec.max_retry_time:
|
|
"""
|
|
for each event in hotfix_msg topic, use the key inside to decide the message type
|
|
"""
|
|
for event in self.consumer:
|
|
time.sleep(1)
|
|
logger.info("processing one msg...")
|
|
try:
|
|
cec_values = event.value
|
|
hotfix_id = cec_values["hotfix_id"]
|
|
if self.sync_key in cec_values:
|
|
if self.rpm_key in cec_values:
|
|
self.sync_hotfix_job_rpm_name(hotfix_id, cec_values[self.rpm_key])
|
|
if self.log_key in cec_values:
|
|
self.sync_hotfix_log(hotfix_id)
|
|
else:
|
|
self.update_hotfix_job_status(hotfix_id, cec_values["status"])
|
|
except Exception as e:
|
|
logger.error(str(e))
|
|
finally:
|
|
logger.info("ack one msg from builder...")
|
|
self.consumer.ack(event=event)
|
|
time.sleep(0.5)
|
|
time.sleep(self.parameters.cec.sleep_time)
|
|
retry_time += 1
|
|
except Exception as e:
|
|
logger.error(str(e))
|
|
logger.error("Hotfix Server CEC Listener exit ...")
|
|
|
|
def update_hotfix_job_status(self, hotfix_id, status):
|
|
hotfix = HotfixModel.objects.get(id=hotfix_id)
|
|
logger.info("ack one job status of : %s from builder of hotfix id : %s " % (status, str(hotfix_id)))
|
|
if hotfix is None:
|
|
logger.error("%s : hotfix job is not exist filtered by hotfix id : %s" % (sys._getframe().f_code.co_name, str(hotfix_id)))
|
|
return None
|
|
if status == "waiting":
|
|
hotfix.building_status=0
|
|
elif status == "building":
|
|
hotfix.building_status=1
|
|
elif status == "failed":
|
|
hotfix.building_status=2
|
|
elif status == "success":
|
|
hotfix.building_status=3
|
|
else:
|
|
logger.error("%s : hotfix job status update failed. status : %s is not supported" % (sys._getframe().f_code.co_name, status))
|
|
hotfix.save()
|
|
return hotfix_id
|
|
|
|
def sync_hotfix_job_rpm_name(self, hotfix_id, rpm_name):
|
|
try:
|
|
logger.info("get rpm_name of %s from builder..." % rpm_name)
|
|
hotfix = HotfixModel.objects.get(id=hotfix_id)
|
|
hotfix.rpm_package = rpm_name
|
|
hotfix.save()
|
|
except Exception as e:
|
|
logger.error("%s : Exception raised..." % sys._getframe().f_code.co_name)
|
|
return hotfix_id
|
|
|
|
def sync_hotfix_log(self, hotfix_id):
|
|
hotfix = HotfixModel.objects.get(id=hotfix_id)
|
|
try:
|
|
log = ""
|
|
for line in open(os.path.join(settings.HOTFIX_FILE_STORAGE_REPO, "log", hotfix.log_file)):
|
|
log = log + str(line)
|
|
hotfix.log = log
|
|
hotfix.save()
|
|
except Exception as e:
|
|
logger.error(str(e))
|
|
|
|
|
|
"""
|
|
Hotfix Server Exception
|
|
"""
|
|
class HotfixServerException(Exception):
|
|
|
|
def __init__(self, *args: object) -> None:
|
|
super().__init__(*args)
|
|
|
|
@staticmethod
|
|
def msg(self, msg: str) -> str:
|
|
return msg |