forked from flashcat/categraf
313 lines
10 KiB
Go
313 lines
10 KiB
Go
//go:build !no_logs
|
|
|
|
// Unless explicitly stated otherwise all files in this repository are licensed
|
|
// under the Apache License Version 2.0.
|
|
// This product includes software developed at Datadog (https://www.datadoghq.com/).
|
|
// Copyright 2016-present Datadog, Inc.
|
|
|
|
package kubelet
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"errors"
|
|
"expvar"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
coreconfig "flashcat.cloud/categraf/config"
|
|
"flashcat.cloud/categraf/logs/util/kubernetes"
|
|
)
|
|
|
|
var (
|
|
kubeletExpVar = expvar.NewInt("kubeletQueries")
|
|
)
|
|
|
|
type kubeletClientConfig struct {
|
|
scheme string
|
|
baseURL string
|
|
tlsVerify bool
|
|
caPath string
|
|
clientCertPath string
|
|
clientKeyPath string
|
|
token string
|
|
}
|
|
|
|
type kubeletClient struct {
|
|
client http.Client
|
|
kubeletURL string
|
|
headers http.Header
|
|
config kubeletClientConfig
|
|
}
|
|
|
|
func newForConfig(config kubeletClientConfig, timeout time.Duration) (*kubeletClient, error) {
|
|
var err error
|
|
|
|
// Building transport based on options
|
|
customTransport := http.DefaultTransport.(*http.Transport).Clone()
|
|
|
|
// Building custom TLS config
|
|
tlsConfig := &tls.Config{}
|
|
tlsConfig.InsecureSkipVerify = !config.tlsVerify
|
|
|
|
if config.caPath == "" && FileExists(kubernetes.DefaultServiceAccountCAPath) {
|
|
config.caPath = kubernetes.DefaultServiceAccountCAPath
|
|
}
|
|
|
|
if config.caPath != "" {
|
|
tlsConfig.RootCAs, err = kubernetes.GetCertificateAuthority(config.caPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if config.clientCertPath != "" && config.clientKeyPath != "" {
|
|
tlsConfig.Certificates, err = kubernetes.GetCertificates(config.clientCertPath, config.clientKeyPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
customTransport.TLSClientConfig = tlsConfig
|
|
|
|
// Do not use token in plain text
|
|
headers := http.Header{}
|
|
if config.scheme == "https" {
|
|
if config.token != "" {
|
|
headers.Set(authorizationHeaderKey, fmt.Sprintf("bearer %s", config.token))
|
|
}
|
|
}
|
|
|
|
// Defaulting timeout
|
|
if timeout == 0 {
|
|
timeout = 30 * time.Second
|
|
}
|
|
|
|
return &kubeletClient{
|
|
client: http.Client{
|
|
Transport: customTransport,
|
|
Timeout: timeout,
|
|
},
|
|
kubeletURL: fmt.Sprintf("%s://%s", config.scheme, config.baseURL),
|
|
config: config,
|
|
headers: headers,
|
|
}, nil
|
|
}
|
|
|
|
func (kc *kubeletClient) checkConnection(ctx context.Context) error {
|
|
_, statusCode, err := kc.query(ctx, "/spec")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if statusCode == http.StatusUnauthorized {
|
|
return fmt.Errorf("unauthorized to request test kubelet endpoint (/spec) - token used: %t", kc.headers.Get("Authorization") != "")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (kc *kubeletClient) query(ctx context.Context, path string) ([]byte, int, error) {
|
|
req, err := http.NewRequestWithContext(ctx, "GET",
|
|
fmt.Sprintf("%s%s", kc.kubeletURL, path), nil)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("Failed to create new request: %w", err)
|
|
}
|
|
req.Header = kc.headers
|
|
|
|
response, err := kc.client.Do(req)
|
|
kubeletExpVar.Add(1)
|
|
if err != nil {
|
|
log.Printf("Cannot request %s: %s", req.URL.String(), err)
|
|
return nil, 0, err
|
|
}
|
|
defer response.Body.Close()
|
|
|
|
b, err := ioutil.ReadAll(response.Body)
|
|
if err != nil {
|
|
log.Printf("Fail to read request %s body: %s", req.URL.String(), err)
|
|
return nil, 0, err
|
|
}
|
|
|
|
log.Printf("Successfully queried %s, status code: %d, body len: %d", req.URL.String(), response.StatusCode, len(b))
|
|
return b, response.StatusCode, nil
|
|
}
|
|
|
|
func getKubeletClient(ctx context.Context) (*kubeletClient, error) {
|
|
var err error
|
|
|
|
kubeletTimeout := 30 * time.Second
|
|
kubeletProxyEnabled := false // ("eks_fargate")
|
|
kubeletHost := "127.0.0.1" // ("kubernetes_kubelet_host")
|
|
kubeletHTTPSPort := coreconfig.Config.Logs.KubeletHTTPSPort // ("kubernetes_https_kubelet_port")
|
|
kubeletHTTPPort := coreconfig.Config.Logs.KubeletHTTPPort // ("kubernetes_http_kubelet_port")
|
|
kubeletTLSVerify := false
|
|
kubeletCAPath := coreconfig.Config.Logs.KubeletCAPath // ("kubelet_client_ca")
|
|
kubeletTokenPath := coreconfig.Config.Logs.KubeletTokenPath // ("kubelet_auth_token_path")
|
|
kubeletClientCertPath := "" // ("kubelet_client_crt")
|
|
kubeletClientKeyPath := "" // ("kubelet_client_key")
|
|
kubeletNodeName := "" // ("kubernetes_kubelet_nodename")
|
|
var kubeletPathPrefix string
|
|
var kubeletToken string
|
|
|
|
// For some reason, token is not given as a path to Python part, so we need to read it here
|
|
if kubeletTokenPath == "" && FileExists(kubernetes.DefaultServiceAccountTokenPath) {
|
|
kubeletTokenPath = kubernetes.DefaultServiceAccountTokenPath
|
|
}
|
|
|
|
if kubeletTokenPath != "" {
|
|
kubeletToken, err = kubernetes.GetBearerToken(kubeletTokenPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kubelet token defined (%s) but unable to read, err: %w", kubeletTokenPath, err)
|
|
}
|
|
}
|
|
if coreconfig.Config.Logs.KubeletHTTPSPort == 0 {
|
|
kubeletHTTPSPort = 10250
|
|
}
|
|
|
|
if coreconfig.Config.Logs.KubeletHTTPPort == 0 {
|
|
kubeletHTTPPort = 10255
|
|
}
|
|
|
|
if kubeletTokenPath == "" {
|
|
kubeletTokenPath = kubernetes.DefaultServiceAccountTokenPath
|
|
}
|
|
|
|
if kubeletCAPath == "" {
|
|
kubeletCAPath = kubernetes.DefaultServiceAccountCAPath
|
|
}
|
|
|
|
clientConfig := kubeletClientConfig{
|
|
tlsVerify: kubeletTLSVerify,
|
|
caPath: kubeletCAPath,
|
|
clientCertPath: kubeletClientCertPath,
|
|
clientKeyPath: kubeletClientKeyPath,
|
|
token: kubeletToken,
|
|
}
|
|
|
|
// Kubelet is unavailable, proxying calls through the APIServer (for instance EKS Fargate)
|
|
var potentialHosts *connectionInfo
|
|
if kubeletProxyEnabled {
|
|
// Explicitly disable HTTP to reach APIServer
|
|
kubeletHTTPPort = 0
|
|
httpsPort, err := strconv.ParseInt(os.Getenv("KUBERNETES_SERVICE_PORT"), 10, 64)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to get APIServer port: %w", err)
|
|
}
|
|
kubeletHTTPSPort = int(httpsPort)
|
|
|
|
if kubeletHost != "" {
|
|
kubeletPathPrefix = fmt.Sprintf("/api/v1/nodes/%s/proxy", kubeletNodeName)
|
|
apiServerHost := os.Getenv("KUBERNETES_SERVICE_HOST")
|
|
|
|
potentialHosts = &connectionInfo{
|
|
hostnames: []string{apiServerHost},
|
|
}
|
|
log.Printf("EKS on Fargate mode detected, will proxy calls to the Kubelet through the APIServer at %s:%d%s", apiServerHost, kubeletHTTPSPort, kubeletPathPrefix)
|
|
} else {
|
|
return nil, errors.New("kubelet proxy mode enabled but nodename is empty - unable to query")
|
|
}
|
|
} else {
|
|
// Building a list of potential ips/hostnames to reach Kubelet
|
|
potentialHosts = getPotentialKubeletHosts(kubeletHost)
|
|
}
|
|
|
|
// Checking HTTPS first if port available
|
|
var httpsErr error
|
|
if kubeletHTTPSPort > 0 {
|
|
httpsErr = checkKubeletConnection(ctx, "https", kubeletHTTPSPort, kubeletPathPrefix, potentialHosts, &clientConfig)
|
|
if httpsErr != nil {
|
|
log.Println("Impossible to reach Kubelet through HTTPS")
|
|
if kubeletHTTPPort <= 0 {
|
|
return nil, httpsErr
|
|
}
|
|
} else {
|
|
return newForConfig(clientConfig, kubeletTimeout)
|
|
}
|
|
}
|
|
|
|
// Check HTTP now if port available
|
|
var httpErr error
|
|
if kubeletHTTPPort > 0 {
|
|
httpErr = checkKubeletConnection(ctx, "http", kubeletHTTPPort, kubeletPathPrefix, potentialHosts, &clientConfig)
|
|
if httpErr != nil {
|
|
log.Println("Impossible to reach Kubelet through HTTP")
|
|
return nil, fmt.Errorf("impossible to reach Kubelet with host: %s. Please check if your setup requires kubelet_tls_verify = false. Activate debug logs to see all attempts made", kubeletHost)
|
|
}
|
|
|
|
if httpsErr != nil {
|
|
log.Println("Unable to access Kubelet through HTTPS - Using HTTP connection instead. Please check if your setup requires kubelet_tls_verify = false")
|
|
}
|
|
|
|
return newForConfig(clientConfig, kubeletTimeout)
|
|
}
|
|
|
|
return nil, fmt.Errorf("Invalid Kubelet configuration: both HTTPS and HTTP ports are disabled")
|
|
}
|
|
|
|
func checkKubeletConnection(ctx context.Context, scheme string, port int, prefix string, hosts *connectionInfo, clientConfig *kubeletClientConfig) error {
|
|
var err error
|
|
var kubeClient *kubeletClient
|
|
|
|
log.Printf("Trying to reach Kubelet with scheme: %s", scheme)
|
|
clientConfig.scheme = scheme
|
|
|
|
for _, ip := range hosts.ips {
|
|
clientConfig.baseURL = fmt.Sprintf("%s:%d", ip, port)
|
|
|
|
log.Printf("Trying to reach Kubelet at: %s", clientConfig.baseURL)
|
|
kubeClient, err = newForConfig(*clientConfig, time.Second)
|
|
if err != nil {
|
|
log.Printf("Failed to create Kubelet client for host: %s - error: %v", clientConfig.baseURL, err)
|
|
continue
|
|
}
|
|
|
|
err = kubeClient.checkConnection(ctx)
|
|
if err != nil {
|
|
logConnectionError(clientConfig, err)
|
|
continue
|
|
}
|
|
|
|
log.Printf("Successful configuration found for Kubelet, using URL: %s", kubeClient.kubeletURL)
|
|
return nil
|
|
}
|
|
|
|
for _, host := range hosts.hostnames {
|
|
clientConfig.baseURL = fmt.Sprintf("%s:%d%s", host, port, prefix)
|
|
|
|
log.Printf("Trying to reach Kubelet at: %s", clientConfig.baseURL)
|
|
kubeClient, err = newForConfig(*clientConfig, time.Second)
|
|
if err != nil {
|
|
log.Printf("Failed to create Kubelet client for host: %s - error: %v", clientConfig.baseURL, err)
|
|
continue
|
|
}
|
|
|
|
err = kubeClient.checkConnection(ctx)
|
|
if err != nil {
|
|
logConnectionError(clientConfig, err)
|
|
continue
|
|
}
|
|
|
|
log.Printf("Successful configuration found for Kubelet, using URL: %s", kubeClient.kubeletURL)
|
|
return nil
|
|
}
|
|
|
|
return errors.New("Kubelet connection check failed")
|
|
}
|
|
|
|
func logConnectionError(clientConfig *kubeletClientConfig, err error) {
|
|
switch {
|
|
case strings.Contains(err.Error(), "x509: certificate is valid for"):
|
|
log.Printf(`Invalid x509 settings, the kubelet server certificate is not valid for this subject alternative name: %s, %v, Please check the SAN of the kubelet server certificate with "openssl x509 -in ${KUBELET_CERTIFICATE} -text -noout". `, clientConfig.baseURL, err)
|
|
case strings.Contains(err.Error(), "x509: certificate signed by unknown authority"):
|
|
log.Printf(`The kubelet server certificate is signed by unknown authority, the current cacert is %s. Is the kubelet issuing self-signed certificates? Please validate the kubelet certificate with "openssl verify -CAfile %s ${KUBELET_CERTIFICATE}" to avoid this error: %v`, clientConfig.caPath, clientConfig.caPath, err)
|
|
default:
|
|
log.Printf("Failed to reach Kubelet at: %s - error: %v", clientConfig.baseURL, err)
|
|
}
|
|
}
|