categraf/logs/input/kubernetes/parser.go

83 lines
2.2 KiB
Go

//go:build !no_logs
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package kubernetes
import (
"bytes"
"errors"
"flashcat.cloud/categraf/logs/message"
)
const (
// Kubernetes log stream types
stdout = "stdout"
stderr = "stderr"
numberOfComponents = 4 // timestamp stream flag message
)
var (
// log line timestamp/stream/flag/content delimiter
delimiter = []byte{' '}
)
// Parser parses Kubernetes log lines
var Parser *parser
type parser struct{}
// Parse parses a Kubernetes log line.
// Kubernetes log lines follow this pattern '<timestamp> <stream> <flag> <content>',
// see https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/kuberuntime/logs/logs.go
// Example:
// 2018-09-20T11:54:11.753589172Z stdout F This is my message
func (p *parser) Parse(msg []byte) ([]byte, string, string, bool, error) {
content, status, timestamp, flag, err := parse(msg)
return content, status, timestamp, isPartial(flag), err
}
func (p *parser) SupportsPartialLine() bool {
return true
}
func parse(msg []byte) ([]byte, string, string, string, error) {
var status = message.StatusInfo
var flag string
var timestamp string
components := bytes.SplitN(msg, delimiter, numberOfComponents)
if len(components) < numberOfComponents-1 {
return msg, status, timestamp, flag, errors.New("cannot parse the log line")
}
var content []byte
if len(components) > numberOfComponents-1 {
content = components[3]
}
status = getStatus(components[1])
timestamp = string(components[0])
flag = string(components[2])
return content, status, timestamp, flag, nil
}
func isPartial(flag string) bool {
return flag == "P"
}
// getStatus returns the status of the message based on
// the value of the STREAM_TYPE field in the header,
// returns the status INFO by default
func getStatus(streamType []byte) string {
switch string(streamType) {
case stdout:
return message.StatusInfo
case stderr:
return message.StatusError
default:
return message.StatusInfo
}
}