diff --git a/internal/services/executor/executor.go b/internal/services/executor/executor.go index 08e910ad..a6e50f93 100644 --- a/internal/services/executor/executor.go +++ b/internal/services/executor/executor.go @@ -761,7 +761,8 @@ func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) { } rt := &runningTask{ - et: et, + et: et, + executing: true, } rt.Lock() @@ -771,7 +772,11 @@ func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) { return } - defer e.runningTasks.delete(et.ID) + defer func() { + rt.Lock() + rt.executing = false + rt.Unlock() + }() et.Status.Phase = types.ExecutorTaskPhaseRunning et.Status.StartTime = util.TimePtr(time.Now()) @@ -1032,7 +1037,7 @@ func (e *Executor) podsCleaner(ctx context.Context) error { func (e *Executor) executorStatusSenderLoop(ctx context.Context) { for { - log.Debugf("executorStatusSender") + log.Debugf("executorStatusSenderLoop") if err := e.sendExecutorStatus(ctx); err != nil { log.Errorf("err: %+v", err) @@ -1048,6 +1053,40 @@ func (e *Executor) executorStatusSenderLoop(ctx context.Context) { } } +func (e *Executor) executorTasksStatusSenderLoop(ctx context.Context) { + for { + log.Debugf("executorTasksStatusSenderLoop") + + for _, rtID := range e.runningTasks.ids() { + rt, ok := e.runningTasks.get(rtID) + if !ok { + continue + } + + rt.Lock() + if err := e.sendExecutorTaskStatus(ctx, rt.et); err != nil { + log.Errorf("err: %+v", err) + rt.Unlock() + continue + } + + // remove running task if send was successful and it's not executing + if !rt.executing { + e.runningTasks.delete(rtID) + } + rt.Unlock() + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(2 * time.Second) + } +} + func (e *Executor) tasksUpdaterLoop(ctx context.Context) { for { log.Debugf("tasksUpdater") @@ -1080,6 +1119,18 @@ func (e *Executor) tasksUpdater(ctx context.Context) error { go e.taskUpdater(ctx, et) } + // remove runningTasks not existing in the runservice + etIDsMap := map[string]struct{}{} + for _, et := range ets { + etIDsMap[et.ID] = struct{}{} + } + + for _, rtID := range e.runningTasks.ids() { + if _, ok := etIDsMap[rtID]; !ok { + e.runningTasks.delete(rtID) + } + } + return nil } @@ -1176,6 +1227,8 @@ type runningTask struct { et *types.ExecutorTask pod driver.Pod + + executing bool } func (r *runningTasks) get(rtID string) (*runningTask, bool) { @@ -1213,6 +1266,16 @@ func (r *runningTasks) len() int { return len(r.tasks) } +func (r *runningTasks) ids() []string { + ids := []string{} + r.m.Lock() + defer r.m.Unlock() + for id := range r.tasks { + ids = append(ids, id) + } + return ids +} + func (e *Executor) handleTasks(ctx context.Context, c <-chan *types.ExecutorTask) { for et := range c { go e.executeTask(ctx, et) @@ -1337,6 +1400,7 @@ func (e *Executor) Run(ctx context.Context) error { apirouter.Handle("/executor/archives", archivesHandler).Methods("GET") go e.executorStatusSenderLoop(ctx) + go e.executorTasksStatusSenderLoop(ctx) go e.podsCleanerLoop(ctx) go e.tasksUpdaterLoop(ctx) go e.tasksDataCleanerLoop(ctx)