categraf/ibex/tasks.go

123 lines
2.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//go:build !no_ibex
package ibex
import (
"log"
"flashcat.cloud/categraf/ibex/types"
)
type LocalTasksT struct {
M map[int64]*Task
}
var Locals = &LocalTasksT{M: make(map[int64]*Task)}
func (lt *LocalTasksT) ReportTasks() []types.ReportTask {
ret := make([]types.ReportTask, 0, len(lt.M))
for id, t := range lt.M {
rt := types.ReportTask{Id: id, Clock: t.Clock}
rt.Status = t.GetStatus()
if rt.Status == "running" || rt.Status == "killing" {
// intermediate state
continue
}
rt.Stdout = t.GetStdout()
rt.Stderr = t.GetStderr()
stdoutLen := len(rt.Stdout)
stderrLen := len(rt.Stderr)
// 输出太长的话,截断,要不然把数据库撑爆了
if stdoutLen > 65535 {
start := stdoutLen - 65535
rt.Stdout = rt.Stdout[start:]
}
if stderrLen > 65535 {
start := stderrLen - 65535
rt.Stderr = rt.Stderr[start:]
}
ret = append(ret, rt)
}
return ret
}
func (lt *LocalTasksT) GetTask(id int64) (*Task, bool) {
t, found := lt.M[id]
return t, found
}
func (lt *LocalTasksT) SetTask(t *Task) {
lt.M[t.Id] = t
}
func (lt *LocalTasksT) AssignTask(at types.AssignTask) {
local, found := lt.GetTask(at.Id)
if found {
if local.Clock == at.Clock && local.Action == at.Action {
// ignore repeat task
return
}
local.Clock = at.Clock
local.Action = at.Action
} else {
if at.Action == "kill" {
// no process in local, no need kill
return
}
local = &Task{
Id: at.Id,
Clock: at.Clock,
Action: at.Action,
}
lt.SetTask(local)
if local.doneBefore() {
local.loadResult()
return
}
}
if local.Action == "kill" {
local.SetStatus("killing")
local.kill()
} else if local.Action == "start" {
local.SetStatus("running")
local.start()
} else {
log.Printf("W! unknown action: %s of task %d", at.Action, at.Id)
}
}
func (lt *LocalTasksT) Clean(assigned map[int64]struct{}) {
del := make(map[int64]struct{})
for id := range lt.M {
if _, found := assigned[id]; !found {
del[id] = struct{}{}
}
}
for id := range del {
// 远端已经不关注这个任务了但是本地来看任务还是running的
// 可能是远端认为超时了,此时本地不能删除,仍然要继续上报
if lt.M[id].GetStatus() == "running" {
continue
}
lt.M[id].ResetBuff()
cmd := lt.M[id].Cmd
delete(lt.M, id)
if cmd != nil && cmd.Process != nil {
cmd.Process.Release()
}
}
}