修改task调度执行传参,避免出现mysql链接失效的问题
This commit is contained in:
parent
b024977a0b
commit
ada5962256
|
@ -1,6 +1,8 @@
|
|||
import json
|
||||
import os
|
||||
import ast
|
||||
import subprocess
|
||||
import logging
|
||||
|
||||
from django.http.response import FileResponse
|
||||
from django_filters.rest_framework import DjangoFilterBackend
|
||||
|
@ -17,8 +19,10 @@ from apps.host.models import HostModel
|
|||
from apps.accounts.models import User
|
||||
from apps.task.filter import TaskFilter
|
||||
from consumer.executors import SshJob
|
||||
from lib import *
|
||||
from lib.response import success, other_response, not_found
|
||||
from lib.utils import human_datetime, uuid_8, scheduler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JobAPIView(GenericViewSet, mixins.ListModelMixin, mixins.RetrieveModelMixin):
|
||||
|
@ -185,10 +189,10 @@ def default_ssh_job(data, task_id):
|
|||
|
||||
def ssh_job(resp_scripts, task_id, user, data=None, **kwargs):
|
||||
if not data:
|
||||
job_model = JobModel.objects.create(command=resp_scripts, task_id=task_id,
|
||||
created_by=user)
|
||||
JobModel.objects.create(command=resp_scripts, task_id=task_id,
|
||||
created_by=user)
|
||||
else:
|
||||
job_model = JobModel.objects.create(command=resp_scripts, task_id=task_id,
|
||||
created_by=user, params=data)
|
||||
sch_job = SshJob(resp_scripts, job_model, **kwargs)
|
||||
JobModel.objects.create(command=resp_scripts, task_id=task_id,
|
||||
created_by=user, params=data)
|
||||
sch_job = SshJob(resp_scripts, task_id, **kwargs)
|
||||
scheduler.add_job(sch_job.run)
|
||||
|
|
|
@ -11,14 +11,15 @@ from sysom import settings
|
|||
|
||||
|
||||
class SshJob:
|
||||
def __init__(self, resp_scripts, job, **kwargs):
|
||||
def __init__(self, resp_scripts, task_id, **kwargs):
|
||||
self.resp_scripts = resp_scripts
|
||||
self.job = job
|
||||
self.task_id = task_id
|
||||
self.kwargs = kwargs
|
||||
|
||||
def run(self):
|
||||
job = JobModel.objects.filter(task_id=self.task_id).first()
|
||||
try:
|
||||
update_job(instance=self.job, status="Running")
|
||||
update_job(instance=job, status="Running")
|
||||
host_ips = []
|
||||
count = 0
|
||||
for script in self.resp_scripts:
|
||||
|
@ -26,10 +27,13 @@ class SshJob:
|
|||
ip = script.get("instance", None)
|
||||
cmd = script.get("cmd", None)
|
||||
if not ip or not cmd:
|
||||
update_job(instance=self.job, status="Fail", result="script result find not instance or cmd")
|
||||
update_job(instance=job, status="Fail", result="script result find not instance or cmd")
|
||||
break
|
||||
host_ips.append(ip)
|
||||
host = HostModel.objects.filter(ip=ip).first()
|
||||
if not host:
|
||||
update_job(instance=job, status="Fail", result="host not found by script return IP:%s" % ip)
|
||||
break
|
||||
ssh_cli = SSH(host.ip, host.port, host.username, host.private_key)
|
||||
with ssh_cli as ssh:
|
||||
status, result = ssh.exec_command(cmd)
|
||||
|
@ -39,10 +43,10 @@ class SshJob:
|
|||
if self.kwargs.get('service_name', None) == "node_delete":
|
||||
host.delete()
|
||||
if str(status) != '0':
|
||||
update_job(instance=self.job, status="Fail", result=result, host_by=host_ips)
|
||||
update_job(instance=job, status="Fail", result=result, host_by=host_ips)
|
||||
break
|
||||
if count == len(self.resp_scripts):
|
||||
params = self.job.params
|
||||
params = job.params
|
||||
if params:
|
||||
params = json.loads(params)
|
||||
service_name = params.get("service_name", None)
|
||||
|
@ -55,22 +59,22 @@ class SshJob:
|
|||
resp = subprocess.run([service_post_path, result], stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
if resp.returncode != 0:
|
||||
update_job(instance=self.job, status="Fail",
|
||||
update_job(instance=job, status="Fail",
|
||||
result=resp.stderr.decode('utf-8'))
|
||||
break
|
||||
stdout = resp.stdout
|
||||
result = stdout.decode('utf-8')
|
||||
except Exception as e:
|
||||
update_job(instance=self.job, status="Fail", result=str(e))
|
||||
update_job(instance=job, status="Fail", result=str(e))
|
||||
break
|
||||
update_job(instance=self.job, status="Success", result=result, host_by=host_ips)
|
||||
update_job(instance=job, status="Success", result=result, host_by=host_ips)
|
||||
except socket.timeout:
|
||||
update_job(instance=self.job, status="Fail", result="socket time out")
|
||||
update_job(instance=job, status="Fail", result="socket time out")
|
||||
except Exception as e:
|
||||
update_job(instance=self.job, status="Fail", result=str(e))
|
||||
update_job(instance=job, status="Fail", result=str(e))
|
||||
|
||||
|
||||
def update_job(instance: JobModel, **kwargs):
|
||||
def update_job(instance, **kwargs):
|
||||
try:
|
||||
instance.__dict__.update(**kwargs)
|
||||
instance.save()
|
||||
|
|
|
@ -19,7 +19,6 @@ const DiagExtra = (props) => {
|
|||
|
||||
export default (props) => {
|
||||
const [responsive, setResponsive] = useState(false);
|
||||
console.log("props.data", props.data)
|
||||
return (
|
||||
<RcResizeObserver
|
||||
key="resize-observer"
|
||||
|
|
Loading…
Reference in New Issue