categraf/pkg/otel/pipelines/pipelines.go

538 lines
20 KiB
Go

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipelines // import "go.opentelemetry.io/collector/service/pkg/pipelines"
import (
"context"
"fmt"
"net/http"
"sort"
"go.uber.org/multierr"
"go.uber.org/zap"
"flashcat.cloud/categraf/pkg/otel/components"
"flashcat.cloud/categraf/pkg/otel/fanoutconsumer"
"flashcat.cloud/categraf/pkg/otel/zpages"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
)
const (
zPipelineName = "zpipelinename"
zComponentName = "zcomponentname"
zComponentKind = "zcomponentkind"
)
// baseConsumer redeclared here since not public in consumer package. May consider to make that public.
type baseConsumer interface {
Capabilities() consumer.Capabilities
}
type builtComponent struct {
id config.ComponentID
comp component.Component
}
type builtPipeline struct {
lastConsumer baseConsumer
receivers []builtComponent
processors []builtComponent
exporters []builtComponent
}
// Pipelines is set of all pipelines created from exporter configs.
type Pipelines struct {
telemetry component.TelemetrySettings
allReceivers map[config.DataType]map[config.ComponentID]component.Receiver
allExporters map[config.DataType]map[config.ComponentID]component.Exporter
pipelines map[config.ComponentID]*builtPipeline
}
// StartAll starts all pipelines.
//
// Start with exporters, processors (in revers configured order), then receivers.
// This is important so that components that are earlier in the pipeline and reference components that are
// later in the pipeline do not start sending data to later components which are not yet started.
func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error {
bps.telemetry.Logger.Info("Starting exporters...")
for dt, expByID := range bps.allExporters {
for expID, exp := range expByID {
expLogger := exporterLogger(bps.telemetry.Logger, expID, dt)
expLogger.Info("Exporter is starting...")
if err := exp.Start(ctx, components.NewHostWrapper(host, expLogger)); err != nil {
return err
}
expLogger.Info("Exporter started.")
}
}
bps.telemetry.Logger.Info("Starting processors...")
for pipelineID, bp := range bps.pipelines {
for i := len(bp.processors) - 1; i >= 0; i-- {
procLogger := processorLogger(bps.telemetry.Logger, bp.processors[i].id, pipelineID)
procLogger.Info("Processor is starting...")
if err := bp.processors[i].comp.Start(ctx, components.NewHostWrapper(host, procLogger)); err != nil {
return err
}
procLogger.Info("Processor started.")
}
}
bps.telemetry.Logger.Info("Starting receivers...")
for dt, recvByID := range bps.allReceivers {
for recvID, recv := range recvByID {
recvLogger := receiverLogger(bps.telemetry.Logger, recvID, dt)
recvLogger.Info("Exporter is starting...")
if err := recv.Start(ctx, components.NewHostWrapper(host, recvLogger)); err != nil {
return err
}
recvLogger.Info("Exporter started.")
}
}
return nil
}
// ShutdownAll stops all pipelines.
//
// Shutdown order is the reverse of starting: receivers, processors, then exporters.
// This gives senders a chance to send all their data to a not "shutdown" component.
func (bps *Pipelines) ShutdownAll(ctx context.Context) error {
var errs error
bps.telemetry.Logger.Info("Stopping receivers...")
for _, recvByID := range bps.allReceivers {
for _, recv := range recvByID {
errs = multierr.Append(errs, recv.Shutdown(ctx))
}
}
bps.telemetry.Logger.Info("Stopping processors...")
for _, bp := range bps.pipelines {
for _, p := range bp.processors {
errs = multierr.Append(errs, p.comp.Shutdown(ctx))
}
}
bps.telemetry.Logger.Info("Stopping exporters...")
for _, expByID := range bps.allExporters {
for _, exp := range expByID {
errs = multierr.Append(errs, exp.Shutdown(ctx))
}
}
return errs
}
func (bps *Pipelines) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter {
exportersMap := make(map[config.DataType]map[config.ComponentID]component.Exporter)
exportersMap[config.TracesDataType] = make(map[config.ComponentID]component.Exporter, len(bps.allExporters[config.TracesDataType]))
exportersMap[config.MetricsDataType] = make(map[config.ComponentID]component.Exporter, len(bps.allExporters[config.MetricsDataType]))
exportersMap[config.LogsDataType] = make(map[config.ComponentID]component.Exporter, len(bps.allExporters[config.LogsDataType]))
for dt, expByID := range bps.allExporters {
for expID, exp := range expByID {
exportersMap[dt][expID] = exp
}
}
return exportersMap
}
func (bps *Pipelines) HandleZPages(w http.ResponseWriter, r *http.Request) {
qValues := r.URL.Query()
pipelineName := qValues.Get(zPipelineName)
componentName := qValues.Get(zComponentName)
componentKind := qValues.Get(zComponentKind)
w.Header().Set("Content-Type", "text/html; charset=utf-8")
zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Pipelines"})
zpages.WriteHTMLPipelinesSummaryTable(w, bps.getPipelinesSummaryTableData())
if pipelineName != "" && componentName != "" && componentKind != "" {
fullName := componentName
if componentKind == "processor" {
fullName = pipelineName + "/" + componentName
}
zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{
Name: componentKind + ": " + fullName,
})
// TODO: Add config + status info.
}
zpages.WriteHTMLPageFooter(w)
}
// Build builds all pipelines from config.
func Build(ctx context.Context, settings component.TelemetrySettings, buildInfo component.BuildInfo, cfg *config.Config, factories component.Factories) (*Pipelines, error) {
exps := &Pipelines{
telemetry: settings,
allReceivers: make(map[config.DataType]map[config.ComponentID]component.Receiver),
allExporters: make(map[config.DataType]map[config.ComponentID]component.Exporter),
pipelines: make(map[config.ComponentID]*builtPipeline, len(cfg.Service.Pipelines)),
}
receiversConsumers := make(map[config.DataType]map[config.ComponentID][]baseConsumer)
// Iterate over all pipelines, and create exporters, then processors.
// Receivers cannot be created since we need to know all consumers, a.k.a. we need all pipelines build up to the
// first processor.
for pipelineID, pipeline := range cfg.Service.Pipelines {
// The data type of the pipeline defines what data type each exporter is expected to receive.
if _, ok := exps.allExporters[pipelineID.Type()]; !ok {
exps.allExporters[pipelineID.Type()] = make(map[config.ComponentID]component.Exporter)
}
expByID := exps.allExporters[pipelineID.Type()]
bp := &builtPipeline{
receivers: make([]builtComponent, len(pipeline.Receivers)),
processors: make([]builtComponent, len(pipeline.Processors)),
exporters: make([]builtComponent, len(pipeline.Exporters)),
}
exps.pipelines[pipelineID] = bp
// Iterate over all Exporters for this pipeline.
for i, expID := range pipeline.Exporters {
// If already created an exporter for this [DataType, ComponentID] nothing to do, will reuse this instance.
if exp, ok := expByID[expID]; ok {
bp.exporters[i] = builtComponent{id: expID, comp: exp}
continue
}
exp, err := buildExporter(ctx, settings, buildInfo, cfg.Exporters, factories.Exporters, expID, pipelineID)
if err != nil {
return nil, err
}
bp.exporters[i] = builtComponent{id: expID, comp: exp}
expByID[expID] = exp
}
// Build a fan out consumer to all exporters.
switch pipelineID.Type() {
case config.TracesDataType:
bp.lastConsumer = buildFanOutExportersTracesConsumer(bp.exporters)
case config.MetricsDataType:
bp.lastConsumer = buildFanOutExportersMetricsConsumer(bp.exporters)
case config.LogsDataType:
bp.lastConsumer = buildFanOutExportersLogsConsumer(bp.exporters)
default:
return nil, fmt.Errorf("create fan-out exporter in pipeline %q, data type %q is not supported", pipelineID, pipelineID.Type())
}
mutatesConsumedData := bp.lastConsumer.Capabilities().MutatesData
// Build the processors backwards, starting from the last one.
// The last processor points to fan out consumer to all Exporters, then the processor itself becomes a
// consumer for the one that precedes it in the pipeline and so on.
for i := len(pipeline.Processors) - 1; i >= 0; i-- {
procID := pipeline.Processors[i]
proc, err := buildProcessor(ctx, settings, buildInfo, cfg.Processors, factories.Processors, procID, pipelineID, bp.lastConsumer)
if err != nil {
return nil, err
}
bp.processors[i] = builtComponent{id: procID, comp: proc}
bp.lastConsumer = proc.(baseConsumer)
mutatesConsumedData = mutatesConsumedData || bp.lastConsumer.Capabilities().MutatesData
}
// Some consumers may not correctly implement the Capabilities, and ignore the next consumer when calculated the Capabilities.
// Because of this wrap the first consumer if any consumers in the pipeline mutate the data and the first says that it doesn't.
switch pipelineID.Type() {
case config.TracesDataType:
bp.lastConsumer = capTraces{Traces: bp.lastConsumer.(consumer.Traces), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}}
case config.MetricsDataType:
bp.lastConsumer = capMetrics{Metrics: bp.lastConsumer.(consumer.Metrics), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}}
case config.LogsDataType:
bp.lastConsumer = capLogs{Logs: bp.lastConsumer.(consumer.Logs), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}}
default:
return nil, fmt.Errorf("create cap consumer in pipeline %q, data type %q is not supported", pipelineID, pipelineID.Type())
}
// The data type of the pipeline defines what data type each exporter is expected to receive.
if _, ok := receiversConsumers[pipelineID.Type()]; !ok {
receiversConsumers[pipelineID.Type()] = make(map[config.ComponentID][]baseConsumer)
}
recvConsByID := receiversConsumers[pipelineID.Type()]
// Iterate over all Receivers for this pipeline and just append the lastConsumer as a consumer for the receiver.
for _, recvID := range pipeline.Receivers {
recvConsByID[recvID] = append(recvConsByID[recvID], bp.lastConsumer)
}
}
// Now that we built the `receiversConsumers` map, we can build the receivers as well.
for pipelineID, pipeline := range cfg.Service.Pipelines {
// The data type of the pipeline defines what data type each exporter is expected to receive.
if _, ok := exps.allReceivers[pipelineID.Type()]; !ok {
exps.allReceivers[pipelineID.Type()] = make(map[config.ComponentID]component.Receiver)
}
recvByID := exps.allReceivers[pipelineID.Type()]
bp := exps.pipelines[pipelineID]
// Iterate over all Receivers for this pipeline.
for i, recvID := range pipeline.Receivers {
// If already created a receiver for this [DataType, ComponentID] nothing to do.
if exp, ok := recvByID[recvID]; ok {
bp.receivers[i] = builtComponent{id: recvID, comp: exp}
continue
}
recv, err := buildReceiver(ctx, settings, buildInfo, cfg.Receivers, factories.Receivers, recvID, pipelineID, receiversConsumers[pipelineID.Type()][recvID])
if err != nil {
return nil, err
}
bp.receivers[i] = builtComponent{id: recvID, comp: recv}
recvByID[recvID] = recv
}
}
return exps, nil
}
func buildExporter(
ctx context.Context,
settings component.TelemetrySettings,
buildInfo component.BuildInfo,
cfgs map[config.ComponentID]config.Exporter,
factories map[config.Type]component.ExporterFactory,
id config.ComponentID,
pipelineID config.ComponentID,
) (component.Exporter, error) {
cfg, existsCfg := cfgs[id]
if !existsCfg {
return nil, fmt.Errorf("exporter %q is not configured", id)
}
factory, existsFactory := factories[id.Type()]
if !existsFactory {
return nil, fmt.Errorf("exporter factory not available for: %q", id)
}
set := component.ExporterCreateSettings{
TelemetrySettings: settings,
BuildInfo: buildInfo,
}
set.TelemetrySettings.Logger = exporterLogger(settings.Logger, id, pipelineID.Type())
exp, err := createExporter(ctx, set, cfg, id, pipelineID, factory)
if err != nil {
return nil, fmt.Errorf("failt to create %q exporter, in pipeline %q: %w", id, pipelineID, err)
}
return exp, nil
}
func createExporter(ctx context.Context, set component.ExporterCreateSettings, cfg config.Exporter, id config.ComponentID, pipelineID config.ComponentID, factory component.ExporterFactory) (component.Exporter, error) {
switch pipelineID.Type() {
case config.TracesDataType:
return factory.CreateTracesExporter(ctx, set, cfg)
case config.MetricsDataType:
return factory.CreateMetricsExporter(ctx, set, cfg)
case config.LogsDataType:
return factory.CreateLogsExporter(ctx, set, cfg)
}
return nil, fmt.Errorf("error creating exporter %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type())
}
func buildFanOutExportersTracesConsumer(exporters []builtComponent) consumer.Traces {
consumers := make([]consumer.Traces, 0, len(exporters))
for _, exp := range exporters {
consumers = append(consumers, exp.comp.(consumer.Traces))
}
// Create a junction point that fans out to all allExporters.
return fanoutconsumer.NewTraces(consumers)
}
func buildFanOutExportersMetricsConsumer(exporters []builtComponent) consumer.Metrics {
consumers := make([]consumer.Metrics, 0, len(exporters))
for _, exp := range exporters {
consumers = append(consumers, exp.comp.(consumer.Metrics))
}
// Create a junction point that fans out to all allExporters.
return fanoutconsumer.NewMetrics(consumers)
}
func buildFanOutExportersLogsConsumer(exporters []builtComponent) consumer.Logs {
consumers := make([]consumer.Logs, 0, len(exporters))
for _, exp := range exporters {
consumers = append(consumers, exp.comp.(consumer.Logs))
}
// Create a junction point that fans out to all allExporters.
return fanoutconsumer.NewLogs(consumers)
}
func exporterLogger(logger *zap.Logger, id config.ComponentID, dt config.DataType) *zap.Logger {
return logger.With(
zap.String(components.ZapKindKey, components.ZapKindExporter),
zap.String(components.ZapDataTypeKey, string(dt)),
zap.String(components.ZapNameKey, id.String()))
}
func buildProcessor(ctx context.Context,
settings component.TelemetrySettings,
buildInfo component.BuildInfo,
cfgs map[config.ComponentID]config.Processor,
factories map[config.Type]component.ProcessorFactory,
id config.ComponentID,
pipelineID config.ComponentID,
next baseConsumer,
) (component.Processor, error) {
procCfg, existsCfg := cfgs[id]
if !existsCfg {
return nil, fmt.Errorf("processor %q is not configured", id)
}
factory, existsFactory := factories[id.Type()]
if !existsFactory {
return nil, fmt.Errorf("processor factory not available for: %q", id)
}
set := component.ProcessorCreateSettings{
TelemetrySettings: settings,
BuildInfo: buildInfo,
}
set.TelemetrySettings.Logger = processorLogger(settings.Logger, id, pipelineID)
proc, err := createProcessor(ctx, set, procCfg, id, pipelineID, next, factory)
if err != nil {
return nil, fmt.Errorf("failt to create %q processor, in pipeline %q: %w", id, pipelineID, err)
}
return proc, nil
}
func createProcessor(ctx context.Context, set component.ProcessorCreateSettings, cfg config.Processor, id config.ComponentID, pipelineID config.ComponentID, next baseConsumer, factory component.ProcessorFactory) (component.Processor, error) {
switch pipelineID.Type() {
case config.TracesDataType:
return factory.CreateTracesProcessor(ctx, set, cfg, next.(consumer.Traces))
case config.MetricsDataType:
return factory.CreateMetricsProcessor(ctx, set, cfg, next.(consumer.Metrics))
case config.LogsDataType:
return factory.CreateLogsProcessor(ctx, set, cfg, next.(consumer.Logs))
}
return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type())
}
func processorLogger(logger *zap.Logger, procID config.ComponentID, pipelineID config.ComponentID) *zap.Logger {
return logger.With(
zap.String(components.ZapKindKey, components.ZapKindProcessor),
zap.String(components.ZapNameKey, procID.String()),
zap.String(components.ZapKindPipeline, pipelineID.String()))
}
func buildReceiver(ctx context.Context,
settings component.TelemetrySettings,
buildInfo component.BuildInfo,
cfgs map[config.ComponentID]config.Receiver,
factories map[config.Type]component.ReceiverFactory,
id config.ComponentID,
pipelineID config.ComponentID,
nexts []baseConsumer,
) (component.Receiver, error) {
cfg, existsCfg := cfgs[id]
if !existsCfg {
return nil, fmt.Errorf("receiver %q is not configured", id)
}
factory, existsFactory := factories[id.Type()]
if !existsFactory {
return nil, fmt.Errorf("receiver factory not available for: %q", id)
}
set := component.ReceiverCreateSettings{
TelemetrySettings: settings,
BuildInfo: buildInfo,
}
set.TelemetrySettings.Logger = receiverLogger(settings.Logger, id, pipelineID.Type())
recv, err := createReceiver(ctx, set, cfg, id, pipelineID, nexts, factory)
if err != nil {
return nil, fmt.Errorf("failt to create %q receiver, in pipeline %q: %w", id, pipelineID, err)
}
return recv, nil
}
func createReceiver(ctx context.Context, set component.ReceiverCreateSettings, cfg config.Receiver, id config.ComponentID, pipelineID config.ComponentID, nexts []baseConsumer, factory component.ReceiverFactory) (component.Receiver, error) {
switch pipelineID.Type() {
case config.TracesDataType:
var consumers []consumer.Traces
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Traces))
}
return factory.CreateTracesReceiver(ctx, set, cfg, fanoutconsumer.NewTraces(consumers))
case config.MetricsDataType:
var consumers []consumer.Metrics
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Metrics))
}
return factory.CreateMetricsReceiver(ctx, set, cfg, fanoutconsumer.NewMetrics(consumers))
case config.LogsDataType:
var consumers []consumer.Logs
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Logs))
}
return factory.CreateLogsReceiver(ctx, set, cfg, fanoutconsumer.NewLogs(consumers))
}
return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type())
}
func receiverLogger(logger *zap.Logger, id config.ComponentID, dt config.DataType) *zap.Logger {
return logger.With(
zap.String(components.ZapKindKey, components.ZapKindReceiver),
zap.String(components.ZapNameKey, id.String()),
zap.String(components.ZapKindPipeline, string(dt)))
}
func (bps *Pipelines) getPipelinesSummaryTableData() zpages.SummaryPipelinesTableData {
sumData := zpages.SummaryPipelinesTableData{}
sumData.Rows = make([]zpages.SummaryPipelinesTableRowData, 0, len(bps.pipelines))
for c, p := range bps.pipelines {
// TODO: Change the template to use ID.
var recvs []string
for _, bRecv := range p.receivers {
recvs = append(recvs, bRecv.id.String())
}
var procs []string
for _, bProc := range p.processors {
procs = append(procs, bProc.id.String())
}
var exps []string
for _, bExp := range p.exporters {
exps = append(exps, bExp.id.String())
}
row := zpages.SummaryPipelinesTableRowData{
FullName: c.String(),
InputType: string(c.Type()),
MutatesData: p.lastConsumer.Capabilities().MutatesData,
Receivers: recvs,
Processors: procs,
Exporters: exps,
}
sumData.Rows = append(sumData.Rows, row)
}
sort.Slice(sumData.Rows, func(i, j int) bool {
return sumData.Rows[i].FullName < sumData.Rows[j].FullName
})
return sumData
}