forked from flashcat/categraf
161 lines
4.3 KiB
Go
161 lines
4.3 KiB
Go
//go:build !no_logs
|
|
|
|
package logs
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type timeProvider func() int64
|
|
|
|
type taggedPoint struct {
|
|
timeStamp int64
|
|
value int64
|
|
count int64
|
|
}
|
|
|
|
// StatsTracker Keeps track of simple stats over its lifetime and a configurable time range.
|
|
// StatsTracker is designed to be memory efficient by aggregating data into buckets. For example
|
|
// a time frame of 24 hours with a bucketFrame of 1 hour will ensure that only 24 points are ever
|
|
// kept in memory. New data is considered in the stats immediately while old data is removed by
|
|
// dropping expired aggregated buckets.
|
|
type StatsTracker struct {
|
|
allTimeAvg int64
|
|
allTimePeak int64
|
|
totalPoints int64
|
|
timeFrame int64
|
|
bucketFrame int64
|
|
avgPointsHead *taggedPoint
|
|
peakPointsHead *taggedPoint
|
|
aggregatedAvgPoints []*taggedPoint
|
|
aggregatedPeakPoints []*taggedPoint
|
|
timeProvider timeProvider
|
|
lock *sync.Mutex
|
|
}
|
|
|
|
// NewStatsTracker Creates a new StatsTracker instance
|
|
func NewStatsTracker(timeFrame time.Duration, bucketSize time.Duration) *StatsTracker {
|
|
return NewStatsTrackerWithTimeProvider(timeFrame, bucketSize, func() int64 {
|
|
return time.Now().UnixNano()
|
|
})
|
|
}
|
|
|
|
// NewStatsTrackerWithTimeProvider Creates a new StatsTracker instance with a time provider closure (mostly for testing)
|
|
func NewStatsTrackerWithTimeProvider(timeFrame time.Duration, bucketSize time.Duration, timeProvider timeProvider) *StatsTracker {
|
|
return &StatsTracker{
|
|
aggregatedAvgPoints: make([]*taggedPoint, 0),
|
|
aggregatedPeakPoints: make([]*taggedPoint, 0),
|
|
timeFrame: int64(timeFrame),
|
|
bucketFrame: int64(bucketSize),
|
|
timeProvider: timeProvider,
|
|
lock: &sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
// Add Records a new value to the stats tracker
|
|
func (s *StatsTracker) Add(value int64) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.allTimeAvg = (s.totalPoints*s.allTimeAvg + value) / (s.totalPoints + 1)
|
|
s.totalPoints++
|
|
|
|
if value > s.allTimePeak {
|
|
s.allTimePeak = value
|
|
}
|
|
|
|
now := s.timeProvider()
|
|
|
|
s.dropOldPoints(now)
|
|
|
|
if s.avgPointsHead == nil {
|
|
s.avgPointsHead = &taggedPoint{now, value, 0}
|
|
s.peakPointsHead = &taggedPoint{now, value, 0}
|
|
} else if s.peakPointsHead.value < value {
|
|
s.peakPointsHead.value = value
|
|
}
|
|
|
|
// We initialized avgPointsHead with the first value, don't count it twice
|
|
if s.avgPointsHead.count > 0 {
|
|
s.avgPointsHead.value = (s.avgPointsHead.count*s.avgPointsHead.value + value) / (s.avgPointsHead.count + 1)
|
|
}
|
|
s.avgPointsHead.count++
|
|
}
|
|
|
|
// AllTimeAvg Gets the all time average of values seen so far
|
|
func (s *StatsTracker) AllTimeAvg() int64 {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
return s.allTimeAvg
|
|
}
|
|
|
|
// MovingAvg Gets the moving average of values within the time frame
|
|
func (s *StatsTracker) MovingAvg() int64 {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.dropOldPoints(s.timeProvider())
|
|
|
|
if s.avgPointsHead == nil {
|
|
return 0
|
|
}
|
|
sum := s.avgPointsHead.value * s.avgPointsHead.count
|
|
count := s.avgPointsHead.count
|
|
for _, v := range s.aggregatedAvgPoints {
|
|
sum += v.value * v.count
|
|
count += v.count
|
|
}
|
|
return sum / count
|
|
}
|
|
|
|
// AllTimePeak Gets the largest value seen so far
|
|
func (s *StatsTracker) AllTimePeak() int64 {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
return s.allTimePeak
|
|
}
|
|
|
|
// MovingPeak Gets the largest value seen within the time frame
|
|
func (s *StatsTracker) MovingPeak() int64 {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.dropOldPoints(s.timeProvider())
|
|
|
|
if s.peakPointsHead == nil {
|
|
return 0
|
|
}
|
|
largest := s.peakPointsHead.value
|
|
for _, v := range s.aggregatedPeakPoints {
|
|
if v.value > largest {
|
|
largest = v.value
|
|
}
|
|
}
|
|
return largest
|
|
}
|
|
|
|
func (s *StatsTracker) dropOldPoints(now int64) {
|
|
if s.avgPointsHead != nil && s.avgPointsHead.timeStamp < now-s.bucketFrame {
|
|
// Pop off the oldest values
|
|
if len(s.aggregatedAvgPoints) > 0 {
|
|
dropFromIndex := 0
|
|
for _, v := range s.aggregatedAvgPoints {
|
|
if v.timeStamp > now-s.timeFrame {
|
|
break
|
|
}
|
|
dropFromIndex++
|
|
}
|
|
|
|
s.aggregatedAvgPoints = s.aggregatedAvgPoints[dropFromIndex:]
|
|
s.aggregatedPeakPoints = s.aggregatedPeakPoints[dropFromIndex:]
|
|
}
|
|
|
|
// Add the new aggregated point to the slice
|
|
s.aggregatedAvgPoints = append(s.aggregatedAvgPoints, s.avgPointsHead)
|
|
s.aggregatedPeakPoints = append(s.aggregatedPeakPoints, s.peakPointsHead)
|
|
s.avgPointsHead = nil
|
|
s.peakPointsHead = nil
|
|
}
|
|
}
|