categraf/logs/util/docker/containers.go

378 lines
12 KiB
Go

//go:build !no_logs
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package docker
import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"regexp"
"strings"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/go-connections/nat"
coreconfig "flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/logs/util/containers"
"flashcat.cloud/categraf/logs/util/containers/providers"
)
var healthRe = regexp.MustCompile(`\(health: (\w+)\)`)
// ContainerListConfig allows to pass listing options
type ContainerListConfig struct {
IncludeExited bool
FlagExcluded bool
}
// ListContainers gets a list of all containers on the current node using a mix of
// the Docker APIs and cgroups stats. We attempt to limit syscalls where possible.
func (d *DockerUtil) ListContainers(ctx context.Context, cfg *ContainerListConfig) ([]*containers.Container, error) {
err := providers.ContainerImpl().Prefetch()
if err != nil {
return nil, fmt.Errorf("could not fetch container metrics: %s", err)
}
cList, err := d.dockerContainers(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("could not get docker containers: %s", err)
}
for _, container := range cList {
if container.State != containers.ContainerRunningState || container.Excluded || !providers.ContainerImpl().ContainerExists(container.ID) {
continue
}
d.getContainerDetails(container)
if isMissingIP(container.AddressList) {
hostIPs := GetDockerHostIPs()
container.AddressList = correctMissingIPs(container.AddressList, hostIPs)
// this can mean one of two things: the container is in host mode, or in awsvpc
// in both cases we can't get the IP address in parseContainerNetworkAddresses
} else if len(container.AddressList) == 0 {
// the inspect should be in the cache already so this is not a problem
inspect, err := d.Inspect(ctx, container.ID, false)
if err != nil {
log.Printf("Error inspecting container %s: %s", container.ID, err)
continue
}
networkMode, err := GetContainerNetworkMode(ctx, container.ID)
log.Printf("container %s network mode: %s", container.Name, networkMode)
if err != nil {
log.Printf("Failed to get network mode for container %s. Network info will be missing. Error: %s", container.ID, err)
continue
}
// in awsvpc, and host mode, we assume that those ports are listening to all ip addresses
// which means to the task IPs, and to the host IPs respectively.
// If this turns out to not be the case (it was in our tests)
// we'll need to inspect the PortSet more deeply.
exposedPorts := []nat.Port{}
for p := range inspect.Config.ExposedPorts {
exposedPorts = append(exposedPorts, p)
}
if networkMode == containers.HostNetworkMode {
ips := GetDockerHostIPs()
if len(ips) == 0 {
log.Printf("Failed to get host IPs. Container %s will be missing network info: %s", container.Name, err)
continue
}
ipAddr := []containers.NetworkAddress{}
for _, ip := range ips {
ipAddr = append(ipAddr, containers.NetworkAddress{
IP: net.ParseIP(ip),
})
}
container.AddressList = crossIPsWithPorts(ipAddr, exposedPorts)
}
}
}
return cList, err
}
// UpdateContainerMetrics updates cgroup / network performance metrics for
// a provided list of Container objects
func (d *DockerUtil) UpdateContainerMetrics(cList []*containers.Container) error {
err := providers.ContainerImpl().Prefetch()
if err != nil {
return fmt.Errorf("could not fetch container metrics: %s", err)
}
for _, ctn := range cList {
if ctn == nil || ctn.State != containers.ContainerRunningState || ctn.Excluded || !providers.ContainerImpl().ContainerExists(ctn.ID) {
continue
}
}
return nil
}
// getContainerMetrics calls a ContainerImplementation, caller should always call Prefetch() before
func (d *DockerUtil) getContainerDetails(ctn *containers.Container) {
var err error
ctn.StartedAt, err = providers.ContainerImpl().GetContainerStartTime(ctn.ID)
if err != nil {
log.Printf("ContainerImplementation cannot get StartTime for container %s, err: %s", ctn.ID[:12], err)
return
}
}
func (d *DockerUtil) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
return d.cli.ContainerLogs(ctx, container, options)
}
// dockerContainers returns the running container list from the docker API
func (d *DockerUtil) dockerContainers(ctx context.Context, cfg *ContainerListConfig) ([]*containers.Container, error) {
if cfg == nil {
return nil, errors.New("configuration is nil")
}
ctx, cancel := context.WithTimeout(ctx, d.queryTimeout)
defer cancel()
cList, err := d.cli.ContainerList(ctx, types.ContainerListOptions{All: cfg.IncludeExited})
if err != nil {
return nil, fmt.Errorf("error listing containers: %s", err)
}
ret := make([]*containers.Container, 0, len(cList))
for _, c := range cList {
if d.cfg.CollectNetwork && c.State == containers.ContainerRunningState {
// FIXME: We might need to invalidate this cache if a containers networks are changed live.
d.Lock()
if _, ok := d.networkMappings[c.ID]; !ok {
i, err := d.Inspect(ctx, c.ID, false)
if err != nil {
d.Unlock()
log.Printf("Error inspecting container %s: %s", c.ID, err)
continue
}
d.networkMappings[c.ID] = findDockerNetworks(c.ID, i.State.Pid, c)
}
d.Unlock()
}
image, err := d.ResolveImageName(ctx, c.Image)
if err != nil {
log.Printf("Can't resolve image name %s: %s", c.Image, err)
}
pauseContainerExcluded := containers.IsPauseContainer(c.Labels)
excluded := pauseContainerExcluded || d.cfg.filter.IsExcluded(c.Names[0], image, c.Labels["io.kubernetes.pod.namespace"])
if excluded && !cfg.FlagExcluded {
continue
}
entityID := ContainerIDToTaggerEntityName(c.ID)
container := &containers.Container{
Type: "Docker",
ID: c.ID,
EntityID: entityID,
Name: c.Names[0],
Image: image,
ImageID: c.ImageID,
Created: c.Created,
State: c.State,
Excluded: excluded,
Health: parseContainerHealth(c.Status),
AddressList: d.parseContainerNetworkAddresses(c.ID, c.Ports, c.NetworkSettings, c.Names[0]),
}
ret = append(ret, container)
}
// Resolve docker networks after we've processed all containers so all
// routing maps are available.
if d.cfg.CollectNetwork {
d.Lock()
resolveDockerNetworks(d.networkMappings)
d.Unlock()
}
if time.Now().Sub(d.lastInvalidate) > invalidationInterval {
d.cleanupCaches(cList)
}
return ret, nil
}
// Parse the health out of a container status. The format is either:
// - 'Up 5 seconds (health: starting)'
// - 'Up 18 hours (unhealthy)'
// - 'Up about an hour'
func parseContainerHealth(status string) string {
// Avoid allocations in most cases by just checking for '('
if strings.Index(status, "unhealthy") >= 0 {
return "unhealthy"
}
if strings.IndexByte(status, '(') == -1 {
return ""
}
all := healthRe.FindAllStringSubmatch(status, -1)
if len(all) < 1 || len(all[0]) < 2 {
return ""
}
return all[0][1]
}
// parseContainerNetworkAddresses converts docker ports
// and network settings into a list of NetworkAddress
func (d *DockerUtil) parseContainerNetworkAddresses(cID string, ports []types.Port, netSettings *types.SummaryNetworkSettings, container string) []containers.NetworkAddress {
addrList := []containers.NetworkAddress{}
tempAddrList := []containers.NetworkAddress{}
if netSettings == nil || len(netSettings.Networks) == 0 {
log.Println("No network settings available from docker")
return addrList
}
for _, port := range ports {
if isExposed(port) {
IP := net.ParseIP(port.IP)
if IP == nil {
log.Println("Unable to parse IP: %v for container: %s", port.IP, container)
continue
}
addrList = append(addrList, containers.NetworkAddress{
IP: IP, // Host IP, since the port is exposed
Port: int(port.PublicPort), // Exposed port
Protocol: port.Type,
})
}
// Cache container ports
tempAddrList = append(tempAddrList, containers.NetworkAddress{
Port: int(port.PrivatePort),
Protocol: port.Type,
})
}
// Retieve IPs from network settings for the cached ports
for _, network := range netSettings.Networks {
if network.IPAddress == "" {
log.Println("No IP found for container %s in network %s", container, network.NetworkID)
continue
}
IP := net.ParseIP(network.IPAddress)
if IP == nil {
log.Println("Unable to parse IP: %v for container: %s", network.IPAddress, container)
continue
}
for _, addr := range tempAddrList {
// Add IP to the cached and not exposed ports
addrList = append(addrList, containers.NetworkAddress{
IP: IP,
Port: addr.Port,
Protocol: addr.Protocol,
})
}
}
return addrList
}
// isExposed returns if a docker port is exposed to the host
func isExposed(port types.Port) bool {
return port.PublicPort > 0 && port.IP != ""
}
// getECSMetadataURL inspects a given container ID and returns its ECS container metadata URI
// if found in its environment. It returns an empty string and an error on failure.
func (d *DockerUtil) getECSMetadataURL(ctx context.Context, cID string) (string, error) {
i, err := d.Inspect(ctx, cID, false)
if err != nil {
return "", err
}
for _, e := range i.Config.Env {
if strings.HasPrefix(e, "ECS_CONTAINER_METADATA_URI=") {
return strings.Split(e, "=")[1], nil
}
}
return "", errors.New("ecs container metadata uri not found")
}
// cleanupCaches removes cache entries for unknown containers and images
func (d *DockerUtil) cleanupCaches(containers []types.Container) {
liveContainers := make(map[string]struct{})
liveImages := make(map[string]struct{})
for _, c := range containers {
liveContainers[c.ID] = struct{}{}
liveImages[c.Image] = struct{}{}
}
d.Lock()
for cid := range d.networkMappings {
if _, ok := liveContainers[cid]; !ok {
delete(d.networkMappings, cid)
}
}
for image := range d.imageNameBySha {
if _, ok := liveImages[image]; !ok {
delete(d.imageNameBySha, image)
}
}
d.lastInvalidate = time.Now()
d.Unlock()
}
var missingIP = net.ParseIP("0.0.0.0")
func isMissingIP(addrs []containers.NetworkAddress) bool {
for _, addr := range addrs {
if addr.IP.Equal(missingIP) {
return true
}
}
return false
}
func correctMissingIPs(addrs []containers.NetworkAddress, hostIPs []string) []containers.NetworkAddress {
if len(hostIPs) == 0 {
return addrs // cannot detect host list, will return the addresses as is
}
var correctedAddrs []containers.NetworkAddress
for _, addr := range addrs {
if addr.IP.Equal(missingIP) {
for _, hip := range hostIPs {
correctedAddr := addr // this will copy addr
correctedAddr.IP = net.ParseIP(hip)
correctedAddrs = append(correctedAddrs, correctedAddr)
}
} else {
correctedAddrs = append(correctedAddrs, addr)
}
}
return correctedAddrs
}
// crossIPsWithPorts returns the product of a list of IP addresses and a list of ports
func crossIPsWithPorts(addrs []containers.NetworkAddress, ports []nat.Port) []containers.NetworkAddress {
res := make([]containers.NetworkAddress, len(addrs)*len(ports))
c := 0
for _, addr := range addrs {
if len(ports) == 0 {
res = append(res, addr)
}
for _, port := range ports {
res[c] = containers.NetworkAddress{
IP: addr.IP,
Port: port.Int(),
Protocol: port.Proto(),
}
c++
}
}
return res
}
func GetDockerHostIPs() []string {
ip, err := coreconfig.GetOutboundIP()
if err != nil {
return []string{coreconfig.Config.GetHostname()}
}
return []string{ip.String()}
}