forked from flashcat/categraf
87 lines
2.2 KiB
Go
87 lines
2.2 KiB
Go
//go:build !no_logs && systemd
|
|
|
|
// Unless explicitly stated otherwise all files in this repository are licensed
|
|
// under the Apache License Version 2.0.
|
|
// This product includes software developed at Datadog (https://www.datadoghq.com/).
|
|
// Copyright 2016-present Datadog, Inc.
|
|
|
|
package journald
|
|
|
|
import (
|
|
"log"
|
|
|
|
"flashcat.cloud/categraf/logs/auditor"
|
|
"flashcat.cloud/categraf/logs/pipeline"
|
|
"flashcat.cloud/categraf/logs/restart"
|
|
)
|
|
|
|
// Launcher is in charge of starting and stopping new journald tailers
|
|
type Launcher struct {
|
|
sources chan *config.LogSource
|
|
pipelineProvider pipeline.Provider
|
|
registry auditor.Registry
|
|
tailers map[string]*Tailer
|
|
stop chan struct{}
|
|
}
|
|
|
|
// NewLauncher returns a new Launcher.
|
|
func NewLauncher(sources *config.LogSources, pipelineProvider pipeline.Provider, registry auditor.Registry) *Launcher {
|
|
return &Launcher{
|
|
sources: sources.GetAddedForType(config.JournaldType),
|
|
pipelineProvider: pipelineProvider,
|
|
registry: registry,
|
|
tailers: make(map[string]*Tailer),
|
|
stop: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Start starts the launcher.
|
|
func (l *Launcher) Start() {
|
|
go l.run()
|
|
}
|
|
|
|
// run starts new tailers.
|
|
func (l *Launcher) run() {
|
|
for {
|
|
select {
|
|
case source := <-l.sources:
|
|
identifier := source.Config.Path
|
|
if _, exists := l.tailers[identifier]; exists {
|
|
// set up only one tailer per journal
|
|
continue
|
|
}
|
|
tailer, err := l.setupTailer(source)
|
|
if err != nil {
|
|
log.Println("Could not set up journald tailer: ", err)
|
|
} else {
|
|
l.tailers[identifier] = tailer
|
|
}
|
|
case <-l.stop:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop stops all active tailers
|
|
func (l *Launcher) Stop() {
|
|
l.stop <- struct{}{}
|
|
stopper := restart.NewParallelStopper()
|
|
for identifier, tailer := range l.tailers {
|
|
stopper.Add(tailer)
|
|
delete(l.tailers, identifier)
|
|
}
|
|
stopper.Stop()
|
|
}
|
|
|
|
// setupTailer configures and starts a new tailer,
|
|
// returns the tailer or an error.
|
|
func (l *Launcher) setupTailer(source *config.LogSource) (*Tailer, error) {
|
|
tailer := NewTailer(source, l.pipelineProvider.NextPipelineChan())
|
|
cursor := l.registry.GetOffset(tailer.Identifier())
|
|
err := tailer.Start(cursor)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return tailer, nil
|
|
}
|