自定义 POD 调度 Scheduling Framework
kube-scheduler 是 kubernetes 的核心组件之一,主要负责整个集群资源的调度功能,根据特定的调度算法和策略,将 Pod 调度到最优的工作节点上面去,从而更加合理、更加充分的利用集群的资源
Kubernetes v1.15 版本中引入了可插拔架构的调度框架,调库框架向现有的调度器中添加了一组插件化的 API,该 API 在保持调度程序“核心”简单且易于维护的同时,使得大部分的调度功能以插件的形式存在,参考文档:调度框架
调度框架
调度框架定义了一组扩展点,用户可以实现扩展点定义的接口来定义自己的调度逻辑(我们称之为扩展),并将扩展注册到扩展点上,调度框架在执行调度工作流时,遇到对应的扩展点时,将调用用户注册的扩展。调度框架在预留扩展点时,都是有特定的目的,有些扩展点上的扩展可以改变调度程序的决策方法,有些扩展点上的扩展只是发送一个通知
我们知道每当调度一个 Pod 时,都会按照两个过程来执行:调度过程和绑定过程
调度过程为 Pod 选择一个合适的节点,绑定过程则将调度过程的决策应用到集群中(也就是在被选定的节点上运行 Pod),将调度过程和绑定过程合在一起,称之为调度上下文(scheduling context)。需要注意的是调度过程是同步
运行的(同一时间点只为一个 Pod 进行调度),绑定过程可异步运行(同一时间点可并发为多个 Pod 执行绑定)
调度过程和绑定过程遇到如下情况时会中途退出:
- 调度程序认为当前没有该 Pod 的可选节点
- 内部错误
这个时候,该 Pod 将被放回到 待调度队列,并等待下次重试
扩展点
下图展示了调度框架中的调度上下文及其中的扩展点,一个扩展可以注册多个扩展点,以便可以执行更复杂的有状态的任务,参考:调度器配置
QueueSort
扩展用于对 Pod 的待调度队列进行排序,以决定先调度哪个 Pod,QueueSort
扩展本质上只需要实现一个方法Less(Pod1, Pod2)
用于比较两个 Pod 谁更优先获得调度即可,同一时间点只能有一个QueueSort
插件生效Pre-filter
扩展用于对 Pod 的信息进行预处理,或者检查一些集群或 Pod 必须满足的前提条件,如果pre-filter
返回了 error,则调度过程终止Filter
扩展用于排除那些不能运行该 Pod 的节点,对于每一个节点,调度器将按顺序执行filter
扩展;如果任何一个filter
将节点标记为不可选,则余下的filter
扩展将不会被执行。调度器可以同时对多个节点执行filter
扩展Post-filter
是一个通知类型的扩展点,调用该扩展的参数是filter
阶段结束后被筛选为可选节点的节点列表,可以在扩展中使用这些信息更新内部状态,或者产生日志或 metrics 信息PreScore
扩展用于执行 “前置评分(pre-scoring)” 工作,即生成一个可共享状态供 Score 插件使用。 如果 PreScore 插件返回错误,则调度周期将终止Score
评分插件用于对通过过滤阶段的节点进行排名。调度器为每个节点调用每个评分插件。 将有一个定义明确的整数范围,代表最小和最大分数。 在标准化评分阶段之后,调度器将根据配置的插件权重 合并所有插件的节点分数NormalizeScore
扩展在调度器对节点进行最终排序之前修改每个节点的评分结果,注册到该扩展点的扩展在被调用时,将获得同一个插件中的scoring
扩展的评分结果作为参数,调度框架每执行一次调度,都将调用所有插件中的一个normalize scoring
扩展一次Reserve
是一个通知性质的扩展点,有状态的插件可以使用该扩展点来获得节点上为 Pod 预留的资源,该事件发生在调度器将 Pod 绑定到节点之前,目的是避免调度器在等待 Pod 与节点绑定的过程中调度新的 Pod 到节点上时,发生实际使用资源超出可用资源的情况。(因为绑定 Pod 到节点上是异步发生的)。这是调度过程的最后一个步骤,Pod 进入 reserved 状态以后,要么在绑定失败时触发 Unreserve 扩展,要么在绑定成功时,由 Post-bind 扩展结束绑定过程Permit
扩展在每个 Pod 调度周期的最后调用,用于阻止或者延迟 Pod 与节点的绑定。Permit 扩展可以做下面三件事中的一项:- approve(批准):当所有的 permit 扩展都 approve 了 Pod 与节点的绑定,调度器将继续执行绑定过程
- deny(拒绝):如果任何一个 permit 扩展 deny 了 Pod 与节点的绑定,Pod 将被放回到待调度队列,此时将触发
Unreserve
扩展 - wait(等待):如果一个 permit 扩展返回了 wait,则 Pod 将保持在 permit 阶段,同时该 Pod 的绑定周期启动时即直接阻塞直到得到批准,如果超时事件发生,wait 状态变成 deny,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展
Pre-bind
扩展用于在 Pod 绑定之前执行某些逻辑。例如,pre-bind 扩展可以将一个基于网络的数据卷挂载到节点上,以便 Pod 可以使用。如果任何一个pre-bind
扩展返回错误,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展Bind
扩展用于将 Pod 绑定到节点上:- 只有所有的 pre-bind 扩展都成功执行了,bind 扩展才会执行
- 调度框架按照 bind 扩展注册的顺序逐个调用 bind 扩展
- 具体某个 bind 扩展可以选择处理或者不处理该 Pod
- 如果某个 bind 扩展处理了该 Pod 与节点的绑定,余下的 bind 扩展将被忽略
- 如果失败,则执行 Unreverse 扩展点将预先消费的资源释放掉(如 PVC 和 PV),并将 Pod 从调度队列中删除
Post-bind
是一个通知性质的扩展,是整个调度的最后一步:- Post-bind 扩展在 Pod 成功绑定到节点上之后被动调用
- Post-bind 扩展是绑定过程的最后一个步骤,可以用来执行资源清理的动作
Unreserve
是一个通知性质的扩展,如果为 Pod 预留了资源,Pod 又在被绑定过程中被拒绝绑定,则 unreserve 扩展将被调用。Unreserve 扩展应该释放已经为 Pod 预留的节点上的计算资源。在一个插件中,reserve 扩展和 unreserve 扩展应该成对出现
官方实现了一些扩展点插件:kubernetes/pkg/scheduler/framework/plugins
如果我们要实现自己的插件,必须向调度框架注册插件并完成配置,另外还必须实现扩展点接口,参考项目:kubernetes-sigs/scheduler-plugins
扩展的调用顺序如下:
- 如果某个扩展点没有配置对应的扩展,调度框架将使用默认插件中的扩展
- 如果为某个扩展点配置且激活了扩展,则调度框架将先调用默认插件的扩展,再调用配置中的扩展
- 默认插件的扩展始终被最先调用,然后按照
KubeSchedulerConfiguration
中扩展的激活enabled
顺序逐个调用扩展点的扩展 - 可以先禁用默认插件的扩展,然后在
enabled
列表中的某个位置激活默认插件的扩展,这种做法可以改变默认插件的扩展被调用时的顺序
基本示例
下面是一个实现 PreFilter 扩展点示例,用来限制命名空间内 POD 最大调度数量
// go.mod
require (
k8s.io/api v0.22.3
k8s.io/apimachinery v0.22.3
k8s.io/component-base v0.22.3
k8s.io/klog/v2 v2.9.0
k8s.io/kube-scheduler v0.22.3 // indirect
k8s.io/kubernetes v1.22.3
)
replace (
k8s.io/api => k8s.io/api v0.22.3
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.22.3
k8s.io/apimachinery => k8s.io/apimachinery v0.22.3
k8s.io/apiserver => k8s.io/apiserver v0.22.3
k8s.io/cli-runtime => k8s.io/cli-runtime v0.22.3
k8s.io/client-go => k8s.io/client-go v0.22.3
k8s.io/cloud-provider => k8s.io/cloud-provider v0.22.3
k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.22.3
k8s.io/code-generator => k8s.io/code-generator v0.22.3
k8s.io/component-base => k8s.io/component-base v0.22.3
k8s.io/component-helpers => k8s.io/component-helpers v0.22.3
k8s.io/controller-manager => k8s.io/controller-manager v0.22.3
k8s.io/cri-api => k8s.io/cri-api v0.22.3
k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.22.3
k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.22.3
k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.22.3
k8s.io/kube-proxy => k8s.io/kube-proxy v0.22.3
k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.22.3
k8s.io/kubectl => k8s.io/kubectl v0.22.3
k8s.io/kubelet => k8s.io/kubelet v0.22.3
k8s.io/kubernetes => k8s.io/kubernetes v1.22.3
k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.22.3
k8s.io/metrics => k8s.io/metrics v0.22.3
k8s.io/mount-utils => k8s.io/mount-utils v0.22.3
k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.22.3
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.22.3
)
// main.go
import (
"fmt"
"k8s.io/component-base/logs"
"k8s.io/kubernetes/cmd/kube-scheduler/app"
"os"
"pod-scheduler/lib"
)
func main() {
command := app.NewSchedulerCommand(
app.WithPlugin(lib.TestSchedulingName, lib.NewTestScheduler),
)
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
_, _ = fmt.Fprintf(os.Stdout, "%v\n", err)
os.Exit(1)
}
}
// lib/test-scheduling.go
import (
"context"
"fmt"
coreV1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkRuntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
)
const (
TestSchedulingName = "TestScheduling"
)
type TestScheduler struct {
fact informers.SharedInformerFactory
args *Args
}
type Args struct {
MaxPods int `json:"maxPods,omitempty"` // 最大调度数量
}
func (s *TestScheduler) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *coreV1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
return nil
}
func (s *TestScheduler) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *coreV1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
return nil
}
func (s *TestScheduler) PreFilter(ctx context.Context, state *framework.CycleState, p *coreV1.Pod) *framework.Status {
// 业务逻辑实现
klog.V(3).Infof("当前被preFilter的POD名称是:%s\n", p.Name)
pods, err := s.fact.Core().V1().Pods().Lister().Pods(p.Namespace).List(labels.Everything())
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
if s.args.MaxPods > 0 && len(pods) > s.args.MaxPods {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("POD数量超过上限%d", s.args.MaxPods))
}
return framework.NewStatus(framework.Success)
}
func (s *TestScheduler) PreFilterExtensions() framework.PreFilterExtensions {
return s
}
func (*TestScheduler) Name() string {
return TestSchedulingName
}
var _ framework.PreFilterPlugin = &TestScheduler{}
func NewTestScheduler(configuration runtime.Object, handle framework.Handle) (framework.Plugin, error) {
args := &Args{}
// 得到调度插件自定义参数,反编码为struct
if err := frameworkRuntime.DecodeInto(configuration, args); err != nil {
return nil, err
}
return &TestScheduler{
fact: handle.SharedInformerFactory(),
args: args,
}, nil
}
然后可以当成普通的应用用一个 Deployment 控制器来部署即可,由于我们需要去获取集群中的一些资源对象,所以当然需要申请 RBAC 权限,然后同样通过 --config
参数来配置我们的调度器
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: test-scheduling-clusterrole
rules:
- apiGroups:
- ""
resources:
- endpoints
- events
verbs:
- create
- get
- update
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- delete
- get
- list
- watch
- update
- apiGroups:
- ""
resources:
- bindings
- pods/binding
verbs:
- create
- apiGroups:
- ""
resources:
- pods/status
verbs:
- patch
- update
- apiGroups:
- ""
resources:
- replicationcontrollers
- services
verbs:
- get
- list
- watch
- apiGroups:
- apps
- extensions
resources:
- replicasets
verbs:
- get
- list
- watch
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- get
- list
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- persistentvolumeclaims
- persistentvolumes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- namespaces
- configmaps
verbs:
- get
- list
- watch
- apiGroups:
- "storage.k8s.io"
resources: ['*']
verbs:
- get
- list
- watch
- apiGroups:
- "coordination.k8s.io"
resources:
- leases
verbs:
- create
- get
- list
- update
- apiGroups:
- "events.k8s.io"
resources:
- events
verbs:
- create
- patch
- update
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: test-scheduling-sa
namespace: kube-system
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: test-scheduling-clusterrolebinding
namespace: kube-system
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: test-scheduling-clusterrole
subjects:
- kind: ServiceAccount
name: test-scheduling-sa
namespace: kube-system
---
apiVersion: v1
kind: ConfigMap
metadata:
name: test-scheduling-config
namespace: kube-system
data:
config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1beta2
kind: KubeSchedulerConfiguration
leaderElection:
leaderElect: false
profiles:
- schedulerName: test-scheduling
plugins:
preFilter:
enabled:
- name: "TestScheduling"
pluginConfig:
- name: TestScheduling
args:
maxPods: 17 # 调度插件自定义参数
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: test-scheduling
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: test-scheduling
template:
metadata:
labels:
app: test-scheduling
spec:
nodeName: lain1
serviceAccount: test-scheduling-sa
containers:
- name: tests-cheduling
image: alpine:3.12
imagePullPolicy: IfNotPresent
command: ["/app/test-scheduling"]
args:
- --config=/etc/kubernetes/config.yaml
- --v=3
volumeMounts:
- name: config
mountPath: /etc/kubernetes
- name: app
mountPath: /app
volumes:
- name: config
configMap:
name: test-scheduling-config
- name: app
hostPath:
path: /home/txl/pod-scheduler
部署一个测试 pod
apiVersion: apps/v1
kind: Deployment
metadata:
name: testngx
namespace: default
spec:
replicas: 2
selector:
matchLabels:
app: testngx
template:
metadata:
labels:
app: testngx
spec:
schedulerName: test-scheduling # 使用指定scheduler
containers:
- image: nginx:1.18-alpine
imagePullPolicy: IfNotPresent
name: testngx
ports:
- containerPort: 80
Filter 过滤节点
不允许调度到存在 noScheduling=true 标签的节点上
// lib/test-scheduling.go
var _ framework.FilterPlugin = &TestScheduler{}
func (s *TestScheduler) Filter(ctx context.Context, state *framework.CycleState, pod *coreV1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
for k, v := range nodeInfo.Node().Labels {
if k == "noScheduling" && v == "true" {
return framework.NewStatus(framework.Unschedulable, "该节点不可调度")
}
}
return framework.NewStatus(framework.Success)
}
加入 filter 调度配置
apiVersion: v1
kind: ConfigMap
metadata:
name: test-scheduling-config
namespace: kube-system
data:
config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1beta2
kind: KubeSchedulerConfiguration
leaderElection:
leaderElect: false
profiles:
- schedulerName: test-scheduling
plugins:
preFilter:
enabled:
- name: "TestScheduling"
filter:
enabled:
- name: "TestScheduling"
pluginConfig:
- name: TestScheduling
args:
maxPods: 17
PreScore 预打分
该阶段能得到 Filter 阶段结束后的 Node 列表,在这里我们可以做些预处理并保存到 CycleState(主要负责调度流程中”一些数据”的保存, framework 中所有的插件均可进行数据增加和修改, 线程安全)给后续插件进行流转
// lib/test-scheduling.go
var _ framework.PreScorePlugin = &TestScheduler{}
type NodeMemory struct {
data map[string]float64 // 内存空闲
}
func (n *NodeMemory) Clone() framework.StateData { // 实现StateData接口
return &NodeMemory{data: n.data}
}
func (s *TestScheduler) PreScore(ctx context.Context, state *framework.CycleState, pod *coreV1.Pod, nodes []*coreV1.Node) *framework.Status {
memory := &NodeMemory{data: map[string]float64{}}
memory.data["node1"] = 0.3
memory.data["node2"] = 0.4
state.Write("nodeMemory", memory)
klog.Info("预打分阶段:保存数据成功")
return framework.NewStatus(framework.Success)
}
加入 filter 调度配置
preScore:
enabled:
- name: "TestScheduling"
然后就可以在后续扩展点通过 getNodeMemory, err := state.Read("nodeMemory")
获取值
Score 打分
最终的分数需要在 [MinNodeScore,MaxNodeScore]([0-100])之间,为了防止分数超出,需要做归一化(Min-Max Normalization )处理,这是最常见的 min-max 归一化公式 $$ X_{nom} = \frac{X - X_{min}}{X_{max} - X_{min}} $$ 也称为离差标准化,是对原始数据的线性变换,使结果值映射到 [min- max]之间
// lib/test-scheduling.go
var _ framework.ScorePlugin = &TestScheduler{}
func (s *TestScheduler) Score(ctx context.Context, state *framework.CycleState, p *coreV1.Pod, nodeName string) (int64, *framework.Status) {
if nodeName == "lain2" {
return 30, framework.NewStatus(framework.Success)
}
return 20, framework.NewStatus(framework.Success)
}
func (s *TestScheduler) ScoreExtensions() framework.ScoreExtensions {
return s
}
func (s *TestScheduler) NormalizeScore(ctx context.Context, state *framework.CycleState, p *coreV1.Pod, scores framework.NodeScoreList) *framework.Status {
var min, max int64 = 0, 0
// 求出最小分数和最大分数区间
for _, score := range scores {
if score.Score < min {
min = score.Score
}
if score.Score > max {
max = score.Score
}
}
if max == min {
min = min - 1
}
for i, score := range scores { // 每个节点的分数归一化处理
scores[i].Score = (score.Score - min) * framework.MaxNodeScore / (max - min)
klog.Infof("节点: %v, Score: %v Pod: %v", scores[i].Name, scores[i].Score, p.GetName())
}
return framework.NewStatus(framework.Success, "")
}
加入 filter 调度配置
score:
enabled:
- name: "TestScheduling"
Permit 阶段
判断前置 POD 如果不存在,则等待10s,超时后后重新入列
// lib/test-scheduling.go
var _ framework.PermitPlugin = &TestScheduler{}
func (s *TestScheduler) Permit(ctx context.Context, state *framework.CycleState, p *coreV1.Pod, nodeName string) (*framework.Status, time.Duration) {
_, err := s.fact.Core().V1().Pods().Lister().Pods("default").Get("prepose_pod")
if err != nil {
klog.Info("Permit:等待10秒")
return framework.NewStatus(framework.Wait), time.Second * 10
} else {
klog.Info("Permit:通过")
return framework.NewStatus(framework.Success), 0
}
}
加入 filter 调度配置
permit:
enabled:
- name: "TestScheduling"