categraf/inputs/aliyun/cloud.go

415 lines
10 KiB
Go

package aliyun
import (
"context"
"fmt"
"log"
"sync"
"time"
cms20190101 "github.com/alibabacloud-go/cms-20190101/v8/client"
"github.com/alibabacloud-go/tea/tea"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/inputs/aliyun/internal/manager"
internalTypes "flashcat.cloud/categraf/inputs/aliyun/internal/types"
"flashcat.cloud/categraf/pkg/cache"
"flashcat.cloud/categraf/pkg/limiter"
"flashcat.cloud/categraf/pkg/stringx"
"flashcat.cloud/categraf/types"
)
const (
inputName = "aliyun"
timefmt = "2006-01-02 15:04:05"
)
type (
Aliyun struct {
config.PluginConfig
Instances []*Instance `toml:"instances"`
}
Instance struct {
config.InstanceConfig
// credentials.Config
Credential
client *manager.Manager `toml:"-"`
windowStart time.Time `toml:"-"`
windowEnd time.Time `toml:"-"`
Delay config.Duration `toml:"delay"`
Period config.Duration `toml:"period"`
Namespaces []string `json:"namespaces"`
Filters []MetricFilter `toml:"metric_filters"`
// 最大请求次数 限流用
RateLimit int `toml:"ratelimit"`
CacheTTL config.Duration `toml:"cache_ttl"`
BatchSize int `toml:"batch_size"`
RecentlyActive string `toml:"recently_active"`
// 请求超时设置
Timeout config.Duration `toml:"timeout"`
// 企业云监控配置项
// batchSize int `toml:"batchSize"`
metricCache *metricCache `toml:"-"`
metaCache *cache.BasicCache `toml:"-"`
}
Credential struct {
AccessKeyID *string `toml:"access_key_id"`
AccessKeySecret *string `toml:"access_key_secret"`
Region *string `toml:"region"`
Endpoint *string `toml:"endpoint"`
}
MetricFilter struct {
MetricNames []string `toml:"metric_names"`
Dimensions string `toml:"dimensions"`
Namespace string `toml:"namespace"`
}
filteredMetric struct {
metrics []internalTypes.Metric
}
// metricCache caches metrics, their filters, and generated queries.
metricCache struct {
ttl time.Duration
built time.Time
metrics []filteredMetric
}
)
func init() {
inputs.Add(inputName, func() inputs.Input {
return &Aliyun{}
})
}
func (a *Aliyun) Clone() inputs.Input {
return &Aliyun{}
}
func (a *Aliyun) Name() string {
return inputName
}
var _ inputs.SampleGatherer = new(Instance)
var _ inputs.Input = new(Aliyun)
var _ inputs.InstancesGetter = new(Aliyun)
func (ins *Instance) Init() error {
if ins == nil ||
ins.AccessKeySecret == nil ||
ins.AccessKeyID == nil ||
ins.Region == nil ||
ins.Endpoint == nil {
return types.ErrInstancesEmpty
}
if ins.BatchSize == 0 {
ins.BatchSize = 500
}
if ins.RateLimit == 0 {
ins.RateLimit = 25
}
if ins.CacheTTL == 0 {
ins.CacheTTL = config.Duration(time.Hour)
}
if ins.Timeout == 0 {
ins.Timeout = config.Duration(time.Second * 5)
}
if len(ins.Namespaces) == 0 {
ins.Namespaces = append(ins.Namespaces, "")
}
ins.metaCache = cache.NewBasicCache()
err := ins.initialize()
if err != nil {
return err
}
return nil
}
func (s *Aliyun) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(s.Instances))
for i := 0; i < len(s.Instances); i++ {
ret[i] = s.Instances[i]
}
return ret
}
func (ins *Instance) initialize() error {
if len(*ins.AccessKeyID) == 0 {
return fmt.Errorf("%s", "access_key_id is required")
}
if len(*ins.AccessKeySecret) == 0 {
return fmt.Errorf("%s", "E! access_key_secret is required")
}
if len(*ins.Region) == 0 {
return fmt.Errorf("%s", "region is required")
}
if len(*ins.Endpoint) == 0 {
return fmt.Errorf("%s", "endpoint is required")
}
if ins.client == nil {
cms := manager.NewCmsClient(*ins.AccessKeyID, *ins.AccessKeySecret, *ins.Region, *ins.Endpoint)
m, err := manager.New(cms)
if err != nil {
return fmt.Errorf("connect to aliyun error, %s", err)
}
ins.client = m
}
if ins.metaCache.Size() == 0 {
hosts, err := ins.client.GetEcsHosts()
if err != nil {
log.Println(err)
return err
}
for _, host := range hosts {
k := ins.client.EcsKey(*host.InstanceId)
ins.metaCache.Add(k, host)
}
}
return nil
}
func (f *metricCache) isValid() bool {
return f != nil && f.metrics != nil && time.Since(f.built) < f.ttl
}
// getFilteredMetrics returns metrics specified in the config file or metrics listed from Cloudwatch.
func (ins *Instance) getFilteredMetrics() ([]filteredMetric, error) {
if ins.metricCache != nil && ins.metricCache.isValid() {
return ins.metricCache.metrics, nil
}
fMetrics := []filteredMetric{}
allMetrics, err := ins.fetchNamespaceMetrics(ins.Namespaces)
if err != nil {
return nil, err
}
metrics := make([]internalTypes.Metric, 0, len(allMetrics))
if len(ins.Filters) != 0 {
for _, metric := range allMetrics {
for _, f := range ins.Filters {
if len(f.MetricNames) != 0 {
for _, name := range f.MetricNames {
if len(name) == 0 {
name = metric.MetricName
}
if isSelected(metric, name, f.Dimensions, f.Namespace) {
metrics = append(metrics, internalTypes.Metric{
MetricName: name,
Namespace: metric.Namespace,
Dimensions: metric.Dimensions,
LabelStr: metric.LabelStr,
})
}
}
} else {
if isSelected(metric, "", f.Dimensions, f.Namespace) {
metrics = append(metrics, internalTypes.Metric{
MetricName: metric.MetricName,
Namespace: metric.Namespace,
Dimensions: metric.Dimensions,
LabelStr: metric.LabelStr,
})
}
}
}
}
} else {
metrics = allMetrics
}
fMetrics = append(fMetrics, filteredMetric{
metrics: metrics,
})
if config.Config.DebugMode {
for _, m := range metrics {
log.Println("D!", m.Namespace, m.MetricName, m.Dimensions)
}
}
ins.metricCache = &metricCache{
metrics: fMetrics,
built: time.Now(),
ttl: time.Duration(ins.CacheTTL),
}
return fMetrics, nil
}
func (ins *Instance) Gather(slist *types.SampleList) {
ins.updateWindow(time.Now())
lmtr := limiter.NewRateLimiter(ins.RateLimit, time.Second)
defer lmtr.Stop()
wg := sync.WaitGroup{}
if ins.metricCache.isValid() {
for _, filtered := range ins.metricCache.metrics {
for j := range filtered.metrics {
<-lmtr.C
wg.Add(1)
go ins.sendMetrics(filtered.metrics[j], &wg, slist)
}
}
} else {
filteredMetrics, err := ins.getFilteredMetrics()
if err != nil {
log.Println("E!", err)
return
}
for _, filtered := range filteredMetrics {
for j := range filtered.metrics {
<-lmtr.C
wg.Add(1)
go ins.sendMetrics(filtered.metrics[j], &wg, slist)
}
}
}
wg.Wait()
}
func (ins *Instance) sendMetrics(metric internalTypes.Metric, wg *sync.WaitGroup, slist *types.SampleList) {
defer wg.Done()
ctx := context.Background()
req := new(cms20190101.DescribeMetricListRequest)
if len(metric.MetricName) != 0 {
req.MetricName = tea.String(metric.MetricName)
}
if len(metric.Namespace) != 0 {
req.Namespace = tea.String(metric.Namespace)
}
if len(metric.Dimensions) != 0 {
req.Dimensions = tea.String(metric.Dimensions)
}
if !ins.windowEnd.IsZero() {
req.EndTime = tea.String(ins.windowEnd.Format(timefmt))
}
if !ins.windowStart.IsZero() {
req.StartTime = tea.String(ins.windowStart.Format(timefmt))
}
points, err := ins.client.GetMetric(ctx, req)
if err != nil {
log.Println("E! get metrics error,", err)
return
}
for _, point := range points {
if point.Value != nil {
tags := ins.makeLabels(point)
mName := fmt.Sprintf("%s_%s", stringx.SnakeCase(point.Namespace), stringx.SnakeCase(point.MetricName))
slist.PushFront(types.NewSample(inputName, mName, *point.Value, tags, map[string]string{"namespace": metric.Namespace, "metric_name": metric.MetricName}).SetTime(point.GetMetricTime()))
}
}
}
func (ins *Instance) makeLabels(point internalTypes.Point, labels ...map[string]string) map[string]string {
result := map[string]string{}
for key, value := range ins.Labels {
result[key] = value
}
for _, lv := range labels {
for k, v := range lv {
result[k] = v
}
}
addLabel := func(instance interface{}) {
if meta, ok := instance.(*cms20190101.DescribeMonitoringAgentHostsResponseBodyHostsHost); ok {
result["ident"] = stringx.SnakeCase(*meta.HostName)
}
}
if instance, ok := ins.metaCache.Get(ins.client.EcsKey(point.InstanceID)); ok {
addLabel(instance)
}
result["user_id"] = point.UserID
if len(point.InstanceID) != 0 {
result["instance_id"] = point.InstanceID
}
if len(point.ClusterID) != 0 {
result["cluster_id"] = point.ClusterID
}
if len(point.NodeID) != 0 {
result["node_id"] = point.NodeID
}
return result
}
func (ins *Instance) updateWindow(relativeTo time.Time) {
windowEnd := relativeTo.Add(-time.Duration(ins.Delay))
if ins.windowEnd.IsZero() {
// this is the first run, no window info, so just get a single period
ins.windowStart = windowEnd.Add(-time.Duration(ins.Period))
} else {
// subsequent window, start where last window left off
ins.windowStart = ins.windowEnd
}
ins.windowEnd = windowEnd
}
// fetchNamespaceMetrics retrieves available metrics for a given aliyun namespace.
func (ins *Instance) fetchNamespaceMetrics(namespaces []string) ([]internalTypes.Metric, error) {
// func (ins *Instance) fetchNamespaceMetrics() ([]*cms20190101.DescribeMetricMetaListResponseBodyResourcesResource, error) {
var params *cms20190101.DescribeMetricMetaListRequest
// namespaces := ins.Namespaces
if len(namespaces) == 0 {
namespaces = append(namespaces, "")
}
// result := make([]*cms20190101.DescribeMetricMetaListResponseBodyResourcesResource, 0, 100)
result := make([]internalTypes.Metric, 0, 100)
for i, namespace := range namespaces {
params = &cms20190101.DescribeMetricMetaListRequest{
Namespace: tea.String(namespaces[i]),
}
resp, err := ins.client.ListMetrics(context.Background(), params)
if err != nil {
log.Printf("E! failed to list metrics with namespace %s: %v", namespace, err)
// skip problem namespace on error and continue to next namespace
return nil, err
}
for _, m := range resp {
point := internalTypes.Metric{
LabelStr: *m.Labels,
Namespace: *m.Namespace,
MetricName: *m.MetricName,
}
result = append(result, point)
}
}
return result, nil
}
func isSelected(metric internalTypes.Metric, name, dimensions, namespace string) bool {
if len(name) != 0 && name != metric.MetricName {
return false
}
if len(dimensions) != 0 && metric.Dimensions != dimensions {
return false
}
if len(namespace) != 0 && metric.Namespace != namespace {
return false
}
return true
}