forked from kubewharf/katalyst-core
152 lines
3.8 KiB
Go
152 lines
3.8 KiB
Go
/*
|
|
Copyright 2022 The Katalyst Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package general
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
)
|
|
|
|
// SmoothWindow is used to smooth the resource
|
|
type SmoothWindow interface {
|
|
// GetWindowedResources receives a sample and returns the result after smoothing,
|
|
// it can return nil if there are not enough samples in this window
|
|
GetWindowedResources(value resource.Quantity) *resource.Quantity
|
|
}
|
|
|
|
type CappedSmoothWindow struct {
|
|
sync.Mutex
|
|
last *resource.Quantity
|
|
minStep resource.Quantity
|
|
maxStep resource.Quantity
|
|
SmoothWindow
|
|
}
|
|
|
|
// NewCappedSmoothWindow creates a capped SmoothWindow, which
|
|
func NewCappedSmoothWindow(minStep resource.Quantity, maxStep resource.Quantity, smoothWindow SmoothWindow) *CappedSmoothWindow {
|
|
return &CappedSmoothWindow{minStep: minStep, maxStep: maxStep, SmoothWindow: smoothWindow}
|
|
}
|
|
|
|
// GetWindowedResources cap the value return by smooth window min to max
|
|
func (m *CappedSmoothWindow) GetWindowedResources(value resource.Quantity) *resource.Quantity {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
|
|
cur := m.SmoothWindow.GetWindowedResources(value)
|
|
if cur == nil {
|
|
cur = m.last
|
|
} else if m.last == nil {
|
|
m.last = cur
|
|
} else if cur.Cmp(*m.last) > 0 {
|
|
step := cur.DeepCopy()
|
|
step.Sub(*m.last)
|
|
if step.Cmp(m.minStep) < 0 {
|
|
cur = m.last
|
|
} else if step.Cmp(m.maxStep) > 0 {
|
|
m.last.Add(m.maxStep)
|
|
cur = m.last
|
|
} else {
|
|
m.last = cur
|
|
}
|
|
} else {
|
|
step := m.last.DeepCopy()
|
|
step.Sub(*cur)
|
|
if step.Cmp(m.minStep) < 0 {
|
|
cur = m.last
|
|
} else if step.Cmp(m.maxStep) > 0 {
|
|
m.last.Sub(m.maxStep)
|
|
cur = m.last
|
|
} else {
|
|
m.last = cur
|
|
}
|
|
}
|
|
|
|
if cur == nil {
|
|
return nil
|
|
}
|
|
|
|
ret := cur.DeepCopy()
|
|
return &ret
|
|
}
|
|
|
|
type averageWithTTLSmoothWindow struct {
|
|
sync.Mutex
|
|
windowSize int
|
|
ttl time.Duration
|
|
usedMillValue bool
|
|
|
|
index int
|
|
samples []*sample
|
|
}
|
|
|
|
type sample struct {
|
|
value resource.Quantity
|
|
timestamp time.Time
|
|
}
|
|
|
|
// NewAverageWithTTLSmoothWindow create a smooth window with ttl and window size, and the window size
|
|
// is the sample count while the ttl is the valid lifetime of each sample, and the usedMillValue means
|
|
// whether calculate the result with milli-value.
|
|
func NewAverageWithTTLSmoothWindow(windowSize int, ttl time.Duration, usedMillValue bool) SmoothWindow {
|
|
return &averageWithTTLSmoothWindow{
|
|
windowSize: windowSize,
|
|
ttl: ttl,
|
|
usedMillValue: usedMillValue,
|
|
index: 0,
|
|
samples: make([]*sample, windowSize),
|
|
}
|
|
}
|
|
|
|
// GetWindowedResources inserts a sample, and returns the smoothed result by average all the valid samples.
|
|
func (w *averageWithTTLSmoothWindow) GetWindowedResources(value resource.Quantity) *resource.Quantity {
|
|
w.Mutex.Lock()
|
|
defer w.Mutex.Unlock()
|
|
|
|
timestamp := time.Now()
|
|
w.samples[w.index] = &sample{
|
|
value: value,
|
|
timestamp: timestamp,
|
|
}
|
|
|
|
w.index++
|
|
if w.index >= w.windowSize {
|
|
w.index = 0
|
|
}
|
|
|
|
total := resource.Quantity{}
|
|
count := int64(0)
|
|
for _, s := range w.samples {
|
|
if s != nil && s.timestamp.Add(w.ttl).After(timestamp) {
|
|
total.Add(s.value)
|
|
count++
|
|
}
|
|
}
|
|
|
|
// if count of valid sample is not enough just return nil
|
|
if count != int64(w.windowSize) {
|
|
return nil
|
|
}
|
|
|
|
if w.usedMillValue {
|
|
return resource.NewMilliQuantity(total.MilliValue()/count, value.Format)
|
|
}
|
|
|
|
return resource.NewQuantity(total.Value()/count, value.Format)
|
|
}
|