katalyst-core/pkg/custom-metric/store/data/cache.go

684 lines
17 KiB
Go

/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package data
import (
"encoding/json"
"fmt"
"sort"
"sync"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)
const (
metricsNameKCMASStoreDataLatencySet = "kcmas_store_data_latency_set"
metricsNameKCMASStoreDataLatencyGet = "kcmas_store_data_latency_get"
)
type InternalValue struct {
Value int64 `json:"value,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
}
func NewInternalValue(value int64, timestamp int64) *InternalValue {
return &InternalValue{
Value: value,
Timestamp: timestamp,
}
}
func (i *InternalValue) DeepCopy() *InternalValue {
return &InternalValue{
Value: i.Value,
Timestamp: i.Timestamp,
}
}
func (i *InternalValue) GetValue() int64 { return i.Value }
func (i *InternalValue) GetTimestamp() int64 { return i.Timestamp }
type MetricMeta struct {
Name string `json:"name,omitempty"`
Namespaced bool `json:"namespaced,omitempty"`
ObjectKind string `json:"objectKind,omitempty"`
}
func (m *MetricMeta) GetName() string { return m.Name }
func (m *MetricMeta) GetNamespaced() bool { return m.Namespaced }
func (m *MetricMeta) GetObjectKind() string { return m.ObjectKind }
func (m *MetricMeta) SetObjectKind(objectKind string) { m.ObjectKind = objectKind }
func (m *MetricMeta) SetNamespaced(namespaced bool) { m.Namespaced = namespaced }
func (m *MetricMeta) SetName(name string) { m.Name = name }
type ObjectMeta struct {
ObjectNamespace string `json:"objectNamespace,omitempty"`
ObjectName string `json:"objectName,omitempty"`
}
func (m *ObjectMeta) GetObjectNamespace() string { return m.ObjectNamespace }
func (m *ObjectMeta) GetObjectName() string { return m.ObjectName }
func (m *ObjectMeta) SetObjectNamespace(objectNamespace string) { m.ObjectNamespace = objectNamespace }
func (m *ObjectMeta) SetObjectName(objectName string) { m.ObjectName = objectName }
// InternalMetric is used as an internal version of metricItem
type InternalMetric struct {
MetricMeta `json:",inline"`
ObjectMeta `json:",inline"`
Labels map[string]string `json:"labels,omitempty"`
InternalValue []*InternalValue `json:"internalValue,omitempty"`
}
func NewInternalMetric(name string) *InternalMetric {
return &InternalMetric{
MetricMeta: MetricMeta{
Name: name,
},
Labels: make(map[string]string),
}
}
func (a *InternalMetric) String() string {
return fmt.Sprintf("{ObjectNamespace: %v, Name: %v, ObjectKind: %v, ObjectName: %v}",
a.ObjectNamespace, a.Name, a.ObjectKind, a.ObjectName)
}
func (a *InternalMetric) DeepCopy() *InternalMetric {
b := &InternalMetric{
MetricMeta: a.MetricMeta,
ObjectMeta: a.ObjectMeta,
Labels: general.DeepCopyMap(a.Labels),
}
for _, i := range a.InternalValue {
b.InternalValue = append(b.InternalValue, i.DeepCopy())
}
return b
}
func (a *InternalMetric) DeepCopyWithLimit(limit int) *InternalMetric {
b := a.DeepCopy()
total := len(b.InternalValue)
if limit > 0 && total > limit {
b.InternalValue = b.InternalValue[total-limit:]
}
return b
}
func (a *InternalMetric) GetLabels() map[string]string { return a.Labels }
func (a *InternalMetric) GetValues() []*InternalValue { return a.InternalValue }
func (a *InternalMetric) SetObjectNamespace(objectNamespace string) {
a.ObjectMeta.SetObjectNamespace(objectNamespace)
a.SetNamespaced(objectNamespace != "")
}
func (a *InternalMetric) SetLabel(key, value string) { a.Labels[key] = value }
func (a *InternalMetric) AppendMetric(i *InternalValue) { a.InternalValue = append(a.InternalValue, i) }
func (a *InternalMetric) Len() int { return len(a.InternalValue) }
func (a *InternalMetric) Less(i, j int) bool {
return a.InternalValue[i].Timestamp < a.InternalValue[j].Timestamp
}
func (a *InternalMetric) Swap(i, j int) {
a.InternalValue[i], a.InternalValue[j] = a.InternalValue[j], a.InternalValue[i]
}
func (a *InternalMetric) GenerateTags() []metrics.MetricTag {
return []metrics.MetricTag{
{Key: "metric_name", Val: a.Name},
{Key: "object_name", Val: a.ObjectName},
}
}
func Marshal(internalList []*InternalMetric) ([]byte, error) {
return json.Marshal(internalList)
}
func Unmarshal(bytes []byte) ([]*InternalMetric, error) {
var res []*InternalMetric
err := json.Unmarshal(bytes, &res)
return res, err
}
// PackInternalMetricList merges internal metric lists and sort them, if the same
// timestamp appears in different list (which should happen actually), we will
// randomly choose one item.
func PackInternalMetricList(internalLists ...[]*InternalMetric) []*InternalMetric {
if len(internalLists) == 0 {
return []*InternalMetric{}
} else if len(internalLists) == 1 {
return internalLists[0]
}
c := NewCachedMetric(metrics.DummyMetrics{})
for _, internalList := range internalLists {
c.Add(internalList...)
}
return c.ListAllMetric()
}
// PackMetricMetaList merges MetricMeta lists and removes duplicates
func PackMetricMetaList(metricMetaLists ...[]MetricMeta) []MetricMeta {
metricTypeMap := make(map[MetricMeta]interface{})
for _, metricsTypeList := range metricMetaLists {
for _, metricsType := range metricsTypeList {
if _, ok := metricTypeMap[metricsType]; !ok {
metricTypeMap[metricsType] = struct{}{}
}
}
}
var res []MetricMeta
for metricType := range metricTypeMap {
res = append(res, metricType)
}
return res
}
// GenerateMetaTags returns tag based on the given meta-info
func GenerateMetaTags(m *MetricMeta, d *ObjectMeta) []metrics.MetricTag {
return []metrics.MetricTag{
{Key: "metric_name", Val: m.Name},
{Key: "object_name", Val: d.ObjectName},
}
}
// CachedMetric stores all metricItems in an organized way;
type CachedMetric struct {
sync.RWMutex
metricsEmitter metrics.MetricEmitter
// namespaced is used as a normal storage format, while
// metricMap flatten the metrics as a reversed index to make refer faster
namespaced map[string]*namespacedMetric
metricMap map[MetricMeta]map[ObjectMeta]*InternalMetric
}
func NewCachedMetric(metricsEmitter metrics.MetricEmitter) *CachedMetric {
return &CachedMetric{
metricsEmitter: metricsEmitter,
namespaced: make(map[string]*namespacedMetric),
metricMap: make(map[MetricMeta]map[ObjectMeta]*InternalMetric),
}
}
func (c *CachedMetric) Add(dList ...*InternalMetric) {
now := time.Now()
c.Lock()
defer c.Unlock()
for _, d := range dList {
if d == nil || len(d.InternalValue) == 0 || d.Name == "" {
continue
}
objectNamespace := d.ObjectNamespace
if _, ok := c.namespaced[objectNamespace]; !ok {
c.namespaced[objectNamespace] = newNamespacedMetric()
}
addedValues := c.namespaced[objectNamespace].add(d)
if len(addedValues) == 0 {
continue
}
if _, ok := c.metricMap[d.MetricMeta]; !ok {
c.metricMap[d.MetricMeta] = make(map[ObjectMeta]*InternalMetric)
}
if _, ok := c.metricMap[d.MetricMeta][d.ObjectMeta]; !ok {
c.metricMap[d.MetricMeta][d.ObjectMeta] = &InternalMetric{
MetricMeta: d.MetricMeta,
ObjectMeta: d.ObjectMeta,
}
}
c.metricMap[d.MetricMeta][d.ObjectMeta].Labels = d.Labels
c.metricMap[d.MetricMeta][d.ObjectMeta].InternalValue = append(c.metricMap[d.MetricMeta][d.ObjectMeta].InternalValue, addedValues...)
if len(addedValues) > 0 {
sort.Sort(c.metricMap[d.MetricMeta][d.ObjectMeta])
// todo, support to emit metrics only when the functionality switched on
index := c.metricMap[d.MetricMeta][d.ObjectMeta].Len() - 1
costs := now.Sub(time.UnixMilli(c.metricMap[d.MetricMeta][d.ObjectMeta].InternalValue[index].Timestamp)).Microseconds()
_ = c.metricsEmitter.StoreInt64(metricsNameKCMASStoreDataLatencySet, costs, metrics.MetricTypeNameRaw,
GenerateMetaTags(&d.MetricMeta, &d.ObjectMeta)...)
}
}
}
// ListAllMetric returns all metric with a flattened slice
func (c *CachedMetric) ListAllMetric() []*InternalMetric {
c.RLock()
defer c.RUnlock()
var res []*InternalMetric
for _, internalMap := range c.metricMap {
for _, internal := range internalMap {
res = append(res, internal.DeepCopy())
}
}
return res
}
// ListAllMetricMeta returns all metric meta with a flattened slice
func (c *CachedMetric) ListAllMetricMeta(withObject bool) []MetricMeta {
c.RLock()
defer c.RUnlock()
var res []MetricMeta
for metricMeta := range c.metricMap {
if (withObject && metricMeta.GetObjectKind() == "") ||
(!withObject && metricMeta.GetObjectKind() != "") {
continue
}
res = append(res, metricMeta)
}
return res
}
// ListAllMetricNames returns all metric with a flattened slice, but only contain names
func (c *CachedMetric) ListAllMetricNames() []string {
c.RLock()
defer c.RUnlock()
var res []string
for metricMeta, internalMap := range c.metricMap {
if len(internalMap) == 0 {
continue
}
res = append(res, metricMeta.Name)
}
return res
}
// GetMetric returns the metric matched with the given key
func (c *CachedMetric) GetMetric(namespace, metricName string, gr *schema.GroupResource) ([]*InternalMetric, bool) {
return c.GetMetricWithLimit(namespace, metricName, gr, -1)
}
func (c *CachedMetric) GetMetricWithLimit(namespace, metricName string, gr *schema.GroupResource, limit int) ([]*InternalMetric, bool) {
now := time.Now()
c.RLock()
defer c.RUnlock()
var res []*InternalMetric
metricMeta := MetricMeta{
Name: metricName,
Namespaced: namespace != "",
}
if gr != nil {
metricMeta.SetObjectKind(gr.String())
}
if internalMap, ok := c.metricMap[metricMeta]; ok {
for _, internal := range internalMap {
res = append(res, internal.DeepCopyWithLimit(limit))
if internal.Len() > 0 {
costs := now.Sub(time.UnixMilli(internal.InternalValue[internal.Len()-1].Timestamp)).Microseconds()
_ = c.metricsEmitter.StoreInt64(metricsNameKCMASStoreDataLatencyGet, costs, metrics.MetricTypeNameRaw, internal.GenerateTags()...)
}
}
return res, true
}
return nil, false
}
// GetMetricInNamespace returns the metric matched with the given Namespace
func (c *CachedMetric) GetMetricInNamespace(namespace string) []*InternalMetric {
return c.GetMetricInNamespaceWithLimit(namespace, -1)
}
func (c *CachedMetric) GetMetricInNamespaceWithLimit(namespace string, limit int) []*InternalMetric {
now := time.Now()
c.RLock()
defer c.RUnlock()
var res []*InternalMetric
if _, ok := c.namespaced[namespace]; !ok {
return res
}
for _, internal := range c.namespaced[namespace].listAllMetric() {
internal.SetObjectNamespace(namespace)
res = append(res, internal.DeepCopyWithLimit(limit))
if internal.Len() > 0 {
costs := now.Sub(time.UnixMilli(internal.InternalValue[internal.Len()-1].Timestamp)).Microseconds()
_ = c.metricsEmitter.StoreInt64(metricsNameKCMASStoreDataLatencyGet, costs, metrics.MetricTypeNameRaw, internal.GenerateTags()...)
}
}
return res
}
func (c *CachedMetric) GC(expiredTime time.Time) {
c.gcWithTimestamp(expiredTime.UnixMilli())
}
func (c *CachedMetric) gcWithTimestamp(expiredTimestamp int64) {
c.Lock()
defer c.Unlock()
for name, n := range c.namespaced {
n.gc(expiredTimestamp)
if n.empty() {
delete(c.namespaced, name)
}
}
for metricMeta, internalMap := range c.metricMap {
for objectMeta, internal := range internalMap {
var valueList []*InternalValue
for _, value := range internal.InternalValue {
if value.Timestamp > expiredTimestamp {
valueList = append(valueList, value)
}
}
internal.InternalValue = valueList
if len(internal.InternalValue) == 0 {
delete(internalMap, objectMeta)
}
}
if len(internalMap) == 0 {
delete(c.metricMap, metricMeta)
}
}
}
// namespacedMetric stores all metricItems in a certain Namespace in an organized way
type namespacedMetric struct {
sync.RWMutex
resourced map[string]*resourcedMetric
}
func newNamespacedMetric() *namespacedMetric {
return &namespacedMetric{
resourced: make(map[string]*resourcedMetric),
}
}
func (n *namespacedMetric) listAllMetric() []*InternalMetric {
n.RLock()
defer n.RUnlock()
var res []*InternalMetric
for objectKind, resourced := range n.resourced {
for _, internal := range resourced.listAllMetric() {
internal.SetObjectKind(objectKind)
res = append(res, internal)
}
}
return res
}
func (n *namespacedMetric) add(d *InternalMetric) []*InternalValue {
objectKind := d.ObjectKind
n.Lock()
defer n.Unlock()
if _, ok := n.resourced[objectKind]; !ok {
n.resourced[objectKind] = newResourcedMetric()
}
return n.resourced[objectKind].add(d)
}
func (n *namespacedMetric) gc(expiredTimestamp int64) {
n.Lock()
defer n.Unlock()
for name, r := range n.resourced {
r.gc(expiredTimestamp)
if r.empty() {
delete(n.resourced, name)
}
}
}
func (n *namespacedMetric) empty() bool {
return len(n.resourced) == 0
}
// resourcedMetric stores all metricItems for a certain Object kind in an organized way
type resourcedMetric struct {
sync.RWMutex
objected map[string]*objectedMetric
}
func newResourcedMetric() *resourcedMetric {
return &resourcedMetric{
objected: make(map[string]*objectedMetric),
}
}
func (r *resourcedMetric) listAllMetric() []*InternalMetric {
r.RLock()
defer r.RUnlock()
var res []*InternalMetric
for objectName, objected := range r.objected {
for _, internal := range objected.listAllMetric() {
internal.SetObjectName(objectName)
res = append(res, internal)
}
}
return res
}
func (r *resourcedMetric) add(d *InternalMetric) []*InternalValue {
objName := d.ObjectName
r.Lock()
defer r.Unlock()
if _, ok := r.objected[objName]; !ok {
r.objected[objName] = newObjectedMetric()
}
return r.objected[objName].add(d)
}
func (r *resourcedMetric) gc(expiredTimestamp int64) {
r.Lock()
defer r.Unlock()
for name, o := range r.objected {
o.gc(expiredTimestamp)
if o.empty() {
delete(r.objected, name)
}
}
}
func (r *resourcedMetric) empty() bool {
return len(r.objected) == 0
}
// objectedMetric stores all metricItems for a certain Object in an organized way
type objectedMetric struct {
sync.RWMutex
series map[string]*seriesMetric
}
func newObjectedMetric() *objectedMetric {
return &objectedMetric{
series: make(map[string]*seriesMetric),
}
}
func (o *objectedMetric) listAllMetric() []*InternalMetric {
o.RLock()
defer o.RUnlock()
var res []*InternalMetric
for name, series := range o.series {
internal := series.listAllMetric()
internal.SetName(name)
res = append(res, internal)
}
return res
}
func (o *objectedMetric) add(d *InternalMetric) []*InternalValue {
// not support for empty metric Name
if d.Name == "" {
return []*InternalValue{}
}
o.Lock()
defer o.Unlock()
if _, ok := o.series[d.Name]; !ok {
o.series[d.Name] = newSeriesMetric(d.Labels)
}
// always overwrite Labels if we have changed it
o.series[d.Name].labels = d.Labels
return o.series[d.Name].add(d.InternalValue)
}
func (o *objectedMetric) gc(expiredTimestamp int64) {
o.Lock()
defer o.Unlock()
for name, s := range o.series {
s.gc(expiredTimestamp)
if s.empty() {
delete(o.series, name)
}
}
}
func (o *objectedMetric) empty() bool {
return len(o.series) == 0
}
// seriesMetric stores all metricItems for a certain Object in an organized way
type seriesMetric struct {
sync.RWMutex
// Timestamp will be used as a unique key to avoid
// duplicated metric to be written more than once
timestampSets map[int64]interface{}
expiredTime int64
labels map[string]string
ordered []metric
}
func newSeriesMetric(labels map[string]string) *seriesMetric {
return &seriesMetric{
labels: labels,
timestampSets: make(map[int64]interface{}),
}
}
func (s *seriesMetric) listAllMetric() *InternalMetric {
s.RLock()
defer s.RUnlock()
res := &InternalMetric{
Labels: s.labels,
}
for _, ordered := range s.ordered {
res.InternalValue = append(res.InternalValue, &InternalValue{
Value: ordered.data,
Timestamp: ordered.timestamp,
})
}
return res
}
// add returns those valid InternalMetric slice
func (s *seriesMetric) add(valueList []*InternalValue) []*InternalValue {
s.Lock()
defer s.Unlock()
var res []*InternalValue
for _, v := range valueList {
// Timestamp must be none-empty and valid
if v.Timestamp == 0 || v.Timestamp < s.expiredTime {
continue
}
// Timestamp must be unique
if _, ok := s.timestampSets[v.Timestamp]; ok {
continue
}
s.timestampSets[v.Timestamp] = struct{}{}
// always make the Value list as ordered
i := sort.Search(len(s.ordered), func(i int) bool {
return v.Timestamp < s.ordered[i].timestamp
})
s.ordered = append(s.ordered, metric{})
copy(s.ordered[i+1:], s.ordered[i:])
s.ordered[i] = metric{
data: v.Value,
timestamp: v.Timestamp,
}
res = append(res, v)
}
return res
}
func (s *seriesMetric) gc(expiredTimestamp int64) {
s.Lock()
defer s.Unlock()
s.expiredTime = expiredTimestamp
var ordered []metric
for _, m := range s.ordered {
if m.timestamp > expiredTimestamp {
ordered = append(ordered, m)
} else {
delete(s.timestampSets, m.timestamp)
}
}
s.ordered = ordered
}
func (s *seriesMetric) empty() bool {
return len(s.ordered) == 0
}
type metric struct {
data int64
timestamp int64
}