forked from flashcat/categraf
272 lines
7.1 KiB
Go
272 lines
7.1 KiB
Go
// Copyright 2011 Google Inc. All Rights Reserved.
|
|
// This file is available under the Apache license.
|
|
|
|
package tailer
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"testing"
|
|
|
|
"flashcat.cloud/categraf/inputs/mtail/internal/logline"
|
|
"flashcat.cloud/categraf/inputs/mtail/internal/testutil"
|
|
"flashcat.cloud/categraf/inputs/mtail/internal/waker"
|
|
)
|
|
|
|
func makeTestTail(t *testing.T, options ...Option) (*Tailer, chan *logline.LogLine, func(int), string, func()) {
|
|
t.Helper()
|
|
tmpDir := testutil.TestTempDir(t)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
lines := make(chan *logline.LogLine, 5) // 5 loglines ought to be enough for any test
|
|
var wg sync.WaitGroup
|
|
waker, awaken := waker.NewTest(ctx, 1)
|
|
options = append(options, LogPatterns([]string{tmpDir}), LogstreamPollWaker(waker))
|
|
ta, err := New(ctx, &wg, lines, options...)
|
|
testutil.FatalIfErr(t, err)
|
|
return ta, lines, awaken, tmpDir, func() { cancel(); wg.Wait() }
|
|
}
|
|
|
|
func TestTail(t *testing.T) {
|
|
ta, _, _, dir, stop := makeTestTail(t)
|
|
|
|
logfile := filepath.Join(dir, "log")
|
|
f := testutil.TestOpenFile(t, logfile)
|
|
defer f.Close()
|
|
|
|
err := ta.TailPath(logfile)
|
|
testutil.FatalIfErr(t, err)
|
|
|
|
if _, ok := ta.logstreams[logfile]; !ok {
|
|
t.Errorf("path not found in files map: %+#v", ta.logstreams)
|
|
}
|
|
|
|
stop()
|
|
}
|
|
|
|
func TestHandleLogUpdate(t *testing.T) {
|
|
ta, lines, awaken, dir, stop := makeTestTail(t)
|
|
|
|
logfile := filepath.Join(dir, "log")
|
|
f := testutil.TestOpenFile(t, logfile)
|
|
defer f.Close()
|
|
|
|
testutil.FatalIfErr(t, ta.TailPath(logfile))
|
|
awaken(1)
|
|
|
|
testutil.WriteString(t, f, "a\nb\nc\nd\n")
|
|
awaken(1)
|
|
|
|
stop()
|
|
|
|
received := testutil.LinesReceived(lines)
|
|
expected := []*logline.LogLine{
|
|
{context.Background(), logfile, "a"},
|
|
{context.Background(), logfile, "b"},
|
|
{context.Background(), logfile, "c"},
|
|
{context.Background(), logfile, "d"},
|
|
}
|
|
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))
|
|
}
|
|
|
|
// TestHandleLogTruncate writes to a file, waits for those
|
|
// writes to be seen, then truncates the file and writes some more.
|
|
// At the end all lines written must be reported by the tailer.
|
|
func TestHandleLogTruncate(t *testing.T) {
|
|
ta, lines, awaken, dir, stop := makeTestTail(t)
|
|
|
|
logfile := filepath.Join(dir, "log")
|
|
f := testutil.OpenLogFile(t, logfile)
|
|
defer f.Close()
|
|
|
|
testutil.FatalIfErr(t, ta.TailPath(logfile))
|
|
// Expect to wake 1 wakee, the logstream reading `logfile`.
|
|
awaken(1)
|
|
|
|
testutil.WriteString(t, f, "a\nb\nc\n")
|
|
awaken(1)
|
|
|
|
if err := f.Truncate(0); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// "File.Truncate" does not change the file offset, force a seek to start.
|
|
_, err := f.Seek(0, 0)
|
|
testutil.FatalIfErr(t, err)
|
|
awaken(1)
|
|
|
|
testutil.WriteString(t, f, "d\ne\n")
|
|
awaken(1)
|
|
|
|
stop()
|
|
|
|
received := testutil.LinesReceived(lines)
|
|
expected := []*logline.LogLine{
|
|
{context.Background(), logfile, "a"},
|
|
{context.Background(), logfile, "b"},
|
|
{context.Background(), logfile, "c"},
|
|
{context.Background(), logfile, "d"},
|
|
{context.Background(), logfile, "e"},
|
|
}
|
|
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))
|
|
}
|
|
|
|
func TestHandleLogUpdatePartialLine(t *testing.T) {
|
|
ta, lines, awaken, dir, stop := makeTestTail(t)
|
|
|
|
logfile := filepath.Join(dir, "log")
|
|
f := testutil.TestOpenFile(t, logfile)
|
|
defer f.Close()
|
|
|
|
testutil.FatalIfErr(t, ta.TailPath(logfile))
|
|
awaken(1) // ensure we've hit an EOF before writing starts
|
|
|
|
testutil.WriteString(t, f, "a")
|
|
awaken(1)
|
|
|
|
testutil.WriteString(t, f, "b")
|
|
awaken(1)
|
|
|
|
testutil.WriteString(t, f, "\n")
|
|
awaken(1)
|
|
|
|
stop()
|
|
|
|
received := testutil.LinesReceived(lines)
|
|
expected := []*logline.LogLine{
|
|
{context.Background(), logfile, "ab"},
|
|
}
|
|
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))
|
|
}
|
|
|
|
func TestTailerUnreadableFile(t *testing.T) {
|
|
// Test broken files are skipped.
|
|
ta, lines, awaken, dir, stop := makeTestTail(t)
|
|
|
|
brokenfile := filepath.Join(dir, "brokenlog")
|
|
logfile := filepath.Join(dir, "log")
|
|
testutil.FatalIfErr(t, ta.AddPattern(brokenfile))
|
|
testutil.FatalIfErr(t, ta.AddPattern(logfile))
|
|
|
|
log.Println("create logs")
|
|
testutil.FatalIfErr(t, os.Symlink("/nonexistent", brokenfile))
|
|
f := testutil.TestOpenFile(t, logfile)
|
|
defer f.Close()
|
|
|
|
testutil.FatalIfErr(t, ta.PollLogPatterns())
|
|
testutil.FatalIfErr(t, ta.PollLogStreamsForCompletion())
|
|
|
|
log.Println("write string")
|
|
testutil.WriteString(t, f, "\n")
|
|
awaken(1)
|
|
|
|
stop()
|
|
|
|
received := testutil.LinesReceived(lines)
|
|
expected := []*logline.LogLine{
|
|
{context.Background(), logfile, ""},
|
|
}
|
|
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))
|
|
}
|
|
|
|
func TestTailerInitErrors(t *testing.T) {
|
|
var wg sync.WaitGroup
|
|
_, err := New(context.TODO(), &wg, nil)
|
|
if err == nil {
|
|
t.Error("expected error")
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
_, err = New(ctx, &wg, nil, nil)
|
|
if err == nil {
|
|
t.Error("expected error")
|
|
}
|
|
lines := make(chan *logline.LogLine, 1)
|
|
_, err = New(ctx, &wg, lines, nil)
|
|
if err == nil {
|
|
t.Error("expected error")
|
|
}
|
|
cancel()
|
|
wg.Wait()
|
|
lines = make(chan *logline.LogLine, 1)
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
_, err = New(ctx, &wg, lines)
|
|
if err != nil {
|
|
t.Errorf("unexpected error %s", err)
|
|
}
|
|
cancel()
|
|
wg.Wait()
|
|
lines = make(chan *logline.LogLine, 1)
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
_, err = New(ctx, &wg, lines, OneShot)
|
|
if err != nil {
|
|
t.Errorf("unexpected error %s", err)
|
|
}
|
|
cancel()
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestTailExpireStaleHandles(t *testing.T) {
|
|
t.Skip("need to set lastRead on logstream to inject condition")
|
|
ta, lines, awaken, dir, stop := makeTestTail(t)
|
|
|
|
log1 := filepath.Join(dir, "log1")
|
|
f1 := testutil.TestOpenFile(t, log1)
|
|
log2 := filepath.Join(dir, "log2")
|
|
f2 := testutil.TestOpenFile(t, log2)
|
|
|
|
if err := ta.TailPath(log1); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if err := ta.TailPath(log2); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
testutil.WriteString(t, f1, "1\n")
|
|
testutil.WriteString(t, f2, "2\n")
|
|
|
|
awaken(1)
|
|
|
|
stop()
|
|
|
|
received := testutil.LinesReceived(lines)
|
|
expected := []*logline.LogLine{
|
|
{context.Background(), log1, "1"},
|
|
{context.Background(), log2, "2"},
|
|
}
|
|
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))
|
|
|
|
if err := ta.ExpireStaleLogstreams(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
ta.logstreamsMu.RLock()
|
|
if len(ta.logstreams) != 2 {
|
|
t.Errorf("expecting 2 handles, got %v", ta.logstreams)
|
|
}
|
|
ta.logstreamsMu.RUnlock()
|
|
// ta.logstreamsMu.Lock()
|
|
// ta.logstreams[log1].(*File).lastRead = time.Now().Add(-time.Hour*24 + time.Minute)
|
|
// ta.logstreamsMu.Unlock()
|
|
if err := ta.ExpireStaleLogstreams(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
ta.logstreamsMu.RLock()
|
|
if len(ta.logstreams) != 2 {
|
|
t.Errorf("expecting 2 handles, got %v", ta.logstreams)
|
|
}
|
|
ta.logstreamsMu.RUnlock()
|
|
// ta.logstreamsMu.Lock()
|
|
// ta.logstreams[log1].(*File).lastRead = time.Now().Add(-time.Hour*24 - time.Minute)
|
|
// ta.logstreamsMu.Unlock()
|
|
if err := ta.ExpireStaleLogstreams(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
ta.logstreamsMu.RLock()
|
|
if len(ta.logstreams) != 1 {
|
|
t.Errorf("expecting 1 logstreams, got %v", ta.logstreams)
|
|
}
|
|
ta.logstreamsMu.RUnlock()
|
|
log.Println("good")
|
|
}
|