sysom1/sysom_server/sysom_vul/apps/vul/vul.py

380 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding: utf-8 -*-
"""
@File : vul.py
@Time : 2022/2/16 下午3:37
@Author : weidongkl
@Email : weidong@uniontech.com
@Software: PyCharm
"""
import datetime
import json
import re
from clogger import logger
from django.utils import timezone
from django.db.models import Q
from rest_framework import status
from apps.vul.models import *
from apps.vul.ssh_pool import VulTaskManager
from lib.utils import human_datetime
from .async_fetch import FetchVulData
def update_vul():
job_start_time = timezone.now()
job_id = "update_vul_{:%Y%m%d%H%M%S%f}".format(datetime.datetime.utcnow())
update_vul_db()
job_end_time = timezone.now()
VulJobModel.objects.create(
job_id=job_id,
job_name="update_vul",
job_desc="Start updating the vulnerability database",
job_start_time=job_start_time,
job_end_time=job_end_time,
)
def update_vul_db():
"""
更新漏洞数据库数据
"""
logger.info("Begin to get vul db address")
vul_addrs = VulAddrModel.objects.all() # 获取漏洞数据库中所有的漏洞库信息
for vul_addr in vul_addrs:
logger.info("Try to get vul db info")
vul_addr_obj = VulDataParse(vul_addr) # 生成漏洞库操作实例
try:
for res in vul_addr_obj._get_vul_data(): # 获取每一项cve数据,解析返回数据,根据cve是否存在进行更新或者插入
vul_addr_obj.parse_and_store_vul_data(res)
except Exception as e:
logger.warning(e)
logger.warning(f"failed in {vul_addr.url}")
"""
VulDataParse:CVE漏洞库操作实例
"""
class VulDataParse(object):
def __init__(self, vul_addr_obj: VulAddrModel):
self.vul_addr_obj = vul_addr_obj
self.cve_data_path = list(filter(None, self._parser_cve_item_path))
@property
def _parser_cve_item_path(self) -> list:
json_attr: dict = json.loads(self.vul_addr_obj.parser)
cve_item_path = json_attr.get('cve_item_path', None)
if not cve_item_path:
return []
return cve_item_path.split('/')
def _generate_url(self, instance: VulAddrModel, url) -> dict:
"""
构造请求
"""
authorization_body = json.loads(instance.authorization_body)
if instance.authorization_type.lower() == "basic":
auth = (
authorization_body.get('username'),
authorization_body.get('password')
)
else:
auth = ()
return {
'method': instance.get_method_display(),
'url': url,
'headers': json.loads(instance.headers),
'data': json.loads(instance.body),
'params': json.loads(instance.params),
'auth': auth
}
def _get_vul_data(self):
"""
异步获取vul
"""
return FetchVulData.run(
instance=self.vul_addr_obj,
cve_data_path=self.cve_data_path
)
def _update_exist_vul(self, instance: VulModel, **kwargs):
"""If the cve exists and the level changes, modify the cve level
"""
level = kwargs.get('vul_level')
if instance.vul_level != level:
try:
instance.__dict__.update(kwargs)
instance.save()
except Exception as e:
logger.error(e)
def _create_new_vul(self, **kwargs):
"""if vul not exist then create new cve"""
try:
VulModel.objects.create(**kwargs)
except Exception as e:
logger.error('vul add fiald')
def parse_and_store_vul_data(self, cve: dict):
"""
parse cve update or save
"""
def _get_vul_level(item):
"""
get cve level
"""
vul_level = None
if "level_flag" in parser:
vul_level = item.get(parser["level_flag"], None)
if vul_level is None:
vul_score = item.get(parser["score_flag"], None)
if vul_score is None:
vul_level = ""
else:
if int(vul_score) < 4:
vul_level = "low"
elif int(vul_score) < 7:
vul_level = "medium"
elif int(vul_score) < 9:
vul_level = "high"
else:
vul_level = "critical"
return vul_level
parser = json.loads(self.vul_addr_obj.parser)
_now = timezone.now()
cve_id = cve[parser["cve_id_flag"]]
pub_time = cve.get(parser["pub_time_flag"], None)
detail = cve.get('abstract', '')
level = _get_vul_level(cve)
common_kwargs = dict(
pub_time=pub_time,
vul_level=level,
update_time=_now,
detail=detail
)
try:
instance = VulModel.objects.get(cve_id=cve_id)
self._update_exist_vul(instance, **common_kwargs)
except VulModel.DoesNotExist:
common_kwargs['cve_id'] = cve_id
self._create_new_vul(**common_kwargs)
def set_vul_data_status(self, status):
self.vul_addr_obj.status = status
self.vul_addr_obj.save()
def set_vul_data_status_down(self):
self.set_vul_data_status(1)
def set_vul_data_status_up(self):
self.set_vul_data_status(0)
def get_unfix_cve():
"""
Returns: QuerySet in django
"""
unfix_cve_obj_search = VulModel.objects.filter(status="unfix")
return unfix_cve_obj_search
def get_unfix_cve_format():
"""
获取python 对象格式的未修复cve信息
Returns: []
"""
queryset = get_unfix_cve()
data = []
for i in queryset:
data.append({"cve_id": i.cve_id,
"os": i.os,
"software_name": i.software_name})
return data
def update_sa(hosts: list):
job_start_time = timezone.now()
job_id = "update_vul_{:%Y%m%d%H%M%S%f}".format(datetime.datetime.utcnow())
update_sa_job_obj = VulJobModel.objects.create(
job_id=job_id,
job_name="update_sa",
job_desc="Start updating the sa database",
job_start_time=job_start_time,
)
cmd = r'''
#!/bin/bash
# 获取版本信息
dist=$(cat /etc/os-release | grep PLATFORM_ID | awk -F '"|:' '{print $3}')
if [ -z $dist ]; then
dist="unknow"
fi
# 获取errata信息
declare -a cve_array
mapfile -t cve_array <<<$(dnf updateinfo list --with-cve 2>/dev/null | grep ^CVE | sort -k 1,1 -u | awk '{print $1 " " $3}')
for i in "${cve_array[@]}"; do
cve_id=$(echo $i | awk '{print $1}')
# 使用sed正则匹配rpm的包名版本号release
rpm_pkg=$(echo $i | awk '{print $2}' | sed -e 's/^\(.*\)-\([^-]\{1,\}\)-\([^-]\{1,\}\)$/\1 \2 \3/' -e 's/\.\(el8\|el7\|an8\|oe\|uel20\|uelc20\).*$//g')
rpm_version=$(echo $rpm_pkg | awk '{print $2"-"$3}')
rpm_bin_name=$(echo $rpm_pkg | awk '{print $1}')
# 根据包名字获取source包名称
rpm_source_name=$(rpm -q $rpm_bin_name --queryformat "%{sourcerpm}\n" | head -n 1 | awk -F "-$(rpm -q $rpm_bin_name --queryformat "%{version}\n" | head -n 1)" '{print $1}')
echo $cve_id $rpm_source_name $rpm_version $dist
done
'''
# hosts = [item['ip'] for item in FetchHost.get_host_list()]
vtm = VulTaskManager(hosts, cmd)
results = vtm.run(VulTaskManager.run_command)
# cve2host_info={"cve1":[(host,software,version,os)]}
# [{'host': 'GqYLM32pIZaNH0rOjd7JViwxPs', 'ret': {'status': 1, 'result': timeout('timed out')}}]
cve2host_info = {}
for result in results:
host = result["host"]
if result["ret"]['status'] == 0:
for cve_info in parse_sa_result(result["ret"]['result']):
cve = cve_info.pop(0)
cve_info.insert(0, host)
if cve in cve2host_info.keys():
cve2host_info[cve].append(tuple(cve_info))
else:
cve2host_info[cve] = [tuple(cve_info)]
if len(cve2host_info) > 0:
update_sa_db(cve2host_info)
job_end_time = timezone.now()
update_sa_job_obj.job_end_time = job_end_time
update_sa_job_obj.save()
def update_sa_db(cveinfo):
#
# cve2host_info={"cve1":[(host,software,version,os)]}
#
current_cveinfo = SecurityAdvisoryModel.objects.values_list("cve_id", "software_name", "fixed_version", "os")
current_cves = set([cve for cve in current_cveinfo])
new_cveinfo = cveinfo
new_cves = set([(k, item[1], item[2], item[3]) for k, v in new_cveinfo.items() for item in v])
delete_cves = current_cves - new_cves
# 删除无效的关联关系,用于更新客户手动修复漏洞后,导致的数据库不匹配问题
for cve in list(delete_cves):
cve_id, software_name, fixed_version, os = cve
sacve_obj = SecurityAdvisoryModel.objects.filter(cve_id=cve_id,
software_name=software_name,
fixed_version=fixed_version,
os=os).first()
sacve_obj.hosts = ""
sacve_obj.save()
add_cves = new_cves - current_cves
# [("cve_id", "software_name", "fixed_version", "os")]alios#123
for cve in list(add_cves):
cve_id, software_name, fixed_version, os = cve
hosts = ",".join([cve_detail[0] for cve_detail in new_cveinfo[cve_id] if
cve_detail[1] == software_name and cve_detail[2] == fixed_version and cve_detail[3] == os])
try:
SecurityAdvisoryModel.objects.get(cve_id=cve_id)
except SecurityAdvisoryModel.DoesNotExist:
logger.info(f'create new cve: {cve_id}')
SecurityAdvisoryModel.objects.create(
cve_id=cve_id, update_time=timezone.now(), hosts=hosts,
os=os, fixed_version=fixed_version, software_name=software_name,
)
update_cves = new_cves & current_cves
for cve in list(update_cves):
cve_id, software_name, fixed_version, os = cve
hosts = [cve_detail[0] for cve_detail in new_cveinfo[cve_id] if
cve_detail[1] == software_name and cve_detail[2] == fixed_version and cve_detail[3] == os]
sacve_obj = SecurityAdvisoryModel.objects.filter(cve_id=cve_id,
software_name=software_name,
fixed_version=fixed_version,
os=os).first()
_hosts = [] if sacve_obj.hosts == "" else sacve_obj.hosts.split(',')
sacve_obj.hosts = ','.join(list(set(hosts + _hosts)))
sacve_obj.save()
# # 更新漏洞数据库数据至sa
for sacve_obj in set(
SecurityAdvisoryModel.objects.filter(Q(pub_time='') | Q(vul_level='')).values_list("cve_id")):
cve_obj_search = VulModel.objects.filter(cve_id=sacve_obj[0])
if len(cve_obj_search) != 0:
cve_obj = cve_obj_search.first()
SecurityAdvisoryModel.objects.filter(cve_id=sacve_obj[0]).update(
score=cve_obj.score,
description=cve_obj.description,
pub_time=cve_obj.pub_time,
vul_level=cve_obj.vul_level,
detail=cve_obj.detail,
update_time=timezone.now(),
)
def parse_sa_result(result):
"""解析dnf获取的sa数据"""
for i in result.split("\n"):
sa_re = "CVE-\d{4}-\d{4,7}(\s+\S+){3}"
if re.match(sa_re, i, re.I):
yield i.split()
def fix_cve(hosts, cve_id, user):
cmd = 'dnf update --cve {} -y'.format(cve_id)
vtm = VulTaskManager(hosts, cmd)
results = vtm.run(VulTaskManager.run_command)
fixed_time = human_datetime()
user_obj = user.get('id', 1)
vul_level = SecurityAdvisoryModel.objects.filter(cve_id=cve_id).first().vul_level
cve_status = "success"
init = True
for ret in results:
if ret["ret"]["status"] == 0:
status = "success"
details = ret["ret"]["result"]
sa_obj = SecurityAdvisoryModel.objects.filter(hosts__contains=ret['host'], cve_id=cve_id).first()
_hosts = sa_obj.hosts.split(',')
_hosts.remove(ret["host"])
sa_obj.hosts = ",".join(_hosts)
sa_obj.save()
else:
status = "fail"
cve_status = "fail"
details = ret["ret"]["result"]
if init:
safh = SecurityAdvisoryFixHistoryModel.objects.create(fixed_at=fixed_time,
cve_id=cve_id,
vul_level=vul_level,
created_by=user_obj,
status=cve_status)
init = False
elif cve_status == "fail":
safh.status = cve_status
# safh.host.add(host_obj, through_defaults={'status': status, "details": details})
safh.hosts = ret["host"]
safh.save()
sfh_kwargs = {}
sfh_kwargs['sa_fix_hist'] = safh
sfh_kwargs['status'] = status
sfh_kwargs['details'] = details
sfh_kwargs['hosts'] = ret["host"]
_save_sa_fix_hist_host(**sfh_kwargs)
return results
def _save_sa_fix_hist_host(**kwargs):
"""
create fix hist host
"""
try:
SaFixHistToHost.objects.create(**kwargs)
except Exception as e:
logger.error(str(e))