mirror of https://gitee.com/anolis/sysom.git
380 lines
14 KiB
Python
380 lines
14 KiB
Python
# -*- encoding: utf-8 -*-
|
||
"""
|
||
@File : vul.py
|
||
@Time : 2022/2/16 下午3:37
|
||
@Author : weidongkl
|
||
@Email : weidong@uniontech.com
|
||
@Software: PyCharm
|
||
"""
|
||
import datetime
|
||
import json
|
||
import re
|
||
from clogger import logger
|
||
from django.utils import timezone
|
||
from django.db.models import Q
|
||
from rest_framework import status
|
||
from apps.vul.models import *
|
||
from apps.vul.ssh_pool import VulTaskManager
|
||
|
||
from lib.utils import human_datetime
|
||
from .async_fetch import FetchVulData
|
||
|
||
|
||
def update_vul():
|
||
job_start_time = timezone.now()
|
||
job_id = "update_vul_{:%Y%m%d%H%M%S%f}".format(datetime.datetime.utcnow())
|
||
update_vul_db()
|
||
job_end_time = timezone.now()
|
||
VulJobModel.objects.create(
|
||
job_id=job_id,
|
||
job_name="update_vul",
|
||
job_desc="Start updating the vulnerability database",
|
||
job_start_time=job_start_time,
|
||
job_end_time=job_end_time,
|
||
)
|
||
|
||
|
||
def update_vul_db():
|
||
"""
|
||
更新漏洞数据库数据
|
||
"""
|
||
logger.info("Begin to get vul db address")
|
||
vul_addrs = VulAddrModel.objects.all() # 获取漏洞数据库中所有的漏洞库信息
|
||
for vul_addr in vul_addrs:
|
||
logger.info("Try to get vul db info")
|
||
vul_addr_obj = VulDataParse(vul_addr) # 生成漏洞库操作实例
|
||
try:
|
||
for res in vul_addr_obj._get_vul_data(): # 获取每一项cve数据,解析返回数据,根据cve是否存在进行更新或者插入
|
||
vul_addr_obj.parse_and_store_vul_data(res)
|
||
except Exception as e:
|
||
logger.warning(e)
|
||
logger.warning(f"failed in {vul_addr.url}")
|
||
|
||
"""
|
||
VulDataParse:CVE漏洞库操作实例
|
||
"""
|
||
class VulDataParse(object):
|
||
def __init__(self, vul_addr_obj: VulAddrModel):
|
||
self.vul_addr_obj = vul_addr_obj
|
||
self.cve_data_path = list(filter(None, self._parser_cve_item_path))
|
||
|
||
@property
|
||
def _parser_cve_item_path(self) -> list:
|
||
json_attr: dict = json.loads(self.vul_addr_obj.parser)
|
||
cve_item_path = json_attr.get('cve_item_path', None)
|
||
if not cve_item_path:
|
||
return []
|
||
return cve_item_path.split('/')
|
||
|
||
def _generate_url(self, instance: VulAddrModel, url) -> dict:
|
||
"""
|
||
构造请求
|
||
"""
|
||
authorization_body = json.loads(instance.authorization_body)
|
||
if instance.authorization_type.lower() == "basic":
|
||
auth = (
|
||
authorization_body.get('username'),
|
||
authorization_body.get('password')
|
||
)
|
||
else:
|
||
auth = ()
|
||
return {
|
||
'method': instance.get_method_display(),
|
||
'url': url,
|
||
'headers': json.loads(instance.headers),
|
||
'data': json.loads(instance.body),
|
||
'params': json.loads(instance.params),
|
||
'auth': auth
|
||
}
|
||
|
||
def _get_vul_data(self):
|
||
"""
|
||
异步获取vul
|
||
"""
|
||
return FetchVulData.run(
|
||
instance=self.vul_addr_obj,
|
||
cve_data_path=self.cve_data_path
|
||
)
|
||
|
||
def _update_exist_vul(self, instance: VulModel, **kwargs):
|
||
"""If the cve exists and the level changes, modify the cve level
|
||
"""
|
||
level = kwargs.get('vul_level')
|
||
if instance.vul_level != level:
|
||
try:
|
||
instance.__dict__.update(kwargs)
|
||
instance.save()
|
||
except Exception as e:
|
||
logger.error(e)
|
||
|
||
def _create_new_vul(self, **kwargs):
|
||
"""if vul not exist then create new cve"""
|
||
try:
|
||
VulModel.objects.create(**kwargs)
|
||
except Exception as e:
|
||
logger.error('vul add fiald')
|
||
|
||
def parse_and_store_vul_data(self, cve: dict):
|
||
"""
|
||
parse cve update or save
|
||
"""
|
||
def _get_vul_level(item):
|
||
"""
|
||
get cve level
|
||
"""
|
||
vul_level = None
|
||
|
||
if "level_flag" in parser:
|
||
vul_level = item.get(parser["level_flag"], None)
|
||
|
||
if vul_level is None:
|
||
vul_score = item.get(parser["score_flag"], None)
|
||
if vul_score is None:
|
||
vul_level = ""
|
||
else:
|
||
if int(vul_score) < 4:
|
||
vul_level = "low"
|
||
elif int(vul_score) < 7:
|
||
vul_level = "medium"
|
||
elif int(vul_score) < 9:
|
||
vul_level = "high"
|
||
else:
|
||
vul_level = "critical"
|
||
return vul_level
|
||
|
||
parser = json.loads(self.vul_addr_obj.parser)
|
||
_now = timezone.now()
|
||
cve_id = cve[parser["cve_id_flag"]]
|
||
pub_time = cve.get(parser["pub_time_flag"], None)
|
||
detail = cve.get('abstract', '')
|
||
level = _get_vul_level(cve)
|
||
|
||
common_kwargs = dict(
|
||
pub_time=pub_time,
|
||
vul_level=level,
|
||
update_time=_now,
|
||
detail=detail
|
||
)
|
||
|
||
try:
|
||
instance = VulModel.objects.get(cve_id=cve_id)
|
||
self._update_exist_vul(instance, **common_kwargs)
|
||
except VulModel.DoesNotExist:
|
||
common_kwargs['cve_id'] = cve_id
|
||
self._create_new_vul(**common_kwargs)
|
||
|
||
def set_vul_data_status(self, status):
|
||
self.vul_addr_obj.status = status
|
||
self.vul_addr_obj.save()
|
||
|
||
def set_vul_data_status_down(self):
|
||
self.set_vul_data_status(1)
|
||
|
||
def set_vul_data_status_up(self):
|
||
self.set_vul_data_status(0)
|
||
|
||
|
||
def get_unfix_cve():
|
||
"""
|
||
Returns: QuerySet in django
|
||
"""
|
||
unfix_cve_obj_search = VulModel.objects.filter(status="unfix")
|
||
return unfix_cve_obj_search
|
||
|
||
|
||
def get_unfix_cve_format():
|
||
"""
|
||
获取python 对象格式的未修复cve信息
|
||
Returns: []
|
||
"""
|
||
queryset = get_unfix_cve()
|
||
data = []
|
||
for i in queryset:
|
||
data.append({"cve_id": i.cve_id,
|
||
"os": i.os,
|
||
"software_name": i.software_name})
|
||
return data
|
||
|
||
|
||
def update_sa(hosts: list):
|
||
job_start_time = timezone.now()
|
||
job_id = "update_vul_{:%Y%m%d%H%M%S%f}".format(datetime.datetime.utcnow())
|
||
update_sa_job_obj = VulJobModel.objects.create(
|
||
job_id=job_id,
|
||
job_name="update_sa",
|
||
job_desc="Start updating the sa database",
|
||
job_start_time=job_start_time,
|
||
)
|
||
cmd = r'''
|
||
#!/bin/bash
|
||
# 获取版本信息
|
||
dist=$(cat /etc/os-release | grep PLATFORM_ID | awk -F '"|:' '{print $3}')
|
||
if [ -z $dist ]; then
|
||
dist="unknow"
|
||
fi
|
||
# 获取errata信息
|
||
declare -a cve_array
|
||
mapfile -t cve_array <<<$(dnf updateinfo list --with-cve 2>/dev/null | grep ^CVE | sort -k 1,1 -u | awk '{print $1 " " $3}')
|
||
for i in "${cve_array[@]}"; do
|
||
cve_id=$(echo $i | awk '{print $1}')
|
||
# 使用sed正则匹配rpm的包名,版本号,release
|
||
rpm_pkg=$(echo $i | awk '{print $2}' | sed -e 's/^\(.*\)-\([^-]\{1,\}\)-\([^-]\{1,\}\)$/\1 \2 \3/' -e 's/\.\(el8\|el7\|an8\|oe\|uel20\|uelc20\).*$//g')
|
||
rpm_version=$(echo $rpm_pkg | awk '{print $2"-"$3}')
|
||
rpm_bin_name=$(echo $rpm_pkg | awk '{print $1}')
|
||
# 根据包名字获取source包名称
|
||
rpm_source_name=$(rpm -q $rpm_bin_name --queryformat "%{sourcerpm}\n" | head -n 1 | awk -F "-$(rpm -q $rpm_bin_name --queryformat "%{version}\n" | head -n 1)" '{print $1}')
|
||
echo $cve_id $rpm_source_name $rpm_version $dist
|
||
done
|
||
'''
|
||
# hosts = [item['ip'] for item in FetchHost.get_host_list()]
|
||
vtm = VulTaskManager(hosts, cmd)
|
||
results = vtm.run(VulTaskManager.run_command)
|
||
|
||
# cve2host_info={"cve1":[(host,software,version,os)]}
|
||
# [{'host': 'GqYLM32pIZaNH0rOjd7JViwxPs', 'ret': {'status': 1, 'result': timeout('timed out')}}]
|
||
cve2host_info = {}
|
||
for result in results:
|
||
host = result["host"]
|
||
if result["ret"]['status'] == 0:
|
||
for cve_info in parse_sa_result(result["ret"]['result']):
|
||
cve = cve_info.pop(0)
|
||
cve_info.insert(0, host)
|
||
if cve in cve2host_info.keys():
|
||
cve2host_info[cve].append(tuple(cve_info))
|
||
else:
|
||
cve2host_info[cve] = [tuple(cve_info)]
|
||
|
||
if len(cve2host_info) > 0:
|
||
update_sa_db(cve2host_info)
|
||
job_end_time = timezone.now()
|
||
update_sa_job_obj.job_end_time = job_end_time
|
||
update_sa_job_obj.save()
|
||
|
||
|
||
def update_sa_db(cveinfo):
|
||
#
|
||
# cve2host_info={"cve1":[(host,software,version,os)]}
|
||
#
|
||
current_cveinfo = SecurityAdvisoryModel.objects.values_list("cve_id", "software_name", "fixed_version", "os")
|
||
current_cves = set([cve for cve in current_cveinfo])
|
||
new_cveinfo = cveinfo
|
||
new_cves = set([(k, item[1], item[2], item[3]) for k, v in new_cveinfo.items() for item in v])
|
||
delete_cves = current_cves - new_cves
|
||
# 删除无效的关联关系,用于更新客户手动修复漏洞后,导致的数据库不匹配问题
|
||
for cve in list(delete_cves):
|
||
cve_id, software_name, fixed_version, os = cve
|
||
sacve_obj = SecurityAdvisoryModel.objects.filter(cve_id=cve_id,
|
||
software_name=software_name,
|
||
fixed_version=fixed_version,
|
||
os=os).first()
|
||
sacve_obj.hosts = ""
|
||
sacve_obj.save()
|
||
add_cves = new_cves - current_cves
|
||
# [("cve_id", "software_name", "fixed_version", "os")]alios#123
|
||
for cve in list(add_cves):
|
||
cve_id, software_name, fixed_version, os = cve
|
||
hosts = ",".join([cve_detail[0] for cve_detail in new_cveinfo[cve_id] if
|
||
cve_detail[1] == software_name and cve_detail[2] == fixed_version and cve_detail[3] == os])
|
||
|
||
try:
|
||
SecurityAdvisoryModel.objects.get(cve_id=cve_id)
|
||
except SecurityAdvisoryModel.DoesNotExist:
|
||
logger.info(f'create new cve: {cve_id}')
|
||
SecurityAdvisoryModel.objects.create(
|
||
cve_id=cve_id, update_time=timezone.now(), hosts=hosts,
|
||
os=os, fixed_version=fixed_version, software_name=software_name,
|
||
)
|
||
|
||
update_cves = new_cves & current_cves
|
||
for cve in list(update_cves):
|
||
cve_id, software_name, fixed_version, os = cve
|
||
hosts = [cve_detail[0] for cve_detail in new_cveinfo[cve_id] if
|
||
cve_detail[1] == software_name and cve_detail[2] == fixed_version and cve_detail[3] == os]
|
||
sacve_obj = SecurityAdvisoryModel.objects.filter(cve_id=cve_id,
|
||
software_name=software_name,
|
||
fixed_version=fixed_version,
|
||
os=os).first()
|
||
|
||
_hosts = [] if sacve_obj.hosts == "" else sacve_obj.hosts.split(',')
|
||
sacve_obj.hosts = ','.join(list(set(hosts + _hosts)))
|
||
sacve_obj.save()
|
||
|
||
# # 更新漏洞数据库数据至sa
|
||
for sacve_obj in set(
|
||
SecurityAdvisoryModel.objects.filter(Q(pub_time='') | Q(vul_level='')).values_list("cve_id")):
|
||
cve_obj_search = VulModel.objects.filter(cve_id=sacve_obj[0])
|
||
if len(cve_obj_search) != 0:
|
||
cve_obj = cve_obj_search.first()
|
||
SecurityAdvisoryModel.objects.filter(cve_id=sacve_obj[0]).update(
|
||
score=cve_obj.score,
|
||
description=cve_obj.description,
|
||
pub_time=cve_obj.pub_time,
|
||
vul_level=cve_obj.vul_level,
|
||
detail=cve_obj.detail,
|
||
update_time=timezone.now(),
|
||
)
|
||
|
||
|
||
def parse_sa_result(result):
|
||
"""解析dnf获取的sa数据"""
|
||
for i in result.split("\n"):
|
||
sa_re = "CVE-\d{4}-\d{4,7}(\s+\S+){3}"
|
||
if re.match(sa_re, i, re.I):
|
||
yield i.split()
|
||
|
||
|
||
def fix_cve(hosts, cve_id, user):
|
||
cmd = 'dnf update --cve {} -y'.format(cve_id)
|
||
vtm = VulTaskManager(hosts, cmd)
|
||
results = vtm.run(VulTaskManager.run_command)
|
||
fixed_time = human_datetime()
|
||
user_obj = user.get('id', 1)
|
||
vul_level = SecurityAdvisoryModel.objects.filter(cve_id=cve_id).first().vul_level
|
||
cve_status = "success"
|
||
init = True
|
||
for ret in results:
|
||
if ret["ret"]["status"] == 0:
|
||
status = "success"
|
||
details = ret["ret"]["result"]
|
||
sa_obj = SecurityAdvisoryModel.objects.filter(hosts__contains=ret['host'], cve_id=cve_id).first()
|
||
_hosts = sa_obj.hosts.split(',')
|
||
_hosts.remove(ret["host"])
|
||
sa_obj.hosts = ",".join(_hosts)
|
||
sa_obj.save()
|
||
else:
|
||
status = "fail"
|
||
cve_status = "fail"
|
||
details = ret["ret"]["result"]
|
||
|
||
if init:
|
||
safh = SecurityAdvisoryFixHistoryModel.objects.create(fixed_at=fixed_time,
|
||
cve_id=cve_id,
|
||
vul_level=vul_level,
|
||
created_by=user_obj,
|
||
status=cve_status)
|
||
init = False
|
||
elif cve_status == "fail":
|
||
safh.status = cve_status
|
||
|
||
# safh.host.add(host_obj, through_defaults={'status': status, "details": details})
|
||
safh.hosts = ret["host"]
|
||
safh.save()
|
||
|
||
sfh_kwargs = {}
|
||
sfh_kwargs['sa_fix_hist'] = safh
|
||
sfh_kwargs['status'] = status
|
||
sfh_kwargs['details'] = details
|
||
sfh_kwargs['hosts'] = ret["host"]
|
||
_save_sa_fix_hist_host(**sfh_kwargs)
|
||
return results
|
||
|
||
|
||
def _save_sa_fix_hist_host(**kwargs):
|
||
"""
|
||
create fix hist host
|
||
"""
|
||
try:
|
||
SaFixHistToHost.objects.create(**kwargs)
|
||
except Exception as e:
|
||
logger.error(str(e))
|