forked from flashcat/categraf
104 lines
2.8 KiB
Go
104 lines
2.8 KiB
Go
//go:build !no_logs
|
|
|
|
// Unless explicitly stated otherwise all files in this repository are licensed
|
|
// under the Apache License Version 2.0.
|
|
// This product includes software developed at Datadog (https://www.datadoghq.com/).
|
|
// Copyright 2016-present Datadog, Inc.
|
|
|
|
package kubelet
|
|
|
|
import (
|
|
"time"
|
|
"unsafe"
|
|
|
|
jsoniter "github.com/json-iterator/go"
|
|
|
|
"flashcat.cloud/categraf/pkg/kubernetes"
|
|
)
|
|
|
|
// jsoniterConfig mirrors jsoniter.ConfigFastest
|
|
var jsonConfig = jsoniter.Config{
|
|
EscapeHTML: false,
|
|
MarshalFloatWith6Digits: true,
|
|
ObjectFieldMustBeSimpleString: true,
|
|
}
|
|
|
|
// podUnmarshaller handles unmarshalling and filtering the podlist contents
|
|
// according to the kubernetes_pod_expiration_duration setting. It uses jsoniter
|
|
// under the hood, with a custom decoder.
|
|
type podUnmarshaller struct {
|
|
jsonConfig jsoniter.API
|
|
podExpirationDuration time.Duration
|
|
timeNowFunction func() time.Time // Allows to mock time in tests
|
|
}
|
|
|
|
func newPodUnmarshaller() *podUnmarshaller {
|
|
pu := &podUnmarshaller{
|
|
podExpirationDuration: 15 * 60 * time.Second, // ("kubernetes_pod_expiration_duration") * time.Second,
|
|
timeNowFunction: time.Now,
|
|
}
|
|
|
|
if pu.podExpirationDuration > 0 {
|
|
jsoniter.RegisterTypeDecoderFunc("kubelet.PodList", pu.filteringDecoder)
|
|
} else {
|
|
// Force-unregister for unit tests to pick up the right state
|
|
jsoniter.RegisterTypeDecoder("kubelet.PodList", nil)
|
|
}
|
|
|
|
// Build a new frozen config to invalidate type decoder cache
|
|
pu.jsonConfig = jsonConfig.Froze()
|
|
|
|
return pu
|
|
}
|
|
|
|
// unmarshal is a drop-in replacement for json.Unmarshall
|
|
func (pu *podUnmarshaller) unmarshal(data []byte, v interface{}) error {
|
|
return pu.jsonConfig.Unmarshal(data, v)
|
|
}
|
|
|
|
func (pu *podUnmarshaller) filteringDecoder(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
|
|
p := (*kubernetes.PodList)(ptr)
|
|
cutoffTime := pu.timeNowFunction().Add(-1 * pu.podExpirationDuration)
|
|
|
|
podCallback := func(iter *jsoniter.Iterator) bool {
|
|
pod := &kubernetes.Pod{}
|
|
iter.ReadVal(pod)
|
|
|
|
// Quick exit for running/pending containers
|
|
if pod.Status.Phase == "Running" || pod.Status.Phase == "Pending" {
|
|
p.Items = append(p.Items, pod)
|
|
return true
|
|
}
|
|
|
|
// Only keep terminated pods where at least one container
|
|
// terminated after the cutoffTime
|
|
expired := true
|
|
for _, ctr := range pod.Status.Containers {
|
|
if ctr.State.Terminated == nil ||
|
|
ctr.State.Terminated.FinishedAt.IsZero() ||
|
|
ctr.State.Terminated.FinishedAt.After(cutoffTime) {
|
|
expired = false
|
|
break
|
|
}
|
|
}
|
|
|
|
if !expired {
|
|
p.Items = append(p.Items, pod)
|
|
}
|
|
return true
|
|
}
|
|
|
|
iter.ReadObjectCB(func(iter *jsoniter.Iterator, field string) bool {
|
|
if field == "items" {
|
|
// consider null pod list as an error
|
|
if iter.WhatIsNext() == jsoniter.NilValue {
|
|
return false
|
|
}
|
|
iter.ReadArrayCB(podCallback)
|
|
} else {
|
|
iter.Skip()
|
|
}
|
|
return true
|
|
})
|
|
}
|