forked from flashcat/categraf
140 lines
3.8 KiB
Go
140 lines
3.8 KiB
Go
//go:build !no_logs
|
|
|
|
// Unless explicitly stated otherwise all files in this repository are licensed
|
|
// under the Apache License Version 2.0.
|
|
// This product includes software developed at Datadog (https://www.datadoghq.com/).
|
|
// Copyright 2016-present Datadog, Inc.
|
|
|
|
package docker
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"log"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/filters"
|
|
)
|
|
|
|
// // eventStreamState logic unit tested in event_stream_test.go
|
|
// // DockerUtil logic covered by the listeners/docker and dogstatsd/origin_detection integration tests.
|
|
const eventSendBuffer = 5
|
|
|
|
// SubscribeToContainerEvents allows a package to subscribe to events from the event stream.
|
|
// A unique subscriber name should be provided.
|
|
func (d *DockerUtil) SubscribeToContainerEvents(name string) (<-chan *ContainerEvent, <-chan error, error) {
|
|
sub, err := d.eventState.subscribe(name)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
go d.dispatchEvents(sub)
|
|
return sub.eventChan, sub.errorChan, err
|
|
}
|
|
|
|
func (e *eventStreamState) subscribe(name string) (*eventSubscriber, error) {
|
|
e.RLock()
|
|
if _, found := e.subscribers[name]; found {
|
|
e.RUnlock()
|
|
return nil, ErrAlreadySubscribed
|
|
}
|
|
e.RUnlock()
|
|
|
|
sub := &eventSubscriber{
|
|
name: name,
|
|
eventChan: make(chan *ContainerEvent, eventSendBuffer),
|
|
errorChan: make(chan error, 1), // TODO: remove errorChan once design is stable
|
|
cancelChan: make(chan struct{}),
|
|
}
|
|
e.Lock()
|
|
e.subscribers[name] = sub
|
|
e.Unlock()
|
|
|
|
return sub, nil
|
|
}
|
|
|
|
// UnsubscribeFromContainerEvents allows a package to unsubscribe.
|
|
// The call is blocking until the request is processed.
|
|
func (d *DockerUtil) UnsubscribeFromContainerEvents(name string) error {
|
|
return d.eventState.unsubscribe(name)
|
|
}
|
|
|
|
func (e *eventStreamState) unsubscribe(name string) error {
|
|
e.Lock()
|
|
defer e.Unlock()
|
|
|
|
sub, found := e.subscribers[name]
|
|
if !found {
|
|
return ErrNotSubscribed
|
|
}
|
|
|
|
// Stop dispatch and remove subscriber
|
|
close(sub.cancelChan)
|
|
delete(e.subscribers, name)
|
|
return nil
|
|
}
|
|
|
|
func (d *DockerUtil) dispatchEvents(sub *eventSubscriber) {
|
|
fltrs := filters.NewArgs()
|
|
fltrs.Add("type", "container")
|
|
fltrs.Add("event", "start")
|
|
fltrs.Add("event", "die")
|
|
fltrs.Add("event", "died")
|
|
fltrs.Add("event", "rename")
|
|
|
|
// On initial subscribe, don't go back in time. On reconnect, we'll
|
|
// resume at the latest timestamp we got.
|
|
latestTimestamp := time.Now().Unix()
|
|
var cancelFunc context.CancelFunc
|
|
|
|
CONNECT: // Outer loop handles re-connecting in case the docker daemon closes the connection
|
|
for {
|
|
eventOptions := types.EventsOptions{
|
|
Since: strconv.FormatInt(latestTimestamp, 10),
|
|
Filters: fltrs,
|
|
}
|
|
|
|
var ctx context.Context
|
|
ctx, cancelFunc = context.WithCancel(context.Background())
|
|
messages, errs := d.cli.Events(ctx, eventOptions)
|
|
|
|
// Inner loop iterates over elements in the channel
|
|
for {
|
|
select {
|
|
case <-sub.cancelChan:
|
|
break CONNECT
|
|
case err := <-errs:
|
|
if err == io.EOF {
|
|
// Silently ignore io.EOF that happens on http connection reset
|
|
log.Println("D! Got EOF, re-connecting")
|
|
} else {
|
|
// Else, let's wait 10 seconds and try reconnecting
|
|
log.Println("W! Got error from docker, waiting for 10 seconds: %s", err)
|
|
time.Sleep(10 * time.Second)
|
|
}
|
|
cancelFunc()
|
|
continue CONNECT // Re-connect to docker
|
|
case msg := <-messages:
|
|
latestTimestamp = msg.Time
|
|
event, err := d.processContainerEvent(ctx, msg)
|
|
if err != nil {
|
|
log.Println("D! Skipping event: %s", err)
|
|
continue
|
|
}
|
|
if event == nil {
|
|
continue
|
|
}
|
|
// Block if the buffered channel is full, pausing the http
|
|
// stream. If docker closes because of client timeout, we
|
|
// will reconnect later and stream from latestTimestamp.
|
|
sub.eventChan <- event
|
|
}
|
|
}
|
|
}
|
|
cancelFunc()
|
|
close(sub.errorChan)
|
|
close(sub.eventChan)
|
|
}
|