categraf/logs/util/kubernetes/kubelet/podwatcher.go

233 lines
7.1 KiB
Go

//go:build !no_logs
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package kubelet
import (
"context"
"hash/fnv"
"log"
"sort"
"strconv"
"sync"
"time"
"flashcat.cloud/categraf/pkg/kubernetes"
)
const unreadinessTimeout = 30 * time.Second
// PodWatcher regularly pools the kubelet for new/changed/removed containers.
// It keeps an internal state to only send the updated pods.
type PodWatcher struct {
sync.Mutex
kubeUtil KubeUtilInterface
expiryDuration time.Duration
lastSeen map[string]time.Time
lastSeenReady map[string]time.Time
tagsDigest map[string]string
oldPhase map[string]string
oldReadiness map[string]bool
}
// NewPodWatcher creates a new watcher given an expiry duration
// and if the watcher should watch label/annotation changes on pods.
// User call must then trigger PullChanges and Expire when needed.
func NewPodWatcher(expiryDuration time.Duration, isWatchingTags bool) (*PodWatcher, error) {
kubeutil, err := GetKubeUtil()
if err != nil {
return nil, err
}
watcher := &PodWatcher{
kubeUtil: kubeutil,
lastSeen: make(map[string]time.Time),
lastSeenReady: make(map[string]time.Time),
oldReadiness: make(map[string]bool),
expiryDuration: expiryDuration,
}
if isWatchingTags {
watcher.tagsDigest = make(map[string]string)
watcher.oldPhase = make(map[string]string)
}
return watcher, nil
}
// isWatchingTags returns true if the pod watcher should
// watch for tag changes on pods
func (w *PodWatcher) isWatchingTags() bool {
return w.tagsDigest != nil
}
// PullChanges pulls a new podList from the kubelet and returns Pod objects for
// new / updated pods. Updated pods will be sent entirely, user must replace
// previous info for these pods.
func (w *PodWatcher) PullChanges(ctx context.Context) ([]*kubernetes.Pod, error) {
var podList []*kubernetes.Pod
podList, err := w.kubeUtil.GetLocalPodList(ctx)
if err != nil {
return podList, err
}
return w.computeChanges(podList)
}
// computeChanges is used by PullChanges, split for testing
func (w *PodWatcher) computeChanges(podList []*kubernetes.Pod) ([]*kubernetes.Pod, error) {
now := time.Now()
var updatedPods []*kubernetes.Pod
w.Lock()
defer w.Unlock()
for _, pod := range podList {
podEntity := PodUIDToEntityName(pod.Metadata.UID)
newPod := false
_, foundPod := w.lastSeen[podEntity]
if w.isWatchingTags() && !foundPod {
w.tagsDigest[podEntity] = digestPodMeta(pod.Metadata)
w.oldPhase[podEntity] = pod.Status.Phase
newPod = true
}
// static pods are included specifically because they won't have any container
// as they're not updated in the pod list after creation
if isPodStatic(pod) && !foundPod {
newPod = true
}
// Refresh last pod seen time
w.lastSeen[podEntity] = now
// Detect updated containers
updatedContainer := false
isPodReady := IsPodReady(pod)
for _, container := range pod.Status.GetAllContainers() {
// We don't check container readiness as init containers are never ready
// We check if the container has an ID instead (has run or is running)
if !container.IsPending() {
// new container are always sent ignoring the pod state
if _, found := w.lastSeen[container.ID]; !found {
updatedContainer = true
}
w.lastSeen[container.ID] = now
// for existing containers, check whether the
// readiness has changed since last time
if oldReadiness, found := w.oldReadiness[container.ID]; !found || oldReadiness != isPodReady {
// the pod has never been seen ready or was removed when
// reaching the unreadinessTimeout
updatedContainer = true
}
w.oldReadiness[container.ID] = isPodReady
// update the readiness expiry cache
if isPodReady {
w.lastSeenReady[container.ID] = now
}
}
}
// Detect changes in labels and annotations values
newLabelsOrAnnotations := false
// Detect changes in the pod phase
newPhase := false
if w.isWatchingTags() {
newTagsDigest := digestPodMeta(pod.Metadata)
if foundPod && newTagsDigest != w.tagsDigest[podEntity] {
// update tags digest
w.tagsDigest[podEntity] = newTagsDigest
newLabelsOrAnnotations = true
}
// compared to our last seen phase the pod phase has changed.
if foundPod && pod.Status.Phase != w.oldPhase[podEntity] {
w.oldPhase[podEntity] = pod.Status.Phase
newPhase = true
}
}
if newPod || updatedContainer || newLabelsOrAnnotations || newPhase {
updatedPods = append(updatedPods, pod)
}
}
log.Printf("Found %d changed pods out of %d", len(updatedPods), len(podList))
return updatedPods, nil
}
// Expire returns a list of entities (containers and pods)
// that are not listed in the podlist anymore. It must be called
// immediately after a PullChanges.
// For containers, string is kubernetes container ID (with runtime name)
// For pods, string is "kubernetes_pod://uid" format
func (w *PodWatcher) Expire() ([]string, error) {
now := time.Now()
w.Lock()
defer w.Unlock()
var expiredContainers []string
for id, lastSeen := range w.lastSeen {
// pod was removed from the pod list, we can safely cleanup everything
if now.Sub(lastSeen) > w.expiryDuration {
delete(w.lastSeen, id)
delete(w.lastSeenReady, id)
delete(w.oldReadiness, id)
if w.isWatchingTags() {
delete(w.tagsDigest, id)
delete(w.oldPhase, id)
}
expiredContainers = append(expiredContainers, id)
}
}
for id, lastSeenReady := range w.lastSeenReady {
// we keep pods gone unready for 25 seconds and then force removal
if now.Sub(lastSeenReady) > unreadinessTimeout {
delete(w.lastSeenReady, id)
expiredContainers = append(expiredContainers, id)
}
}
return expiredContainers, nil
}
// GetPodForEntityID finds the pod corresponding to an entity.
// EntityIDs can be Docker container IDs or pod UIDs (prefixed).
// Returns a nil pointer if not found.
func (w *PodWatcher) GetPodForEntityID(ctx context.Context, entityID string) (*kubernetes.Pod, error) {
return w.kubeUtil.GetPodForEntityID(ctx, entityID)
}
// digestPodMeta returns a unique hash of pod labels
// and annotations.
// it hashes labels then annotations and makes a single hash of both maps
func digestPodMeta(meta kubernetes.PodMetadata) string {
h := fnv.New64()
h.Write([]byte(digestMapValues(meta.Labels))) //nolint:errcheck
h.Write([]byte(digestMapValues(meta.Annotations))) //nolint:errcheck
return strconv.FormatUint(h.Sum64(), 16)
}
// digestMapValues returns a unique hash of map values
// used to track changes in labels and annotations values
// it takes into consideration the random keys order in a map
// by hashing the values after sorting the keys
func digestMapValues(m map[string]string) string {
if len(m) == 0 {
return ""
}
// to store the keys in slice in sorted order
var keys []string
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
h := fnv.New64()
for _, k := range keys {
h.Write([]byte(m[k])) //nolint:errcheck
}
return strconv.FormatUint(h.Sum64(), 16)
}