categraf/inputs/mtail/internal/tailer/logstream/logstream.go

95 lines
3.5 KiB
Go

// Copyright 2020 Google Inc. All Rights Reserved.
// This file is available under the Apache license.
// Package logstream provides an interface and implementations of log source
// streaming. Each log streaming implementation provides an abstraction that
// makes one pathname look like one perpetual source of logs, even though the
// underlying file objects might be truncated or rotated, or in the case of
// pipes have different open/close semantics.
package logstream
import (
"context"
"errors"
"expvar"
"fmt"
"net/url"
"os"
"sync"
"time"
"flashcat.cloud/categraf/inputs/mtail/internal/logline"
"flashcat.cloud/categraf/inputs/mtail/internal/waker"
// "github.com/golang/glog"
)
var (
// logErrors counts the IO errors encountered per log.
logErrors = expvar.NewMap("log_errors_total")
// logOpens counts the opens of new log file descriptors/sockets.
logOpens = expvar.NewMap("log_opens_total")
// logCloses counts the closes of old log file descriptors/sockets.
logCloses = expvar.NewMap("log_closes_total")
)
// LogStream.
type LogStream interface {
LastReadTime() time.Time // Return the time when the last log line was read from the source
Stop() // Ask to gracefully stop the stream; e.g. stream keeps reading until EOF and then completes work.
IsComplete() bool // True if the logstream has completed work and cannot recover. The caller should clean up this logstream, creating a new logstream on a pathname if necessary.
}
// defaultReadBufferSize the size of the buffer for reading bytes into.
const defaultReadBufferSize = 4096
var (
ErrUnsupportedURLScheme = errors.New("unsupported URL scheme")
ErrUnsupportedFileType = errors.New("unsupported file type")
ErrEmptySocketAddress = errors.New("socket address cannot be empty, please provide a unix domain socket filename or host:port")
)
// New creates a LogStream from the file object located at the absolute path
// `pathname`. The LogStream will watch `ctx` for a cancellation signal, and
// notify the `wg` when it is Done. Log lines will be sent to the `lines`
// channel. `seekToStart` is only used for testing and only works for regular
// files that can be seeked.
func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, lines chan<- *logline.LogLine, oneShot bool) (LogStream, error) {
u, err := url.Parse(pathname)
if err != nil {
return nil, err
}
// glog.Infof("Parsed url as %v", u)
path := pathname
switch u.Scheme {
default:
// glog.V(2).Infof("%v: %q in path pattern %q, treating as path", ErrUnsupportedURLScheme, u.Scheme, pathname)
case "unixgram":
return newDgramStream(ctx, wg, waker, u.Scheme, u.Path, lines)
case "unix":
return newSocketStream(ctx, wg, waker, u.Scheme, u.Path, lines, oneShot)
case "tcp":
return newSocketStream(ctx, wg, waker, u.Scheme, u.Host, lines, oneShot)
case "udp":
return newDgramStream(ctx, wg, waker, u.Scheme, u.Host, lines)
case "", "file":
path = u.Path
}
fi, err := os.Stat(path)
if err != nil {
logErrors.Add(path, 1)
return nil, err
}
switch m := fi.Mode(); {
case m.IsRegular():
return newFileStream(ctx, wg, waker, path, fi, lines, oneShot)
case m&os.ModeType == os.ModeNamedPipe:
return newPipeStream(ctx, wg, waker, path, fi, lines)
// TODO(jaq): in order to listen on an existing socket filepath, we must unlink and recreate it
// case m&os.ModeType == os.ModeSocket:
// return newSocketStream(ctx, wg, waker, pathname, lines)
default:
return nil, fmt.Errorf("%w: %q", ErrUnsupportedFileType, pathname)
}
}