forked from flashcat/categraf
188 lines
5.1 KiB
Go
188 lines
5.1 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package telemetry // import "go.opentelemetry.io/collector/service/pkg/telemetry"
|
|
|
|
import (
|
|
"os"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/shirou/gopsutil/v3/process"
|
|
"go.opencensus.io/metric"
|
|
"go.opencensus.io/stats"
|
|
)
|
|
|
|
// processMetrics is a struct that contains views related to process metrics (cpu, mem, etc)
|
|
type processMetrics struct {
|
|
startTimeUnixNano int64
|
|
ballastSizeBytes uint64
|
|
proc *process.Process
|
|
|
|
processUptime *metric.Float64DerivedCumulative
|
|
allocMem *metric.Int64DerivedGauge
|
|
totalAllocMem *metric.Int64DerivedCumulative
|
|
sysMem *metric.Int64DerivedGauge
|
|
cpuSeconds *metric.Float64DerivedCumulative
|
|
rssMemory *metric.Int64DerivedGauge
|
|
|
|
// mu protects everything bellow.
|
|
mu sync.Mutex
|
|
lastMsRead time.Time
|
|
ms *runtime.MemStats
|
|
}
|
|
|
|
// RegisterProcessMetrics creates a new set of processMetrics (mem, cpu) that can be used to measure
|
|
// basic information about this process.
|
|
func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64) error {
|
|
pm := &processMetrics{
|
|
startTimeUnixNano: time.Now().UnixNano(),
|
|
ballastSizeBytes: ballastSizeBytes,
|
|
ms: &runtime.MemStats{},
|
|
}
|
|
var err error
|
|
pm.proc, err = process.NewProcess(int32(os.Getpid()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pm.processUptime, err = registry.AddFloat64DerivedCumulative(
|
|
"process/uptime",
|
|
metric.WithDescription("Uptime of the process"),
|
|
metric.WithUnit(stats.UnitSeconds))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = pm.processUptime.UpsertEntry(pm.updateProcessUptime); err != nil {
|
|
return err
|
|
}
|
|
|
|
pm.allocMem, err = registry.AddInt64DerivedGauge(
|
|
"process/runtime/heap_alloc_bytes",
|
|
metric.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"),
|
|
metric.WithUnit(stats.UnitBytes))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = pm.allocMem.UpsertEntry(pm.updateAllocMem); err != nil {
|
|
return err
|
|
}
|
|
|
|
pm.totalAllocMem, err = registry.AddInt64DerivedCumulative(
|
|
"process/runtime/total_alloc_bytes",
|
|
metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"),
|
|
metric.WithUnit(stats.UnitBytes))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = pm.totalAllocMem.UpsertEntry(pm.updateTotalAllocMem); err != nil {
|
|
return err
|
|
}
|
|
|
|
pm.sysMem, err = registry.AddInt64DerivedGauge(
|
|
"process/runtime/total_sys_memory_bytes",
|
|
metric.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"),
|
|
metric.WithUnit(stats.UnitBytes))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = pm.sysMem.UpsertEntry(pm.updateSysMem); err != nil {
|
|
return err
|
|
}
|
|
|
|
pm.cpuSeconds, err = registry.AddFloat64DerivedCumulative(
|
|
"process/cpu_seconds",
|
|
metric.WithDescription("Total CPU user and system time in seconds"),
|
|
metric.WithUnit(stats.UnitSeconds))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = pm.cpuSeconds.UpsertEntry(pm.updateCPUSeconds); err != nil {
|
|
return err
|
|
}
|
|
|
|
pm.rssMemory, err = registry.AddInt64DerivedGauge(
|
|
"process/memory/rss",
|
|
metric.WithDescription("Total physical memory (resident set size)"),
|
|
metric.WithUnit(stats.UnitBytes))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = pm.rssMemory.UpsertEntry(pm.updateRSSMemory); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (pm *processMetrics) updateProcessUptime() float64 {
|
|
now := time.Now().UnixNano()
|
|
return float64(now-pm.startTimeUnixNano) / 1e9
|
|
}
|
|
|
|
func (pm *processMetrics) updateAllocMem() int64 {
|
|
pm.mu.Lock()
|
|
defer pm.mu.Unlock()
|
|
pm.readMemStatsIfNeeded()
|
|
return int64(pm.ms.Alloc)
|
|
}
|
|
|
|
func (pm *processMetrics) updateTotalAllocMem() int64 {
|
|
pm.mu.Lock()
|
|
defer pm.mu.Unlock()
|
|
pm.readMemStatsIfNeeded()
|
|
return int64(pm.ms.TotalAlloc)
|
|
}
|
|
|
|
func (pm *processMetrics) updateSysMem() int64 {
|
|
pm.mu.Lock()
|
|
defer pm.mu.Unlock()
|
|
pm.readMemStatsIfNeeded()
|
|
return int64(pm.ms.Sys)
|
|
}
|
|
|
|
func (pm *processMetrics) updateCPUSeconds() float64 {
|
|
times, err := pm.proc.Times()
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
|
|
return times.Total()
|
|
}
|
|
|
|
func (pm *processMetrics) updateRSSMemory() int64 {
|
|
mem, err := pm.proc.MemoryInfo()
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return int64(mem.RSS)
|
|
}
|
|
|
|
func (pm *processMetrics) readMemStatsIfNeeded() {
|
|
now := time.Now()
|
|
// If last time we read was less than one second ago just reuse the values
|
|
if now.Sub(pm.lastMsRead) < time.Second {
|
|
return
|
|
}
|
|
pm.lastMsRead = now
|
|
runtime.ReadMemStats(pm.ms)
|
|
if pm.ballastSizeBytes > 0 {
|
|
pm.ms.Alloc -= pm.ballastSizeBytes
|
|
pm.ms.HeapAlloc -= pm.ballastSizeBytes
|
|
pm.ms.HeapSys -= pm.ballastSizeBytes
|
|
pm.ms.HeapInuse -= pm.ballastSizeBytes
|
|
}
|
|
}
|