categraf/logs/input/journald/launcher.go

87 lines
2.2 KiB
Go

//go:build !no_logs && systemd
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package journald
import (
"log"
"flashcat.cloud/categraf/logs/auditor"
"flashcat.cloud/categraf/logs/pipeline"
"flashcat.cloud/categraf/logs/restart"
)
// Launcher is in charge of starting and stopping new journald tailers
type Launcher struct {
sources chan *config.LogSource
pipelineProvider pipeline.Provider
registry auditor.Registry
tailers map[string]*Tailer
stop chan struct{}
}
// NewLauncher returns a new Launcher.
func NewLauncher(sources *config.LogSources, pipelineProvider pipeline.Provider, registry auditor.Registry) *Launcher {
return &Launcher{
sources: sources.GetAddedForType(config.JournaldType),
pipelineProvider: pipelineProvider,
registry: registry,
tailers: make(map[string]*Tailer),
stop: make(chan struct{}),
}
}
// Start starts the launcher.
func (l *Launcher) Start() {
go l.run()
}
// run starts new tailers.
func (l *Launcher) run() {
for {
select {
case source := <-l.sources:
identifier := source.Config.Path
if _, exists := l.tailers[identifier]; exists {
// set up only one tailer per journal
continue
}
tailer, err := l.setupTailer(source)
if err != nil {
log.Println("Could not set up journald tailer: ", err)
} else {
l.tailers[identifier] = tailer
}
case <-l.stop:
return
}
}
}
// Stop stops all active tailers
func (l *Launcher) Stop() {
l.stop <- struct{}{}
stopper := restart.NewParallelStopper()
for identifier, tailer := range l.tailers {
stopper.Add(tailer)
delete(l.tailers, identifier)
}
stopper.Stop()
}
// setupTailer configures and starts a new tailer,
// returns the tailer or an error.
func (l *Launcher) setupTailer(source *config.LogSource) (*Tailer, error) {
tailer := NewTailer(source, l.pipelineProvider.NextPipelineChan())
cursor := l.registry.GetOffset(tailer.Identifier())
err := tailer.Start(cursor)
if err != nil {
return nil, err
}
return tailer, nil
}