categraf/logs/input/file/tailer.go

318 lines
9.3 KiB
Go

//go:build !no_logs
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package file
import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync/atomic"
"time"
logsconfig "flashcat.cloud/categraf/config/logs"
"flashcat.cloud/categraf/logs/decoder"
"flashcat.cloud/categraf/logs/input/kubernetes"
"flashcat.cloud/categraf/logs/message"
"flashcat.cloud/categraf/logs/parser"
"flashcat.cloud/categraf/logs/tag"
)
// DefaultSleepDuration represents the amount of time the tailer waits before reading new data when no data is received
const DefaultSleepDuration = 1 * time.Second
// Tailer tails one file and sends messages to an output channel
type Tailer struct {
readOffset int64
decodedOffset int64
bytesRead int64
// file contains the logs configuration for the file to parse (path, source, ...)
// If you are looking for the os.file use to read on the FS, see osFile.
file *File
fullpath string
osFile *os.File
tags []string
outputChan chan *message.Message
decoder *decoder.Decoder
tagProvider tag.Provider
sleepDuration time.Duration
closeTimeout time.Duration
shouldStop int32
didFileRotate int32
stop chan struct{}
done chan struct{}
forwardContext context.Context
stopForward context.CancelFunc
}
// NewDecoderFromSource creates a new decoder from a log source
func NewDecoderFromSource(source *logsconfig.LogSource) *decoder.Decoder {
return NewDecoderFromSourceWithPattern(source, nil)
}
// NewDecoderFromSourceWithPattern creates a new decoder from a log source with a multiline pattern
func NewDecoderFromSourceWithPattern(source *logsconfig.LogSource, multiLinePattern *regexp.Regexp) *decoder.Decoder {
// TODO: remove those checks and add to source a reference to a tagProvider and a lineParser.
var lineParser parser.Parser
var matcher decoder.EndLineMatcher
switch source.GetSourceType() {
// TODO
case logsconfig.KubernetesSourceType:
lineParser = kubernetes.JSONParser
matcher = &decoder.NewLineMatcher{}
// case logsconfig.DockerSourceType:
// lineParser = docker.JSONParser
// matcher = &decoder.NewLineMatcher{}
default:
switch strings.ToLower(source.Config.Encoding) {
case logsconfig.UTF16BE:
lineParser = parser.NewDecodingParser(parser.UTF16BE)
matcher = decoder.NewBytesSequenceMatcher(decoder.Utf16beEOL)
case logsconfig.UTF16LE:
lineParser = parser.NewDecodingParser(parser.UTF16LE)
matcher = decoder.NewBytesSequenceMatcher(decoder.Utf16leEOL)
case logsconfig.GB18030:
lineParser = parser.NewDecodingParser(parser.GBK18030)
matcher = &decoder.NewLineMatcher{}
case logsconfig.HZGB2312:
lineParser = parser.NewDecodingParser(parser.HZGB2312)
matcher = &decoder.NewLineMatcher{}
case logsconfig.GBK, logsconfig.GB2312:
lineParser = parser.NewDecodingParser(parser.GBK)
matcher = &decoder.NewLineMatcher{}
case logsconfig.BIG5:
lineParser = parser.NewDecodingParser(parser.BIG5)
matcher = &decoder.NewLineMatcher{}
default:
lineParser = parser.NoopParser
matcher = &decoder.NewLineMatcher{}
}
}
return decoder.NewDecoderWithEndLineMatcher(source, lineParser, matcher, multiLinePattern)
}
// NewTailer returns an initialized Tailer
func NewTailer(outputChan chan *message.Message, file *File, sleepDuration time.Duration, decoder *decoder.Decoder) *Tailer {
var tagProvider tag.Provider
tagProvider = tag.NewLocalProvider([]string{})
forwardContext, stopForward := context.WithCancel(context.Background())
closeTimeout := time.Duration(60) * time.Second // TODO get value from coreConfig
return &Tailer{
file: file,
outputChan: outputChan,
decoder: decoder,
tagProvider: tagProvider,
readOffset: 0,
sleepDuration: sleepDuration,
closeTimeout: closeTimeout,
stop: make(chan struct{}, 1),
done: make(chan struct{}, 1),
forwardContext: forwardContext,
stopForward: stopForward,
}
}
// Identifier returns a string that uniquely identifies a source.
// This is the identifier used in the registry.
// FIXME(remy): during container rotation, this Identifier() method could return
// the same value for different tailers. It is happening during container rotation
// where the dead container still has a tailer running on the log file, and the tailer
// of the freshly spawned container starts tailing this file as well.
func (t *Tailer) Identifier() string {
return fmt.Sprintf("file:%s", t.file.Path)
}
// Start let's the tailer open a file and tail from whence
func (t *Tailer) Start(offset int64, whence int) error {
err := t.setup(offset, whence)
if err != nil {
t.file.Source.Status.Error(err)
return err
}
t.file.Source.Status.Success()
t.file.Source.AddInput(t.file.Path)
go t.forwardMessages()
t.decoder.Start()
go t.readForever()
return nil
}
// readForever lets the tailer tail the content of a file
// until it is closed or the tailer is stopped.
func (t *Tailer) readForever() {
defer t.onStop()
for {
n, err := t.read()
if err != nil {
return
}
t.recordBytes(int64(n))
select {
case <-t.stop:
if n != 0 && atomic.LoadInt32(&t.didFileRotate) == 1 {
log.Println("W! Tailer stopped after rotation close timeout with remaining unread data")
}
// stop reading data from file
return
default:
if n == 0 {
// wait for new data to come
t.wait()
}
}
}
}
// buildTailerTags groups the file tag, directory (if wildcard path) and user tags
func (t *Tailer) buildTailerTags() []string {
tags := []string{fmt.Sprintf("filename:%s", filepath.Base(t.file.Path))}
if t.file.IsWildcardPath {
tags = append(tags, fmt.Sprintf("dirname:%s", filepath.Dir(t.file.Path)))
}
return tags
}
// StartFromBeginning lets the tailer start tailing its file
// from the beginning
func (t *Tailer) StartFromBeginning() error {
return t.Start(0, io.SeekStart)
}
// Stop stops the tailer and returns only when the decoder is flushed
func (t *Tailer) Stop() {
atomic.StoreInt32(&t.didFileRotate, 0)
t.stop <- struct{}{}
t.file.Source.RemoveInput(t.file.Path)
// wait for the decoder to be flushed
<-t.done
}
// StopAfterFileRotation prepares the tailer to stop after a timeout
// to finish reading its file that has been log-rotated
func (t *Tailer) StopAfterFileRotation() {
atomic.StoreInt32(&t.didFileRotate, 1)
go t.startStopTimer()
t.file.Source.RemoveInput(t.file.Path)
}
// startStopTimer initializes and starts a timer to stop the tailor after the timeout
func (t *Tailer) startStopTimer() {
stopTimer := time.NewTimer(t.closeTimeout)
<-stopTimer.C
t.stopForward()
t.stop <- struct{}{}
}
// onStop finishes to stop the tailer
func (t *Tailer) onStop() {
t.osFile.Close()
t.decoder.Stop()
log.Println("Closed", t.file.Path, "for tailer key", t.file.GetScanKey(), "read", t.bytesRead, "bytes and", t.decoder.GetLineCount(), "lines")
}
// forwardMessages lets the Tailer forward log messages to the output channel
func (t *Tailer) forwardMessages() {
defer func() {
// the decoder has successfully been flushed
atomic.StoreInt32(&t.shouldStop, 1)
close(t.done)
}()
for output := range t.decoder.OutputChan {
offset := t.decodedOffset + int64(output.RawDataLen)
identifier := t.Identifier()
if !t.shouldTrackOffset() {
offset = 0
identifier = ""
}
t.decodedOffset = offset
origin := message.NewOrigin(t.file.Source)
origin.Identifier = identifier
origin.Offset = strconv.FormatInt(offset, 10)
origin.SetTags(append(t.tags, t.tagProvider.GetTags()...))
// Ignore empty lines once the registry offset is updated
if len(output.Content) == 0 {
continue
}
// Make the write to the output chan cancellable to be able to stop the tailer
// after a file rotation when it is stuck on it.
// We don't return directly to keep the same shutdown sequence that in the
// normal case.
select {
case t.outputChan <- message.NewMessage(output.Content, origin, output.Status, output.IngestionTimestamp):
case <-t.forwardContext.Done():
}
}
}
func (t *Tailer) incrementReadOffset(n int) {
atomic.AddInt64(&t.readOffset, int64(n))
}
// SetReadOffset sets the position of the last byte read in the
// file
func (t *Tailer) SetReadOffset(off int64) {
atomic.StoreInt64(&t.readOffset, off)
}
// GetReadOffset returns the position of the last byte read in file
func (t *Tailer) GetReadOffset() int64 {
return atomic.LoadInt64(&t.readOffset)
}
// SetDecodedOffset sets the position of the last byte decoded in the
// file
func (t *Tailer) SetDecodedOffset(off int64) {
atomic.StoreInt64(&t.decodedOffset, off)
}
// GetDetectedPattern returns a regexp if a pattern was detected
func (t *Tailer) GetDetectedPattern() *regexp.Regexp {
return t.decoder.GetDetectedPattern()
}
// shouldTrackOffset returns whether the tailer should track the file offset or not
func (t *Tailer) shouldTrackOffset() bool {
if atomic.LoadInt32(&t.didFileRotate) != 0 {
return false
}
return true
}
// wait lets the tailer sleep for a bit
func (t *Tailer) wait() {
time.Sleep(t.sleepDuration)
}
func (t *Tailer) recordBytes(n int64) {
t.bytesRead += n
t.file.Source.BytesRead.Add(n)
if t.file.Source.ParentSource != nil {
t.file.Source.ParentSource.BytesRead.Add(n)
}
}