sysom1/sysom_server/sysom_vul/apps/vul/executor.py

59 lines
2.2 KiB
Python

from clogger import logger
from django.conf import settings
from cec_base.event import Event
from cec_base.consumer import Consumer
from cec_base.cec_client import MultiConsumer, CecAsyncConsumeTask
from sysom_utils import ConfigParser
from .models import VulModel, SecurityAdvisoryModel, SecurityAdvisoryFixHistoryModel, SaFixHistToHost
class VulListener(MultiConsumer):
def __init__(self):
config: ConfigParser = settings.YAML_CONFIG
super().__init__(
settings.SYSOM_CEC_URL,
custom_callback=self.on_receive_event
)
self.append_group_consume_task(
config.get_server_config().cec.topics.SYSOM_CEC_PLUGIN_TOPIC,
config.get_consumer_group(),
consumer_id=Consumer.generate_consumer_id(),
ensure_topic_exist=True
)
def on_receive_event(self, event: Event, task: CecAsyncConsumeTask):
"""Process received event"""
try:
if event.value.get("type", "") == "clean":
params = event.value.get('params')
self._delete_host_instance(params['instance'])
finally:
task.ack(event)
def _delete_host_instance(self, instance: str):
"""delete host"""
sa_querysets = SecurityAdvisoryModel.objects.filter(hosts__contains=instance)
safh_querysets = SecurityAdvisoryFixHistoryModel.objects.filter(hosts__contains=instance)
sfht_querysets = SaFixHistToHost.objects.filter(hosts__contains=instance)
for querysets in [sa_querysets, safh_querysets, sfht_querysets]:
self._clean_host(instance, querysets)
logger.info(f'node {instance} clean done!')
def _clean_host(self, instance:str, querysets=None):
if not querysets:
return
for queryset in querysets:
try:
if queryset.hosts == instance:
queryset.delete()
else:
_hosts = queryset.hosts.split(',')
_hosts.remove(instance)
queryset.hosts = ",".join(_hosts)
queryset.save()
except Exception as e:
logger.error(f'clean failed, error: {str(e)}')
logger.info(querysets)