forked from flashcat/categraf
390 lines
14 KiB
Go
390 lines
14 KiB
Go
//go:build !no_logs
|
|
|
|
// Unless explicitly stated otherwise all files in this repository are licensed
|
|
// under the Apache License Version 2.0.
|
|
// This product includes software developed at Datadog (https://www.datadoghq.com/).
|
|
// Copyright 2016-present Datadog, Inc.
|
|
|
|
package file
|
|
|
|
import (
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
coreconfig "flashcat.cloud/categraf/config"
|
|
logsconfig "flashcat.cloud/categraf/config/logs"
|
|
"flashcat.cloud/categraf/logs/auditor"
|
|
"flashcat.cloud/categraf/logs/message"
|
|
"flashcat.cloud/categraf/logs/pipeline"
|
|
"flashcat.cloud/categraf/logs/restart"
|
|
)
|
|
|
|
// rxContainerID is used in the shouldIgnore func to do a best-effort validation
|
|
// that the file currently scanned for a source is attached to the proper container.
|
|
// If the container ID we parse from the filename isn't matching this regexp, we *will*
|
|
// tail the file because we prefer a false-negative than a false-positive (best-effort).
|
|
var rxContainerID = regexp.MustCompile("^[a-fA-F0-9]{64}$")
|
|
|
|
// ContainersLogsDir is the directory in which we should find containers logsfile
|
|
// with the container ID in their filename.
|
|
// Public to be able to change it while running unit tests.
|
|
var ContainersLogsDir = "/var/log/containers"
|
|
|
|
// Scanner checks all files provided by fileProvider and create new tailers
|
|
// or update the old ones if needed
|
|
type Scanner struct {
|
|
pipelineProvider pipeline.Provider
|
|
addedSources chan *logsconfig.LogSource
|
|
removedSources chan *logsconfig.LogSource
|
|
activeSources []*logsconfig.LogSource
|
|
tailingLimit int
|
|
fileProvider *Provider
|
|
tailers map[string]*Tailer
|
|
registry auditor.Registry
|
|
tailerSleepDuration time.Duration
|
|
stop chan struct{}
|
|
// set to true if we want to use `ContainersLogsDir` to validate that a new
|
|
// pod log file is being attached to the correct containerID.
|
|
// Feature flag defaulting to false, use `logs_config.validate_pod_container_id`.
|
|
validatePodContainerID bool
|
|
scanPeriod time.Duration
|
|
}
|
|
|
|
// NewScanner returns a new scanner.
|
|
func NewScanner(sources *logsconfig.LogSources, tailingLimit int, pipelineProvider pipeline.Provider, registry auditor.Registry,
|
|
tailerSleepDuration time.Duration, validatePodContainerID bool, scanPeriod time.Duration) *Scanner {
|
|
return &Scanner{
|
|
pipelineProvider: pipelineProvider,
|
|
tailingLimit: tailingLimit,
|
|
addedSources: sources.GetAddedForType(logsconfig.FileType),
|
|
removedSources: sources.GetRemovedForType(logsconfig.FileType),
|
|
fileProvider: NewProvider(tailingLimit),
|
|
tailers: make(map[string]*Tailer),
|
|
registry: registry,
|
|
tailerSleepDuration: tailerSleepDuration,
|
|
stop: make(chan struct{}),
|
|
validatePodContainerID: validatePodContainerID,
|
|
scanPeriod: scanPeriod,
|
|
}
|
|
}
|
|
|
|
// Start starts the Scanner
|
|
func (s *Scanner) Start() {
|
|
go s.run()
|
|
}
|
|
|
|
// Stop stops the Scanner and its tailers in parallel,
|
|
// this call returns only when all the tailers are stopped
|
|
func (s *Scanner) Stop() {
|
|
s.stop <- struct{}{}
|
|
s.cleanup()
|
|
}
|
|
|
|
// run checks periodically if there are new files to tail and the state of its tailers until stop
|
|
func (s *Scanner) run() {
|
|
scanTicker := time.NewTicker(s.scanPeriod)
|
|
defer scanTicker.Stop()
|
|
for {
|
|
select {
|
|
case source := <-s.addedSources:
|
|
s.addSource(source)
|
|
case source := <-s.removedSources:
|
|
s.removeSource(source)
|
|
case <-scanTicker.C:
|
|
// check if there are new files to tail, tailers to stop and tailer to restart because of file rotation
|
|
s.scan()
|
|
case <-s.stop:
|
|
// no more file should be tailed
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanup all tailers
|
|
func (s *Scanner) cleanup() {
|
|
stopper := restart.NewParallelStopper()
|
|
for _, tailer := range s.tailers {
|
|
stopper.Add(tailer)
|
|
delete(s.tailers, tailer.file.GetScanKey())
|
|
}
|
|
stopper.Stop()
|
|
}
|
|
|
|
// scan checks all the files we're expected to tail, compares them to the currently tailed files,
|
|
// and triggeres the required updates.
|
|
// For instance, when a file is logrotated, its tailer will keep tailing the rotated file.
|
|
// The Scanner needs to stop that previous tailer, and start a new one for the new file.
|
|
func (s *Scanner) scan() {
|
|
files := s.fileProvider.FilesToTail(s.activeSources)
|
|
filesTailed := make(map[string]bool)
|
|
tailersLen := len(s.tailers)
|
|
|
|
for _, file := range files {
|
|
// We're using generated key here: in case this file has been found while
|
|
// scanning files for container, the key will use the format:
|
|
// <filepath>/<containerID>
|
|
// If it has been found while scanning for a regular integration logsconfig,
|
|
// its format will be:
|
|
// <filepath>
|
|
// It is a hack to let two tailers tail the same file (it's happening
|
|
// when a tailer for a dead container is still tailing the file, and another
|
|
// tailer is tailing the file for the new container).
|
|
tailerKey := file.GetScanKey()
|
|
tailer, isTailed := s.tailers[tailerKey]
|
|
if isTailed && atomic.LoadInt32(&tailer.shouldStop) != 0 {
|
|
// skip this tailer as it must be stopped
|
|
continue
|
|
}
|
|
if !isTailed && tailersLen >= s.tailingLimit {
|
|
// can't create new tailer because tailingLimit is reached
|
|
continue
|
|
}
|
|
|
|
if !isTailed && tailersLen < s.tailingLimit {
|
|
// create a new tailer tailing from the beginning of the file if no offset has been recorded
|
|
succeeded := s.startNewTailer(file, logsconfig.Beginning)
|
|
if !succeeded {
|
|
// the setup failed, let's try to tail this file in the next scan
|
|
continue
|
|
}
|
|
tailersLen++
|
|
filesTailed[tailerKey] = true
|
|
continue
|
|
}
|
|
|
|
didRotate, err := DidRotate(tailer.osFile, tailer.GetReadOffset())
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if didRotate {
|
|
// restart tailer because of file-rotation on file
|
|
succeeded := s.restartTailerAfterFileRotation(tailer, file)
|
|
if !succeeded {
|
|
// the setup failed, let's try to tail this file in the next scan
|
|
continue
|
|
}
|
|
}
|
|
|
|
filesTailed[tailerKey] = true
|
|
}
|
|
|
|
for _, tailer := range s.tailers {
|
|
// stop all tailers which have not been selected
|
|
_, shouldTail := filesTailed[tailer.file.GetScanKey()]
|
|
if !shouldTail {
|
|
s.stopTailer(tailer)
|
|
}
|
|
}
|
|
}
|
|
|
|
// addSource keeps track of the new source and launch new tailers for this source.
|
|
func (s *Scanner) addSource(source *logsconfig.LogSource) {
|
|
s.activeSources = append(s.activeSources, source)
|
|
s.launchTailers(source)
|
|
}
|
|
|
|
// removeSource removes the source from cache.
|
|
func (s *Scanner) removeSource(source *logsconfig.LogSource) {
|
|
for i, src := range s.activeSources {
|
|
if src == source {
|
|
// no need to stop the tailer here, it will be stopped in the next iteration of scan.
|
|
s.activeSources = append(s.activeSources[:i], s.activeSources[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// launch launches new tailers for a new source.
|
|
func (s *Scanner) launchTailers(source *logsconfig.LogSource) {
|
|
files, err := s.fileProvider.CollectFiles(source)
|
|
if err != nil {
|
|
source.Status.Error(err)
|
|
log.Println("W! Could not collect files: ", err)
|
|
return
|
|
}
|
|
for _, file := range files {
|
|
if len(s.tailers) >= s.tailingLimit {
|
|
return
|
|
}
|
|
if _, isTailed := s.tailers[file.GetScanKey()]; isTailed {
|
|
continue
|
|
}
|
|
|
|
mode, _ := logsconfig.TailingModeFromString(source.Config.TailingMode)
|
|
|
|
if source.Config.Identifier != "" {
|
|
// only sources generated from a service discovery will contain a logsconfig identifier,
|
|
// in which case we want to collect all logs.
|
|
// FIXME: better detect a source that has been generated from a service discovery.
|
|
mode = logsconfig.Beginning
|
|
}
|
|
|
|
s.startNewTailer(file, mode)
|
|
}
|
|
}
|
|
|
|
// startNewTailer creates a new tailer, making it tail from the last committed offset, the beginning or the end of the file,
|
|
// returns true if the operation succeeded, false otherwise.
|
|
func (s *Scanner) startNewTailer(file *File, m logsconfig.TailingMode) bool {
|
|
if file == nil {
|
|
return false
|
|
}
|
|
|
|
// We also use the file scanner to look for containers and pods logs file, because of that
|
|
// we have to make sure that the file we just detected is tagged with the correct
|
|
// container ID. Enabled through `logs_config.validate_pod_container_id`.
|
|
// The way k8s is storing files in /var/log/pods doesn't let us do that properly
|
|
// (the filename doesn't contain the container ID).
|
|
// However, the symlinks present in /var/log/containers are pointing to /var/log/pods files,
|
|
// meaning that we can use them to validate that the file we have found is concerning us.
|
|
// That's what the function shouldIgnore is trying to do when the directory exists and is readable.
|
|
// See these links for more info:
|
|
// - https://github.com/kubernetes/kubernetes/issues/58638
|
|
// - https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter/issues/105
|
|
if s.validatePodContainerID && file.Source != nil &&
|
|
(file.Source.GetSourceType() == logsconfig.KubernetesSourceType || file.Source.GetSourceType() == logsconfig.DockerSourceType) &&
|
|
s.shouldIgnore(file) {
|
|
return false
|
|
}
|
|
|
|
tailer := s.createTailer(file, s.pipelineProvider.NextPipelineChan())
|
|
|
|
var offset int64
|
|
var whence int
|
|
mode := s.handleTailingModeChange(tailer.Identifier(), m)
|
|
|
|
offset, whence, err := Position(s.registry, tailer.Identifier(), mode)
|
|
if err != nil {
|
|
log.Println("W! Could not recover offset for file with path", file.Path, err)
|
|
}
|
|
|
|
if coreconfig.Config.DebugMode {
|
|
log.Printf("Starting a new tailer for: %s (offset: %d, whence: %d) for tailer key %s\n", file.Path, offset, whence, file.GetScanKey())
|
|
}
|
|
err = tailer.Start(offset, whence)
|
|
if err != nil {
|
|
log.Println(err)
|
|
return false
|
|
}
|
|
|
|
s.tailers[tailer.file.GetScanKey()] = tailer
|
|
return true
|
|
}
|
|
|
|
// shouldIgnore resolves symlinks in /var/log/containers in order to use that redirection
|
|
// to validate that we will be reading a file for the correct container.
|
|
func (s *Scanner) shouldIgnore(file *File) bool {
|
|
// this method needs a source logsconfig to detect whether we should ignore that file or not
|
|
if file == nil || file.Source == nil || file.Source.Config == nil {
|
|
return false
|
|
}
|
|
|
|
infos := make(map[string]string)
|
|
err := filepath.Walk(ContainersLogsDir, func(containerLogFilename string, info os.FileInfo, err error) error {
|
|
// we only wants to follow symlinks
|
|
if info == nil || info.Mode()&os.ModeSymlink != os.ModeSymlink || info.IsDir() {
|
|
// not a symlink, we are not interested in this file
|
|
return nil
|
|
}
|
|
|
|
// resolve the symlink
|
|
podLogFilename, err2 := os.Readlink(containerLogFilename)
|
|
if err2 != nil {
|
|
return nil
|
|
}
|
|
|
|
infos[podLogFilename] = containerLogFilename
|
|
return nil
|
|
})
|
|
|
|
// this is not an error if we we are not currently looking for container logs files,
|
|
// so not problem and just return false.
|
|
// Still, we write a debug message to be able to troubleshoot that
|
|
// in cases we're legitimately looking for containers logs.
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
// container id extracted from the file found in /var/log/containers
|
|
base := filepath.Base(infos[file.Path]) // only the file
|
|
ext := filepath.Ext(base) // file extension
|
|
parts := strings.Split(base, "-") // get only the container ID part from the file
|
|
var containerIDFromFilename string
|
|
if len(parts) > 1 {
|
|
containerIDFromFilename = strings.TrimSuffix(parts[len(parts)-1], ext)
|
|
}
|
|
|
|
// basic validation of the ID that has been parsed, if it doesn't look like
|
|
// an ID we don't want to compare another ID to it
|
|
if containerIDFromFilename == "" || !rxContainerID.Match([]byte(containerIDFromFilename)) {
|
|
return false
|
|
}
|
|
|
|
if file.Source.Config.Identifier != "" && containerIDFromFilename != "" {
|
|
if strings.TrimSpace(strings.ToLower(containerIDFromFilename)) != strings.TrimSpace(strings.ToLower(file.Source.Config.Identifier)) {
|
|
// ignore this file, it is not concerning the container stored in file.Source
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// handleTailingModeChange determines the tailing behaviour when the tailing mode for a given file has its
|
|
// configuration change. Two case may happen we can switch from "end" to "beginning" (1) and from "beginning" to
|
|
// "end" (2). If the tailing mode is set to forceEnd or forceBeginning it will remain unchanged.
|
|
// If (1) then the resulting tailing mode if "beginning" in order to honor existing offset to avoid duplicated lines to be sent.
|
|
// If (2) then the resulting tailing mode is "forceEnd" to drop any saved offset and tail from the end of the file.
|
|
func (s *Scanner) handleTailingModeChange(tailerID string, currentTailingMode logsconfig.TailingMode) logsconfig.TailingMode {
|
|
if currentTailingMode == logsconfig.ForceBeginning || currentTailingMode == logsconfig.ForceEnd {
|
|
return currentTailingMode
|
|
}
|
|
previousMode, _ := logsconfig.TailingModeFromString(s.registry.GetTailingMode(tailerID))
|
|
if previousMode != currentTailingMode {
|
|
log.Printf("Tailing mode changed for %v. Was: %v: Now: %v\n", tailerID, previousMode, currentTailingMode)
|
|
if currentTailingMode == logsconfig.Beginning {
|
|
// end -> beginning, the offset will be honored if it exists
|
|
return logsconfig.Beginning
|
|
}
|
|
// beginning -> end, the offset will be ignored
|
|
return logsconfig.ForceEnd
|
|
}
|
|
return currentTailingMode
|
|
}
|
|
|
|
// stopTailer stops the tailer
|
|
func (s *Scanner) stopTailer(tailer *Tailer) {
|
|
go tailer.Stop()
|
|
delete(s.tailers, tailer.file.GetScanKey())
|
|
}
|
|
|
|
// restartTailer safely stops tailer and starts a new one
|
|
// returns true if the new tailer is up and running, false if an error occurred
|
|
func (s *Scanner) restartTailerAfterFileRotation(tailer *Tailer, file *File) bool {
|
|
log.Println("Log rotation happened to ", file.Path)
|
|
tailer.StopAfterFileRotation()
|
|
tailer = s.createRotatedTailer(file, tailer.outputChan, tailer.GetDetectedPattern())
|
|
// force reading file from beginning since it has been log-rotated
|
|
err := tailer.StartFromBeginning()
|
|
if err != nil {
|
|
log.Println(err)
|
|
return false
|
|
}
|
|
s.tailers[file.GetScanKey()] = tailer
|
|
return true
|
|
}
|
|
|
|
// createTailer returns a new initialized tailer
|
|
func (s *Scanner) createTailer(file *File, outputChan chan *message.Message) *Tailer {
|
|
return NewTailer(outputChan, file, s.tailerSleepDuration, NewDecoderFromSource(file.Source))
|
|
}
|
|
|
|
func (s *Scanner) createRotatedTailer(file *File, outputChan chan *message.Message, pattern *regexp.Regexp) *Tailer {
|
|
return NewTailer(outputChan, file, s.tailerSleepDuration, NewDecoderFromSourceWithPattern(file.Source, pattern))
|
|
}
|