categraf/inputs/mtail/internal/metrics/store.go

220 lines
5.4 KiB
Go

// Copyright 2011 Google Inc. All Rights Reserved.
// This file is available under the Apache license.
package metrics
import (
"context"
"encoding/json"
"io"
"log"
"reflect"
"sync"
"time"
"github.com/pkg/errors"
)
// Store contains Metrics.
type Store struct {
searchMu sync.RWMutex // read for iterate and insert, write for delete
insertMu sync.Mutex // locked for insert and delete, unlocked for iterate
Metrics map[string][]*Metric
}
// NewStore returns a new metric Store.
func NewStore() (s *Store) {
s = &Store{}
s.ClearMetrics()
return
}
// Add is used to add one metric to the Store.
func (s *Store) Add(m *Metric) error {
s.insertMu.Lock()
defer s.insertMu.Unlock()
s.searchMu.RLock()
// glog.V(1).Infof("Adding a new metric %v", m)
dupeIndex := -1
if len(s.Metrics[m.Name]) > 0 {
t := s.Metrics[m.Name][0].Kind
if m.Kind != t {
s.searchMu.RUnlock()
return errors.Errorf("Metric %s has different kind %v to existing %v.", m.Name, m.Kind, t)
}
// To avoid duplicate metrics:
// - copy old LabelValues into new metric;
// - discard old metric.
for i, v := range s.Metrics[m.Name] {
if v.Program != m.Program {
continue
}
if v.Type != m.Type {
continue
}
if v.Source != m.Source {
continue
}
dupeIndex = i
// glog.V(2).Infof("v keys: %v m.keys: %v", v.Keys, m.Keys)
// If a set of label keys has changed, discard
// old metric completely, w/o even copying old
// data, as they are now incompatible.
if len(v.Keys) != len(m.Keys) || !reflect.DeepEqual(v.Keys, m.Keys) {
break
}
// glog.V(2).Infof("v buckets: %v m.buckets: %v", v.Buckets, m.Buckets)
// Otherwise, copy everything into the new metric
// glog.V(2).Infof("Found duped metric: %d", dupeIndex)
for _, oldLabel := range v.LabelValues {
// glog.V(2).Infof("Labels: %d %s", j, oldLabel.Labels)
d, err := v.GetDatum(oldLabel.Labels...)
if err != nil {
return err
}
if err = m.RemoveDatum(oldLabel.Labels...); err != nil {
return err
}
lv := &LabelValue{Labels: oldLabel.Labels, Value: d}
if err := m.AppendLabelValue(lv); err != nil {
return err
}
}
}
}
s.searchMu.RUnlock()
// We're in modify mode now so lock out search
s.searchMu.Lock()
s.Metrics[m.Name] = append(s.Metrics[m.Name], m)
if dupeIndex >= 0 {
// glog.V(2).Infof("removing original, keeping its clone")
s.Metrics[m.Name] = append(s.Metrics[m.Name][0:dupeIndex], s.Metrics[m.Name][dupeIndex+1:]...)
}
s.searchMu.Unlock()
return nil
}
// FindMetricOrNil returns a metric in a store, or returns nil if not found.
func (s *Store) FindMetricOrNil(name, prog string) *Metric {
s.searchMu.RLock()
defer s.searchMu.RUnlock()
ml, ok := s.Metrics[name]
if !ok {
return nil
}
for _, m := range ml {
if m.Program != prog {
continue
}
return m
}
return nil
}
// ClearMetrics empties the store of all metrics.
func (s *Store) ClearMetrics() {
s.insertMu.Lock()
defer s.insertMu.Unlock()
s.searchMu.Lock()
defer s.searchMu.Unlock()
s.Metrics = make(map[string][]*Metric)
}
// MarshalJSON returns a JSON byte string representing the Store.
func (s *Store) MarshalJSON() (b []byte, err error) {
s.searchMu.RLock()
defer s.searchMu.RUnlock()
ms := make([]*Metric, 0)
for _, ml := range s.Metrics {
ms = append(ms, ml...)
}
return json.Marshal(ms)
}
// Range calls f sequentially for each Metric present in the store.
// The Metric is not locked when f is called.
// If f returns non nil error, Range stops the iteration.
// This looks a lot like sync.Map, ay.
func (s *Store) Range(f func(*Metric) error) error {
s.searchMu.RLock()
defer s.searchMu.RUnlock()
for _, ml := range s.Metrics {
for _, m := range ml {
if err := f(m); err != nil {
return err
}
}
}
return nil
}
// Gc iterates through the Store looking for metrics that can be tidied up,
// if they are passed their expiry or sized greater than their limit.
func (s *Store) Gc() error {
// log.Println("D! Running Store.Expire()")
now := time.Now()
return s.Range(func(m *Metric) error {
if m.Limit > 0 && len(m.LabelValues) >= m.Limit {
for i := len(m.LabelValues); i > m.Limit; i-- {
m.RemoveOldestDatum()
}
}
for i := 0; i < len(m.LabelValues); i++ {
lv := m.LabelValues[i]
if lv.Expiry <= 0 {
continue
}
if now.Sub(lv.Value.TimeUTC()) > lv.Expiry {
err := m.RemoveDatum(lv.Labels...)
if err != nil {
return err
}
i--
}
}
return nil
})
}
// StartGcLoop runs a permanent goroutine to expire metrics every duration.
func (s *Store) StartGcLoop(ctx context.Context, duration time.Duration) {
if duration <= 0 {
log.Println("Metric store expiration disabled")
return
}
go func() {
log.Printf("Starting metric store expiry loop every %s", duration.String())
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := s.Gc(); err != nil {
log.Println(err)
}
case <-ctx.Done():
return
}
}
}()
}
// WriteMetrics dumps the current state of the metrics store in JSON format to
// the io.Writer.
func (s *Store) WriteMetrics(w io.Writer) error {
s.searchMu.RLock()
b, err := json.MarshalIndent(s.Metrics, "", " ")
s.searchMu.RUnlock()
if err != nil {
return errors.Wrap(err, "failed to marshal metrics into json")
}
_, err = w.Write(b)
if err != nil {
return errors.Wrap(err, "failed to write metrics")
}
return nil
}