categraf/agent/logs_agent.go

247 lines
8.0 KiB
Go

//go:build !no_logs
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package agent
import (
"context"
"errors"
"fmt"
"log"
"os"
"time"
"flashcat.cloud/categraf/logs/auditor"
"flashcat.cloud/categraf/logs/client"
"flashcat.cloud/categraf/logs/diagnostic"
"flashcat.cloud/categraf/logs/input/container"
"flashcat.cloud/categraf/logs/input/file"
"flashcat.cloud/categraf/logs/input/journald"
"flashcat.cloud/categraf/logs/input/kubernetes"
"flashcat.cloud/categraf/logs/input/listener"
"flashcat.cloud/categraf/logs/pipeline"
"flashcat.cloud/categraf/logs/restart"
"flashcat.cloud/categraf/logs/status"
coreconfig "flashcat.cloud/categraf/config"
logsconfig "flashcat.cloud/categraf/config/logs"
logService "flashcat.cloud/categraf/logs/service"
)
const (
intakeTrackType = "logs"
AgentJSONIntakeProtocol = "agent-json"
invalidProcessingRules = "invalid_global_processing_rules"
)
// LogsAgent represents the data pipeline that collects, decodes,
// processes and sends logs to the backend
// + ------------------------------------------------------ +
// | |
// | Collector -> Decoder -> Processor -> Sender -> Auditor |
// | |
// + ------------------------------------------------------ +
type LogsAgent struct {
sources *logsconfig.LogSources
services *logService.Services
processingRules []*logsconfig.ProcessingRule
endpoints *logsconfig.Endpoints
auditor auditor.Auditor
destinationsCtx *client.DestinationsContext
pipelineProvider pipeline.Provider
inputs []restart.Restartable
diagnosticMessageReceiver *diagnostic.BufferedMessageReceiver
}
// NewLogsAgent returns a new Logs LogsAgent
func NewLogsAgent() AgentModule {
if coreconfig.Config == nil ||
!coreconfig.Config.Logs.Enable ||
(len(coreconfig.Config.Logs.Items) == 0 && coreconfig.Config.Logs.CollectContainerAll == false) {
return nil
}
endpoints, err := BuildEndpoints(intakeTrackType, AgentJSONIntakeProtocol, logsconfig.DefaultIntakeOrigin)
if err != nil {
message := fmt.Sprintf("Invalid endpoints: %v", err)
status.AddGlobalError("invalid endpoints", message)
log.Println("E!", errors.New(message))
return nil
}
processingRules, err := GlobalProcessingRules()
if err != nil {
message := fmt.Sprintf("Invalid processing rules: %v", err)
status.AddGlobalError(invalidProcessingRules, message)
log.Println("E!", errors.New(message))
return nil
}
sources := logsconfig.NewLogSources()
services := logService.NewServices()
log.Println("I! Starting logs-agent...")
// setup the auditor
// We pass the health handle to the auditor because it's the end of the pipeline and the most
// critical part. Arguably it could also be plugged to the destination.
auditorTTL := time.Duration(23) * time.Hour
_, err = os.Stat(coreconfig.GetLogRunPath())
if os.IsNotExist(err) {
os.MkdirAll(coreconfig.GetLogRunPath(), 0755)
}
auditor := auditor.New(coreconfig.GetLogRunPath(), auditor.DefaultRegistryFilename, auditorTTL)
destinationsCtx := client.NewDestinationsContext()
diagnosticMessageReceiver := diagnostic.NewBufferedMessageReceiver()
// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(logsconfig.NumberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, endpoints, destinationsCtx)
validatePodContainerID := coreconfig.ValidatePodContainerID()
//
containerLaunchables := []container.Launchable{
{
IsAvailable: kubernetes.IsAvailable,
Launcher: func() restart.Restartable {
return kubernetes.NewLauncher(sources, services, coreconfig.GetContainerCollectAll())
},
},
}
// setup the inputs
inputs := []restart.Restartable{
file.NewScanner(sources, coreconfig.OpenLogsLimit(), pipelineProvider, auditor,
file.DefaultSleepDuration, validatePodContainerID, time.Duration(time.Duration(coreconfig.FileScanPeriod())*time.Second)),
listener.NewLauncher(sources, coreconfig.LogFrameSize(), pipelineProvider),
journald.NewLauncher(sources, pipelineProvider, auditor),
}
if coreconfig.GetContainerCollectAll() {
log.Println("collect docker logs...")
inputs = append(inputs, container.NewLauncher(containerLaunchables))
}
return &LogsAgent{
sources: sources,
services: services,
processingRules: processingRules,
endpoints: endpoints,
auditor: auditor,
destinationsCtx: destinationsCtx,
pipelineProvider: pipelineProvider,
inputs: inputs,
diagnosticMessageReceiver: diagnosticMessageReceiver,
}
}
func (la *LogsAgent) Start() error {
la.startInner()
if coreconfig.GetContainerCollectAll() {
// collect container all
if coreconfig.Config.DebugMode {
log.Println("Adding ContainerCollectAll source to the Logs Agent")
}
kubesource := logsconfig.NewLogSource(logsconfig.ContainerCollectAll,
&logsconfig.LogsConfig{
Type: coreconfig.Kubernetes,
Service: "docker",
Source: "docker",
})
la.sources.AddSource(kubesource)
go kubernetes.NewScanner(la.services).Scan()
}
// add source
for _, c := range coreconfig.Config.Logs.Items {
if c == nil {
continue
}
source := logsconfig.NewLogSource(c.Name, c)
if err := c.Validate(); err != nil {
log.Println("W! Invalid logs configuration:", err)
source.Status.Error(err)
continue
}
la.sources.AddSource(source)
}
return nil
}
// startInner starts all the elements of the data pipeline
// in the right order to prevent data loss
func (a *LogsAgent) startInner() {
starter := restart.NewStarter(a.destinationsCtx, a.auditor, a.pipelineProvider, a.diagnosticMessageReceiver)
for _, input := range a.inputs {
starter.Add(input)
}
starter.Start()
}
// Flush flushes synchronously the pipelines managed by the Logs LogsAgent.
func (a *LogsAgent) Flush(ctx context.Context) {
a.pipelineProvider.Flush(ctx)
}
// Stop stops all the elements of the data pipeline
// in the right order to prevent data loss
func (a *LogsAgent) Stop() error {
inputs := restart.NewParallelStopper()
for _, input := range a.inputs {
inputs.Add(input)
}
stopper := restart.NewSerialStopper(
inputs,
a.pipelineProvider,
a.auditor,
a.destinationsCtx,
a.diagnosticMessageReceiver,
)
// This will try to stop everything in order, including the potentially blocking
// parts like the sender. After StopTimeout it will just stop the last part of the
// pipeline, disconnecting it from the auditor, to make sure that the pipeline is
// flushed before stopping.
// TODO: Add this feature in the stopper.
c := make(chan struct{})
go func() {
stopper.Stop()
close(c)
}()
timeout := time.Duration(30) * time.Second
select {
case <-c:
case <-time.After(timeout):
log.Println("I! Timed out when stopping logs-agent, forcing it to stop now")
// We force all destinations to read/flush all the messages they get without
// trying to write to the network.
a.destinationsCtx.Stop()
// Wait again for the stopper to complete.
// In some situation, the stopper unfortunately never succeed to complete,
// we've already reached the grace period, give it some more seconds and
// then force quit.
timeout := time.NewTimer(5 * time.Second)
select {
case <-c:
case <-timeout.C:
log.Println("W! Force close of the Logs LogsAgent, dumping the Go routines.")
}
}
return nil
}
// GlobalProcessingRules returns the global processing rules to apply to all logs.
func GlobalProcessingRules() ([]*logsconfig.ProcessingRule, error) {
rules := coreconfig.Config.Logs.GlobalProcessingRules
err := logsconfig.ValidateProcessingRules(rules)
if err != nil {
return nil, err
}
err = logsconfig.CompileProcessingRules(rules)
if err != nil {
return nil, err
}
return rules, nil
}