forked from flashcat/categraf
313 lines
8.6 KiB
Go
313 lines
8.6 KiB
Go
//go:build !no_logs && systemd
|
|
|
|
// Unless explicitly stated otherwise all files in this repository are licensed
|
|
// under the Apache License Version 2.0.
|
|
// This product includes software developed at Datadog (https://www.datadoghq.com/).
|
|
// Copyright 2016-present Datadog, Inc.
|
|
|
|
package journald
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/coreos/go-systemd/sdjournal"
|
|
|
|
logsconfig "flashcat.cloud/categraf/config/logs"
|
|
"flashcat.cloud/categraf/logs/message"
|
|
)
|
|
|
|
// defaultWaitDuration represents the delay before which we try to collect a new log from the journal
|
|
const (
|
|
defaultWaitDuration = 1 * time.Second
|
|
defaultApplicationName = "docker"
|
|
)
|
|
|
|
// Tailer collects logs from a journal.
|
|
type Tailer struct {
|
|
source *logsconfig.LogSource
|
|
outputChan chan *message.Message
|
|
journal *sdjournal.Journal
|
|
blacklist map[string]bool
|
|
stop chan struct{}
|
|
done chan struct{}
|
|
}
|
|
|
|
// NewTailer returns a new tailer.
|
|
func NewTailer(source *logsconfig.LogSource, outputChan chan *message.Message) *Tailer {
|
|
return &Tailer{
|
|
source: source,
|
|
outputChan: outputChan,
|
|
stop: make(chan struct{}, 1),
|
|
done: make(chan struct{}, 1),
|
|
}
|
|
}
|
|
|
|
// Start starts tailing the journal from a given offset.
|
|
func (t *Tailer) Start(cursor string) error {
|
|
if err := t.setup(); err != nil {
|
|
t.source.Status.Error(err)
|
|
return err
|
|
}
|
|
if err := t.seek(cursor); err != nil {
|
|
t.source.Status.Error(err)
|
|
return err
|
|
}
|
|
t.source.Status.Success()
|
|
t.source.AddInput(t.journalPath())
|
|
log.Println("Start tailing journal ", t.journalPath())
|
|
go t.tail()
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the tailer
|
|
func (t *Tailer) Stop() {
|
|
log.Println("Stop tailing journal ", t.journalPath())
|
|
t.stop <- struct{}{}
|
|
t.source.RemoveInput(t.journalPath())
|
|
<-t.done
|
|
}
|
|
|
|
// setup configures the tailer
|
|
func (t *Tailer) setup() error {
|
|
config := t.source.Config
|
|
var err error
|
|
|
|
t.initializeTagger()
|
|
|
|
if config.Path == "" {
|
|
// open the default journal
|
|
t.journal, err = sdjournal.NewJournal()
|
|
} else {
|
|
t.journal, err = sdjournal.NewJournalFromDir(config.Path)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, unit := range config.IncludeUnits {
|
|
// add filters to collect only the logs of the units defined in the configuration,
|
|
// if no units are defined, collect all the logs of the journal by default.
|
|
match := sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT + "=" + unit
|
|
err := t.journal.AddMatch(match)
|
|
if err != nil {
|
|
return fmt.Errorf("could not add filter %s: %s", match, err)
|
|
}
|
|
}
|
|
|
|
t.blacklist = make(map[string]bool)
|
|
for _, unit := range config.ExcludeUnits {
|
|
// add filters to drop all the logs related to units to exclude.
|
|
t.blacklist[unit] = true
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// seek seeks to the cursor if it is not empty or the end of the journal,
|
|
// returns an error if the operation failed.
|
|
func (t *Tailer) seek(cursor string) error {
|
|
if cursor != "" {
|
|
err := t.journal.SeekCursor(cursor)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// must skip one entry since the cursor points to the last committed one.
|
|
_, err = t.journal.NextSkip(1)
|
|
return err
|
|
}
|
|
return t.journal.SeekTail()
|
|
}
|
|
|
|
// tail tails the journal until a message stop is received.
|
|
func (t *Tailer) tail() {
|
|
defer func() {
|
|
t.journal.Close()
|
|
t.done <- struct{}{}
|
|
}()
|
|
for {
|
|
select {
|
|
case <-t.stop:
|
|
// stop tailing journal
|
|
return
|
|
default:
|
|
n, err := t.journal.Next()
|
|
t.source.BytesRead.Add(int64(n))
|
|
if err != nil && err != io.EOF {
|
|
err := fmt.Errorf("cant't tail journal %s: %s", t.journalPath(), err)
|
|
t.source.Status.Error(err)
|
|
log.Println(err)
|
|
return
|
|
}
|
|
if n < 1 {
|
|
// no new entry
|
|
t.journal.Wait(defaultWaitDuration)
|
|
continue
|
|
}
|
|
entry, err := t.journal.GetEntry()
|
|
if err != nil {
|
|
log.Printf("Could not retrieve journal entry: %s\n", err)
|
|
continue
|
|
}
|
|
if t.shouldDrop(entry) {
|
|
continue
|
|
}
|
|
t.outputChan <- t.toMessage(entry)
|
|
}
|
|
}
|
|
}
|
|
|
|
// shouldDrop returns true if the entry should be dropped,
|
|
// returns false otherwise.
|
|
func (t *Tailer) shouldDrop(entry *sdjournal.JournalEntry) bool {
|
|
unit, exists := entry.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT]
|
|
if !exists {
|
|
return false
|
|
}
|
|
if _, blacklisted := t.blacklist[unit]; blacklisted {
|
|
// drop the entry
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// toMessage transforms a journal entry into a message.
|
|
// A journal entry has different fields that may vary depending on its nature,
|
|
// for more information, see https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html.
|
|
func (t *Tailer) toMessage(entry *sdjournal.JournalEntry) *message.Message {
|
|
return message.NewMessage(t.getContent(entry), t.getOrigin(entry), t.getStatus(entry), time.Now().UnixNano())
|
|
}
|
|
|
|
// getContent returns all the fields of the entry as a json-string,
|
|
// remapping "MESSAGE" into "message" and bundling all the other keys in a "journald" attribute.
|
|
// ex:
|
|
// - journal-entry:
|
|
// {
|
|
// "MESSAGE": "foo",
|
|
// "_SYSTEMD_UNIT": "foo",
|
|
// ...
|
|
// }
|
|
// - message-content:
|
|
// {
|
|
// "message": "foo",
|
|
// "journald": {
|
|
// "_SYSTEMD_UNIT": "foo",
|
|
// ...
|
|
// }
|
|
// }
|
|
func (t *Tailer) getContent(entry *sdjournal.JournalEntry) []byte {
|
|
payload := make(map[string]interface{})
|
|
fields := entry.Fields
|
|
if message, exists := fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE]; exists {
|
|
payload["message"] = message
|
|
delete(fields, sdjournal.SD_JOURNAL_FIELD_MESSAGE)
|
|
}
|
|
payload["journald"] = fields
|
|
|
|
content, err := json.Marshal(payload)
|
|
if err != nil {
|
|
// ensure the message has some content if the json encoding failed
|
|
value, _ := entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE]
|
|
content = []byte(value)
|
|
}
|
|
|
|
return content
|
|
}
|
|
|
|
// getOrigin returns the message origin computed from the journal entry
|
|
func (t *Tailer) getOrigin(entry *sdjournal.JournalEntry) *message.Origin {
|
|
origin := message.NewOrigin(t.source)
|
|
origin.Identifier = t.Identifier()
|
|
origin.Offset, _ = t.journal.GetCursor()
|
|
// set the service and the source attributes of the message,
|
|
// those values are still overridden by the integration logsconfig when defined
|
|
tags := t.getTags(entry)
|
|
applicationName := t.getApplicationName(entry, tags)
|
|
origin.SetSource(applicationName)
|
|
origin.SetService(applicationName)
|
|
origin.SetTags(tags)
|
|
return origin
|
|
}
|
|
|
|
// applicationKeys represents all the valid attributes used to extract the value of the application name of a journal entry.
|
|
var applicationKeys = []string{
|
|
sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER, // "SYSLOG_IDENTIFIER"
|
|
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT, // "_SYSTEMD_UNIT"
|
|
sdjournal.SD_JOURNAL_FIELD_COMM, // "_COMM"
|
|
}
|
|
|
|
// getApplicationName returns the name of the application from where the entry is from.
|
|
func (t *Tailer) getApplicationName(entry *sdjournal.JournalEntry, tags []string) string {
|
|
if t.isContainerEntry(entry) {
|
|
if t.source.Config.ContainerMode {
|
|
if shortName, found := getDockerImageShortName(t.getContainerID(entry), tags); found {
|
|
return shortName
|
|
}
|
|
}
|
|
|
|
return defaultApplicationName
|
|
}
|
|
|
|
for _, key := range applicationKeys {
|
|
if value, exists := entry.Fields[key]; exists {
|
|
return value
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// getTags returns a list of tags matching with the journal entry.
|
|
func (t *Tailer) getTags(entry *sdjournal.JournalEntry) []string {
|
|
var tags []string
|
|
if t.isContainerEntry(entry) {
|
|
tags = t.getContainerTags(t.getContainerID(entry))
|
|
}
|
|
return tags
|
|
}
|
|
|
|
// priorityStatusMapping represents the 1:1 mapping between journal entry priorities and statuses.
|
|
var priorityStatusMapping = map[string]string{
|
|
"0": message.StatusEmergency,
|
|
"1": message.StatusAlert,
|
|
"2": message.StatusCritical,
|
|
"3": message.StatusError,
|
|
"4": message.StatusWarning,
|
|
"5": message.StatusNotice,
|
|
"6": message.StatusInfo,
|
|
"7": message.StatusDebug,
|
|
}
|
|
|
|
// getStatus returns the status of the journal entry,
|
|
// returns "info" by default if no valid value is found.
|
|
func (t *Tailer) getStatus(entry *sdjournal.JournalEntry) string {
|
|
priority, exists := entry.Fields[sdjournal.SD_JOURNAL_FIELD_PRIORITY]
|
|
if !exists {
|
|
return message.StatusInfo
|
|
}
|
|
status, exists := priorityStatusMapping[priority]
|
|
if !exists {
|
|
return message.StatusInfo
|
|
}
|
|
return status
|
|
}
|
|
|
|
// journaldIntegration represents the name of the integration,
|
|
// it's used to override the source of the message and as a fingerprint to store the journal cursor.
|
|
const journaldIntegration = "journald"
|
|
|
|
// Identifier returns the unique identifier of the current journal being tailed.
|
|
func (t *Tailer) Identifier() string {
|
|
return journaldIntegration + ":" + t.journalPath()
|
|
}
|
|
|
|
// journalPath returns the path of the journal
|
|
func (t *Tailer) journalPath() string {
|
|
if t.source.Config.Path != "" {
|
|
return t.source.Config.Path
|
|
}
|
|
return "default"
|
|
}
|