上一篇文章我们从整体架构上认识了 HAMi知道 HAMi 主要由 MutatingWebhook、Scheduler Extender、Device Plugin 和 HAMi-Core 几部分组成。这一篇开始进入源码层面重点分析cmd/scheduler/main.go。HAMi scheduler 进程启动后会先通过 Cobra 解析命令行参数然后初始化 Kubernetes client、加载设备配置、创建 Scheduler 对象、启动后台设备信息同步逻辑、启动 informer 缓存、启动 metrics 服务最后通过httprouter注册/filter、/bind、/webhook、/healthz、/readyz等 HTTP 路由并通过 HTTP 或 HTTPS 对外提供服务。更准确地说HAMi scheduler 不是完全替代 kube-scheduler 的独立调度器。它更像是一个同时承担两类职责的 HTTP 服务1. 对 kube-apiserver 提供 /webhook 用于 Pod 创建 admission 阶段的 mutation。 2. 对 kube-scheduler extender 提供 /filter 和 /bind 用于 GPU / NPU 等异构设备的调度过滤和绑定。一、整体启动流程从cmd/scheduler/main.go看HAMi scheduler 的整体启动流程可以概括为程序启动 | |-- Go 自动执行 init() | | | |-- 给 rootCmd 注册 flags | |-- 注册 device 配置相关 flags | |-- 注册 klog flags | |-- 注册 version 子命令 | |-- main() | | | |-- rootCmd.Execute() | | | |-- Cobra 解析命令行参数 | |-- 匹配当前要执行的 command | |-- 如果执行的是 rootCmd 本身则进入 RunE | | | |-- flag.PrintPFlags(cmd.Flags()) | |-- start() | |-- start() | |-- 设置 node lock timeout |-- 初始化 Kubernetes client |-- 加载 device config 并初始化设备处理逻辑 |-- 获取当前 hostname |-- 创建 Scheduler 对象 |-- 后台启动 RegisterFromNodeAnnotations() |-- 启动 informer |-- 启动 metrics 服务 |-- 创建 httprouter.Router |-- 注册 /filter、/bind、/webhook、/healthz、/readyz |-- 启动 HTTP 或 HTTPS Server这里最容易混淆的是init()注册参数不是真正解析参数 rootCmd.Execute()真正解析命令行参数并决定执行哪个 command RunE当前 command 真正要执行的业务逻辑 start()HAMi scheduler 的正式启动入口。二、rootCmdHAMi scheduler 的命令行入口在cmd/scheduler/main.go中核心全局变量是rootCmdrootCmd cobra.Command{ Use: scheduler, Short: kubernetes vgpu scheduler, RunE: func(cmd *cobra.Command, args []string) error { flag.PrintPFlags(cmd.Flags()) return start() }, }这里的cobra.Command可以理解为 Cobra 框架里的“命令对象”。几个字段含义如下Use命令名称这里是 scheduler。 Short命令的简短描述。 RunE当前命令真正执行的函数。 RunE 和 Run 类似但 RunE 可以返回 error。所以当用户执行的是 HAMi scheduler 主命令时最终会执行RunE而RunE里面又调用了start()。rootCmd.Execute() - 匹配到 rootCmd - 执行 rootCmd.RunE() - 调用 start()但是要注意源码里还注册了一个子命令rootCmd.AddCommand(version.VersionCmd)所以如果用户执行的是scheduler version那么 Cobra 会优先匹配version子命令通常不会进入 rootCmd 的RunE也就不会启动 scheduler 服务。三、init()注册 flagsinit()函数中注册了大量启动参数例如rootCmd.Flags().StringVar(config.HTTPBind, http_bind, 127.0.0.1:8080, http server bind address) rootCmd.Flags().StringVar(tlsCertFile, cert_file, , tls cert file) rootCmd.Flags().StringVar(tlsKeyFile, key_file, , tls key file) rootCmd.Flags().StringVar(config.SchedulerName, scheduler-name, , the name to be added to pod.spec.schedulerName if not empty) rootCmd.Flags().Int32Var(config.DefaultMem, default-mem, 0, default gpu device memory to allocate) rootCmd.Flags().Int32Var(config.DefaultCores, default-cores, 0, default gpu core percentage to allocate) rootCmd.Flags().Int32Var(config.DefaultResourceNum, default-gpu, 1, default gpu to allocate)这类StringVar、Int32Var、BoolVar、DurationVar的作用是把一个命令行参数绑定到一个 Go 变量上。比如rootCmd.Flags().StringVar(config.HTTPBind, http_bind, 127.0.0.1:8080, http server bind address)意思是注册一个命令行参数--http_bind 默认值127.0.0.1:8080 绑定变量config.HTTPBind如果启动时不传scheduler那么config.HTTPBind 127.0.0.1:8080如果启动时传scheduler --http_bind0.0.0.0:443那么 Cobra 解析参数后config.HTTPBind 0.0.0.0:443但是这个赋值不是init()完成的而是在rootCmd.Execute()解析命令行参数时完成的。四、main()真正触发 Cobra 执行func main() { if err : rootCmd.Execute(); err ! nil { klog.Fatal(err) } }这里的关键是rootCmd.Execute()。Cobra 的Execute()会根据命令行参数查找 command tree 中匹配的命令并解析对应 flags。因此 HAMi scheduler 的启动可以理解为main() | |-- rootCmd.Execute() | |-- 解析命令行参数 |-- 找到要执行的 command |-- 执行 command.RunE() | |-- start()五、start()HAMi scheduler 真正启动入口start()是 HAMi scheduler 进程真正干活的地方。核心流程如下start() | |-- nodelock.NodeLockTimeout config.NodeLockTimeout | |-- client.InitGlobalClient(...) | |-- config.InitDevices() | |-- os.Hostname() | |-- scheduler.NewScheduler() | |-- go sher.RegisterFromNodeAnnotations() | |-- sher.Start() | |-- defer sher.Stop() | |-- go initMetrics(...) | |-- router : httprouter.New() | |-- router.POST(/filter, routes.PredicateRoute(sher)) |-- router.POST(/bind, routes.Bind(sher)) |-- router.POST(/webhook, routes.WebHookRoute()) |-- router.GET(/healthz, routes.HealthzRoute()) |-- router.GET(/readyz, routes.ReadyzRoute(sher)) | |-- http.ListenAndServe(...) 或 server.ListenAndServeTLS(...)接下来按顺序分析。5.1、设置 NodeLockTimeout源码中首先执行nodelock.NodeLockTimeout config.NodeLockTimeout klog.InfoS(Set node lock timeout, timeout, nodelock.NodeLockTimeout)这里是把命令行参数或者配置中解析出来的config.NodeLockTimeout赋值给nodelock.NodeLockTimeout。HAMi 在调度设备时需要避免多个调度请求同时修改同一个节点的设备分配状态所以会有节点级别的锁。NodeLockTimeout就是这个节点锁的超时时间。5.2、初始化 Kubernetes client接下来是client.InitGlobalClient( client.WithBurst(config.Burst), client.WithQPS(config.QPS), client.WithTimeout(config.Timeout), )HAMi scheduler 后续需要访问 kube-apiserver例如1. 读取 Pod 2. 读取 Node 3. 读取 ResourceQuota 4. 创建 Binding 5. Patch Pod / Node 相关状态所以启动初期必须先初始化 Kubernetes client。参数含义QPS访问 kube-apiserver 的平均请求速率Burst瞬时突发请求上限Timeout请求 kube-apiserver 的超时时间后续sher.Start()中会通过client.GetClient()获取这个 client然后创建 informer。5.3、初始化异构设备处理逻辑config.InitDevices()接下来是config.InitDevices()这一步很重要。HAMi 支持 NVIDIA GPU、Ascend NPU、Cambricon MLU、Hygon DCU、Metax、Mthreads、Iluvatar、Kunlun、AWS Neuron、AMD、VastAI 等多类异构设备。config.InitDevices()的逻辑可以概括为1. 如果 device.DevicesMap 已经有内容说明设备处理逻辑已经初始化过直接返回 2. 否则读取 --device-config-file 指定的设备配置文件 3. 调用 LoadConfig(configFile) 解析 YAML 4. 调用 InitDevicesWithConfig(config) 初始化各类设备 5. 把各类设备处理器注册到 device.DevicesMap 和 device.DevicesToHandle 中。也就是说HAMi scheduler 不是写死只处理 NVIDIA GPU而是通过设备配置和各厂商 device handler 实现异构设备支持。这一步完成后后续/webhook、/filter、/bind才知道哪些资源名属于 HAMi 管理 不同设备类型如何解析资源请求 不同设备类型如何参与打分和分配 不同设备类型如何写入 Pod annotation。5.4、获取 HostNameconfig.HostName, err os.Hostname() if err ! nil { return fmt.Errorf(unable to get hostname: %v, err) } if config.HostName { return fmt.Errorf(empty hostname returned) }这里获取的是当前 scheduler 进程所在容器或者宿主机的 hostname。这个 hostname 后续会用于 leader election 相关逻辑。例如创建 Scheduler 时如果开启了 leader election会用config.HostName作为当前 scheduler 实例的身份标识。5.5、创建 Scheduler 对象sher scheduler.NewScheduler()这一步创建 HAMi 的 Scheduler 对象并初始化它的内部状态。可以把Scheduler对象理解成 HAMi scheduler 的核心内存状态管理器它维护的信息包括1. Kubernetes client 2. Pod lister 3. Node lister 4. ResourceQuota lister 5. PodManager 6. NodeManager 7. QuotaManager 8. 节点设备状态缓存 9. 调度缓存 10. leader election 状态 11. informer stop channel 12. synced 状态所以这个Scheduler对象不是只处理 HTTP 请求它还维护 HAMi 调度判断所需的本地缓存和状态。后续/filter和/bind路由最终都会调用这个对象的方法5.6、后台同步 Node annotationregisterFromNodeAnnotations()创建 Scheduler 对象后执行go sher.RegisterFromNodeAnnotations()注意这里有一个go说明它是后台 goroutine。RegisterFromNodeAnnotations()的作用是持续从 Kubernetes Node annotation 中同步设备信息并更新 HAMi scheduler 内部的节点设备视图。因为 HAMi device-plugin 会运行在各个节点上它负责发现本机 GPU / NPU 等设备然后把设备信息上报到 Node annotation。HAMi scheduler 再从 Node annotation 中读取这些信息构建自己的全局设备视图。这个方法内部不是只执行一次而是循环运行。它会被几类事件触发1. nodeNotify 2. leaderNotify 3. 15 秒 ticker 4. stopCh可以理解成Node 信息变了触发同步 leader 状态变了触发同步 即使没有事件每 15 秒也会周期性检查 收到 stopCh 后退出。5.7、启动 informersher.Start()接下来执行err sher.Start() if err ! nil { return err } defer sher.Stop()sher.Start()是 HAMi scheduler 运行前非常关键的一步。它主要做几件事1. 获取 Kubernetes client 2. 创建 SharedInformerFactory 3. 创建 Pod lister 4. 创建 Node lister 5. 创建 ResourceQuota lister 6. 给 Pod informer 注册 Add / Update / Delete 事件处理函数 7. 给 Node informer 注册 Add / Delete 事件处理函数 8. 给 ResourceQuota informer 注册 Add / Update / Delete 事件处理函数 9. 启动 informer 10. 等待 informer cache 同步 11. 如果开启 leader election则启动 Lease informer 12. 标记 scheduler started。所以 HAMi 使用 informer 维护本地缓存当前有哪些 Pod Pod 分别占用了哪些设备 当前有哪些 Node Node 上有哪些设备 namespace 下有哪些 ResourceQuota 设备资源使用量是多少。后续 kube-scheduler 调用/filter时HAMi 就可以基于本地缓存快速判断这个 Pod 能不能放到这个 Node 上 这个 Node 上有没有满足要求的 GPU / NPU 显存是否足够 算力 core 是否足够 ResourceQuota 是否允许5.8、启动 metrics 服务sher.Start()成功后执行go initMetrics(config.MetricsBindAddress, legacyMetrics)这里同样是后台 goroutine。它的作用是启动 Prometheus metrics 服务。默认监听地址来自参数--metrics-bind-address默认值是:9395这说明 metrics 服务和/filter、/bind、/webhook主 HTTP 服务不是同一个入口。5.9、创建 HTTP Routerhttprouter.New()router : httprouter.New()这行代码创建了一个httprouter.Router。httprouter是 Go 里的一个高性能 HTTP 路由器也可以叫 mux。它的作用是根据 HTTP method 和 URL path把请求分发给对应的 handler。比如POST /filter - routes.PredicateRoute(sher) POST /bind - routes.Bind(sher) POST /webhook - routes.WebHookRoute() GET /healthz - routes.HealthzRoute() GET /readyz - routes.ReadyzRoute(sher)这里要注意httprouter.New()只是创建路由器不是启动 HTTP 服务。真正开始监听端口的是后面的http.ListenAndServe(config.HTTPBind, router)或者 HTTPS 分支中的server.ListenAndServeTLS(, )所以可以把这一段理解成httprouter.New() 只是创建“请求分发表”。 router.POST(...) 是往请求分发表里注册规则。 http.ListenAndServe(...) 才是真正监听端口并处理请求。5.10、注册 /filter 路由router.POST(/filter, routes.PredicateRoute(sher))/filter是 kube-scheduler extender 调用的接口。当 kube-scheduler 发现某个 Pod 请求了 HAMi 管理的扩展资源时会根据 extender 配置把候选节点和 Pod 信息通过 HTTP POST 发送给 HAMi scheduler 的/filter接口。routes.PredicateRoute(sher)返回的是一个httprouter.Handle。它内部大致做这些事1. 检查 request body 2. 限制 request body 最大 1MB 3. 把请求体反序列化成 ExtenderArgs 4. 等待 HAMi scheduler cache 同步 5. 调用 s.Filter(extenderArgs) 6. 把 ExtenderFilterResult 序列化成 JSON 返回给 kube-scheduler。5.12、注册 /bind 路由router.POST(/bind, routes.Bind(sher))/bind也是 kube-scheduler extender 调用的接口。它的 handler 内部大致做这些事1. 限制 request body 最大 1MB 2. 把请求体反序列化成 ExtenderBindingArgs 3. 调用 s.Bind(extenderBindingArgs) 4. 返回 ExtenderBindingResult。5.13、注册 /webhook 路由router.POST(/webhook, routes.WebHookRoute())/webhook和/filter、/bind的调用方不一样。/filter、/bind 调用方是 kube-scheduler extender。 /webhook 调用方是 kube-apiserver 的 admission webhook 机制。当用户创建 Pod 时请求会先到 kube-apiserver。kube-apiserver 在认证、授权之后持久化 Pod 之前会进入 admission 阶段。如果集群中配置了 HAMi 的MutatingWebhookConfiguration并且这个 Pod 没有被 selector 排除那么 kube-apiserver 就会调用 HAMi scheduler 的/webhook。/webhook的触发时机是Pod 创建时kube-apiserver admission 阶段。它主要负责对 Pod 做 mutation例如根据 Pod 请求的 HAMi 资源补充或调整调度相关字段。其中一个很关键的点是如果 Pod 请求了 HAMi 管理的设备资源webhook 可以把 pod.spec.schedulerName 改成 HAMi 配置的 schedulerName。5.14、注册 /healthz 和 /readyzrouter.GET(/healthz, routes.HealthzRoute()) router.GET(/readyz, routes.ReadyzRoute(sher))/healthz比较简单请求进来后直接返回 HTTP 200。用于存活检查。/readyz会检查当前 scheduler 是否是 leader如果是 leader打印 Scheduler extender is leader 如果不是 leader打印 Scheduler extender has not become leader yet。5.15、HTTP 和 HTTPS 两种启动方式HAMi 注册完路由后会判断是否配置了 TLS 证书if len(tlsCertFile) 0 || len(tlsKeyFile) 0 { http.ListenAndServe(config.HTTPBind, router) } else { ... server.ListenAndServeTLS(, ) }这里有两个分支。5.15.1. 没有配置证书启动普通 HTTPhttp.ListenAndServe(config.HTTPBind, router)这表示监听 config.HTTPBind收到请求后交给 router 处理。例如config.HTTPBind 127.0.0.1:8080那么 HAMi scheduler 就监听127.0.0.1:80805.15.2. 配置了证书启动 HTTPS如果传入了--cert_file --key_file则进入 HTTPS 分支。源码中创建了证书 watchercertWatcher, err : certwatcher.New(tlsCertFile, tlsKeyFile)然后创建 TLS 配置tlsCfg : tls.Config{ GetCertificate: certWatcher.GetCertificate, }再创建 HTTP Serverserver : http.Server{ Addr: addr, Handler: handler, TLSConfig: tlsCfg, }最后启动server.ListenAndServeTLS(, )这里ListenAndServeTLS(, )传空字符串是因为证书不是通过参数文件直接交给ListenAndServeTLS加载而是通过TLSConfig.GetCertificate动态提供。所以这一段可以总结为普通模式 http.ListenAndServe(config.HTTPBind, router) HTTPS 模式 certwatcher 监听证书文件变化 tls.Config.GetCertificate 动态获取证书 http.Server 使用 TLSConfig 启动 HTTPS Server六、/webhook、/filter、/bind 的调用关系总结最后把三个核心接口和 Kubernetes 调度链路串起来。6.1 Pod 创建时kube-apiserver 调用 /webhook用户创建 Pod | |-- kubectl apply / API 请求 | |-- kube-apiserver | |-- 认证 |-- 授权 |-- Admission 阶段 | |-- 调用 HAMi /webhook | |-- 检查 Pod 是否请求 HAMi 管理的设备资源 |-- 根据需要修改 Pod |-- 可能设置 pod.spec.schedulerName/webhook的核心作用是在 Pod 进入调度前对 Pod 做 mutation。6.2 Pod 调度时kube-scheduler 调用 /filterPod 已经进入调度队列 | |-- kube-scheduler 发现 Pod 使用了 HAMi 管理的扩展资源 | |-- 根据 extender 配置调用 HAMi /filter | |-- HAMi 读取本地缓存 |-- 检查候选 Node 的 GPU / NPU 资源 |-- 返回哪些节点可用哪些节点不可用/filter的核心作用是在 kube-scheduler 的候选节点基础上进一步按照 HAMi 的设备资源视图过滤节点。6.3 Pod 绑定时kube-scheduler 调用 /bind/filter 过滤完成 | |-- kube-scheduler 选择目标节点 | |-- 调用 HAMi /bind | |-- HAMi 执行绑定逻辑 |-- 写入设备分配信息 |-- 创建 Binding |-- Pod 最终绑定到目标 Node/bind的核心作用是完成最终节点绑定并让 Pod 带上 HAMi 需要的设备分配结果。七、本文总结本文从cmd/scheduler/main.go出发梳理了 HAMi scheduler 的启动流程。核心链路如下main() | |-- rootCmd.Execute() | |-- 解析命令行参数 |-- 执行 RunE | |-- start() | |-- 初始化 Kubernetes client |-- 初始化设备处理逻辑 |-- 创建 Scheduler 对象 |-- 启动 RegisterFromNodeAnnotations() |-- 启动 informer |-- 启动 metrics |-- 创建 httprouter |-- 注册 /webhook、/filter、/bind、/healthz、/readyz |-- 启动 HTTP / HTTPS Server几个重点结论1. init() 只是注册参数不是解析参数。 2. rootCmd.Execute() 才会解析命令行参数并执行 RunE。 3. start() 是 HAMi scheduler 真正启动入口。 4. config.InitDevices() 负责加载设备配置并初始化各类异构设备处理逻辑。 5. scheduler.NewScheduler() 只是创建 Scheduler 对象。 6. RegisterFromNodeAnnotations() 是后台同步 Node annotation 设备信息的 goroutine。 7. sher.Start() 负责启动 informer 并等待 cache 同步。 8. httprouter.New() 只是创建路由器不是启动 HTTP Server。 9. /webhook 由 kube-apiserver admission webhook 调用。 10. /filter 和 /bind 由 kube-scheduler extender 调用。 11. HAMi scheduler 不是完全替代 kube-scheduler而是通过 extender 机制与 kube-scheduler 配合。下面为start()的完整源码func start() error { // Initialize node lock timeout from config nodelock.NodeLockTimeout config.NodeLockTimeout klog.InfoS(Set node lock timeout, timeout, nodelock.NodeLockTimeout) client.InitGlobalClient( client.WithBurst(config.Burst), client.WithQPS(config.QPS), client.WithTimeout(config.Timeout), ) config.InitDevices() var err error config.HostName, err os.Hostname() if err ! nil { return fmt.Errorf(unable to get hostname: %v, err) } if config.HostName { return fmt.Errorf(empty hostname returned) } sher scheduler.NewScheduler() go sher.RegisterFromNodeAnnotations() err sher.Start() if err ! nil { return err } defer sher.Stop() // start monitor metrics go initMetrics(config.MetricsBindAddress, legacyMetrics) // start http server router : httprouter.New() router.POST(/filter, routes.PredicateRoute(sher)) router.POST(/bind, routes.Bind(sher)) router.POST(/webhook, routes.WebHookRoute()) router.GET(/healthz, routes.HealthzRoute()) router.GET(/readyz, routes.ReadyzRoute(sher)) klog.Info(listen on , config.HTTPBind) if enableProfiling { injectProfilingRoute(router) klog.Infof(Profiling enabled, visit %s/debug/pprof/ to view profiles, config.HTTPBind) } if len(tlsCertFile) 0 || len(tlsKeyFile) 0 { if err : http.ListenAndServe(config.HTTPBind, router); err ! nil { return fmt.Errorf(listen and Serve error, %v, err) } } else { certWatcher, err : certwatcher.New(tlsCertFile, tlsKeyFile) if err ! nil { return fmt.Errorf(failed to create cert watcher: %w, err) } tlsCfg : tls.Config{ GetCertificate: certWatcher.GetCertificate, } ctx, cancel : context.WithCancel(context.Background()) defer cancel() go func() { if err : certWatcher.Start(ctx); err ! nil err ! context.Canceled { klog.ErrorS(err, cert watcher error) } }() addr : config.HTTPBind handler : router server : http.Server{ Addr: addr, Handler: handler, TLSConfig: tlsCfg, } klog.InfoS(Starting HTTPS server, address, addr) if err : server.ListenAndServeTLS(, ); err ! nil { return fmt.Errorf(HTTPS server error: %w, err) } } return nil }
HAMi 源码阅读笔记 02:HAMi Scheduler-extender 启动流程,main.go、路由注册与配置加载
上一篇文章我们从整体架构上认识了 HAMi知道 HAMi 主要由 MutatingWebhook、Scheduler Extender、Device Plugin 和 HAMi-Core 几部分组成。这一篇开始进入源码层面重点分析cmd/scheduler/main.go。HAMi scheduler 进程启动后会先通过 Cobra 解析命令行参数然后初始化 Kubernetes client、加载设备配置、创建 Scheduler 对象、启动后台设备信息同步逻辑、启动 informer 缓存、启动 metrics 服务最后通过httprouter注册/filter、/bind、/webhook、/healthz、/readyz等 HTTP 路由并通过 HTTP 或 HTTPS 对外提供服务。更准确地说HAMi scheduler 不是完全替代 kube-scheduler 的独立调度器。它更像是一个同时承担两类职责的 HTTP 服务1. 对 kube-apiserver 提供 /webhook 用于 Pod 创建 admission 阶段的 mutation。 2. 对 kube-scheduler extender 提供 /filter 和 /bind 用于 GPU / NPU 等异构设备的调度过滤和绑定。一、整体启动流程从cmd/scheduler/main.go看HAMi scheduler 的整体启动流程可以概括为程序启动 | |-- Go 自动执行 init() | | | |-- 给 rootCmd 注册 flags | |-- 注册 device 配置相关 flags | |-- 注册 klog flags | |-- 注册 version 子命令 | |-- main() | | | |-- rootCmd.Execute() | | | |-- Cobra 解析命令行参数 | |-- 匹配当前要执行的 command | |-- 如果执行的是 rootCmd 本身则进入 RunE | | | |-- flag.PrintPFlags(cmd.Flags()) | |-- start() | |-- start() | |-- 设置 node lock timeout |-- 初始化 Kubernetes client |-- 加载 device config 并初始化设备处理逻辑 |-- 获取当前 hostname |-- 创建 Scheduler 对象 |-- 后台启动 RegisterFromNodeAnnotations() |-- 启动 informer |-- 启动 metrics 服务 |-- 创建 httprouter.Router |-- 注册 /filter、/bind、/webhook、/healthz、/readyz |-- 启动 HTTP 或 HTTPS Server这里最容易混淆的是init()注册参数不是真正解析参数 rootCmd.Execute()真正解析命令行参数并决定执行哪个 command RunE当前 command 真正要执行的业务逻辑 start()HAMi scheduler 的正式启动入口。二、rootCmdHAMi scheduler 的命令行入口在cmd/scheduler/main.go中核心全局变量是rootCmdrootCmd cobra.Command{ Use: scheduler, Short: kubernetes vgpu scheduler, RunE: func(cmd *cobra.Command, args []string) error { flag.PrintPFlags(cmd.Flags()) return start() }, }这里的cobra.Command可以理解为 Cobra 框架里的“命令对象”。几个字段含义如下Use命令名称这里是 scheduler。 Short命令的简短描述。 RunE当前命令真正执行的函数。 RunE 和 Run 类似但 RunE 可以返回 error。所以当用户执行的是 HAMi scheduler 主命令时最终会执行RunE而RunE里面又调用了start()。rootCmd.Execute() - 匹配到 rootCmd - 执行 rootCmd.RunE() - 调用 start()但是要注意源码里还注册了一个子命令rootCmd.AddCommand(version.VersionCmd)所以如果用户执行的是scheduler version那么 Cobra 会优先匹配version子命令通常不会进入 rootCmd 的RunE也就不会启动 scheduler 服务。三、init()注册 flagsinit()函数中注册了大量启动参数例如rootCmd.Flags().StringVar(config.HTTPBind, http_bind, 127.0.0.1:8080, http server bind address) rootCmd.Flags().StringVar(tlsCertFile, cert_file, , tls cert file) rootCmd.Flags().StringVar(tlsKeyFile, key_file, , tls key file) rootCmd.Flags().StringVar(config.SchedulerName, scheduler-name, , the name to be added to pod.spec.schedulerName if not empty) rootCmd.Flags().Int32Var(config.DefaultMem, default-mem, 0, default gpu device memory to allocate) rootCmd.Flags().Int32Var(config.DefaultCores, default-cores, 0, default gpu core percentage to allocate) rootCmd.Flags().Int32Var(config.DefaultResourceNum, default-gpu, 1, default gpu to allocate)这类StringVar、Int32Var、BoolVar、DurationVar的作用是把一个命令行参数绑定到一个 Go 变量上。比如rootCmd.Flags().StringVar(config.HTTPBind, http_bind, 127.0.0.1:8080, http server bind address)意思是注册一个命令行参数--http_bind 默认值127.0.0.1:8080 绑定变量config.HTTPBind如果启动时不传scheduler那么config.HTTPBind 127.0.0.1:8080如果启动时传scheduler --http_bind0.0.0.0:443那么 Cobra 解析参数后config.HTTPBind 0.0.0.0:443但是这个赋值不是init()完成的而是在rootCmd.Execute()解析命令行参数时完成的。四、main()真正触发 Cobra 执行func main() { if err : rootCmd.Execute(); err ! nil { klog.Fatal(err) } }这里的关键是rootCmd.Execute()。Cobra 的Execute()会根据命令行参数查找 command tree 中匹配的命令并解析对应 flags。因此 HAMi scheduler 的启动可以理解为main() | |-- rootCmd.Execute() | |-- 解析命令行参数 |-- 找到要执行的 command |-- 执行 command.RunE() | |-- start()五、start()HAMi scheduler 真正启动入口start()是 HAMi scheduler 进程真正干活的地方。核心流程如下start() | |-- nodelock.NodeLockTimeout config.NodeLockTimeout | |-- client.InitGlobalClient(...) | |-- config.InitDevices() | |-- os.Hostname() | |-- scheduler.NewScheduler() | |-- go sher.RegisterFromNodeAnnotations() | |-- sher.Start() | |-- defer sher.Stop() | |-- go initMetrics(...) | |-- router : httprouter.New() | |-- router.POST(/filter, routes.PredicateRoute(sher)) |-- router.POST(/bind, routes.Bind(sher)) |-- router.POST(/webhook, routes.WebHookRoute()) |-- router.GET(/healthz, routes.HealthzRoute()) |-- router.GET(/readyz, routes.ReadyzRoute(sher)) | |-- http.ListenAndServe(...) 或 server.ListenAndServeTLS(...)接下来按顺序分析。5.1、设置 NodeLockTimeout源码中首先执行nodelock.NodeLockTimeout config.NodeLockTimeout klog.InfoS(Set node lock timeout, timeout, nodelock.NodeLockTimeout)这里是把命令行参数或者配置中解析出来的config.NodeLockTimeout赋值给nodelock.NodeLockTimeout。HAMi 在调度设备时需要避免多个调度请求同时修改同一个节点的设备分配状态所以会有节点级别的锁。NodeLockTimeout就是这个节点锁的超时时间。5.2、初始化 Kubernetes client接下来是client.InitGlobalClient( client.WithBurst(config.Burst), client.WithQPS(config.QPS), client.WithTimeout(config.Timeout), )HAMi scheduler 后续需要访问 kube-apiserver例如1. 读取 Pod 2. 读取 Node 3. 读取 ResourceQuota 4. 创建 Binding 5. Patch Pod / Node 相关状态所以启动初期必须先初始化 Kubernetes client。参数含义QPS访问 kube-apiserver 的平均请求速率Burst瞬时突发请求上限Timeout请求 kube-apiserver 的超时时间后续sher.Start()中会通过client.GetClient()获取这个 client然后创建 informer。5.3、初始化异构设备处理逻辑config.InitDevices()接下来是config.InitDevices()这一步很重要。HAMi 支持 NVIDIA GPU、Ascend NPU、Cambricon MLU、Hygon DCU、Metax、Mthreads、Iluvatar、Kunlun、AWS Neuron、AMD、VastAI 等多类异构设备。config.InitDevices()的逻辑可以概括为1. 如果 device.DevicesMap 已经有内容说明设备处理逻辑已经初始化过直接返回 2. 否则读取 --device-config-file 指定的设备配置文件 3. 调用 LoadConfig(configFile) 解析 YAML 4. 调用 InitDevicesWithConfig(config) 初始化各类设备 5. 把各类设备处理器注册到 device.DevicesMap 和 device.DevicesToHandle 中。也就是说HAMi scheduler 不是写死只处理 NVIDIA GPU而是通过设备配置和各厂商 device handler 实现异构设备支持。这一步完成后后续/webhook、/filter、/bind才知道哪些资源名属于 HAMi 管理 不同设备类型如何解析资源请求 不同设备类型如何参与打分和分配 不同设备类型如何写入 Pod annotation。5.4、获取 HostNameconfig.HostName, err os.Hostname() if err ! nil { return fmt.Errorf(unable to get hostname: %v, err) } if config.HostName { return fmt.Errorf(empty hostname returned) }这里获取的是当前 scheduler 进程所在容器或者宿主机的 hostname。这个 hostname 后续会用于 leader election 相关逻辑。例如创建 Scheduler 时如果开启了 leader election会用config.HostName作为当前 scheduler 实例的身份标识。5.5、创建 Scheduler 对象sher scheduler.NewScheduler()这一步创建 HAMi 的 Scheduler 对象并初始化它的内部状态。可以把Scheduler对象理解成 HAMi scheduler 的核心内存状态管理器它维护的信息包括1. Kubernetes client 2. Pod lister 3. Node lister 4. ResourceQuota lister 5. PodManager 6. NodeManager 7. QuotaManager 8. 节点设备状态缓存 9. 调度缓存 10. leader election 状态 11. informer stop channel 12. synced 状态所以这个Scheduler对象不是只处理 HTTP 请求它还维护 HAMi 调度判断所需的本地缓存和状态。后续/filter和/bind路由最终都会调用这个对象的方法5.6、后台同步 Node annotationregisterFromNodeAnnotations()创建 Scheduler 对象后执行go sher.RegisterFromNodeAnnotations()注意这里有一个go说明它是后台 goroutine。RegisterFromNodeAnnotations()的作用是持续从 Kubernetes Node annotation 中同步设备信息并更新 HAMi scheduler 内部的节点设备视图。因为 HAMi device-plugin 会运行在各个节点上它负责发现本机 GPU / NPU 等设备然后把设备信息上报到 Node annotation。HAMi scheduler 再从 Node annotation 中读取这些信息构建自己的全局设备视图。这个方法内部不是只执行一次而是循环运行。它会被几类事件触发1. nodeNotify 2. leaderNotify 3. 15 秒 ticker 4. stopCh可以理解成Node 信息变了触发同步 leader 状态变了触发同步 即使没有事件每 15 秒也会周期性检查 收到 stopCh 后退出。5.7、启动 informersher.Start()接下来执行err sher.Start() if err ! nil { return err } defer sher.Stop()sher.Start()是 HAMi scheduler 运行前非常关键的一步。它主要做几件事1. 获取 Kubernetes client 2. 创建 SharedInformerFactory 3. 创建 Pod lister 4. 创建 Node lister 5. 创建 ResourceQuota lister 6. 给 Pod informer 注册 Add / Update / Delete 事件处理函数 7. 给 Node informer 注册 Add / Delete 事件处理函数 8. 给 ResourceQuota informer 注册 Add / Update / Delete 事件处理函数 9. 启动 informer 10. 等待 informer cache 同步 11. 如果开启 leader election则启动 Lease informer 12. 标记 scheduler started。所以 HAMi 使用 informer 维护本地缓存当前有哪些 Pod Pod 分别占用了哪些设备 当前有哪些 Node Node 上有哪些设备 namespace 下有哪些 ResourceQuota 设备资源使用量是多少。后续 kube-scheduler 调用/filter时HAMi 就可以基于本地缓存快速判断这个 Pod 能不能放到这个 Node 上 这个 Node 上有没有满足要求的 GPU / NPU 显存是否足够 算力 core 是否足够 ResourceQuota 是否允许5.8、启动 metrics 服务sher.Start()成功后执行go initMetrics(config.MetricsBindAddress, legacyMetrics)这里同样是后台 goroutine。它的作用是启动 Prometheus metrics 服务。默认监听地址来自参数--metrics-bind-address默认值是:9395这说明 metrics 服务和/filter、/bind、/webhook主 HTTP 服务不是同一个入口。5.9、创建 HTTP Routerhttprouter.New()router : httprouter.New()这行代码创建了一个httprouter.Router。httprouter是 Go 里的一个高性能 HTTP 路由器也可以叫 mux。它的作用是根据 HTTP method 和 URL path把请求分发给对应的 handler。比如POST /filter - routes.PredicateRoute(sher) POST /bind - routes.Bind(sher) POST /webhook - routes.WebHookRoute() GET /healthz - routes.HealthzRoute() GET /readyz - routes.ReadyzRoute(sher)这里要注意httprouter.New()只是创建路由器不是启动 HTTP 服务。真正开始监听端口的是后面的http.ListenAndServe(config.HTTPBind, router)或者 HTTPS 分支中的server.ListenAndServeTLS(, )所以可以把这一段理解成httprouter.New() 只是创建“请求分发表”。 router.POST(...) 是往请求分发表里注册规则。 http.ListenAndServe(...) 才是真正监听端口并处理请求。5.10、注册 /filter 路由router.POST(/filter, routes.PredicateRoute(sher))/filter是 kube-scheduler extender 调用的接口。当 kube-scheduler 发现某个 Pod 请求了 HAMi 管理的扩展资源时会根据 extender 配置把候选节点和 Pod 信息通过 HTTP POST 发送给 HAMi scheduler 的/filter接口。routes.PredicateRoute(sher)返回的是一个httprouter.Handle。它内部大致做这些事1. 检查 request body 2. 限制 request body 最大 1MB 3. 把请求体反序列化成 ExtenderArgs 4. 等待 HAMi scheduler cache 同步 5. 调用 s.Filter(extenderArgs) 6. 把 ExtenderFilterResult 序列化成 JSON 返回给 kube-scheduler。5.12、注册 /bind 路由router.POST(/bind, routes.Bind(sher))/bind也是 kube-scheduler extender 调用的接口。它的 handler 内部大致做这些事1. 限制 request body 最大 1MB 2. 把请求体反序列化成 ExtenderBindingArgs 3. 调用 s.Bind(extenderBindingArgs) 4. 返回 ExtenderBindingResult。5.13、注册 /webhook 路由router.POST(/webhook, routes.WebHookRoute())/webhook和/filter、/bind的调用方不一样。/filter、/bind 调用方是 kube-scheduler extender。 /webhook 调用方是 kube-apiserver 的 admission webhook 机制。当用户创建 Pod 时请求会先到 kube-apiserver。kube-apiserver 在认证、授权之后持久化 Pod 之前会进入 admission 阶段。如果集群中配置了 HAMi 的MutatingWebhookConfiguration并且这个 Pod 没有被 selector 排除那么 kube-apiserver 就会调用 HAMi scheduler 的/webhook。/webhook的触发时机是Pod 创建时kube-apiserver admission 阶段。它主要负责对 Pod 做 mutation例如根据 Pod 请求的 HAMi 资源补充或调整调度相关字段。其中一个很关键的点是如果 Pod 请求了 HAMi 管理的设备资源webhook 可以把 pod.spec.schedulerName 改成 HAMi 配置的 schedulerName。5.14、注册 /healthz 和 /readyzrouter.GET(/healthz, routes.HealthzRoute()) router.GET(/readyz, routes.ReadyzRoute(sher))/healthz比较简单请求进来后直接返回 HTTP 200。用于存活检查。/readyz会检查当前 scheduler 是否是 leader如果是 leader打印 Scheduler extender is leader 如果不是 leader打印 Scheduler extender has not become leader yet。5.15、HTTP 和 HTTPS 两种启动方式HAMi 注册完路由后会判断是否配置了 TLS 证书if len(tlsCertFile) 0 || len(tlsKeyFile) 0 { http.ListenAndServe(config.HTTPBind, router) } else { ... server.ListenAndServeTLS(, ) }这里有两个分支。5.15.1. 没有配置证书启动普通 HTTPhttp.ListenAndServe(config.HTTPBind, router)这表示监听 config.HTTPBind收到请求后交给 router 处理。例如config.HTTPBind 127.0.0.1:8080那么 HAMi scheduler 就监听127.0.0.1:80805.15.2. 配置了证书启动 HTTPS如果传入了--cert_file --key_file则进入 HTTPS 分支。源码中创建了证书 watchercertWatcher, err : certwatcher.New(tlsCertFile, tlsKeyFile)然后创建 TLS 配置tlsCfg : tls.Config{ GetCertificate: certWatcher.GetCertificate, }再创建 HTTP Serverserver : http.Server{ Addr: addr, Handler: handler, TLSConfig: tlsCfg, }最后启动server.ListenAndServeTLS(, )这里ListenAndServeTLS(, )传空字符串是因为证书不是通过参数文件直接交给ListenAndServeTLS加载而是通过TLSConfig.GetCertificate动态提供。所以这一段可以总结为普通模式 http.ListenAndServe(config.HTTPBind, router) HTTPS 模式 certwatcher 监听证书文件变化 tls.Config.GetCertificate 动态获取证书 http.Server 使用 TLSConfig 启动 HTTPS Server六、/webhook、/filter、/bind 的调用关系总结最后把三个核心接口和 Kubernetes 调度链路串起来。6.1 Pod 创建时kube-apiserver 调用 /webhook用户创建 Pod | |-- kubectl apply / API 请求 | |-- kube-apiserver | |-- 认证 |-- 授权 |-- Admission 阶段 | |-- 调用 HAMi /webhook | |-- 检查 Pod 是否请求 HAMi 管理的设备资源 |-- 根据需要修改 Pod |-- 可能设置 pod.spec.schedulerName/webhook的核心作用是在 Pod 进入调度前对 Pod 做 mutation。6.2 Pod 调度时kube-scheduler 调用 /filterPod 已经进入调度队列 | |-- kube-scheduler 发现 Pod 使用了 HAMi 管理的扩展资源 | |-- 根据 extender 配置调用 HAMi /filter | |-- HAMi 读取本地缓存 |-- 检查候选 Node 的 GPU / NPU 资源 |-- 返回哪些节点可用哪些节点不可用/filter的核心作用是在 kube-scheduler 的候选节点基础上进一步按照 HAMi 的设备资源视图过滤节点。6.3 Pod 绑定时kube-scheduler 调用 /bind/filter 过滤完成 | |-- kube-scheduler 选择目标节点 | |-- 调用 HAMi /bind | |-- HAMi 执行绑定逻辑 |-- 写入设备分配信息 |-- 创建 Binding |-- Pod 最终绑定到目标 Node/bind的核心作用是完成最终节点绑定并让 Pod 带上 HAMi 需要的设备分配结果。七、本文总结本文从cmd/scheduler/main.go出发梳理了 HAMi scheduler 的启动流程。核心链路如下main() | |-- rootCmd.Execute() | |-- 解析命令行参数 |-- 执行 RunE | |-- start() | |-- 初始化 Kubernetes client |-- 初始化设备处理逻辑 |-- 创建 Scheduler 对象 |-- 启动 RegisterFromNodeAnnotations() |-- 启动 informer |-- 启动 metrics |-- 创建 httprouter |-- 注册 /webhook、/filter、/bind、/healthz、/readyz |-- 启动 HTTP / HTTPS Server几个重点结论1. init() 只是注册参数不是解析参数。 2. rootCmd.Execute() 才会解析命令行参数并执行 RunE。 3. start() 是 HAMi scheduler 真正启动入口。 4. config.InitDevices() 负责加载设备配置并初始化各类异构设备处理逻辑。 5. scheduler.NewScheduler() 只是创建 Scheduler 对象。 6. RegisterFromNodeAnnotations() 是后台同步 Node annotation 设备信息的 goroutine。 7. sher.Start() 负责启动 informer 并等待 cache 同步。 8. httprouter.New() 只是创建路由器不是启动 HTTP Server。 9. /webhook 由 kube-apiserver admission webhook 调用。 10. /filter 和 /bind 由 kube-scheduler extender 调用。 11. HAMi scheduler 不是完全替代 kube-scheduler而是通过 extender 机制与 kube-scheduler 配合。下面为start()的完整源码func start() error { // Initialize node lock timeout from config nodelock.NodeLockTimeout config.NodeLockTimeout klog.InfoS(Set node lock timeout, timeout, nodelock.NodeLockTimeout) client.InitGlobalClient( client.WithBurst(config.Burst), client.WithQPS(config.QPS), client.WithTimeout(config.Timeout), ) config.InitDevices() var err error config.HostName, err os.Hostname() if err ! nil { return fmt.Errorf(unable to get hostname: %v, err) } if config.HostName { return fmt.Errorf(empty hostname returned) } sher scheduler.NewScheduler() go sher.RegisterFromNodeAnnotations() err sher.Start() if err ! nil { return err } defer sher.Stop() // start monitor metrics go initMetrics(config.MetricsBindAddress, legacyMetrics) // start http server router : httprouter.New() router.POST(/filter, routes.PredicateRoute(sher)) router.POST(/bind, routes.Bind(sher)) router.POST(/webhook, routes.WebHookRoute()) router.GET(/healthz, routes.HealthzRoute()) router.GET(/readyz, routes.ReadyzRoute(sher)) klog.Info(listen on , config.HTTPBind) if enableProfiling { injectProfilingRoute(router) klog.Infof(Profiling enabled, visit %s/debug/pprof/ to view profiles, config.HTTPBind) } if len(tlsCertFile) 0 || len(tlsKeyFile) 0 { if err : http.ListenAndServe(config.HTTPBind, router); err ! nil { return fmt.Errorf(listen and Serve error, %v, err) } } else { certWatcher, err : certwatcher.New(tlsCertFile, tlsKeyFile) if err ! nil { return fmt.Errorf(failed to create cert watcher: %w, err) } tlsCfg : tls.Config{ GetCertificate: certWatcher.GetCertificate, } ctx, cancel : context.WithCancel(context.Background()) defer cancel() go func() { if err : certWatcher.Start(ctx); err ! nil err ! context.Canceled { klog.ErrorS(err, cert watcher error) } }() addr : config.HTTPBind handler : router server : http.Server{ Addr: addr, Handler: handler, TLSConfig: tlsCfg, } klog.InfoS(Starting HTTPS server, address, addr) if err : server.ListenAndServeTLS(, ); err ! nil { return fmt.Errorf(HTTPS server error: %w, err) } } return nil }