sysom1/sysom_server/sysom_hotfix/lib/function.py

568 lines
24 KiB
Python

import os
import re
import urllib
import subprocess
import sys
from clogger import logger
from django.conf import settings
import threading
import redis
import requests
import json
import time
from apps.hotfix.models import HotfixModel, OSTypeModel, KernelVersionModel, ReleasedHotfixListModule
from cec_base.producer import dispatch_producer, Producer
from cec_base.consumer import Consumer, dispatch_consumer
from cec_base.admin import dispatch_admin
from bs4 import BeautifulSoup
from lib.utils import human_datetime
from sysom_utils import SysomFramework
from channel_job import default_channel_job_executor
"""
Function class
This class contains the support/tool function
"""
class FunctionClass():
def __init__(self):
self.producer = SysomFramework.cec_producer()
def delete_document(self, doc_path, doc_name):
document = os.path.join(doc_path, doc_name)
if os.path.exists(document):
try:
os.remove(document)
except Exception as e:
logger.error(str(e))
return None
else:
return None
def query_formal_hotfix_by_parameters(self, created_time, kernel_version, patch_file, hotfix_name):
if created_time is not None and len(created_time) <= 0:
created_time=None
if kernel_version is not None and len(kernel_version) == 0:
kernel_version = None
if patch_file is not None and len(patch_file) == 0:
patch_file=None
if hotfix_name is not None and len(hotfix_name) == 0:
hotfix_name=None
objects = HotfixModel.objects.all().filter(formal=1)
if created_time is not None:
objects = objects.filter(created_at__lt=created_time)
if kernel_version is not None:
objects = objects.filter(kernel_version=kernel_version)
if patch_file is not None:
objects = objects.filter(patch_file__contains=patch_file)
if hotfix_name is not None:
objects = objects.filter(hotfix_name=hotfix_name)
return objects
def get_info_from_version(self, kernel_version, info="os_type"):
try:
version_object = KernelVersionModel.objects.all().filter(kernel_version=kernel_version).first()
if version_object is None:
logger.error("query kernel version from record failed")
return None
if info == "os_type":
return version_object.os_type
if info == "source":
return version_object.source
if info == "devel_link":
return version_object.devel_link
if info == "debuginfo_link":
return version_object.debuginfo_link
except Exception as e:
logger.error(str(e))
return None
def get_sourcerepo_of_os(self, os_type):
try:
os_object = OSTypeModel.objects.all().filter(os_type=os_type).first()
if os_object is None:
return None
return os_object.source_repo
except Exception as e:
logger.error(str(e))
return None
def get_image_of_os(self, os_type):
try:
os_object = OSTypeModel.objects.all().filter(os_type=os_type).first()
if os_object is None:
return None
return os_object.image
except Exception as e:
logger.exception(e)
return None
def get_src_pkg_mark_of_os(self, os_type):
try:
os_object = OSTypeModel.objects.all().filter(os_type=os_type).first()
if os_object is None:
return None
return os_object.src_pkg_mark
except Exception as e:
logger.exception(e)
return None
# building status and formal is set to be 0 when creating a hotfix
def create_hotfix_object_to_database(self, os_type, kernel_version, hotfix_name, patch_file, patch_path, hotfix_necessary, hotfix_risk,
log_file, arch):
res = HotfixModel.objects.create(
kernel_version = kernel_version,
os_type=os_type,
patch_file = patch_file,
hotfix_name = hotfix_name,
patch_path = patch_path,
building_status = 0,
hotfix_necessary = 0,
hotfix_risk = 2,
formal = 0,
log_file = log_file,
arch = arch
)
return res
def create_message_to_cec(self, **kwargs):
customize = kwargs['customize']
cec_topic = kwargs['cec_topic']
os_type = kwargs['os_type']
hotfix_id = kwargs['hotfix_id']
kernel_version= kwargs['kernel_version']
patch_file = kwargs['patch_file']
hotfix_name = kwargs['hotfix_name']
patch_path = kwargs['patch_path']
arch = kwargs['arch']
log_file = kwargs['log_file']
try:
if not customize:
if re.search('anolis', os_type):
self.producer.produce(cec_topic, {
"hotfix_id" : hotfix_id,
"kernel_version" : kernel_version,
"patch_name" : patch_file,
"hotfix_name" : hotfix_name,
"patch_path" : patch_path,
"arch": arch,
"log_file" : log_file,
"os_type" : os_type,
"git_repo": "git@gitee.com:anolis/cloud-kernel.git",
"customize": 0
})
else:
# this is customize kernel
source_repo = kwargs['source_repo']
source = kwargs['source']
devel_link = kwargs['devel_link']
debuginfo_link = kwargs['debuginfo_link']
image = kwargs['image']
is_src_package = kwargs["is_src_package"]
self.producer.produce(cec_topic, {
"hotfix_id" : hotfix_id,
"kernel_version" : kernel_version,
"hotfix_name" : hotfix_name,
"patch_name" : patch_file,
"patch_path" : patch_path,
"arch": arch,
"log_file" : log_file,
"os_type" : os_type,
"customize": 1,
"src_repo": source_repo,
"src_origin": source,
"devel_link": devel_link,
"debuginfo_link": debuginfo_link,
"image": image,
"is_src_package": is_src_package
})
self.producer.flush()
return True
except Exception as e:
logger.exception(e)
return False
def get_hotfix_object_by_id(self, hotfix_id):
try:
hotfix_object = HotfixModel.objects.all().filter(id=hotfix_id).first()
return hotfix_object
except Exception as e:
logger.error(str(e))
return None
def get_host_list(self, kernel_version):
host_url = settings.HOST_URL
res = requests.get(host_url)
if res.status_code == 200:
res = res.json()
data = res['data']
host_list = []
for each_host in data:
try:
if each_host['host_info']['kernel_version'] == kernel_version:
host_list.append(each_host["ip"])
except Exception as e:
pass
return host_list
else:
return None
def dispatch_hotfix_cmd(self, ip, hotfix_name):
machine_hotfix_path = "/root/%s" % hotfix_name
job = default_channel_job_executor.dispatch_job(
channel_type="ssh",
params={
"instance": ip,
"command": "rpm -ivh %s" % machine_hotfix_path,
},
timeout=1000,
auto_retry=False
)
channel_result = job.execute()
# the result is the output of executing the cmd
# result = channel_result.result
if channel_result.code != 1:
return False
return True
"""
Args : Given kernel version and the hotfix path
Return : the number of instance successfully execute the dispatch hotfix and installed hotfix
"""
def deploy_hotfix_to_machine(self, kernel_version, hotfix_path):
try:
hotfix_list = self.get_host_list(kernel_version)
hotfix_name = hotfix_path.split("/")[-1]
"""dispatch the hotfix package to one machine"""
res = default_channel_job_executor.dispatch_file_job(params={
"local_path": hotfix_path,
"remote_path": "/root/%s" % hotfix_name,
"instances": hotfix_list
}).execute().result
if hotfix_list is None or len(hotfix_list) == 0:
return -1
res = json.loads(res)
for each_instance in res:
if not each_instance["success"]:
logger.error("Instance ip: {} dispatch hotfix:{} failed".format(each_instance["instance"], hotfix_path))
hotfix_name = hotfix_path.split("/")[-1]
success_num = 0
for each_host in hotfix_list:
if self.dispatch_hotfix_cmd(each_host, hotfix_name):
success_num += 1
return success_num
except Exception as e:
logger.error(str(e))
return None
def sync_kernel(self, id):
try:
os_type_object = OSTypeModel.objects.all().filter(id=id).first()
src_pkg_mark = os_type_object.src_pkg_mark
if src_pkg_mark:
self.sync_source(id, os_type_object)
else:
self.sync_git(id, os_type_object)
except Exception as e:
logger.error(e)
def sync_git(self, id, os_type_model : OSTypeModel):
try:
os_type = os_type_model.os_type
source_devel = os_type_model.source_devel
source_debuginfo = os_type_model.source_debuginfo
devel_lists = self.get_rpm_list(source_devel, "devel")
debuginfo_lists = self.get_rpm_list(source_debuginfo, "debuginfo")
self.update_ostype_sync_status(id=id, status=0)
for each_rpm in debuginfo_lists:
version = each_rpm.replace("kernel-debuginfo-", '').replace(".rpm",'')
debuginfo_url = source_debuginfo + each_rpm
devel_rpm = list(filter(lambda x: version in x, devel_lists))[0]
devel_url = source_devel + devel_rpm
self.insert_kernel_version_relation_internal(kernel_version=version,os_type=os_type,
source="", devel_link=devel_url,debuginfo_link=debuginfo_url)
self.update_ostype_sync_status(id=id, status=2)
except Exception as e:
logger.error(e)
self.update_ostype_sync_status(id=id, status=1)
"""
deprecated
"""
def insert_kernel_version_relation_git(self, os_type, git_rule, devel_source, source_devel_list, debuginfo_source):
for kernel_devel_rpm in source_devel_list:
version = ".".join(kernel_devel_rpm.replace("kernel-devel-", '').split(".")[:-1])
if not git_rule:
source = ""
else:
branch = self.git_branch_by_git_rule(git_rule, version)
if not branch:
continue
source = branch
debuginfo_link = self.get_debuginfo_rpm(debuginfo_source, version)
debuginfo_link = debuginfo_source + debuginfo_link
develinfo_link = devel_source + kernel_devel_rpm
self.insert_kernel_version_relation_internal(kernel_version=version,os_type=os_type,
source=source, devel_link=develinfo_link,debuginfo_link=debuginfo_link)
"""
deprecated
"""
def git_branch_by_git_rule(self, git_rule, version):
branch = None
rule_file_dir = os.path.join(settings.HOTFIX_FILE_BRANCH_RULE)
rule_path = os.path.join(rule_file_dir, git_rule)
if git_rule.endswith(".sh"):
cmd = f"sh {rule_path} {version}"
if git_rule.endswith(".py"):
cmd = f"python {rule_path} {version}"
logger.info("The command executed is: {}".format(cmd))
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
while p.poll() is None:
if p.wait() is not 0:
logger.info(f"执行失败")
break
else:
branch=p.stdout.read().decode("utf-8").strip("\r\n")
if not branch:
logger.error("branch of version {} not found".format(version))
return branch
def sync_source(self, id, os_type_model : OSTypeModel):
try:
os_type = os_type_model.os_type
source_repo = os_type_model.source_repo
source_devel = os_type_model.source_devel
source_debuginfo = os_type_model.source_debuginfo
image = os_type_model.image
use_src_package = os_type_model.src_pkg_mark
self.update_ostype_sync_status(id=id, status=0)
source_lists = self.get_rpm_list(source_repo, "source")
devel_lists = self.get_rpm_list(source_devel, "devel")
debuginfo_lists = self.get_rpm_list(source_debuginfo, "debuginfo")
for each_rpm in debuginfo_lists: # find each kernel debuginfo package
version = each_rpm.replace("kernel-debuginfo-", '').replace(".rpm",'') # kernel-debuginfo-5.10.60-9.an8.x86_64.rpm -> 5.10.60-9.an8.x86_64
version_no_arch = version.replace(version.split(".")[-1], '')
kernel_package = list(filter(lambda x: version_no_arch in x, source_lists))[0] # find the specific kernel version rpm from kernel package
kernel_url = source_repo + kernel_package
debuginfo_rpm = list(filter(lambda x: version in x, debuginfo_lists))[0]
debuginfo_url = source_debuginfo + debuginfo_rpm
devel_rpm = list(filter(lambda x: version in x, devel_lists))[0]
devel_url = source_devel + devel_rpm
self.insert_kernel_version_relation_internal(kernel_version=version,os_type=os_type,
source=kernel_url, devel_link=devel_url,debuginfo_link=debuginfo_url,
image=image, use_src_package=use_src_package)
self.update_ostype_sync_status(id=id, status=2)
except Exception as e:
logger.error(e)
self.update_ostype_sync_status(id=id, status=1)
def insert_kernel_version_relation_internal(self,kernel_version, os_type, source, devel_link, debuginfo_link, image, use_src_package):
logger.info("start kernel is :%s" %kernel_version)
kernel_object = KernelVersionModel.objects.all().filter(kernel_version=kernel_version).first()
if kernel_object is None:
logger.info("start insert kernel is :%s" %kernel_version)
kernel_object = KernelVersionModel.objects.create(
kernel_version = kernel_version,
os_type=os_type,
source = source,
devel_link = devel_link,
debuginfo_link = debuginfo_link,
image = image,
use_src_package = use_src_package
)
else:
kernel_object.kernel_version = kernel_version
kernel_object.os_type=os_type
kernel_object.source = source
kernel_object.devel_link = devel_link
kernel_object.debuginfo_link = debuginfo_link
kernel_object.image = image
kernel_object.use_src_package = use_src_package
kernel_object.save()
logger.info("update kernel version in record...")
def get_rpm_list(self, package_url, packge_type):
try:
response = urllib.request.urlopen(package_url)
html = response.read()
soup = BeautifulSoup(html, "html.parser")
all_rpm_lists = soup.tbody.select('a')
rpm_list = []
if packge_type == "source":
for each_rpm in all_rpm_lists:
rpm_name = each_rpm.get_text().strip()
if re.findall('^kernel-\d', rpm_name):
rpm_list.append(rpm_name)
elif packge_type == "devel":
for each_rpm in all_rpm_lists:
rpm_name = each_rpm.get_text().strip()
if re.findall('^kernel-devel-\d', rpm_name):
rpm_list.append(rpm_name)
elif packge_type == "debuginfo":
for each_rpm in all_rpm_lists:
rpm_name = each_rpm.get_text().strip()
if re.findall('^kernel-debuginfo-\d', rpm_name):
rpm_list.append(rpm_name)
else:
logger.error("This package type is not supported")
return None
return rpm_list
except Exception as e:
logger.error(e)
def update_ostype_sync_status(self, id, status):
try:
os_type_object = OSTypeModel.objects.all().filter(id=id).first()
if os_type_object is None:
logger.error("can not find the OS type record")
os_type_object.synced_at = human_datetime()
os_type_object.sync_status = status
os_type_object.save()
except Exception as e:
logger.error(e)
def query_released_hotfix_by_para(self, search_hotfix_id,search_kernel_version,
search_serious, search_released_time, search_fix_system):
try:
objects = ReleasedHotfixListModule.objects.all()
if search_hotfix_id is not None:
objects = objects.filter(hotfix_id=search_hotfix_id)
if search_kernel_version is not None:
objects = objects.filter(released_kernel_version=search_kernel_version)
if search_serious is not None:
objects = objects.filter(serious=search_serious)
if search_released_time is not None:
objects = objects.filter(released_time = search_released_time)
if search_fix_system is not None:
objects = objects.filter(fix_system=search_fix_system)
return objects
except Exception as e:
logger.error("Error when filtering released hotfix database")
logger.error("query_released_hotfix_by_para: %s " % str(e))
return None
"""
CECListener listen topic of hotfix_msg, which is send by builder
"""
class CECListener():
def __init__(self, con, cec_url, listen_topic) -> None:
try:
logger.info("Server CECListener init ...")
self.parameters = con
self.cec_url = cec_url
self.listen_topic = listen_topic
self.sync_key = "sync" # this key is to tag this message for sync job
self.rpm_key = "rpm_name" # this key is to tag this message for sync rpm name
self.log_key = "log" # this key is to tag this message for sync log
self.thread_runner = threading.Thread(target=self.listener, name="hotfix_server_listener")
except Exception as e:
return None
def run(self):
logger.info("Server CEC Listener start...")
self.thread_runner.start()
def listener(self):
with dispatch_admin(self.cec_url) as admin:
if not admin.is_topic_exist(self.listen_topic):
admin.create_topic(self.listen_topic)
consumer_id = Consumer.generate_consumer_id()
# one server just need one group id and one consumer is enough
self.consumer = dispatch_consumer(self.cec_url, self.listen_topic,
consumer_id=consumer_id,
group_id="server_listener")
logger.info("Server CECListener init finished...")
retry_time = 0
try:
while retry_time < self.parameters.cec.max_retry_time:
"""
for each event in hotfix_msg topic, use the key inside to decide the message type
"""
for event in self.consumer:
time.sleep(1)
logger.info("processing one msg...")
try:
cec_values = event.value
hotfix_id = cec_values["hotfix_id"]
if self.sync_key in cec_values:
if self.rpm_key in cec_values:
self.sync_hotfix_job_rpm_name(hotfix_id, cec_values[self.rpm_key])
if self.log_key in cec_values:
self.sync_hotfix_log(hotfix_id)
else:
self.update_hotfix_job_status(hotfix_id, cec_values["status"])
except Exception as e:
logger.error(str(e))
finally:
logger.info("ack one msg from builder...")
self.consumer.ack(event=event)
time.sleep(0.5)
time.sleep(self.parameters.cec.sleep_time)
retry_time += 1
except Exception as e:
logger.error(str(e))
logger.error("Hotfix Server CEC Listener exit ...")
def update_hotfix_job_status(self, hotfix_id, status):
hotfix = HotfixModel.objects.get(id=hotfix_id)
logger.info("ack one job status of : %s from builder of hotfix id : %s " % (status, str(hotfix_id)))
if hotfix is None:
logger.error("%s : hotfix job is not exist filtered by hotfix id : %s" % (sys._getframe().f_code.co_name, str(hotfix_id)))
return None
if status == "waiting":
hotfix.building_status=0
elif status == "building":
hotfix.building_status=1
elif status == "failed":
hotfix.building_status=2
elif status == "success":
hotfix.building_status=3
else:
logger.error("%s : hotfix job status update failed. status : %s is not supported" % (sys._getframe().f_code.co_name, status))
hotfix.save()
return hotfix_id
def sync_hotfix_job_rpm_name(self, hotfix_id, rpm_name):
try:
logger.info("get rpm_name of %s from builder..." % rpm_name)
hotfix = HotfixModel.objects.get(id=hotfix_id)
hotfix.rpm_package = rpm_name
hotfix.save()
except Exception as e:
logger.error("%s : Exception raised..." % sys._getframe().f_code.co_name)
return hotfix_id
def sync_hotfix_log(self, hotfix_id):
hotfix = HotfixModel.objects.get(id=hotfix_id)
try:
log = ""
for line in open(os.path.join(settings.HOTFIX_FILE_STORAGE_REPO, "log", hotfix.log_file)):
log = log + str(line)
hotfix.log = log
hotfix.save()
except Exception as e:
logger.error(str(e))
"""
Hotfix Server Exception
"""
class HotfixServerException(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)
@staticmethod
def msg(self, msg: str) -> str:
return msg