Kubernetes Operator开发实践:构建自定义资源控制器

Kubernetes Operator开发实践:构建自定义资源控制器 Kubernetes Operator开发实践构建自定义资源控制器一、Operator概述Kubernetes Operator是一种软件扩展模式用于管理复杂的有状态应用。它基于Kubernetes的自定义资源定义(CRD)和控制器模式将领域知识编码到软件中实现自动化运维。Operator框架主要包括CRD (Custom Resource Definition)定义自定义资源类型Controller监听资源变化并协调状态Reconcile Loop持续比对期望状态和实际状态二、环境准备2.1 安装Operator SDK# 下载Operator SDK curl -LO https://github.com/operator-framework/operator-sdk/releases/download/v1.32.0/operator-sdk_darwin_amd64 # 安装到PATH chmod x operator-sdk_darwin_amd64 sudo mv operator-sdk_darwin_amd64 /usr/local/bin/operator-sdk # 验证安装 operator-sdk version2.2 安装kubebuilder# 下载kubebuilder curl -L https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH) | tar -xz -C /tmp/ sudo mv /tmp/kubebuilder/bin/* /usr/local/bin/ # 验证安装 kubebuilder version2.3 初始化Go模块mkdir my-operator cd my-operator go mod init github.com/my-org/my-operator三、创建CRD3.1 使用Operator SDK创建项目# 初始化Operator项目 operator-sdk init --domain mydomain.com --repo github.com/my-org/my-operator # 创建API资源 operator-sdk create api \ --group apps \ --version v1alpha1 \ --kind MyApp \ --resource \ --controller3.2 定义CRD结构修改api/v1alpha1/myapp_types.gopackage v1alpha1 import ( metav1 k8s.io/apimachinery/pkg/apis/meta/v1 ) type MyAppSpec struct { Replicas int32 json:replicas,omitempty Image string json:image,omitempty Port int32 json:port,omitempty } type MyAppStatus struct { ReadyReplicas int32 json:readyReplicas,omitempty } //kubebuilder:object:roottrue //kubebuilder:subresource:status type MyApp struct { metav1.TypeMeta json:,inline metav1.ObjectMeta json:metadata,omitempty Spec MyAppSpec json:spec,omitempty Status MyAppStatus json:status,omitempty } //kubebuilder:object:roottrue type MyAppList struct { metav1.TypeMeta json:,inline metav1.ListMeta json:metadata,omitempty Items []MyApp json:items } func init() { SchemeBuilder.Register(MyApp{}, MyAppList{}) }3.3 生成CRD清单# 生成CRD make manifests # 安装CRD到集群 kubectl apply -f config/crd/bases/apps.mydomain.com_myapps.yaml四、实现控制器4.1 修改Reconcile逻辑修改controllers/myapp_controller.gopackage controllers import ( context fmt reflect appsv1 k8s.io/api/apps/v1 corev1 k8s.io/api/core/v1 k8s.io/apimachinery/pkg/api/errors metav1 k8s.io/apimachinery/pkg/apis/meta/v1 k8s.io/apimachinery/pkg/runtime k8s.io/apimachinery/pkg/types ctrl sigs.k8s.io/controller-runtime sigs.k8s.io/controller-runtime/pkg/client sigs.k8s.io/controller-runtime/pkg/controller/controllerutil sigs.k8s.io/controller-runtime/pkg/log appsv1alpha1 github.com/my-org/my-operator/api/v1alpha1 ) type MyAppReconciler struct { client.Client Scheme *runtime.Scheme } //kubebuilder:rbac:groupsapps.mydomain.com,resourcesmyapps,verbsget;list;watch;create;update;patch;delete //kubebuilder:rbac:groupsapps.mydomain.com,resourcesmyapps/status,verbsget;update;patch //kubebuilder:rbac:groupsapps.mydomain.com,resourcesmyapps/finalizers,verbsupdate func (r *MyAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log : log.FromContext(ctx) var myapp appsv1alpha1.MyApp if err : r.Get(ctx, req.NamespacedName, myapp); err ! nil { if errors.IsNotFound(err) { return ctrl.Result{}, nil } log.Error(err, Unable to fetch MyApp) return ctrl.Result{}, err } if err : r.reconcileDeployment(ctx, myapp); err ! nil { return ctrl.Result{}, err } if err : r.reconcileService(ctx, myapp); err ! nil { return ctrl.Result{}, err } return ctrl.Result{RequeueAfter: time.Minute}, nil } func (r *MyAppReconciler) reconcileDeployment(ctx context.Context, myapp *appsv1alpha1.MyApp) error { log : log.FromContext(ctx) dep : appsv1.Deployment{} dep.Name myapp.Name dep.Namespace myapp.Namespace or, err : ctrl.CreateOrUpdate(ctx, r.Client, dep, func() error { dep.Spec.Replicas myapp.Spec.Replicas dep.Spec.Selector metav1.LabelSelector{ MatchLabels: map[string]string{app: myapp.Name}, } dep.Spec.Template.Spec.Containers []corev1.Container{ { Name: myapp, Image: myapp.Spec.Image, Ports: []corev1.ContainerPort{ {ContainerPort: myapp.Spec.Port}, }, }, } dep.Spec.Template.Labels map[string]string{app: myapp.Name} return ctrl.SetControllerReference(myapp, dep, r.Scheme) }) if err ! nil { return err } if or ! controllerutil.OperationResultNone { log.Info(fmt.Sprintf(Deployment %s %s, dep.Name, or)) } return nil } func (r *MyAppReconciler) reconcileService(ctx context.Context, myapp *appsv1alpha1.MyApp) error { log : log.FromContext(ctx) svc : corev1.Service{} svc.Name myapp.Name svc.Namespace myapp.Namespace or, err : ctrl.CreateOrUpdate(ctx, r.Client, svc, func() error { svc.Spec.Selector map[string]string{app: myapp.Name} svc.Spec.Ports []corev1.ServicePort{ {Port: myapp.Spec.Port, TargetPort: intstr.FromInt(int(myapp.Spec.Port))}, } svc.Spec.Type corev1.ServiceTypeClusterIP return ctrl.SetControllerReference(myapp, svc, r.Scheme) }) if err ! nil { return err } if or ! controllerutil.OperationResultNone { log.Info(fmt.Sprintf(Service %s %s, svc.Name, or)) } return nil } func (r *MyAppReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(appsv1alpha1.MyApp{}). Owns(appsv1.Deployment{}). Owns(corev1.Service{}). Complete(r) }4.2 更新状态func (r *MyAppReconciler) updateStatus(ctx context.Context, myapp *appsv1alpha1.MyApp) error { dep : appsv1.Deployment{} if err : r.Get(ctx, types.NamespacedName{Name: myapp.Name, Namespace: myapp.Namespace}, dep); err ! nil { return err } if myapp.Status.ReadyReplicas ! dep.Status.ReadyReplicas { myapp.Status.ReadyReplicas dep.Status.ReadyReplicas if err : r.Status().Update(ctx, myapp); err ! nil { return err } } return nil }五、构建与部署5.1 构建Operator镜像# 构建镜像 make docker-build IMGmy-registry/my-operator:v1.0.0 # 推送镜像 make docker-push IMGmy-registry/my-operator:v1.0.05.2 部署Operator# 部署Operator make deploy IMGmy-registry/my-operator:v1.0.0 # 检查Operator Pod状态 kubectl get pods -n my-operator-system5.3 创建自定义资源实例apiVersion: apps.mydomain.com/v1alpha1 kind: MyApp metadata: name: my-app spec: replicas: 3 image: nginx:latest port: 80应用配置kubectl apply -f config/samples/apps_v1alpha1_myapp.yaml六、高级功能6.1 配置Finalizerfunc (r *MyAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log : log.FromContext(ctx) var myapp appsv1alpha1.MyApp if err : r.Get(ctx, req.NamespacedName, myapp); err ! nil { if errors.IsNotFound(err) { return ctrl.Result{}, nil } return ctrl.Result{}, err } if myapp.GetDeletionTimestamp() ! nil { if controllerutil.ContainsFinalizer(myapp, myapp.finalizers.mydomain.com) { if err : r.cleanupResources(ctx, myapp); err ! nil { return ctrl.Result{}, err } controllerutil.RemoveFinalizer(myapp, myapp.finalizers.mydomain.com) if err : r.Update(ctx, myapp); err ! nil { return ctrl.Result{}, err } } return ctrl.Result{}, nil } if !controllerutil.ContainsFinalizer(myapp, myapp.finalizers.mydomain.com) { controllerutil.AddFinalizer(myapp, myapp.finalizers.mydomain.com) if err : r.Update(ctx, myapp); err ! nil { return ctrl.Result{}, err } } // ... 其他reconcile逻辑 }6.2 事件处理func (r *MyAppReconciler) recordEvent(myapp *appsv1alpha1.MyApp, eventType, reason, message string) { event : corev1.Event{ ObjectMeta: metav1.ObjectMeta{ GenerateName: myapp.Name -, Namespace: myapp.Namespace, }, InvolvedObject: corev1.ObjectReference{ Kind: MyApp, Namespace: myapp.Namespace, Name: myapp.Name, UID: myapp.UID, APIVersion: myapp.APIVersion, ResourceVersion: myapp.ResourceVersion, }, Type: eventType, Reason: reason, Message: message, Source: corev1.EventSource{ Component: myapp-controller, }, } r.Create(context.Background(), event) }6.3 状态验证func (r *MyAppReconciler) validateSpec(myapp *appsv1alpha1.MyApp) error { if myapp.Spec.Replicas 1 { return fmt.Errorf(replicas must be at least 1) } if myapp.Spec.Image { return fmt.Errorf(image is required) } if myapp.Spec.Port 1 || myapp.Spec.Port 65535 { return fmt.Errorf(port must be between 1 and 65535) } return nil }七、测试与调试7.1 单元测试func TestReconcile(t *testing.T) { ctx : context.Background() scheme : runtime.NewScheme() _ appsv1alpha1.AddToScheme(scheme) _ appsv1.AddToScheme(scheme) _ corev1.AddToScheme(scheme) k8sClient : fake.NewClientBuilder().WithScheme(scheme).Build() r : MyAppReconciler{ Client: k8sClient, Scheme: scheme, } myapp : appsv1alpha1.MyApp{ ObjectMeta: metav1.ObjectMeta{ Name: test-app, Namespace: default, }, Spec: appsv1alpha1.MyAppSpec{ Replicas: 2, Image: nginx:latest, Port: 80, }, } k8sClient.Create(ctx, myapp) req : ctrl.Request{ NamespacedName: types.NamespacedName{ Name: test-app, Namespace: default, }, } res, err : r.Reconcile(ctx, req) if err ! nil { t.Fatalf(Reconcile failed: %v, err) } if res.Requeue { t.Fatal(Expected no requeue) } }7.2 运行本地调试# 设置KUBECONFIG export KUBECONFIG~/.kube/config # 运行Operator make run八、最佳实践8.1 CRD设计原则保持简单只定义必要的字段使用标准类型遵循Kubernetes API风格版本控制使用v1alpha1、v1beta1、v1版本策略状态分离Spec定义期望状态Status记录实际状态8.2 控制器设计模式幂等性确保Reconcile可以安全重复执行错误处理区分可重试和不可重试错误资源限制使用RateLimiter防止过度重试日志记录记录关键操作和决策点8.3 部署建议使用ServiceAccount为Operator配置最小权限资源请求和限制合理配置CPU和内存健康检查配置livenessProbe和readinessProbe高可用性部署多个副本并使用Leader选举九、总结Kubernetes Operator模式为管理复杂应用提供了强大的解决方案。通过本文的实践指南您可以掌握Operator的核心开发流程从CRD定义到控制器实现。建议从简单场景开始逐步扩展功能最终构建出生产级的Operator。参考资料Operator SDK官方文档Kubebuilder官方文档Kubernetes Operator模式