sysom1/sysom_server/sysom_vul/apps/vul/apps.py

124 lines
4.1 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import json
from clogger import logger
from django.apps import AppConfig
from django.db.models.signals import post_migrate, pre_migrate
from django.db import connection
class VulConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.vul'
def __init__(self, app_name, app_module) -> None:
super().__init__(app_name, app_module)
self._sa_host_cache: dict = None
def ready(self) -> None:
pre_migrate.connect(self.get_all_sa_bout_host, sender=self)
post_migrate.connect(self.initialization_vul_config, sender=self)
from django.conf import settings
if ('runserver' in sys.argv or 'manage.py' not in sys.argv):
from channel_job.job import default_channel_job_executor
from .executor import VulListener
from sysom_utils import FrameworkPlugMag, CmgPlugin
# 初始化 channel_job sdk
default_channel_job_executor.init_config(settings.CHANNEL_JOB_URL)
default_channel_job_executor.start()
# Framework Plug Mag init
FrameworkPlugMag(settings.YAML_CONFIG) \
.load_plugin_cls(CmgPlugin) \
.start()
try:
VulListener().start()
except Exception as e:
logger.exception(e)
logger.info(">>> Vul module loading success")
def initialization_vul_config(self, sender, **kwargs):
"""
初始化vul_db表数据
"""
# setup 1 初始化vul addr 数据
self._initialize_vul_addr()
#  setup 2 复原sa相关host数据
self._fix_migrate_hosts_field()
def get_all_sa_bout_host(self, sender, **kwargs):
sql = """
SELECT sys_sa.cve_id, ssh.ip
FROM sys_sa left
JOIN
(SELECT sh.securityadvisorymodel_id, h.id, h.ip
FROM sys_host h
LEFT JOIN sys_sa_host sh
ON h.id=sh.hostmodel_id
) ssh ON sys_sa.id = ssh.securityadvisorymodel_id;
"""
try:
cursor = connection.cursor()
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
except Exception as e:
# logger.error(str(e))
return
self._sa_host_cache = dict()
for ret in results:
cve_id, ip = ret
if cve_id not in self._sa_host_cache:
self._sa_host_cache[cve_id] = [ip]
else:
self._sa_host_cache[cve_id].append(ip)
def _initialize_vul_addr(self):
parser = {
"cve_item_path": "data/data",
"cve_id_flag": "cve_id",
"score_flag": "score",
"pub_time_flag": "publish_date"
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36\
(KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36"
}
params = {
"format": "json",
"page_num": 1,
"page_size": 50,
}
fields = {
"name": "Anolis 漏洞数据",
"description": "Anolis 漏洞数据",
"url": "https://anas.openanolis.cn/api/portal/v1/cves/",
"parser": json.dumps(parser),
"headers": json.dumps(headers),
"params": json.dumps(params),
}
try:
from .models import VulAddrModel
VulAddrModel.objects.create(**fields)
except Exception as e:
pass
def _fix_migrate_hosts_field(self):
if self._sa_host_cache is not None:
from .models import SecurityAdvisoryModel
for k, v in self._sa_host_cache.items():
try:
instance = SecurityAdvisoryModel.objects.get(cve_id=k)
instance.hosts = ','.join(v)
instance.save()
logger.info(f'cve: {k} update done!')
except SecurityAdvisoryModel.DoesNotExist as e:
logger.error(f'{k} not found! error: {str(e)}')
except Exception as e:
logger.error(f'{k} update fiad, error: {str(e)}')