categraf/inputs/kafka/exporter/exporter.go

974 lines
36 KiB
Go

package exporter
import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/Shopify/sarama"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/krallistic/kazoo-go"
"github.com/prometheus/client_golang/prometheus"
)
const (
namespace = "kafka"
clientID = "kafka_exporter"
)
type PromDesc struct {
up *prometheus.Desc
clusterBrokers *prometheus.Desc
clusterBrokerInfo *prometheus.Desc
topicPartitions *prometheus.Desc
topicCurrentOffset *prometheus.Desc
topicOldestOffset *prometheus.Desc
topicPartitionLeader *prometheus.Desc
topicPartitionReplicas *prometheus.Desc
topicPartitionInSyncReplicas *prometheus.Desc
topicPartitionUsesPreferredReplica *prometheus.Desc
topicUnderReplicatedPartition *prometheus.Desc
consumergroupCurrentOffset *prometheus.Desc
consumergroupCurrentOffsetSum *prometheus.Desc
consumergroupUncomittedOffsets *prometheus.Desc
consumergroupUncommittedOffsetsSum *prometheus.Desc
consumergroupUncommittedOffsetsZookeeper *prometheus.Desc
consumergroupMembers *prometheus.Desc
topicPartitionLagMillis *prometheus.Desc
lagDatapointUsedInterpolation *prometheus.Desc
lagDatapointUsedExtrapolation *prometheus.Desc
}
// Exporter collects Kafka stats from the given server and exports them using
// the prometheus metrics package.
type Exporter struct {
client sarama.Client
topicFilter *regexp.Regexp
groupFilter *regexp.Regexp
mu sync.Mutex
useZooKeeperLag bool
zookeeperClient *kazoo.Kazoo
nextMetadataRefresh time.Time
metadataRefreshInterval time.Duration
offsetShowAll bool
allowConcurrent bool
sgMutex sync.Mutex
sgWaitCh chan struct{}
sgChans []chan<- prometheus.Metric
consumerGroupFetchAll bool
consumerGroupLagTable interpolationMap
kafkaOpts Options
saramaConfig *sarama.Config
logger log.Logger
promDesc *PromDesc
disableCalculateLagRate bool
renameUncommitOffsetsToLag bool
quitPruneCh chan struct{}
}
type Options struct {
Uri []string
UseSASL bool
UseSASLHandshake bool
SaslUsername string
SaslPassword string
SaslMechanism string
UseTLS bool
TlsCAFile string
TlsCertFile string
TlsKeyFile string
TlsInsecureSkipTLSVerify bool
KafkaVersion string
UseZooKeeperLag bool
UriZookeeper []string
Labels string
MetadataRefreshInterval string
OffsetShowAll bool
AllowConcurrent bool
MaxOffsets int
PruneIntervalSeconds int
DisableCalculateLagRate bool
RenameUncommitOffsetsToLag bool
}
// CanReadCertAndKey returns true if the certificate and key files already exists,
// otherwise returns false. If lost one of cert and key, returns error.
func CanReadCertAndKey(certPath, keyPath string) (bool, error) {
certReadable := canReadFile(certPath)
keyReadable := canReadFile(keyPath)
if certReadable == false && keyReadable == false {
return false, nil
}
if certReadable == false {
return false, fmt.Errorf("error reading %s, certificate and key must be supplied as a pair", certPath)
}
if keyReadable == false {
return false, fmt.Errorf("error reading %s, certificate and key must be supplied as a pair", keyPath)
}
return true, nil
}
// If the file represented by path exists and
// readable, returns true otherwise returns false.
func canReadFile(path string) bool {
f, err := os.Open(path)
if err != nil {
return false
}
defer f.Close()
return true
}
// New returns an initialized Exporter.
func New(logger log.Logger, opts Options, topicFilter, groupFilter string) (*Exporter, error) {
var zookeeperClient *kazoo.Kazoo
config := sarama.NewConfig()
config.ClientID = clientID
kafkaVersion, err := sarama.ParseKafkaVersion(opts.KafkaVersion)
if err != nil {
return nil, err
}
config.Version = kafkaVersion
if opts.UseSASL {
// Convert to lowercase so that SHA512 and SHA256 is still valid
opts.SaslMechanism = strings.ToLower(opts.SaslMechanism)
switch opts.SaslMechanism {
case "scram-sha512":
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
case "scram-sha256":
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
case "plain":
default:
level.Error(logger).Log("msg", "invalid sasl mechanism. can only be \"scram-sha256\", \"scram-sha512\" or \"plain\"", "SaslMechanism", opts.SaslMechanism)
return nil, fmt.Errorf("invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\" or \"plain\"", opts.SaslMechanism)
}
config.Net.SASL.Enable = true
config.Net.SASL.Handshake = opts.UseSASLHandshake
if opts.SaslUsername != "" {
config.Net.SASL.User = opts.SaslUsername
}
if opts.SaslPassword != "" {
config.Net.SASL.Password = opts.SaslPassword
}
}
if opts.UseTLS {
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
RootCAs: x509.NewCertPool(),
InsecureSkipVerify: opts.TlsInsecureSkipTLSVerify,
}
if opts.TlsCAFile != "" {
if ca, err := os.ReadFile(opts.TlsCAFile); err == nil {
config.Net.TLS.Config.RootCAs.AppendCertsFromPEM(ca)
} else {
level.Error(logger).Log("msg", "unable to open TlsCAFile", "TlsCAFile", opts.TlsCAFile)
return nil, fmt.Errorf("UseTLS is true but unable to open TlsCAFile: %s", opts.TlsCAFile)
}
}
canReadCertAndKey, err := CanReadCertAndKey(opts.TlsCertFile, opts.TlsKeyFile)
if err != nil {
level.Error(logger).Log("msg", "Error attempting to read TlsCertFile or TlsKeyFile", "err", err.Error())
return nil, err
}
if canReadCertAndKey {
cert, err := tls.LoadX509KeyPair(opts.TlsCertFile, opts.TlsKeyFile)
if err == nil {
config.Net.TLS.Config.Certificates = []tls.Certificate{cert}
} else {
level.Error(logger).Log("msg", "Error attempting to load X509KeyPair", "err", err.Error())
return nil, err
}
}
}
if opts.UseZooKeeperLag {
zookeeperClient, err = kazoo.NewKazoo(opts.UriZookeeper, nil)
if err != nil {
level.Error(logger).Log("msg", "Error connecting to ZooKeeper", "err", err.Error())
return nil, err
}
}
interval, err := time.ParseDuration(opts.MetadataRefreshInterval)
if err != nil {
level.Error(logger).Log("msg", "Error parsing metadata refresh interval", "err", err.Error())
return nil, err
}
config.Metadata.RefreshFrequency = interval
client, err := sarama.NewClient(opts.Uri, config)
if err != nil {
level.Error(logger).Log("msg", "Error initiating kafka client: %s", "err", err.Error())
return nil, err
}
level.Debug(logger).Log("msg", "Done with kafka client initialization")
// Init our exporter.
newExporter := &Exporter{
client: client,
topicFilter: regexp.MustCompile(topicFilter),
groupFilter: regexp.MustCompile(groupFilter),
useZooKeeperLag: opts.UseZooKeeperLag,
zookeeperClient: zookeeperClient,
nextMetadataRefresh: time.Now(),
metadataRefreshInterval: interval,
offsetShowAll: opts.OffsetShowAll,
allowConcurrent: opts.AllowConcurrent,
sgMutex: sync.Mutex{},
sgWaitCh: nil,
sgChans: []chan<- prometheus.Metric{},
consumerGroupFetchAll: config.Version.IsAtLeast(sarama.V2_0_0_0),
consumerGroupLagTable: interpolationMap{mu: sync.Mutex{}},
kafkaOpts: opts,
saramaConfig: config,
logger: logger,
promDesc: nil, // initialized in func initializeMetrics
disableCalculateLagRate: opts.DisableCalculateLagRate,
renameUncommitOffsetsToLag: opts.RenameUncommitOffsetsToLag,
}
level.Debug(logger).Log("msg", "Initializing metrics")
newExporter.initializeMetrics()
if !newExporter.disableCalculateLagRate {
newExporter.quitPruneCh = make(chan struct{})
go newExporter.RunPruner()
}
return newExporter, nil
}
func (e *Exporter) fetchOffsetVersion() int16 {
version := e.client.Config().Version
if e.client.Config().Version.IsAtLeast(sarama.V2_0_0_0) {
return 4
} else if version.IsAtLeast(sarama.V0_10_2_0) {
return 2
} else if version.IsAtLeast(sarama.V0_8_2_2) {
return 1
}
return 0
}
// Describe describes all the metrics ever exported by the Kafka exporter. It
// implements prometheus.Collector.
func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
ch <- e.promDesc.up
ch <- e.promDesc.clusterBrokers
ch <- e.promDesc.clusterBrokerInfo
ch <- e.promDesc.topicCurrentOffset
ch <- e.promDesc.topicOldestOffset
ch <- e.promDesc.topicPartitions
ch <- e.promDesc.topicPartitionLeader
ch <- e.promDesc.topicPartitionReplicas
ch <- e.promDesc.topicPartitionInSyncReplicas
ch <- e.promDesc.topicPartitionUsesPreferredReplica
ch <- e.promDesc.topicUnderReplicatedPartition
ch <- e.promDesc.consumergroupCurrentOffset
ch <- e.promDesc.consumergroupCurrentOffsetSum
ch <- e.promDesc.consumergroupUncomittedOffsets
ch <- e.promDesc.consumergroupUncommittedOffsetsZookeeper
ch <- e.promDesc.consumergroupUncommittedOffsetsSum
ch <- e.promDesc.topicPartitionLagMillis
ch <- e.promDesc.lagDatapointUsedInterpolation
ch <- e.promDesc.lagDatapointUsedExtrapolation
}
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
if e.allowConcurrent {
e.collect(ch)
return
}
// Locking to avoid race add
e.sgMutex.Lock()
e.sgChans = append(e.sgChans, ch)
// Safe to compare length since we own the Lock
if len(e.sgChans) == 1 {
e.sgWaitCh = make(chan struct{})
go e.collectChans(e.sgWaitCh)
} else {
level.Info(e.logger).Log("msg", "concurrent calls detected, waiting for first to finish")
}
// Put in another variable to ensure not overwriting it in another Collect once we wait
waiter := e.sgWaitCh
e.sgMutex.Unlock()
// Released lock, we have insurance that our chan will be part of the collectChan slice
<-waiter
// collectChan finished
}
// Collect fetches the stats from configured Kafka location and delivers them
// as Prometheus metrics. It implements prometheus.Collector.
func (e *Exporter) collectChans(quit chan struct{}) {
original := make(chan prometheus.Metric)
container := make([]prometheus.Metric, 0, 100)
go func() {
for metric := range original {
container = append(container, metric)
}
}()
e.collect(original)
close(original)
// Lock to avoid modification on the channel slice
e.sgMutex.Lock()
for _, ch := range e.sgChans {
for _, metric := range container {
ch <- metric
}
}
// Reset the slice
e.sgChans = e.sgChans[:0]
// Notify remaining waiting Collect they can return
close(quit)
// Release the lock so Collect can append to the slice again
e.sgMutex.Unlock()
}
func (e *Exporter) collect(ch chan<- prometheus.Metric) {
var wg = sync.WaitGroup{}
ch <- prometheus.MustNewConstMetric(
e.promDesc.clusterBrokers, prometheus.GaugeValue, float64(len(e.client.Brokers())),
)
for _, b := range e.client.Brokers() {
ch <- prometheus.MustNewConstMetric(
e.promDesc.clusterBrokerInfo, prometheus.GaugeValue, 1, strconv.Itoa(int(b.ID())), b.Addr(),
)
}
offsetMap := make(map[string]map[int32]int64)
now := time.Now()
if now.After(e.nextMetadataRefresh) {
level.Info(e.logger).Log("msg", "Refreshing client metadata")
if err := e.client.RefreshMetadata(); err != nil {
level.Error(e.logger).Log("msg", "Error refreshing topics. Using cached topic data", "err", err.Error())
}
e.nextMetadataRefresh = now.Add(e.metadataRefreshInterval)
}
var value float64
defer func() {
ch <- prometheus.MustNewConstMetric(e.promDesc.up, prometheus.GaugeValue, value)
}()
topics, err := e.client.Topics()
if err != nil {
level.Error(e.logger).Log("msg", "Error getting topics: %s. Skipping metric generation", "err", err.Error())
return
}
value = 1
level.Info(e.logger).Log("msg", "Generating topic metrics")
for _, topic := range topics {
wg.Add(1)
topic := topic
go func() {
defer wg.Done()
e.metricsForTopic(topic, offsetMap, ch)
}()
}
level.Debug(e.logger).Log("msg", "waiting for topic metric generation to complete")
wg.Wait()
level.Info(e.logger).Log("msg", "Generating consumergroup metrics")
if len(e.client.Brokers()) > 0 {
for _, broker := range e.client.Brokers() {
wg.Add(1)
broker := broker
go func() {
defer wg.Done()
e.metricsForConsumerGroup(broker, offsetMap, ch)
}()
}
level.Debug(e.logger).Log("msg", "waiting for consumergroup metric generation to complete")
wg.Wait()
} else {
level.Error(e.logger).Log("msg", "No brokers found. Unable to generate topic metrics")
}
if !e.disableCalculateLagRate {
level.Info(e.logger).Log("msg", "Calculating consumergroup lag")
wg.Add(1)
go func() {
defer wg.Done()
e.metricsForLag(ch)
}()
level.Debug(e.logger).Log("msg", "waiting for consumergroup lag estimation metric generation to complete")
wg.Wait()
}
}
func (e *Exporter) metricsForTopic(topic string, offsetMap map[string]map[int32]int64, ch chan<- prometheus.Metric) {
level.Debug(e.logger).Log("msg", "Fetching topic metrics", "topic", topic)
if e.topicFilter.MatchString(topic) {
partitions, err := e.client.Partitions(topic)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting partitions for topic", "topic", topic, "err", err.Error())
return
}
ch <- prometheus.MustNewConstMetric(
e.promDesc.topicPartitions, prometheus.GaugeValue, float64(len(partitions)), topic,
)
e.mu.Lock()
offset := make(map[int32]int64, len(partitions))
e.mu.Unlock()
for _, partition := range partitions {
broker, err := e.client.Leader(topic, partition)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting leader for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
ch <- prometheus.MustNewConstMetric(
e.promDesc.topicPartitionLeader, prometheus.GaugeValue, float64(broker.ID()), topic, strconv.FormatInt(int64(partition), 10),
)
}
currentOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting offset for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
e.mu.Lock()
offset[partition] = currentOffset
offsetMap[topic] = offset
e.mu.Unlock()
ch <- prometheus.MustNewConstMetric(
e.promDesc.topicCurrentOffset, prometheus.GaugeValue, float64(currentOffset), topic, strconv.FormatInt(int64(partition), 10),
)
}
oldestOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetOldest)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting oldest offset for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
ch <- prometheus.MustNewConstMetric(
e.promDesc.topicOldestOffset, prometheus.GaugeValue, float64(oldestOffset), topic, strconv.FormatInt(int64(partition), 10),
)
}
replicas, err := e.client.Replicas(topic, partition)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting replicas for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
ch <- prometheus.MustNewConstMetric(
e.promDesc.topicPartitionReplicas, prometheus.GaugeValue, float64(len(replicas)), topic, strconv.FormatInt(int64(partition), 10),
)
}
inSyncReplicas, err := e.client.InSyncReplicas(topic, partition)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting in-sync replicas for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
ch <- prometheus.MustNewConstMetric(
e.promDesc.topicPartitionInSyncReplicas, prometheus.GaugeValue, float64(len(inSyncReplicas)), topic, strconv.FormatInt(int64(partition), 10),
)
}
if broker != nil && replicas != nil && len(replicas) > 0 && broker.ID() == replicas[0] {
ch <- prometheus.MustNewConstMetric(
e.promDesc.topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
)
} else {
ch <- prometheus.MustNewConstMetric(
e.promDesc.topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
)
}
if replicas != nil && inSyncReplicas != nil && len(inSyncReplicas) < len(replicas) {
ch <- prometheus.MustNewConstMetric(
e.promDesc.topicUnderReplicatedPartition, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
)
} else {
ch <- prometheus.MustNewConstMetric(
e.promDesc.topicUnderReplicatedPartition, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
)
}
if e.useZooKeeperLag {
ConsumerGroups, err := e.zookeeperClient.Consumergroups()
if err != nil {
level.Error(e.logger).Log("msg", "Error getting consumergroups from ZooKeeper", "err", err.Error())
}
for _, group := range ConsumerGroups {
offset, _ := group.FetchOffset(topic, partition)
if offset > 0 {
consumerGroupLag := currentOffset - offset
ch <- prometheus.MustNewConstMetric(
e.promDesc.consumergroupUncommittedOffsetsZookeeper, prometheus.GaugeValue, float64(consumerGroupLag), group.Name, topic, strconv.FormatInt(int64(partition), 10),
)
}
}
}
}
}
}
func (e *Exporter) metricsForConsumerGroup(broker *sarama.Broker, offsetMap map[string]map[int32]int64, ch chan<- prometheus.Metric) {
level.Debug(e.logger).Log("msg", "Fetching consumer group metrics for broker", "broker", broker.ID(), "broker_addr", broker.Addr())
if err := broker.Open(e.client.Config()); err != nil && err != sarama.ErrAlreadyConnected {
level.Error(e.logger).Log("msg", "Error connecting to broker", "broker", broker.ID(), "broker_addr", broker.Addr(), "err", err.Error())
return
}
defer broker.Close()
level.Debug(e.logger).Log("msg", "listing consumergroups for broker", "broker", broker.ID(), "broker_addr", broker.Addr())
groups, err := broker.ListGroups(&sarama.ListGroupsRequest{})
if err != nil {
level.Error(e.logger).Log("msg", "Error listing consumergroups for broker", "broker", broker.ID(), "broker_addr", broker.Addr(), "err", err.Error())
return
}
groupIds := make([]string, 0)
for groupId := range groups.Groups {
if e.groupFilter.MatchString(groupId) {
groupIds = append(groupIds, groupId)
}
}
level.Debug(e.logger).Log("msg", "describing consumergroups for broker", "broker", broker.ID())
describeGroups, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupIds})
if err != nil {
level.Error(e.logger).Log("msg", "Error from broker.DescribeGroups()", "err", err.Error())
return
}
for _, group := range describeGroups.Groups {
offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1}
if e.offsetShowAll {
for topic, partitions := range offsetMap {
for partition := range partitions {
offsetFetchRequest.AddPartition(topic, partition)
}
}
} else {
for _, member := range group.Members {
assignment, err := member.GetMemberAssignment()
if err != nil {
level.Error(e.logger).Log("msg", "Cannot get GetMemberAssignment of group member", "member", member, "err", err.Error())
return
}
for topic, partions := range assignment.Topics {
for _, partition := range partions {
offsetFetchRequest.AddPartition(topic, partition)
}
}
}
}
ch <- prometheus.MustNewConstMetric(
e.promDesc.consumergroupMembers, prometheus.GaugeValue, float64(len(group.Members)), group.GroupId,
)
level.Debug(e.logger).Log("msg", "fetching offsets for broker/group", "broker", broker.ID(), "group", group.GroupId)
if offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest); err != nil {
level.Error(e.logger).Log("msg", "Error fetching offset for consumergroup", "group", group.GroupId, "err", err.Error())
} else {
for topic, partitions := range offsetFetchResponse.Blocks {
if !e.topicFilter.MatchString(topic) {
continue
}
// If the topic is not consumed by that consumer group, skip it
topicConsumed := false
for _, offsetFetchResponseBlock := range partitions {
// Kafka will return -1 if there is no offset associated with a topic-partition under that consumer group
if offsetFetchResponseBlock.Offset != -1 {
topicConsumed = true
break
}
}
if topicConsumed {
var currentOffsetSum int64
var lagSum int64
for partition, offsetFetchResponseBlock := range partitions {
kerr := offsetFetchResponseBlock.Err
if kerr != sarama.ErrNoError {
level.Error(e.logger).Log("msg", "Error in response block for topic/partition", "topic", topic, "partition", partition, "err", kerr.Error())
continue
}
currentOffset := offsetFetchResponseBlock.Offset
currentOffsetSum += currentOffset
ch <- prometheus.MustNewConstMetric(
e.promDesc.consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
e.mu.Lock()
// Get and insert the next offset to be produced into the interpolation map
nextOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
level.Error(e.logger).Log("msg", "Error getting next offset for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
}
if !e.disableCalculateLagRate {
e.consumerGroupLagTable.createOrUpdate(group.GroupId, topic, partition, nextOffset)
}
// If the topic is consumed by that consumer group, but no offset associated with the partition
// forcing lag to -1 to be able to alert on that
var lag int64
if currentOffset == -1 {
lag = -1
} else {
lag = nextOffset - currentOffset
lagSum += lag
}
e.mu.Unlock()
ch <- prometheus.MustNewConstMetric(
e.promDesc.consumergroupUncomittedOffsets, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
}
ch <- prometheus.MustNewConstMetric(
e.promDesc.consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic,
)
ch <- prometheus.MustNewConstMetric(
e.promDesc.consumergroupUncommittedOffsetsSum, prometheus.GaugeValue, float64(lagSum), group.GroupId, topic,
)
}
}
}
}
}
func (e *Exporter) metricsForLag(ch chan<- prometheus.Metric) {
admin, err := sarama.NewClusterAdminFromClient(e.client)
if err != nil {
level.Error(e.logger).Log("msg", "Error creating cluster admin", "err", err.Error())
return
}
if admin == nil {
level.Error(e.logger).Log("msg", "Failed to create cluster admin")
return
}
// Iterate over all consumergroup/topic/partitions
e.consumerGroupLagTable.mu.Lock()
for group, topics := range e.consumerGroupLagTable.iMap {
for topic, partitionMap := range topics {
var partitionKeys []int32
// Collect partitions to create ListConsumerGroupOffsets request
for key := range partitionMap {
partitionKeys = append(partitionKeys, key)
}
// response.Blocks is a map of topic to partition to offset
response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
topic: partitionKeys,
})
if err != nil {
level.Error(e.logger).Log("msg", "Error listing offsets for", "group", group, "err", err.Error())
}
if response == nil {
level.Error(e.logger).Log("msg", "Got nil response from ListConsumerGroupOffsets for group", "group", group)
continue
}
for partition, offsets := range partitionMap {
if len(offsets) < 2 {
level.Debug(e.logger).Log("msg", "Insufficient data for lag calculation for group: continuing", "group", group)
continue
}
if latestConsumedOffset, ok := response.Blocks[topic][partition]; ok {
/*
Sort offset keys so we know if we have an offset to use as a left bound in our calculation
If latestConsumedOffset < smallestMappedOffset then extrapolate
Else Find two offsets that bound latestConsumedOffset
*/
var producedOffsets []int64
for offsetKey := range offsets {
producedOffsets = append(producedOffsets, offsetKey)
}
sort.Slice(producedOffsets, func(i, j int) bool { return producedOffsets[i] < producedOffsets[j] })
if latestConsumedOffset.Offset < producedOffsets[0] {
level.Debug(e.logger).Log("msg", "estimating lag for group/topic/partition", "group", group, "topic", topic, "partition", partition, "method", "extrapolation")
// Because we do not have data points that bound the latestConsumedOffset we must use extrapolation
highestOffset := producedOffsets[len(producedOffsets)-1]
lowestOffset := producedOffsets[0]
px := float64(offsets[highestOffset].UnixNano()/1000000) -
float64(highestOffset-latestConsumedOffset.Offset)*
float64((offsets[highestOffset].Sub(offsets[lowestOffset])).Milliseconds())/float64(highestOffset-lowestOffset)
lagMillis := float64(time.Now().UnixNano()/1000000) - px
level.Debug(e.logger).Log("msg", "estimated lag for group/topic/partition (in ms)", "group", group, "topic", topic, "partition", partition, "lag", lagMillis)
ch <- prometheus.MustNewConstMetric(e.promDesc.lagDatapointUsedExtrapolation, prometheus.CounterValue, 1, group, topic, strconv.FormatInt(int64(partition), 10))
ch <- prometheus.MustNewConstMetric(e.promDesc.topicPartitionLagMillis, prometheus.GaugeValue, lagMillis, group, topic, strconv.FormatInt(int64(partition), 10))
} else {
level.Debug(e.logger).Log("msg", "estimating lag for group/topic/partition", "group", group, "topic", topic, "partition", partition, "method", "interpolation")
nextHigherOffset := getNextHigherOffset(producedOffsets, latestConsumedOffset.Offset)
nextLowerOffset := getNextLowerOffset(producedOffsets, latestConsumedOffset.Offset)
px := float64(offsets[nextHigherOffset].UnixNano()/1000000) -
float64(nextHigherOffset-latestConsumedOffset.Offset)*
float64((offsets[nextHigherOffset].Sub(offsets[nextLowerOffset])).Milliseconds())/float64(nextHigherOffset-nextLowerOffset)
lagMillis := float64(time.Now().UnixNano()/1000000) - px
level.Debug(e.logger).Log("msg", "estimated lag for group/topic/partition (in ms)", "group", group, "topic", topic, "partition", partition, "lag", lagMillis)
ch <- prometheus.MustNewConstMetric(e.promDesc.lagDatapointUsedInterpolation, prometheus.CounterValue, 1, group, topic, strconv.FormatInt(int64(partition), 10))
ch <- prometheus.MustNewConstMetric(e.promDesc.topicPartitionLagMillis, prometheus.GaugeValue, lagMillis, group, topic, strconv.FormatInt(int64(partition), 10))
}
} else {
level.Error(e.logger).Log("msg", "Could not get latest latest consumed offset", "group", group, "topic", topic, "partition", partition)
}
}
}
}
e.consumerGroupLagTable.mu.Unlock()
}
func getNextHigherOffset(offsets []int64, k int64) int64 {
index := len(offsets) - 1
max := offsets[index]
for max >= k && index > 0 {
if offsets[index-1] < k {
return max
}
max = offsets[index]
index--
}
return max
}
func getNextLowerOffset(offsets []int64, k int64) int64 {
index := 0
min := offsets[index]
for min <= k && index < len(offsets)-1 {
if offsets[index+1] > k {
return min
}
min = offsets[index]
index++
}
return min
}
// Run iMap.Prune() on an interval (default 30 seconds). A new client is created
// to avoid an issue where the client may be closed before Prune attempts to
// use it.
func (e *Exporter) RunPruner() {
ticker := time.NewTicker(time.Duration(e.kafkaOpts.PruneIntervalSeconds) * time.Second)
for {
select {
case <-ticker.C:
client, err := sarama.NewClient(e.kafkaOpts.Uri, e.saramaConfig)
if err != nil {
level.Error(e.logger).Log("msg", "Error initializing kafka client for RunPruner", "err", err.Error())
return
}
e.consumerGroupLagTable.Prune(e.logger, client, e.kafkaOpts.MaxOffsets)
client.Close()
case <-e.quitPruneCh:
ticker.Stop()
return
}
}
}
func (e *Exporter) Close() {
close(e.quitPruneCh)
e.client.Close()
}
func (e *Exporter) initializeMetrics() {
labels := make(map[string]string)
// Protect against empty labels
if e.kafkaOpts.Labels != "" {
for _, label := range strings.Split(e.kafkaOpts.Labels, ",") {
splitLabels := strings.Split(label, "=")
if len(splitLabels) >= 2 {
labels[splitLabels[0]] = splitLabels[1]
}
}
}
up := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "up"),
"Whether Kafka is up.",
nil, labels,
)
clusterBrokers := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "brokers"),
"Number of Brokers in the Kafka Cluster.",
nil, labels,
)
clusterBrokerInfo := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "broker_info"),
"Information about the Kafka Broker.",
[]string{"id", "address"}, labels,
)
topicPartitions := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partitions"),
"Number of partitions for this Topic",
[]string{"topic"}, labels,
)
topicCurrentOffset := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_current_offset"),
"Current Offset of a Broker at Topic/Partition",
[]string{"topic", "partition"}, labels,
)
topicOldestOffset := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_oldest_offset"),
"Oldest Offset of a Broker at Topic/Partition",
[]string{"topic", "partition"}, labels,
)
topicPartitionLeader := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_leader"),
"Leader Broker ID of this Topic/Partition",
[]string{"topic", "partition"}, labels,
)
topicPartitionReplicas := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_replicas"),
"Number of Replicas for this Topic/Partition",
[]string{"topic", "partition"}, labels,
)
topicPartitionInSyncReplicas := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_in_sync_replica"),
"Number of In-Sync Replicas for this Topic/Partition",
[]string{"topic", "partition"}, labels,
)
topicPartitionUsesPreferredReplica := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_leader_is_preferred"),
"1 if Topic/Partition is using the Preferred Broker",
[]string{"topic", "partition"}, labels,
)
topicUnderReplicatedPartition := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_under_replicated_partition"),
"1 if Topic/Partition is under Replicated",
[]string{"topic", "partition"}, labels,
)
consumergroupCurrentOffset := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "current_offset"),
"Current Offset of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
)
consumergroupCurrentOffsetSum := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "current_offset_sum"),
"Current Offset of a ConsumerGroup at Topic for all partitions",
[]string{"consumergroup", "topic"}, labels,
)
var consumergroupUncomittedOffsets, consumergroupUncommittedOffsetsZookeeper, consumergroupUncommittedOffsetsSum *prometheus.Desc
if e.renameUncommitOffsetsToLag {
consumergroupUncomittedOffsets = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "lag"),
"Current Approximate Lag of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
)
consumergroupUncommittedOffsetsZookeeper = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroupzookeeper", "lag_zookeeper"),
"Current Approximate Lag(zookeeper) of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, nil,
)
consumergroupUncommittedOffsetsSum = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "lag_sum"),
"Current Approximate Lag of a ConsumerGroup at Topic for all partitions",
[]string{"consumergroup", "topic"}, labels,
)
} else {
consumergroupUncomittedOffsets = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "uncommitted_offsets"),
"Current Approximate count of uncommitted offsets for a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
)
consumergroupUncommittedOffsetsZookeeper = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroupzookeeper", "uncommitted_offsets_zookeeper"),
"Current Approximate count of uncommitted offsets(zookeeper) for a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, nil,
)
consumergroupUncommittedOffsetsSum = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "uncommitted_offsets_sum"),
"Current Approximate count of uncommitted offsets for a ConsumerGroup at Topic for all partitions",
[]string{"consumergroup", "topic"}, labels,
)
}
consumergroupMembers := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "members"),
"Amount of members in a consumer group",
[]string{"consumergroup"}, labels,
)
topicPartitionLagMillis := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumer_lag", "millis"),
"Current approximation of consumer lag for a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"},
labels,
)
lagDatapointUsedInterpolation := prometheus.NewDesc(prometheus.BuildFQName(namespace, "consumer_lag", "interpolation"),
"Indicates that a consumer group lag estimation used interpolation",
[]string{"consumergroup", "topic", "partition"},
labels,
)
lagDatapointUsedExtrapolation := prometheus.NewDesc(prometheus.BuildFQName(namespace, "consumer_lag", "extrapolation"),
"Indicates that a consumer group lag estimation used extrapolation",
[]string{"consumergroup", "topic", "partition"},
labels,
)
e.promDesc = &PromDesc{
up: up,
clusterBrokers: clusterBrokers,
clusterBrokerInfo: clusterBrokerInfo,
topicPartitions: topicPartitions,
topicCurrentOffset: topicCurrentOffset,
topicOldestOffset: topicOldestOffset,
topicPartitionLeader: topicPartitionLeader,
topicPartitionReplicas: topicPartitionReplicas,
topicPartitionInSyncReplicas: topicPartitionInSyncReplicas,
topicPartitionUsesPreferredReplica: topicPartitionUsesPreferredReplica,
topicUnderReplicatedPartition: topicUnderReplicatedPartition,
consumergroupCurrentOffset: consumergroupCurrentOffset,
consumergroupCurrentOffsetSum: consumergroupCurrentOffsetSum,
consumergroupUncomittedOffsets: consumergroupUncomittedOffsets,
consumergroupUncommittedOffsetsSum: consumergroupUncommittedOffsetsSum,
consumergroupUncommittedOffsetsZookeeper: consumergroupUncommittedOffsetsZookeeper,
consumergroupMembers: consumergroupMembers,
topicPartitionLagMillis: topicPartitionLagMillis,
lagDatapointUsedInterpolation: lagDatapointUsedInterpolation,
lagDatapointUsedExtrapolation: lagDatapointUsedExtrapolation,
}
}