forked from flashcat/categraf
468 lines
15 KiB
Go
468 lines
15 KiB
Go
//go:build !no_logs
|
|
|
|
// Unless explicitly stated otherwise all files in this repository are licensed
|
|
// under the Apache License Version 2.0.
|
|
// This product includes software developed at Datadog (https://www.datadoghq.com/).
|
|
// Copyright 2016-present Datadog, Inc.
|
|
|
|
package kubelet
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
coreconfig "flashcat.cloud/categraf/config"
|
|
"flashcat.cloud/categraf/logs/errors"
|
|
"flashcat.cloud/categraf/logs/util/containers"
|
|
"flashcat.cloud/categraf/logs/util/containers/providers"
|
|
"flashcat.cloud/categraf/pkg/cache"
|
|
"flashcat.cloud/categraf/pkg/kubernetes"
|
|
"flashcat.cloud/categraf/pkg/retry"
|
|
)
|
|
|
|
const (
|
|
kubeletPodPath = "/pods"
|
|
kubeletMetricsPath = "/metrics"
|
|
authorizationHeaderKey = "Authorization"
|
|
podListCacheKey = "KubeletPodListCacheKey"
|
|
unreadyAnnotation = "ad.datadoghq.com/tolerate-unready"
|
|
configSourceAnnotation = "kubernetes.io/config.source"
|
|
)
|
|
|
|
var (
|
|
globalKubeUtil *KubeUtil
|
|
globalKubeUtilMutex sync.Mutex
|
|
)
|
|
|
|
// KubeUtil is a struct to hold the kubelet api url
|
|
// Instantiate with GetKubeUtil
|
|
type KubeUtil struct {
|
|
// used to setup the KubeUtil
|
|
initRetry retry.Retrier
|
|
|
|
kubeletClient *kubeletClient
|
|
rawConnectionInfo map[string]string // kept to pass to the python kubelet check
|
|
podListCacheDuration time.Duration
|
|
filter *containers.Filter
|
|
waitOnMissingContainer time.Duration
|
|
podUnmarshaller *podUnmarshaller
|
|
}
|
|
|
|
func (ku *KubeUtil) init() error {
|
|
var err error
|
|
ku.filter, err = containers.GetSharedMetricFilter()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ku.kubeletClient, err = getKubeletClient(context.Background())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ku.rawConnectionInfo["url"] = ku.kubeletClient.kubeletURL
|
|
if ku.kubeletClient.config.scheme == "https" {
|
|
ku.rawConnectionInfo["verify_tls"] = fmt.Sprintf("%v", ku.kubeletClient.config.tlsVerify)
|
|
if ku.kubeletClient.config.caPath != "" {
|
|
ku.rawConnectionInfo["ca_cert"] = ku.kubeletClient.config.caPath
|
|
}
|
|
if ku.kubeletClient.config.clientCertPath != "" && ku.kubeletClient.config.clientKeyPath != "" {
|
|
ku.rawConnectionInfo["client_crt"] = ku.kubeletClient.config.clientCertPath
|
|
ku.rawConnectionInfo["client_key"] = ku.kubeletClient.config.clientKeyPath
|
|
}
|
|
if ku.kubeletClient.config.token != "" {
|
|
ku.rawConnectionInfo["token"] = ku.kubeletClient.config.token
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func NewKubeUtil() *KubeUtil {
|
|
ku := &KubeUtil{
|
|
rawConnectionInfo: make(map[string]string),
|
|
podListCacheDuration: 5 * time.Second,
|
|
podUnmarshaller: newPodUnmarshaller(),
|
|
}
|
|
|
|
waitOnMissingContainer := 0
|
|
if waitOnMissingContainer > 0 {
|
|
ku.waitOnMissingContainer = time.Duration(waitOnMissingContainer) * time.Second
|
|
}
|
|
|
|
return ku
|
|
}
|
|
|
|
// ResetGlobalKubeUtil is a helper to remove the current KubeUtil global
|
|
// It is ONLY to be used for tests
|
|
func ResetGlobalKubeUtil() {
|
|
globalKubeUtilMutex.Lock()
|
|
defer globalKubeUtilMutex.Unlock()
|
|
globalKubeUtil = nil
|
|
}
|
|
|
|
// ResetCache deletes existing kubeutil related cache
|
|
func ResetCache() {
|
|
cache.Cache.Delete(podListCacheKey)
|
|
}
|
|
|
|
// GetKubeUtilWithRetrier returns an instance of KubeUtil or a retrier
|
|
func GetKubeUtilWithRetrier() (KubeUtilInterface, *retry.Retrier) {
|
|
globalKubeUtilMutex.Lock()
|
|
defer globalKubeUtilMutex.Unlock()
|
|
if globalKubeUtil == nil {
|
|
globalKubeUtil = NewKubeUtil()
|
|
globalKubeUtil.initRetry.SetupRetrier(&retry.Config{ //nolint:errcheck
|
|
Name: "kubeutil",
|
|
AttemptMethod: globalKubeUtil.init,
|
|
Strategy: retry.Backoff,
|
|
InitialRetryDelay: 1 * time.Second,
|
|
MaxRetryDelay: 5 * time.Minute,
|
|
})
|
|
}
|
|
err := globalKubeUtil.initRetry.TriggerRetry()
|
|
if err != nil {
|
|
log.Printf("Kube util init error: %s", err)
|
|
return nil, &globalKubeUtil.initRetry
|
|
}
|
|
return globalKubeUtil, nil
|
|
}
|
|
|
|
// GetKubeUtil returns an instance of KubeUtil.
|
|
func GetKubeUtil() (KubeUtilInterface, error) {
|
|
util, retrier := GetKubeUtilWithRetrier()
|
|
if retrier != nil {
|
|
return nil, retrier.LastError()
|
|
}
|
|
return util, nil
|
|
}
|
|
|
|
// GetNodeInfo returns the IP address and the hostname of the first valid pod in the PodList
|
|
func (ku *KubeUtil) GetNodeInfo(ctx context.Context) (string, string, error) {
|
|
pods, err := ku.GetLocalPodList(ctx)
|
|
if err != nil {
|
|
return "", "", fmt.Errorf("error getting pod list from kubelet: %s", err)
|
|
}
|
|
|
|
for _, pod := range pods {
|
|
if pod.Status.HostIP == "" || pod.Spec.NodeName == "" {
|
|
continue
|
|
}
|
|
return pod.Status.HostIP, pod.Spec.NodeName, nil
|
|
}
|
|
|
|
return "", "", fmt.Errorf("failed to get node info, pod list length: %d", len(pods))
|
|
}
|
|
|
|
// GetNodename returns the nodename of the first pod.spec.nodeName in the PodList
|
|
func (ku *KubeUtil) GetNodename(ctx context.Context) (string, error) {
|
|
pods, err := ku.GetLocalPodList(ctx)
|
|
if err != nil {
|
|
return "", fmt.Errorf("error getting pod list from kubelet: %s", err)
|
|
}
|
|
|
|
for _, pod := range pods {
|
|
if pod.Spec.NodeName == "" {
|
|
continue
|
|
}
|
|
return pod.Spec.NodeName, nil
|
|
}
|
|
|
|
return "", fmt.Errorf("failed to get the kubernetes nodename, pod list length: %d", len(pods))
|
|
}
|
|
|
|
// GetLocalPodList returns the list of pods running on the node.
|
|
// If kubernetes_pod_expiration_duration is set, old exited pods
|
|
// will be filtered out to keep the podlist size down: see json.go
|
|
func (ku *KubeUtil) GetLocalPodList(ctx context.Context) ([]*kubernetes.Pod, error) {
|
|
var ok bool
|
|
pods := kubernetes.PodList{}
|
|
|
|
if cached, hit := cache.Cache.Get(podListCacheKey); hit {
|
|
pods, ok = cached.(kubernetes.PodList)
|
|
if !ok {
|
|
log.Printf("Invalid pod list cache format, forcing a cache miss")
|
|
} else {
|
|
return pods.Items, nil
|
|
}
|
|
}
|
|
|
|
data, code, err := ku.QueryKubelet(ctx, kubeletPodPath)
|
|
if err != nil {
|
|
return nil, errors.NewRetriable("podlist", fmt.Errorf("error performing kubelet query %s%s: %w", ku.kubeletClient.kubeletURL, kubeletPodPath, err))
|
|
}
|
|
if code != http.StatusOK {
|
|
return nil, errors.NewRetriable("podlist", fmt.Errorf("unexpected status code %d on %s%s: %s", code, ku.kubeletClient.kubeletURL, kubeletPodPath, string(data)))
|
|
}
|
|
|
|
err = ku.podUnmarshaller.unmarshal(data, &pods)
|
|
if err != nil {
|
|
return nil, errors.NewRetriable("podlist", fmt.Errorf("unable to unmarshal podlist, invalid or null: %w", err))
|
|
}
|
|
|
|
// ensure we dont have nil pods
|
|
tmpSlice := make([]*kubernetes.Pod, 0, len(pods.Items))
|
|
for _, pod := range pods.Items {
|
|
if pod != nil {
|
|
allContainers := make([]kubernetes.ContainerStatus, 0, len(pod.Status.InitContainers)+len(pod.Status.Containers))
|
|
allContainers = append(allContainers, pod.Status.InitContainers...)
|
|
allContainers = append(allContainers, pod.Status.Containers...)
|
|
pod.Status.AllContainers = allContainers
|
|
if !ku.filterPod(pod) {
|
|
if coreconfig.Config.DebugMode {
|
|
log.Printf("D! filter include, pod name: %s, pod namespace: %s. pod image:[%v]", pod.Metadata.Name, pod.Metadata.Namespace, pod.Spec.Containers)
|
|
}
|
|
tmpSlice = append(tmpSlice, pod)
|
|
}
|
|
}
|
|
}
|
|
pods.Items = tmpSlice
|
|
|
|
// cache the podList to reduce pressure on the kubelet
|
|
cache.Cache.Set(podListCacheKey, pods, ku.podListCacheDuration)
|
|
|
|
return pods.Items, nil
|
|
}
|
|
|
|
func (ku *KubeUtil) filterPod(pod *kubernetes.Pod) bool {
|
|
for _, c := range pod.Status.GetAllContainers() {
|
|
if ku.filter.IsExcluded(c.Name, c.Image, pod.Metadata.Namespace) {
|
|
if coreconfig.Config.DebugMode {
|
|
log.Printf("D! container name:%s image:%s, ns:%s, exclude:true", c.Name, c.Image, pod.Metadata.Namespace)
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ForceGetLocalPodList reset podList cache and call GetLocalPodList
|
|
func (ku *KubeUtil) ForceGetLocalPodList(ctx context.Context) ([]*kubernetes.Pod, error) {
|
|
ResetCache()
|
|
return ku.GetLocalPodList(ctx)
|
|
}
|
|
|
|
// GetPodForContainerID fetches the podList and returns the pod running
|
|
// a given container on the node. Reset the cache if needed.
|
|
// Returns a nil pointer if not found.
|
|
func (ku *KubeUtil) GetPodForContainerID(ctx context.Context, containerID string) (*kubernetes.Pod, error) {
|
|
// Best case scenario
|
|
pods, err := ku.GetLocalPodList(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pod, err := ku.searchPodForContainerID(pods, containerID)
|
|
if err == nil {
|
|
return pod, nil
|
|
}
|
|
|
|
// Retry with cache invalidation
|
|
if err != nil && errors.IsNotFound(err) {
|
|
log.Printf("Cannot get container %q: %s, retrying without cache...", containerID, err)
|
|
pods, err = ku.ForceGetLocalPodList(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pod, err = ku.searchPodForContainerID(pods, containerID)
|
|
if err == nil {
|
|
return pod, nil
|
|
}
|
|
}
|
|
|
|
// On some kubelet versions, containers can take up to a second to
|
|
// register in the podlist, retry a few times before failing
|
|
if ku.waitOnMissingContainer == 0 {
|
|
log.Printf("Still cannot get container %q, wait disabled", containerID)
|
|
return pod, err
|
|
}
|
|
timeout := time.NewTimer(ku.waitOnMissingContainer)
|
|
defer timeout.Stop()
|
|
retryTicker := time.NewTicker(250 * time.Millisecond)
|
|
defer retryTicker.Stop()
|
|
for {
|
|
log.Printf("Still cannot get container %q: %s, retrying in 250ms", containerID, err)
|
|
select {
|
|
case <-retryTicker.C:
|
|
pods, err = ku.ForceGetLocalPodList(ctx)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
pod, err = ku.searchPodForContainerID(pods, containerID)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
return pod, nil
|
|
case <-timeout.C:
|
|
// Return the latest error on timeout
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ku *KubeUtil) searchPodForContainerID(podList []*kubernetes.Pod, containerID string) (*kubernetes.Pod, error) {
|
|
if containerID == "" {
|
|
return nil, fmt.Errorf("containerID is empty")
|
|
}
|
|
|
|
// We will match only on the id itself, without runtime identifier, it should be quite unlikely on a Kube node
|
|
// to have a container in the runtime used by Kube to match a container in another runtime...
|
|
if containers.IsEntityName(containerID) {
|
|
containerID = containers.ContainerIDForEntity(containerID)
|
|
}
|
|
|
|
for _, pod := range podList {
|
|
for _, container := range pod.Status.GetAllContainers() {
|
|
if container.ID != "" && containers.ContainerIDForEntity(container.ID) == containerID {
|
|
return pod, nil
|
|
}
|
|
}
|
|
}
|
|
return nil, errors.NewNotFound(fmt.Sprintf("container %s in PodList", containerID))
|
|
}
|
|
|
|
// GetStatusForContainerID returns the container status from the pod given an ID
|
|
func (ku *KubeUtil) GetStatusForContainerID(pod *kubernetes.Pod, containerID string) (kubernetes.ContainerStatus, error) {
|
|
for _, container := range pod.Status.GetAllContainers() {
|
|
if containerID == container.ID {
|
|
return container, nil
|
|
}
|
|
}
|
|
return kubernetes.ContainerStatus{}, errors.NewNotFound(fmt.Sprintf("container %s in pod", containerID))
|
|
}
|
|
|
|
// GetSpecForContainerName returns the container spec from the pod given a name
|
|
// It searches spec.containers then spec.initContainers
|
|
func (ku *KubeUtil) GetSpecForContainerName(pod *kubernetes.Pod, containerName string) (kubernetes.ContainerSpec, error) {
|
|
for _, containerSpec := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
|
|
if containerName == containerSpec.Name {
|
|
return containerSpec, nil
|
|
}
|
|
}
|
|
return kubernetes.ContainerSpec{}, errors.NewNotFound(fmt.Sprintf("container %s in pod", containerName))
|
|
}
|
|
|
|
func (ku *KubeUtil) GetPodFromUID(ctx context.Context, podUID string) (*kubernetes.Pod, error) {
|
|
if podUID == "" {
|
|
return nil, fmt.Errorf("pod UID is empty")
|
|
}
|
|
pods, err := ku.GetLocalPodList(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, pod := range pods {
|
|
if pod.Metadata.UID == podUID {
|
|
return pod, nil
|
|
}
|
|
}
|
|
log.Printf("cannot get the pod uid %q: %s, retrying without cache...", podUID, err)
|
|
|
|
pods, err = ku.ForceGetLocalPodList(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, pod := range pods {
|
|
if pod.Metadata.UID == podUID {
|
|
return pod, nil
|
|
}
|
|
}
|
|
return nil, errors.NewNotFound(fmt.Sprintf("pod %s in podlist", podUID))
|
|
}
|
|
|
|
// GetPodForEntityID returns a pointer to the pod that corresponds to an entity ID.
|
|
// If the pod is not found it returns nil and an error.
|
|
func (ku *KubeUtil) GetPodForEntityID(ctx context.Context, entityID string) (*kubernetes.Pod, error) {
|
|
if strings.HasPrefix(entityID, KubePodPrefix) {
|
|
uid := strings.TrimPrefix(entityID, KubePodPrefix)
|
|
return ku.GetPodFromUID(ctx, uid)
|
|
}
|
|
return ku.GetPodForContainerID(ctx, entityID)
|
|
}
|
|
|
|
// QueryKubelet allows to query the KubeUtil registered kubelet API on the parameter path
|
|
// path commonly used are /healthz, /pods, /metrics
|
|
// return the content of the response, the response HTTP status code and an error in case of
|
|
func (ku *KubeUtil) QueryKubelet(ctx context.Context, path string) ([]byte, int, error) {
|
|
return ku.kubeletClient.query(ctx, path)
|
|
}
|
|
|
|
// GetKubeletAPIEndpoint returns the current endpoint used to perform QueryKubelet
|
|
func (ku *KubeUtil) GetKubeletAPIEndpoint() string {
|
|
return ku.kubeletClient.kubeletURL
|
|
}
|
|
|
|
// GetRawConnectionInfo returns a map containging the url and credentials to connect to the kubelet
|
|
// Possible map entries:
|
|
// - url: full url with scheme (required)
|
|
// - verify_tls: "true" or "false" string
|
|
// - ca_cert: path to the kubelet CA cert if set
|
|
// - token: content of the bearer token if set
|
|
// - client_crt: path to the client cert if set
|
|
// - client_key: path to the client key if set
|
|
func (ku *KubeUtil) GetRawConnectionInfo() map[string]string {
|
|
return ku.rawConnectionInfo
|
|
}
|
|
|
|
// GetRawMetrics returns the raw kubelet metrics payload
|
|
func (ku *KubeUtil) GetRawMetrics(ctx context.Context) ([]byte, error) {
|
|
data, code, err := ku.QueryKubelet(ctx, kubeletMetricsPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error performing kubelet query %s%s: %s", ku.kubeletClient.kubeletURL, kubeletMetricsPath, err)
|
|
}
|
|
if code != http.StatusOK {
|
|
return nil, fmt.Errorf("unexpected status code %d on %s%s: %s", code, ku.kubeletClient.kubeletURL, kubeletMetricsPath, string(data))
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// IsAgentHostNetwork returns whether the agent is running inside a container with `hostNetwork` or not
|
|
func (ku *KubeUtil) IsAgentHostNetwork(ctx context.Context) (bool, error) {
|
|
cid, err := providers.ContainerImpl().GetAgentCID()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
pod, err := ku.GetPodForContainerID(ctx, cid)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return pod.Spec.HostNetwork, nil
|
|
}
|
|
|
|
// IsPodReady return a bool if the Pod is ready
|
|
func IsPodReady(pod *kubernetes.Pod) bool {
|
|
// static pods are always reported as Pending, so we make an exception there
|
|
if pod.Status.Phase == "Pending" && isPodStatic(pod) {
|
|
return true
|
|
}
|
|
|
|
if pod.Status.Phase != "Running" {
|
|
return false
|
|
}
|
|
|
|
if tolerate, ok := pod.Metadata.Annotations[unreadyAnnotation]; ok && tolerate == "true" {
|
|
return true
|
|
}
|
|
for _, status := range pod.Status.Conditions {
|
|
if status.Type == "Ready" && status.Status == "True" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// isPodStatic identifies whether a pod is static or not based on an annotation
|
|
// Static pods can be sent to the kubelet from files or an http endpoint.
|
|
func isPodStatic(pod *kubernetes.Pod) bool {
|
|
if source, ok := pod.Metadata.Annotations[configSourceAnnotation]; ok == true && (source == "file" || source == "http") {
|
|
return len(pod.Status.Containers) == 0
|
|
}
|
|
return false
|
|
}
|