categraf/inputs/mtail/internal/tailer/tail.go

414 lines
11 KiB
Go

// Copyright 2011 Google Inc. All Rights Reserved.
// This file is available under the Apache license.
// Package tailer provides a class that is responsible for tailing log files
// and extracting new log lines to be passed into the virtual machines.
package tailer
import (
"context"
"errors"
"expvar"
"fmt"
"log"
"net/url"
"os"
"path/filepath"
"regexp"
"sync"
"time"
"flashcat.cloud/categraf/inputs/mtail/internal/logline"
"flashcat.cloud/categraf/inputs/mtail/internal/tailer/logstream"
"flashcat.cloud/categraf/inputs/mtail/internal/waker"
// "github.com/golang/glog"
)
// logCount records the number of logs that are being tailed.
var logCount = expvar.NewInt("log_count")
// Tailer polls the filesystem for log sources that match given
// `LogPathPatterns` and creates `LogStream`s to tail them.
type Tailer struct {
ctx context.Context
wg sync.WaitGroup // Wait for our subroutines to finish
lines chan<- *logline.LogLine
globPatternsMu sync.RWMutex // protects `globPatterns'
globPatterns map[string]struct{} // glob patterns to match newly created logs in dir paths against
ignoreRegexPattern *regexp.Regexp
socketPaths []string
oneShot bool
pollMu sync.Mutex // protects Poll()
logstreamPollWaker waker.Waker // Used for waking idle logstreams
logstreamsMu sync.RWMutex // protects `logstreams`.
logstreams map[string]logstream.LogStream // Map absolte pathname to logstream reading that pathname.
initDone chan struct{}
}
// Option configures a new Tailer.
type Option interface {
apply(*Tailer) error
}
type niladicOption struct {
applyfunc func(*Tailer) error
}
func (n *niladicOption) apply(t *Tailer) error {
return n.applyfunc(t)
}
// OneShot puts the tailer in one-shot mode, where sources are read once from the start and then closed.
var OneShot = &niladicOption{func(t *Tailer) error { t.oneShot = true; return nil }}
// LogPatterns sets the glob patterns to use to match pathnames.
type LogPatterns []string
func (opt LogPatterns) apply(t *Tailer) error {
for _, p := range opt {
if err := t.AddPattern(p); err != nil {
return err
}
}
return nil
}
// IgnoreRegex sets the regular expression to use to filter away pathnames that match the LogPatterns glob.
type IgnoreRegex string
func (opt IgnoreRegex) apply(t *Tailer) error {
return t.SetIgnorePattern(string(opt))
}
// StaleLogGcWaker triggers garbage collection runs for stale logs in the tailer.
func StaleLogGcWaker(w waker.Waker) Option {
return &staleLogGcWaker{w}
}
type staleLogGcWaker struct {
waker.Waker
}
func (opt staleLogGcWaker) apply(t *Tailer) error {
t.StartStaleLogstreamExpirationLoop(opt.Waker)
return nil
}
// LogPatternPollWaker triggers polls on the filesystem for new logs that match the log glob patterns.
func LogPatternPollWaker(w waker.Waker) Option {
return &logPatternPollWaker{w}
}
type logPatternPollWaker struct {
waker.Waker
}
func (opt logPatternPollWaker) apply(t *Tailer) error {
t.StartLogPatternPollLoop(opt.Waker)
return nil
}
// LogstreamPollWaker wakes idle logstreams.
func LogstreamPollWaker(w waker.Waker) Option {
return &logstreamPollWaker{w}
}
type logstreamPollWaker struct {
waker.Waker
}
func (opt logstreamPollWaker) apply(t *Tailer) error {
t.logstreamPollWaker = opt.Waker
return nil
}
var ErrNoLinesChannel = errors.New("Tailer needs a lines channel")
// New creates a new Tailer.
func New(ctx context.Context, wg *sync.WaitGroup, lines chan<- *logline.LogLine, options ...Option) (*Tailer, error) {
if lines == nil {
return nil, ErrNoLinesChannel
}
t := &Tailer{
ctx: ctx,
lines: lines,
initDone: make(chan struct{}),
globPatterns: make(map[string]struct{}),
logstreams: make(map[string]logstream.LogStream),
}
defer close(t.initDone)
if err := t.SetOption(options...); err != nil {
return nil, err
}
if len(t.globPatterns) == 0 && len(t.socketPaths) == 0 {
log.Println("No patterns or sockets to tail, tailer done.")
close(t.lines)
return t, nil
}
// Set up listeners on every socket.
for _, pattern := range t.socketPaths {
if err := t.TailPath(pattern); err != nil {
return nil, err
}
}
// Guarantee all existing logs get tailed before we leave. Also necessary
// in case oneshot mode is active, the logs get read!
if err := t.PollLogPatterns(); err != nil {
return nil, err
}
// Setup for shutdown, once all routines are finished.
wg.Add(1)
go func() {
defer wg.Done()
<-t.initDone
// We need to wait for context.Done() before we wait for the subbies
// because we don't know how many are running at any point -- as soon
// as t.wg.Wait begins the number of waited-on goroutines is fixed, and
// we may end up leaking a LogStream goroutine and it'll try to send on
// a closed channel as a result. But in tests and oneshot, we want to
// make sure the whole log gets read so we can't wait on context.Done
// here.
if !t.oneShot {
<-t.ctx.Done()
}
t.wg.Wait()
close(t.lines)
}()
return t, nil
}
var ErrNilOption = errors.New("nil option supplied")
// SetOption takes one or more option functions and applies them in order to Tailer.
func (t *Tailer) SetOption(options ...Option) error {
for _, option := range options {
if option == nil {
return ErrNilOption
}
if err := option.apply(t); err != nil {
return err
}
}
return nil
}
var ErrUnsupportedURLScheme = errors.New("unsupported URL scheme")
// AddPattern adds a pattern to the list of patterns to filter filenames against.
func (t *Tailer) AddPattern(pattern string) error {
u, err := url.Parse(pattern)
if err != nil {
return err
}
path := pattern
switch u.Scheme {
default:
// glog.V(2).Infof("%v: %q in path pattern %q, treating as path", ErrUnsupportedURLScheme, u.Scheme, pattern)
case "unix", "unixgram", "tcp", "udp":
// Keep the scheme.
// glog.V(2).Infof("AddPattern: socket %q", pattern)
t.socketPaths = append(t.socketPaths, pattern)
return nil
case "", "file":
path = u.Path
}
absPath, err := filepath.Abs(path)
if err != nil {
// glog.V(2).Infof("Couldn't canonicalize path %q: %s", u.Path, err)
return err
}
// glog.V(2).Infof("AddPattern: file %q", absPath)
t.globPatternsMu.Lock()
t.globPatterns[absPath] = struct{}{}
t.globPatternsMu.Unlock()
return nil
}
func (t *Tailer) Ignore(pathname string) bool {
absPath, err := filepath.Abs(pathname)
if err != nil {
// glog.V(2).Infof("Couldn't get absolute path for %q: %s", pathname, err)
return true
}
fi, err := os.Stat(absPath)
if err != nil {
// glog.V(2).Infof("Couldn't stat path %q: %s", pathname, err)
return true
}
if fi.Mode().IsDir() {
// glog.V(2).Infof("ignore path %q because it is a folder", pathname)
return true
}
return t.ignoreRegexPattern != nil && t.ignoreRegexPattern.MatchString(fi.Name())
}
func (t *Tailer) SetIgnorePattern(pattern string) error {
if len(pattern) == 0 {
return nil
}
// glog.V(2).Infof("Set filename ignore regex pattern %q", pattern)
ignoreRegexPattern, err := regexp.Compile(pattern)
if err != nil {
// glog.V(2).Infof("Couldn't compile regex %q: %s", pattern, err)
fmt.Printf("error: %v\n", err)
return err
}
t.ignoreRegexPattern = ignoreRegexPattern
return nil
}
// TailPath registers a filesystem pathname to be tailed.
func (t *Tailer) TailPath(pathname string) error {
t.logstreamsMu.Lock()
defer t.logstreamsMu.Unlock()
if l, ok := t.logstreams[pathname]; ok {
if !l.IsComplete() {
// glog.V(2).Infof("already got a logstream on %q", pathname)
return nil
}
logCount.Add(-1) // Removing the current entry before re-adding.
// glog.V(2).Infof("Existing logstream is finished, creating a new one.")
}
l, err := logstream.New(t.ctx, &t.wg, t.logstreamPollWaker, pathname, t.lines, t.oneShot)
if err != nil {
return err
}
if t.oneShot {
// glog.V(2).Infof("Starting oneshot read at startup of %q", pathname)
l.Stop()
}
t.logstreams[pathname] = l
log.Printf("Tailing %s", pathname)
logCount.Add(1)
return nil
}
// ExpireStaleLogstreams removes logstreams that have had no reads for 1h or more.
func (t *Tailer) ExpireStaleLogstreams() error {
t.logstreamsMu.Lock()
defer t.logstreamsMu.Unlock()
for _, v := range t.logstreams {
if time.Since(v.LastReadTime()) > (time.Hour * 24) {
v.Stop()
}
}
return nil
}
// StartStaleLogstreamExpirationLoop runs a permanent goroutine to expire stale logstreams.
func (t *Tailer) StartStaleLogstreamExpirationLoop(waker waker.Waker) {
if waker == nil {
log.Printf("Log handle expiration disabled")
return
}
t.wg.Add(1)
go func() {
defer t.wg.Done()
<-t.initDone
if t.oneShot {
log.Println("No gc loop in oneshot mode.")
return
}
// glog.Infof("Starting log handle expiry loop every %s", duration.String())
for {
select {
case <-t.ctx.Done():
return
case <-waker.Wake():
if err := t.ExpireStaleLogstreams(); err != nil {
log.Println(err)
}
}
}
}()
}
// StartLogPatternPollLoop runs a permanent goroutine to poll for new log files.
func (t *Tailer) StartLogPatternPollLoop(waker waker.Waker) {
if waker == nil {
log.Println("Log pattern polling disabled")
return
}
t.wg.Add(1)
go func() {
defer t.wg.Done()
<-t.initDone
if t.oneShot {
log.Println("No polling loop in oneshot mode.")
return
}
// glog.Infof("Starting log pattern poll loop every %s", duration.String())
for {
select {
case <-t.ctx.Done():
return
case <-waker.Wake():
if err := t.Poll(); err != nil {
log.Println(err)
}
}
}
}()
}
func (t *Tailer) PollLogPatterns() error {
t.globPatternsMu.RLock()
defer t.globPatternsMu.RUnlock()
for pattern := range t.globPatterns {
matches, err := filepath.Glob(pattern)
if err != nil {
return err
}
// log.Printf("glob matches: %v", matches)
for _, pathname := range matches {
if t.Ignore(pathname) {
continue
}
absPath, err := filepath.Abs(pathname)
if err != nil {
// glog.V(2).Infof("Couldn't get absolute path for %q: %s", pathname, err)
continue
}
// glog.V(2).Infof("watched path is %q", absPath)
if err := t.TailPath(absPath); err != nil {
log.Println(err)
}
}
}
return nil
}
// PollLogStreamsForCompletion looks at the existing paths and checks if they're already
// complete, removing it from the map if so.
func (t *Tailer) PollLogStreamsForCompletion() error {
t.logstreamsMu.Lock()
defer t.logstreamsMu.Unlock()
for name, l := range t.logstreams {
if l.IsComplete() {
log.Printf("%s is complete", name)
delete(t.logstreams, name)
logCount.Add(-1)
continue
}
}
return nil
}
func (t *Tailer) Poll() error {
t.pollMu.Lock()
defer t.pollMu.Unlock()
for _, f := range []func() error{t.PollLogPatterns, t.PollLogStreamsForCompletion} {
if err := f(); err != nil {
return err
}
}
return nil
}