mirror of https://gitee.com/anolis/sysom.git
124 lines
4.1 KiB
Python
124 lines
4.1 KiB
Python
import sys
|
||
import json
|
||
from clogger import logger
|
||
from django.apps import AppConfig
|
||
from django.db.models.signals import post_migrate, pre_migrate
|
||
from django.db import connection
|
||
|
||
|
||
class VulConfig(AppConfig):
|
||
default_auto_field = 'django.db.models.BigAutoField'
|
||
name = 'apps.vul'
|
||
|
||
def __init__(self, app_name, app_module) -> None:
|
||
super().__init__(app_name, app_module)
|
||
self._sa_host_cache: dict = None
|
||
|
||
def ready(self) -> None:
|
||
pre_migrate.connect(self.get_all_sa_bout_host, sender=self)
|
||
post_migrate.connect(self.initialization_vul_config, sender=self)
|
||
|
||
from django.conf import settings
|
||
if ('runserver' in sys.argv or 'manage.py' not in sys.argv):
|
||
from channel_job.job import default_channel_job_executor
|
||
from .executor import VulListener
|
||
from sysom_utils import FrameworkPlugMag, CmgPlugin
|
||
# 初始化 channel_job sdk
|
||
default_channel_job_executor.init_config(settings.CHANNEL_JOB_URL)
|
||
default_channel_job_executor.start()
|
||
|
||
# Framework Plug Mag init
|
||
FrameworkPlugMag(settings.YAML_CONFIG) \
|
||
.load_plugin_cls(CmgPlugin) \
|
||
.start()
|
||
|
||
try:
|
||
VulListener().start()
|
||
except Exception as e:
|
||
logger.exception(e)
|
||
logger.info(">>> Vul module loading success")
|
||
|
||
def initialization_vul_config(self, sender, **kwargs):
|
||
"""
|
||
初始化vul_db表数据
|
||
"""
|
||
# setup 1 初始化vul addr 数据
|
||
self._initialize_vul_addr()
|
||
|
||
# setup 2 复原sa相关host数据
|
||
self._fix_migrate_hosts_field()
|
||
|
||
def get_all_sa_bout_host(self, sender, **kwargs):
|
||
sql = """
|
||
SELECT sys_sa.cve_id, ssh.ip
|
||
FROM sys_sa left
|
||
JOIN
|
||
(SELECT sh.securityadvisorymodel_id, h.id, h.ip
|
||
FROM sys_host h
|
||
LEFT JOIN sys_sa_host sh
|
||
ON h.id=sh.hostmodel_id
|
||
) ssh ON sys_sa.id = ssh.securityadvisorymodel_id;
|
||
"""
|
||
try:
|
||
cursor = connection.cursor()
|
||
cursor.execute(sql)
|
||
results = cursor.fetchall()
|
||
cursor.close()
|
||
except Exception as e:
|
||
# logger.error(str(e))
|
||
return
|
||
|
||
self._sa_host_cache = dict()
|
||
for ret in results:
|
||
cve_id, ip = ret
|
||
if cve_id not in self._sa_host_cache:
|
||
self._sa_host_cache[cve_id] = [ip]
|
||
else:
|
||
self._sa_host_cache[cve_id].append(ip)
|
||
|
||
def _initialize_vul_addr(self):
|
||
parser = {
|
||
"cve_item_path": "data/data",
|
||
"cve_id_flag": "cve_id",
|
||
"score_flag": "score",
|
||
"pub_time_flag": "publish_date"
|
||
}
|
||
headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36\
|
||
(KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36"
|
||
}
|
||
params = {
|
||
"format": "json",
|
||
"page_num": 1,
|
||
"page_size": 50,
|
||
}
|
||
|
||
fields = {
|
||
"name": "Anolis 漏洞数据",
|
||
"description": "Anolis 漏洞数据",
|
||
"url": "https://anas.openanolis.cn/api/portal/v1/cves/",
|
||
"parser": json.dumps(parser),
|
||
"headers": json.dumps(headers),
|
||
"params": json.dumps(params),
|
||
}
|
||
|
||
try:
|
||
from .models import VulAddrModel
|
||
VulAddrModel.objects.create(**fields)
|
||
except Exception as e:
|
||
pass
|
||
|
||
def _fix_migrate_hosts_field(self):
|
||
if self._sa_host_cache is not None:
|
||
from .models import SecurityAdvisoryModel
|
||
for k, v in self._sa_host_cache.items():
|
||
try:
|
||
instance = SecurityAdvisoryModel.objects.get(cve_id=k)
|
||
instance.hosts = ','.join(v)
|
||
instance.save()
|
||
logger.info(f'cve: {k} update done!')
|
||
except SecurityAdvisoryModel.DoesNotExist as e:
|
||
logger.error(f'{k} not found! error: {str(e)}')
|
||
except Exception as e:
|
||
logger.error(f'{k} update fiad, error: {str(e)}')
|