sysom1/sysom_server/sysom_hotfix/apps/hotfix/views.py

669 lines
34 KiB
Python

import threading
from clogger import logger
import re
import os
import time
from typing import Any
from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema
from rest_framework.request import Request
from rest_framework.views import APIView
from rest_framework import mixins
from django.db.models import Q
from django.db.models.functions import Concat
from django.db.models import Value
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.exceptions import ValidationError
from django.conf import settings
from rest_framework.viewsets import GenericViewSet
import sys
import pandas as pd
from apps.hotfix import serializer
from apps.hotfix.models import HotfixModel, OSTypeModel, KernelVersionModel, ReleasedHotfixListModule
from apps.hotfix.filters import HotfixReleasedFilter
from lib.response import *
from lib.paginations import Pagination
from lib.utils import human_datetime, datetime, datetime_str
from lib.exception import APIException
from lib.base_view import CommonModelViewSet
from concurrent.futures import ThreadPoolExecutor, as_completed
from channel_job import default_channel_job_executor
from channel_job import ChannelJobExecutor
from django import forms
from django.views.decorators.csrf import csrf_exempt
from cec_base.admin import dispatch_admin
from cec_base.producer import dispatch_producer
from cec_base.event import Event
from django.http import HttpResponse, FileResponse
from lib.function import FunctionClass
from sysom_utils import SysomFramework
class SaveUploadFile(APIView):
authentication_classes = []
@swagger_auto_schema(operation_description="上传文件",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=["file"],
properties={
"file": openapi.Schema(type=openapi.TYPE_FILE),
"catalogue": openapi.Schema(type=openapi.TYPE_STRING)
}
),
responses={
'200': openapi.Response('save upload success', examples={"application/json": {
"code": 200,
"message": "Upload success",
"data": {}
}}),
'400': openapi.Response('Fail', examples={"application/json": {
"code": 400,
"message": "Required Field: file",
"data": {}
}})
}
)
def post(self, request):
patch_file = request.data.get('file', None)
catalogue = request.data.get('catalogue', None)
if not patch_file:
return APIException(message="Upload Failed: file required!")
requirement_file_type = ["sh", "patch", "py"]
file_upload_name = patch_file.name
logger.info(file_upload_name)
file_upload_type = file_upload_name.split(".")[-1]
logger.info(file_upload_type)
if file_upload_type not in requirement_file_type:
return other_response(msg=f"file type error, we can not support .{file_upload_type}, please upload correct file type", code=400)
if file_upload_type == "patch":
patch_file_repo = os.path.join(settings.HOTFIX_FILE_STORAGE_REPO, "patch")
if file_upload_type == "py" or file_upload_type == "sh":
patch_file_repo = os.path.join(settings.HOTFIX_FILE_BRANCH_RULE)
if not os.path.exists(patch_file_repo):
os.makedirs(patch_file_repo)
patch_file_name = patch_file.name.rstrip("."+file_upload_type) + "-" + datetime.now().strftime('%Y%m%d%H%M%S') + "." + file_upload_type
file_path = os.path.join(patch_file_repo, patch_file_name)
try:
with open(file_path, 'wb') as f:
for chunk in patch_file.chunks():
f.write(chunk)
except Exception as e:
logger.error(e)
raise APIException(message=f"Upload Failed: {e}")
return success(result={"patch_name":patch_file_name}, message="Upload success")
class HotfixAPIView(GenericViewSet,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.CreateModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin
):
queryset = HotfixModel.objects.filter(deleted_at=None)
serializer_class = serializer.HotfixSerializer
filter_backends = [DjangoFilterBackend]
filterset_fields = ['created_at', 'creator', 'building_status', 'arch']
http_method_names = ['get', 'post', 'patch', 'delete']
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.event_id = None
self.function = FunctionClass()
self.cec = CommonModelViewSet()
# initial build state
self.build_success = 3
self.build_failed = 2
self.build_wait = 0
"""
Log_file : patch_file-time.log , this is used to output the log
"""
def create_hotfix(self, request, **kwargs):
arch = request.data['kernel_version'].split(".")[-1]
log_file = "{}-{}.log".format(request.data["hotfix_name"], time.strftime("%Y%m%d%H%M%S"))
hotfix_name = request.data["hotfix_name"]
hotfix_name.replace(" ","-")
kernel_version = request.data['kernel_version'].replace(" ","")
patch_file_name = request.data['patch_file_name']
# check if this kernel_version is customize
try:
customize_version_object = KernelVersionModel.objects.all().filter(kernel_version=kernel_version).first()
except Exception as e:
return other_response(message="Error when operating db", code=400)
if customize_version_object is None:
# This is not a customize kernel version
release = kernel_version.split('.')[-2] # 4.19.91-26.5.an8.x86_64 -> an8
if re.search('an', release):
# this is a anolis kernel
os_type = "anolis"
patch_path = os.path.join(settings.HOTFIX_FILE_STORAGE_REPO, patch_file_name)
patch_file = patch_file_name
hotfix_necessary = 0
hotfix_risk = 2
try:
res = self.function.create_hotfix_object_to_database(os_type, kernel_version, hotfix_name, patch_file, patch_path,
hotfix_necessary, hotfix_risk, log_file, arch)
status = self.function.create_message_to_cec(customize=False, cec_topic=settings.SYSOM_CEC_HOTFIX_TOPIC, os_type=os_type,
hotfix_id=res.id, kernel_version=res.kernel_version, hotfix_name=res.hotfix_name, patch_file=res.patch_file, patch_path=res.patch_path,
arch=res.arch, log_file=res.log_file)
except Exception as e:
return other_response(msg=str(e), code=400)
else:
return other_response(msg=f"This custom kernel version :{kernel_version} needs to be configured before use", code=400)
else:
# This is a customize kernel version
os_type = self.function.get_info_from_version(kernel_version)
source_repo = self.function.get_sourcerepo_of_os(os_type)
image = self.function.get_image_of_os(os_type)
is_src_package = self.function.get_src_pkg_mark_of_os(os_type)
source = self.function.get_info_from_version(kernel_version, "source")
devel_link = self.function.get_info_from_version(kernel_version, "devel_link")
debuginfo_link = self.function.get_info_from_version(kernel_version, "debuginfo_link")
patch_path = os.path.join(settings.HOTFIX_FILE_STORAGE_REPO, patch_file_name)
patch_file = patch_file_name
hotfix_necessary = 0
hotfix_risk = 2
try:
res = self.function.create_hotfix_object_to_database(os_type, kernel_version, hotfix_name, patch_file, patch_path,
hotfix_necessary, hotfix_risk, log_file, arch)
# if this is customize kernel, it should provide the git repo and the branch of this version
# with devel-package and debuginfo-package
status = self.function.create_message_to_cec(customize=True, cec_topic=settings.SYSOM_CEC_HOTFIX_TOPIC, os_type=res.os_type,
hotfix_id=res.id, kernel_version=res.kernel_version, hotfix_name=res.hotfix_name, patch_file=res.patch_file, patch_path=res.patch_path, arch=res.arch,
log_file=res.log_file, source_repo=source_repo, source=source, devel_link=devel_link,debuginfo_link=debuginfo_link,image=image,is_src_package=is_src_package)
except Exception as e:
return other_response(msg=str(e), code=400)
return success(result={"msg":"success","id":res.id,"event_id":self.event_id}, message="create hotfix job success")
def rebuild_hotfix(self, request):
hotfix_id = request.data['hotfix_id']
hotfix_object = self.function.get_hotfix_object_by_id(hotfix_id)
if hotfix_object is None:
return other_response(message="Hotfix job not found", code=400)
kernel_version = hotfix_object.kernel_version
os_type = hotfix_object.os_type
if hotfix_object.building_status != self.build_failed:
return other_response(message="This Hotfix Job is Not Failed", code=401)
# clear the original build log
hotfix_object.log=""
hotfix_object.save()
try:
customize_version_object = KernelVersionModel.objects.all().filter(kernel_version=kernel_version).first()
except Exception as e:
return other_response(message="Error when querying customize kernel", code=400)
try:
if customize_version_object is None:
status = self.function.create_message_to_cec(customize=False, cec_topic=settings.SYSOM_CEC_HOTFIX_TOPIC, os_type=hotfix_object.os_type,
hotfix_id=hotfix_object.id, kernel_version=hotfix_object.kernel_version, hotfix_name=hotfix_object.hotfix_name,
patch_file=hotfix_object.patch_file, patch_path=hotfix_object.patch_path, arch=hotfix_object.arch, log_file=hotfix_object.log_file
)
else:
source_repo = self.function.get_sourcerepo_of_os(os_type)
source = self.function.get_info_from_version(kernel_version, "source")
devel_link = self.function.get_info_from_version(kernel_version, "devel_link")
debuginfo_link = self.function.get_info_from_version(kernel_version, "debuginfo_link")
image = self.function.get_image_of_os(os_type)
is_src_package = self.function.get_src_pkg_mark_of_os(os_type)
status = self.function.create_message_to_cec(customize=True, cec_topic=settings.SYSOM_CEC_HOTFIX_TOPIC, os_type=hotfix_object.os_type,
hotfix_id=hotfix_object.id, kernel_version=hotfix_object.kernel_version, hotfix_name=hotfix_object.hotfix_name, patch_file=hotfix_object.patch_file,
patch_path=hotfix_object.patch_path, arch=hotfix_object.arch, log_file=hotfix_object.log_file, source_repo=source_repo, source=source,
devel_link=devel_link,debuginfo_link=debuginfo_link,image=image,is_src_package=is_src_package
)
if status:
hotfix_object.building_status = self.build_wait
hotfix_object.created_at = human_datetime()
hotfix_object.save()
return success(result="success", message="rebuild success")
else:
return other_response(message="Error raised when sending msg to cec...",result={"msg":"Error raised when operating msg to cec..."} , code=400)
except Exception as e:
logger.error(str(e))
logger.error("Rebuild Hotfix Failed")
return other_response(message=str(e), result={"msg":"Error raised when operating msg to cec..."}, code=400)
def get_hotfixlist(self, request):
try:
queryset = HotfixModel.objects.all().filter(deleted_at=None)
response = serializer.HotfixSerializer(queryset, many=True)
except Exception as e:
logger.exception(e)
return other_response(message=str(e), result={"msg":"invoke get_hotfixlist failed"}, code=400)
return success(result=response.data, message="invoke get_hotfixlist")
"""
return the formal hotfix information list base on the given parameters
"""
def get_formal_hotfixlist(self, request):
try:
created_time = request.GET.get('created_at')
kernel_version = request.GET.get('kernel_version')
patch_file = request.GET.get('patch_file')
hotfix_name = request.GET.get('hotfix_name')
queryset = self.function.query_formal_hotfix_by_parameters(created_time, kernel_version, patch_file, hotfix_name)
response = serializer.HotfixSerializer(queryset, many=True)
except Exception as e:
logger.exception(e)
return other_response(message=str(e), result={"msg":"invoke get_formal_hotfixlist failed"}, code=400)
return success(result=response.data, message="invoke get_formal_hotfixlist")
def delete_hotfix(self, request):
hotfix = HotfixModel.objects.filter(id=request.data["id"],deleted_at=None).first()
if hotfix is None:
return other_response(message="can not delete this hotfix", result={"msg":"Hotfix not found"}, code=400)
else:
if hotfix.building_status == 1:
return other_response(message="can not delete this hotfix:this hotfix status is running..", code=400)
try:
delete_hotfix_cache = SysomFramework.gcache("deleted_hotfix")
delete_hotfix_cache.store(str(hotfix.id), str(hotfix.id), expire=3600*3) # exprie after 3 hours
logger.info("deleted a hotfix, id : %s" % str(hotfix.id))
except Exception as e:
logger.error(str(e))
return other_response(message="communication to redis of delete pool failed...", code=400)
hotfix.deleted_at=human_datetime()
hotfix.delete()
return success(result={}, message="invoke delete_hotfix")
def set_formal(self, request):
hotfix = HotfixModel.objects.filter(id=request.data["id"]).first()
hotfix.formal = 1
hotfix.save()
return success(result={"msg":"scuuessfully update formal status"}, message="formal status updated")
def update_building_status(self, request):
hotfix = HotfixModel.objects.filter(id=request.data["id"]).first()
status = request.data["status"]
if hotfix is None:
return other_response(message="No such hotfix id", result={"mgs":"update building status failed"}, code=400)
if status == "waiting":
hotfix.building_status=0
elif status == "building":
hotfix.building_status=1
elif status == "failed":
hotfix.building_status=2
elif status == "success":
hotfix.building_status=3
else:
return other_response(message="unsupported status", result={"mgs":"update building status failed"}, code=401)
hotfix.save()
return success(result={"msg":"update building status successfully"}, message="update building status success")
def get_build_log(self, request):
hotfix_id = request.GET.get('id')
hotfix = HotfixModel.objects.filter(id=hotfix_id).first()
if hotfix:
try:
if hotfix.building_status == 2 or hotfix.building_status == 3:
# this hotfix task is finished
if os.path.exists(os.path.join(settings.HOTFIX_FILE_STORAGE_REPO, "log", hotfix.log_file)):
msg = ""
for line in open(os.path.join(settings.HOTFIX_FILE_STORAGE_REPO, "log", hotfix.log_file)):
msg += line
hotfix.log = msg
hotfix.save()
return success(result=msg, message="hotfix build log return")
else:
msg = hotfix.log
if len(msg) > 0:
return success(result=msg, message="hotfix build log return")
else:
return other_response(message="No build log found", result={"msg":"No build log found"}, code=400)
else:
# this job is not finished.. read from the log file
msg = ""
for line in open(os.path.join(settings.HOTFIX_FILE_STORAGE_REPO, "log", hotfix.log_file)):
msg += line
return success(result=msg, message="hotfix build log return")
except Exception as e:
logger.error(str(e))
return other_response(message="No such log", result={"msg":"Log not found"}, code=400)
else:
return other_response(message="No such record", result={"msg":"Hotfix not found"}, code=400)
def insert_building_log(self, request):
hotfix = HotfixModel.objects.filter(id=request.data["id"]).first()
log = request.data["log"]
if hotfix is None:
return other_response(message="No such hotfix id", result={"mgs":"insert build log failed"}, code=400)
if len(log) <= 0:
return other_response(message="log is blank", result={"mgs":"insert build log failed"}, code=400)
build_log = hotfix.log
build_log = build_log + log
hotfix.log = build_log
hotfix.save()
return success(result={"msg": "inserted hotfix log"}, message="insert build log success")
# this function is invoked when job finished..
def sync_build_log(self, request):
hotfix = HotfixModel.objects.filter(id=request.data["id"]).first()
try:
log = ""
for line in open(os.path.join(settings.HOTFIX_FILE_STORAGE_REPO, "log", hotfix.log_file)):
log = log + str(line)
hotfix.log = log
hotfix.save()
except Exception as e:
return other_response(message=str(e), code=400)
return success(result={"msg": "SUCCESS"}, message="sync build log success")
def update_hotfix_name(self, request):
try:
hotfix = HotfixModel.objects.filter(id=request.data["id"]).first()
rpm_package = request.data["rpm"]
hotfix.rpm_package += rpm_package
hotfix.save()
logger.info("invoking update hotfix name, this function is duplicated...")
except Exception as e:
return other_response(message=str(e), code=400)
return success(result={"msg":"update hotfix name success"}, message="updated hotfix name")
def download_hotfix_file(self, request):
try:
hotfix_id = request.GET.get('id')
hotfix = HotfixModel.objects.filter(id=hotfix_id).first()
rpm_package = hotfix.rpm_package
response = FileResponse(open(os.path.join(settings.HOTFIX_FILE_STORAGE_REPO, "rpm", rpm_package), "rb"), as_attachment=True)
response['content_type'] = "application/octet-stream"
response['Content-Disposition'] = 'attachment;filename=' + rpm_package
return response
except Exception as e:
logger.exception(e)
return other_response(message=str(e), code=400)
def download_file(self, request):
try:
file_name = request.GET.get('file_name')
logger.info(file_name)
file_upload_type = file_name.split(".")[-1]
if file_upload_type == "patch":
rule_file_path = os.path.join(settings.HOTFIX_FILE_STORAGE_REPO, "patch", file_name)
if file_upload_type == "py" or file_upload_type == "sh":
rule_file_path = os.path.join(settings.HOTFIX_FILE_BRANCH_RULE, file_name)
response = FileResponse(open(rule_file_path, "rb"), as_attachment=True)
response['content_type'] = "application/octet-stream"
response['Content-Disposition'] = 'attachment;filename=' + file_name
return response
except Exception as e:
logger.exception(e)
return other_response(message=str(e), code=400)
def insert_os_type_relation(self, request):
data = request.data
os_type = request.data["os_type"]
source_repo = request.data["source_repo"]
src_pkg_mark = request.data["src_pkg_mark"]
git_rule = request.data.get("git_rule", None)
source_devel = request.data["source_devel"]
source_debuginfo = request.data["source_debuginfo"]
sync_conf = request.data.get("sync_conf", None)
logger.info(sync_conf)
try:
image = request.data['image']
except Exception as e:
image = ""
if len(os_type) > 0 and len(source_repo) > 0:
try:
os_object = OSTypeModel.objects.all().filter(os_type=os_type).first()
if os_object is None:
os_type_object = OSTypeModel.objects.create(
os_type = os_type,
source_repo = source_repo,
image = image,
src_pkg_mark = src_pkg_mark,
source_devel = source_devel,
source_debuginfo = source_debuginfo
)
if sync_conf:
if os_type_object is None:
return other_response(msg="can not find the OS type record", code=400)
if os_type_object.sync_status == 0:
return other_response(msg="Synchronizing, please wait for synchronization to complete", code=400)
thread_runner = threading.Thread(target=self.function.sync_kernel, name="sync_kernel",args=(os_type_object.id,))
thread_runner.start()
else:
return other_response(message="same OS Type found in record..", code=400)
except Exception as e:
return other_response(message=str(e), code=400)
else:
return other_response(message="one or more of the key parameter is null", code=400)
return success(result={"msg":"create os_type relation successfully"}, message="invoke insert_os_type_relation success")
def insert_kernel_version_relation(self, request):
kernel_version = request.data['kernel_version']
source = request.data['source']
devel_link = request.data['devel_link']
debuginfo_link = request.data['debuginfo_link']
os_type = request.data['os_type']
image = request.data['image']
use_src_package = request.data['use_src_package']
if len(kernel_version)>0 and len(source)>0 and len(devel_link)>0 and len(debuginfo_link)>0 and len(os_type)>0 and len(image) > 0:
try:
kernel_object = KernelVersionModel.objects.all().filter(kernel_version=kernel_version).first()
if kernel_object is None:
kernel_object = KernelVersionModel.objects.create(
kernel_version = kernel_version,
os_type=os_type,
source = source,
devel_link = devel_link,
debuginfo_link = debuginfo_link,
image = image,
use_src_package = use_src_package
)
else:
return other_response(message="same kernel version found in record...")
except Exception as e:
return other_response(message=str(e), code=400)
else:
return other_response(message="one or more of the key parameters is null", code=400)
return success(result={"msg":"create kernel version relation successfully"}, message="invoke insert_kernel_version_relation success")
def get_os_type_relation(self, request):
try:
id = request.GET.get('id')
except Exception as e:
id = None
# if id == None , it means this api need to get one specific os_type
try:
if id is None:
queryset = OSTypeModel.objects.all().filter(deleted_at=None)
response = serializer.OSTypeSerializer(queryset, many=True)
else:
os_type_object = OSTypeModel.objects.all().filter(id=id).first()
if os_type_object is None:
return other_response(msg="can not find the OS type record", code=400)
response = serializer.OSTypeSerializer(os_type_object)
except Exception as e:
logger.error(e)
return other_response(message=str(e), result={"msg":"get_os_type_relation failed"}, code=400)
return success(result=response.data, message="get_os_type_relation")
def get_kernel_relation(self, request):
try:
queryset = KernelVersionModel.objects.all().filter(deleted_at=None)
response = serializer.KernelSerializer(queryset, many=True)
except Exception as e:
logger.error(e)
return other_response(message=str(e), result={"msg":"get_kernel_relation failed"}, code=400)
return success(result=response.data, message="get_kernel_relation")
def delete_os_type(self, request):
object_id = request.data['id']
try:
os_type_object = OSTypeModel.objects.all().filter(id=object_id).first()
if os_type_object is not None:
os_type = os_type_object.os_type
kernel_sets = KernelVersionModel.objects.all().filter(os_type=os_type)
# delete all the kernel belongs to the delete os_type
for each_kernel in kernel_sets:
each_kernel.delete()
os_type_object.delete()
else:
return other_response(message="can not find the record with the given id", code=400)
except Exception as e:
return other_response(message=str(e), code=400)
return success(result={"msg":"successfully deleted object"}, message="invoked delete os type")
def delete_kernel_version(self, request):
object_id = request.data['id']
try:
os_type_object = KernelVersionModel.objects.all().filter(id=object_id).first()
if os_type_object is not None:
os_type_object.delete()
else:
return other_response(message="can not find the record with the given id", code=400)
except Exception as e:
return other_response(message=str(e), code=400)
return success(result={"msg":"successfully deleted object"}, message="invoked delete kernel_version")
def update_kernel_version(self, request):
try:
kernel_object = KernelVersionModel.objects.all().filter(id=request.data['id']).first()
if kernel_object is None:
return other_response(msg="can not find the kernelversion record", code=400)
kernel_object.kernel_version = request.data['kernel_version']
kernel_object.os_type = request.data['os_type']
kernel_object.source = request.data['source']
kernel_object.devel_link = request.data['devel_link']
kernel_object.debuginfo_link = request.data['debuginfo_link']
kernel_object.image = request.data['image']
kernel_object.use_src_package = request.data['use_src_package']
kernel_object.save()
except Exception as e:
return other_response(msg=str(e), code=400)
return success(result={"msg":"successfully update kernelversion object"},message="invoke update_kernelversion")
def update_ostype(self, request):
try:
os_type_object = OSTypeModel.objects.all().filter(id=request.data['id']).first()
if os_type_object is None:
return other_response(msg="can not find the OS type record", code=400)
if os_type_object.sync_status == 0:
return other_response(msg="Synchronizing, please wait for synchronization to complete", code=400)
os_type_object.os_type = request.data['os_type']
os_type_object.source_repo = request.data['source_repo']
os_type_object.image = request.data['image']
os_type_object.source_devel = request.data['source_devel']
os_type_object.source_debuginfo = request.data['source_debuginfo']
# src_pkg_mark = request.data.get("src_pkg_mark", None)
git_rule = request.data.get("git_rule", None)
os_type_object.save()
thread_runner = threading.Thread(target=self.function.sync_kernel, name="sync_kernel",args=(os_type_object.id,))
thread_runner.start()
except Exception as e:
return other_response(msg=str(e), code=400)
return success(result={"msg":"successfully update os_type object"},message="invoke update_ostype")
def sync_kernel_by_os_type(self, request):
id=request.data.get("id")
os_type_object = OSTypeModel.objects.all().filter(id=id).first()
if os_type_object is None:
return other_response(msg="can not find the OS type record", code=400)
if os_type_object.sync_status == 0:
return other_response(msg="Synchronizing, please wait for synchronization to complete", code=400)
thread_runner=threading.Thread(target=self.function.sync_kernel, name="sync_kernel",args=(id,))
thread_runner.start()
return success(result={"msg":"success trigger sync kernel"})
def oneclick_deploy(self, request):
kernel_version = request.data.get("kernel_version")
rpm_package = request.data.get("rpm_package")
server_hotfix_path = os.path.join(settings.HOTFIX_FILE_STORAGE_REPO, "rpm", rpm_package)
if not os.path.exists(server_hotfix_path):
logger.error("hotfix file %s not exist..." % server_hotfix_path)
return other_response(msg="hotfix file %s not found." % server_hotfix_path, code=400)
host_list = self.function.get_host_list(kernel_version=kernel_version)
host_num = len(host_list)
success_num = self.function.deploy_hotfix_to_machine(kernel_version=kernel_version, hotfix_path=server_hotfix_path)
if success_num == -1:
return other_response(msg="No host matches kernel version : %s" % kernel_version, code=400)
if success_num is not None:
return success(result={"msg": "deploy hotfix to machine, %s of %s success" % (success_num, host_num)})
else:
return other_response(msg="deploy to hotfix failed", code=400)
class HealthViewset(CommonModelViewSet):
def health_check(self, request, *args, **kwargs):
return success(result={})
class ReleaseHotfixListAPIView(GenericViewSet,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.CreateModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin):
queryset = ReleasedHotfixListModule.objects.all()
pagination_class = Pagination
serializer_class = serializer.ReleasedHotfixSerializer
filter_class = HotfixReleasedFilter
filter_backends = [DjangoFilterBackend]
http_method_names = ['get', 'post', 'patch', 'put', 'delete']
def get_serializer_class(self):
"""appoint method serializer"""
request_method_dict = {
"POST": serializer.CreateReleasedHotfixSerializer,
"PUT": serializer.UpdatePutReleasedHotfixSerializer,
"PATCH": serializer.UpdateReleasedHotfixSerializer,
"GET": serializer.ReleasedHotfixSerializer,
}
return request_method_dict[self.request.method]
"""this function returns the filtered records in the database
"""
def get_filter_released_hotfixs(self, request: Request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
if not queryset:
return success([], total=0)
return super().list(request, *args, **kwargs)
"""insert one released hotfix record into database
"""
def add_one_released_hotfix(self, request, *args, **kwarg):
create_serializer = self.get_serializer(data=request.data)
create_serializer.is_valid(raise_exception=True)
self.perform_create(create_serializer)
ser = serializer.ReleasedHotfixSerializer(create_serializer.instance)
return success(result=ser.data)
"""this function is used to update one hotfix record
However, the information inside just can update, but cannot delete!
You can update it into blank message " ", but I hope this message should never be deleted!
"""
def update_released_hotfix_record(self, request, *args, **kwargs):
"""appoint patch method"""
partial = kwargs.pop('partial', False)
instance = self.get_object()
update_serializer = self.get_serializer(instance, data=request.data, partial=partial)
update_serializer.is_valid(raise_exception=True)
self.perform_update(update_serializer)
ser = serializer.ReleasedHotfixSerializer(update_serializer.instance, many=False)
return success(result=ser.data)
def import_from_table_v2(self, request):
ser = serializer.BulkImportHotfixReleasedSerializer(data=request.data)
ser.is_valid(raise_exception=True)
self.perform_create(ser)
return success(result={},message="save files successful!")