forked from flashcat/categraf
212 lines
5.9 KiB
Go
212 lines
5.9 KiB
Go
package mtail
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
dto "github.com/prometheus/client_model/go"
|
|
"golang.org/x/net/context"
|
|
|
|
"flashcat.cloud/categraf/config"
|
|
"flashcat.cloud/categraf/inputs"
|
|
"flashcat.cloud/categraf/inputs/mtail/internal/metrics"
|
|
"flashcat.cloud/categraf/inputs/mtail/internal/mtail"
|
|
"flashcat.cloud/categraf/inputs/mtail/internal/waker"
|
|
util "flashcat.cloud/categraf/pkg/metrics"
|
|
"flashcat.cloud/categraf/types"
|
|
)
|
|
|
|
const inputName = `mtail`
|
|
const description = ` extract internal monitoring data from application logs`
|
|
|
|
// MTail holds the configuration for the plugin.
|
|
type MTail struct {
|
|
config.PluginConfig
|
|
Instances []*Instance `toml:"instances"`
|
|
}
|
|
|
|
type Instance struct {
|
|
config.InstanceConfig
|
|
|
|
NamePrefix string `toml:"name_prefix"`
|
|
Progs string `toml:"progs"`
|
|
Logs []string `toml:"logs"`
|
|
IgnoreFileRegPattern string `toml:"ignore_filename_regex_pattern"`
|
|
OverrideTimeZone string `toml:"override_timezone"`
|
|
EmitProgLabel string `toml:"emit_prog_label"`
|
|
emitProgLabel bool `toml:"-"`
|
|
EmitMetricTimestamp string `toml:"emit_metric_timestamp"`
|
|
emitMetricTimestamp bool `toml:"-"`
|
|
PollInterval time.Duration `toml:"poll_interval"`
|
|
PollLogInterval time.Duration `toml:"poll_log_interval"`
|
|
MetricPushInterval time.Duration `toml:"metric_push_interval"`
|
|
MaxRegexpLen int `toml:"max_regexp_length"`
|
|
MaxRecursionDepth int `toml:"max_recursion_depth"`
|
|
|
|
SyslogUseCurrentYear string `toml:"syslog_use_current_year"` // true
|
|
sysLogUseCurrentYear bool `toml:"-"`
|
|
LogRuntimeErrors string `toml:"vm_logs_runtime_errors"` // true
|
|
logRuntimeErrors bool `toml:"-"`
|
|
//
|
|
ctx context.Context `toml:"-"`
|
|
cancel context.CancelFunc `toml:"-"`
|
|
m *mtail.Server
|
|
}
|
|
|
|
func (ins *Instance) Init() error {
|
|
|
|
if len(ins.Progs) == 0 || len(ins.Logs) == 0 {
|
|
return types.ErrInstancesEmpty
|
|
}
|
|
|
|
// set default value
|
|
ins.sysLogUseCurrentYear = ins.SyslogUseCurrentYear == "true"
|
|
ins.logRuntimeErrors = ins.LogRuntimeErrors == "true"
|
|
ins.emitProgLabel = ins.EmitProgLabel == "true"
|
|
ins.emitMetricTimestamp = ins.EmitMetricTimestamp == "true"
|
|
|
|
if ins.PollLogInterval == 0 {
|
|
ins.PollLogInterval = 250 * time.Millisecond
|
|
}
|
|
if ins.PollInterval == 0 {
|
|
ins.PollInterval = 250 * time.Millisecond
|
|
}
|
|
if ins.MetricPushInterval == 0 {
|
|
ins.MetricPushInterval = 1 * time.Minute
|
|
}
|
|
if ins.MaxRegexpLen == 0 {
|
|
ins.MaxRegexpLen = 1024
|
|
}
|
|
if ins.MaxRecursionDepth == 0 {
|
|
ins.MaxRecursionDepth = 100
|
|
}
|
|
buildInfo := mtail.BuildInfo{
|
|
Version: config.Version,
|
|
}
|
|
loc, err := time.LoadLocation(ins.OverrideTimeZone)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Couldn't parse timezone %q: %s", ins.OverrideTimeZone, err)
|
|
return err
|
|
}
|
|
|
|
opts := []mtail.Option{
|
|
mtail.ProgramPath(ins.Progs),
|
|
mtail.LogPathPatterns(ins.Logs...),
|
|
mtail.IgnoreRegexPattern(ins.IgnoreFileRegPattern),
|
|
mtail.SetBuildInfo(buildInfo),
|
|
mtail.OverrideLocation(loc),
|
|
mtail.MetricPushInterval(ins.MetricPushInterval), // keep it here ?
|
|
mtail.MaxRegexpLength(ins.MaxRegexpLen),
|
|
mtail.MaxRecursionDepth(ins.MaxRecursionDepth),
|
|
mtail.LogRuntimeErrors,
|
|
}
|
|
if ins.cancel != nil {
|
|
ins.cancel()
|
|
} else {
|
|
ins.ctx, ins.cancel = context.WithCancel(context.Background())
|
|
}
|
|
staleLogGcWaker := waker.NewTimed(ins.ctx, time.Hour)
|
|
opts = append(opts, mtail.StaleLogGcWaker(staleLogGcWaker))
|
|
|
|
if ins.PollInterval > 0 {
|
|
logStreamPollWaker := waker.NewTimed(ins.ctx, ins.PollInterval)
|
|
logPatternPollWaker := waker.NewTimed(ins.ctx, ins.PollLogInterval)
|
|
opts = append(opts, mtail.LogPatternPollWaker(logPatternPollWaker), mtail.LogstreamPollWaker(logStreamPollWaker))
|
|
}
|
|
if ins.sysLogUseCurrentYear {
|
|
opts = append(opts, mtail.SyslogUseCurrentYear)
|
|
}
|
|
if !ins.emitProgLabel {
|
|
opts = append(opts, mtail.OmitProgLabel)
|
|
}
|
|
if ins.emitMetricTimestamp {
|
|
opts = append(opts, mtail.EmitMetricTimestamp)
|
|
}
|
|
|
|
store := metrics.NewStore()
|
|
store.StartGcLoop(ins.ctx, time.Hour)
|
|
|
|
m, err := mtail.New(ins.ctx, store, opts...)
|
|
if err != nil {
|
|
log.Println(err)
|
|
ins.cancel()
|
|
return err
|
|
}
|
|
ins.m = m
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ins *Instance) Drop() {
|
|
ins.cancel()
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add(inputName, func() inputs.Input {
|
|
return &MTail{}
|
|
})
|
|
}
|
|
|
|
func (s *MTail) Clone() inputs.Input {
|
|
return &MTail{}
|
|
}
|
|
|
|
func (s *MTail) Name() string {
|
|
return inputName
|
|
}
|
|
|
|
func (s *MTail) GetInstances() []inputs.Instance {
|
|
ret := make([]inputs.Instance, len(s.Instances))
|
|
for i := 0; i < len(s.Instances); i++ {
|
|
ret[i] = s.Instances[i]
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// Description returns a one-sentence description on the input.
|
|
func (s *MTail) Description() string {
|
|
return description
|
|
}
|
|
|
|
// Gather retrieves all the configured fields and tables.
|
|
// Any error encountered does not halt the process. The errors are accumulated
|
|
// and returned at the end.
|
|
// func (s *Instance) Gather(acc telegraf.Accumulator) error {
|
|
func (ins *Instance) Gather(slist *types.SampleList) {
|
|
reg := ins.m.GetRegistry()
|
|
mfs, done, err := prometheus.ToTransactionalGatherer(reg).Gather()
|
|
if err != nil {
|
|
log.Println(err)
|
|
return
|
|
}
|
|
defer done()
|
|
for _, mf := range mfs {
|
|
metricName := mf.GetName()
|
|
for _, m := range mf.Metric {
|
|
tags := util.MakeLabels(m, ins.GetLabels())
|
|
|
|
if mf.GetType() == dto.MetricType_SUMMARY {
|
|
util.HandleSummary(inputName, m, tags, metricName, ins.GetLogMetricTime, slist)
|
|
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
|
|
util.HandleHistogram(inputName, m, tags, metricName, ins.GetLogMetricTime, slist)
|
|
} else {
|
|
util.HandleGaugeCounter(inputName, m, tags, metricName, ins.GetLogMetricTime, slist)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Instance) GetLogMetricTime(ts int64) time.Time {
|
|
var tm time.Time
|
|
if ts <= 0 || !p.emitMetricTimestamp {
|
|
return tm
|
|
}
|
|
sec := ts / 1000
|
|
ms := ts % 1000 * 1e6
|
|
tm = time.Unix(sec, ms)
|
|
return tm
|
|
}
|