基于 Micronaut 和 OCI 标准构建管理应用生命周期的 Kubernetes Operator


团队内部推进 GitOps 已经有一段时间,但总有几个场景覆盖不到。最典型的一个,是基础环境镜像的自动化更新。例如,我们有一个通用的 Java 基础镜像,当它更新了安全补丁并推送到 OCI 仓库后,我们希望所有依赖它的应用 Pod 都能自动重建,而不是等待每个业务团队手动去修改 Git 仓库里的 Helm Chart 或 Kustomize 文件。手动操作不仅繁琐,而且容易遗漏,构成安全隐患。

最初的构想是写一个定时任务脚本,用 kubectlskopeo 之类的工具去轮询仓库,但这套方案过于“胶水”,状态管理、错误处理和扩展性都成问题。我们需要一个更云原生的解决方案:一个 Kubernetes Operator。它的核心职责是监视 OCI 镜像仓库的变化,并自动更新集群内关联的 Deployment

技术选型上,社区主流是使用 Go 语言的 Kubebuilder 或 Operator SDK。但我们的技术栈以 JVM 为主,引入 Go 会带来维护成本。因此,目标转向在 JVM 生态中寻找方案。Quarkus 和 Spring Boot 都是备选,但考虑到 Operator 这种长时间运行但大部分时间处于空闲状态的控制器(Controller)特性,Micronaut 凭借其极低的内存占用和近乎瞬时的启动速度(尤其是在编译为 GraalVM Native Image 后)脱颖而出。这对于控制器的资源消耗和弹性伸缩极其友好。

我们的目标是创建一个 ImageWatcher Operator,它通过一个自定义资源(CRD)ImagePolicy 来声明需要监视的镜像和更新策略。

第一步:定义 CRD 和领域模型

一个健壮的 Operator 始于一个清晰的 API 定义。我们设计的 ImagePolicy CRD 如下:

# crd/imagepolicy.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: imagepolicies.operator.technomad.dev
spec:
  group: operator.technomad.dev
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                repository:
                  type: string
                  description: "OCI image repository to watch, e.g., 'harbor.mycorp.com/library/base-java'"
                tagPolicy:
                  type: object
                  properties:
                    strategy:
                      type: string
                      enum: ["SemVer", "Latest", "Regex"]
                    pattern:
                      type: string
                      description: "Regex pattern if strategy is Regex"
                  required: ["strategy"]
                updateTarget:
                  type: object
                  properties:
                    kind:
                      type: string
                      enum: ["Deployment"]
                    name:
                      type: string
                    namespace:
                      type: string
                  required: ["kind", "name", "namespace"]
                pollIntervalMinutes:
                  type: integer
                  default: 60
              required: ["repository", "tagPolicy", "updateTarget"]
            status:
              type: object
              properties:
                lastCheckedTime:
                  type: string
                  format: date-time
                lastAppliedTag:
                  type: string
                observedGeneration:
                  type: integer
                conditions:
                  type: array
                  items:
                    type: object
                    properties:
                      type:
                        type: string
                      status:
                        type: string
                      lastTransitionTime:
                        type: string
                        format: date-time
                      reason:
                        type: string
                      message:
                        type: string
  scope: Namespaced
  names:
    plural: imagepolicies
    singular: imagepolicy
    kind: ImagePolicy
    shortNames:
    - ipol

这份 CRD 定义了我们的核心逻辑:监视哪个仓库(repository),用什么策略选择标签(tagPolicy),以及更新哪个目标工作负载(updateTarget)。status 字段则用于记录 Operator 的工作状态,这是实现幂等性和避免重复操作的关键。

接下来是在 Micronaut 项目中创建对应的 Java POJO。这里必须使用 jackson-databind 并且注解要齐全,以便 Kubernetes Java Client 能够正确地序列化和反序列化。

// src/main/java/dev/technomad/operator/model/ImagePolicy.java
package dev.technomad.operator.model;

import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("operator.technomad.dev")
@Version("v1alpha1")
public class ImagePolicy extends CustomResource<ImagePolicySpec, ImagePolicyStatus> {
}

// src/main/java/dev/technomad/operator/model/ImagePolicySpec.java
package dev.technomad.operator.model;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;
// ... other models for TagPolicy and UpdateTarget
public class ImagePolicySpec {
    @NotNull
    private final String repository;
    @NotNull
    private final TagPolicy tagPolicy;
    @NotNull
    private final UpdateTarget updateTarget;
    private final int pollIntervalMinutes;

    @JsonCreator
    public ImagePolicySpec(
            @JsonProperty("repository") String repository,
            @JsonProperty("tagPolicy") TagPolicy tagPolicy,
            @JsonProperty("updateTarget") UpdateTarget updateTarget,
            @JsonProperty("pollIntervalMinutes") Integer pollIntervalMinutes) {
        this.repository = repository;
        this.tagPolicy = tagPolicy;
        this.updateTarget = updateTarget;
        this.pollIntervalMinutes = pollIntervalMinutes == null ? 60 : pollIntervalMinutes;
    }
    // ... getters
}

// src/main/java/dev/technomad/operator/model/ImagePolicyStatus.java
package dev.technomad.operator.model;

import java.time.Instant;
import java.util.List;
// ... model for Condition
public class ImagePolicyStatus {
    private Instant lastCheckedTime;
    private String lastAppliedTag;
    private Long observedGeneration;
    private List<Condition> conditions;
    // ... getters and setters
}

第二步:实现核心调谐逻辑 (Reconciliation Loop)

Operator 的灵魂在于调谐循环(Reconciliation Loop),这是典型的控制器设计模式的应用。我们使用 Fabric8 Kubernetes Client,它与 Micronaut 集成良好。

首先是配置。在 application.yml 中,我们为 Kubernetes client 和 OCI 仓库客户端做好配置准备。

# src/main/resources/application.yml
micronaut:
  application:
    name: image-watcher-operator

kubernetes:
  client:
    namespace: "default" # Default namespace, can be overridden

oci:
  registry:
    # We will fetch credentials from a Kubernetes secret
    credentials:
      secret-name: "oci-registry-creds"
      namespace: "operators" # Namespace where the operator runs

核心的 Reconciler Bean 如下。它的 reconcile 方法会在 ImagePolicy 资源发生变化时被调用。

// src/main/java/dev/technomad/operator/ImagePolicyReconciler.java
package dev.technomad.operator;

import dev.technomad.operator.model.ImagePolicy;
import dev.technomad.operator.model.ImagePolicyStatus;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.*;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Optional;

@ControllerConfiguration
@Singleton
public class ImagePolicyReconciler implements Reconciler<ImagePolicy>, EventSourceInitializer<ImagePolicy> {

    private static final Logger LOG = LoggerFactory.getLogger(ImagePolicyReconciler.class);

    @Inject
    private KubernetesClient k8sClient;

    @Inject
    private OciRegistryClient ociRegistryClient; // This is our custom client to talk to OCI registries

    @Override
    public Map<String, EventSource> prepareEventSources(EventSourceContext<ImagePolicy> context) {
        // Here we could set up event sources to watch Deployments, etc.
        // For simplicity in this example, we rely on periodic reconciliation.
        return EventSourceInitializer.super.prepareEventSources(context);
    }

    @Override
    public UpdateControl<ImagePolicy> reconcile(ImagePolicy resource, Context<ImagePolicy> context) {
        LOG.info("Reconciling ImagePolicy {} in namespace {}", resource.getMetadata().getName(), resource.getMetadata().getNamespace());

        ImagePolicyStatus status = resource.getStatus();
        if (status == null) {
            status = new ImagePolicyStatus();
            resource.setStatus(status);
        }

        // 1. Throttle reconciliation based on poll interval
        final int pollInterval = resource.getSpec().getPollIntervalMinutes();
        if (status.getLastCheckedTime() != null &&
                status.getLastCheckedTime().plus(pollInterval, ChronoUnit.MINUTES).isAfter(Instant.now())) {
            LOG.debug("Skipping reconciliation for {} as poll interval has not passed.", resource.getMetadata().getName());
            // Reschedule reconciliation for when the interval is up
            return UpdateControl.noUpdate().rescheduleAfter(pollInterval * 60 * 1000L);
        }

        try {
            // 2. Fetch the latest tag from the OCI registry
            // This is a critical and complex part of the implementation.
            String latestTag = ociRegistryClient.getLatestTag(
                    resource.getSpec().getRepository(),
                    resource.getSpec().getTagPolicy()
            ).orElse(null);

            status.setLastCheckedTime(Instant.now());

            if (latestTag == null) {
                LOG.warn("No suitable tag found for repository {}", resource.getSpec().getRepository());
                // Update status and stop here for this cycle
                // You would typically set a Condition here to indicate the issue.
                return UpdateControl.updateStatus(resource);
            }

            // 3. Compare with the last applied tag
            if (latestTag.equals(status.getLastAppliedTag())) {
                LOG.info("Tag {} is already applied for {}. No update needed.", latestTag, resource.getMetadata().getName());
                return UpdateControl.updateStatus(resource).rescheduleAfter(pollInterval * 60 * 1000L);
            }

            // 4. Find and patch the target Deployment
            String targetNamespace = resource.getSpec().getUpdateTarget().getNamespace();
            String targetName = resource.getSpec().getUpdateTarget().getName();
            Deployment targetDeployment = k8sClient.apps().deployments()
                    .inNamespace(targetNamespace)
                    .withName(targetName)
                    .get();

            if (targetDeployment == null) {
                LOG.error("Target Deployment {}/{} not found.", targetNamespace, targetName);
                // Again, use Conditions in status to reflect this error
                return UpdateControl.updateStatus(resource);
            }

            // 5. Perform the update
            String newImageRef = resource.getSpec().getRepository() + ":" + latestTag;
            LOG.info("Updating Deployment {}/{} to image {}", targetNamespace, targetName, newImageRef);

            // A common mistake is to update the entire container list.
            // This can cause issues if other controllers (like sidecars) manage containers.
            // It's better to find the specific container and patch it.
            targetDeployment.getSpec().getTemplate().getSpec().getContainers().stream()
                    .filter(c -> c.getImage().startsWith(resource.getSpec().getRepository()))
                    .findFirst()
                    .ifPresentOrElse(
                        container -> container.setImage(newImageRef),
                        () -> LOG.error("No container in Deployment {}/{} matches the repository {}",
                                targetNamespace, targetName, resource.getSpec().getRepository())
                    );

            k8sClient.apps().deployments().inNamespace(targetNamespace).replace(targetDeployment);

            // 6. Update the status with the new tag
            status.setLastAppliedTag(latestTag);
            status.setObservedGeneration(resource.getMetadata().getGeneration());
            LOG.info("Successfully applied tag {} and updated status for {}", latestTag, resource.getMetadata().getName());

            return UpdateControl.updateStatus(resource).rescheduleAfter(pollInterval * 60 * 1000L);

        } catch (Exception e) {
            LOG.error("Error during reconciliation for " + resource.getMetadata().getName(), e);
            // In a production scenario, you would implement exponential backoff
            // and update status conditions with the error message.
            return UpdateControl.noUpdate().rescheduleAfter(5 * 60 * 1000L); // Reschedule after 5 mins on error
        }
    }
}

第三步:与 OCI 仓库交互的核心库

与 Kubernetes API 服务器的交互由现成的 fabric8 库解决了,但与 OCI 镜像仓库的交互则需要我们自己实现。OCI Registry API v2 是一个标准的 HTTP API,但处理认证(通常是 Bearer token)和分页比较繁琐。

在真实项目中,我们会引入一个健壮的 HTTP client(Micronaut 自带的声明式 HTTP client 就很棒)并处理 www-authenticate 头来获取 token。为了演示核心逻辑,下面是一个简化的 OciRegistryClient 实现,它展示了与仓库API交互的骨架。

// src/main/java/dev/technomad/operator/OciRegistryClient.java
package dev.technomad.operator;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.technomad.operator.model.TagPolicy;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.annotation.Client;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

@Singleton
public class OciRegistryClient {

    private static final Logger LOG = LoggerFactory.getLogger(OciRegistryClient.class);
    private final HttpClient httpClient;
    private final ObjectMapper objectMapper = new ObjectMapper();

    // NOTE: This is a simplified client. A production version would handle authentication,
    // pagination ("Link" header), and different registry providers.
    public OciRegistryClient(@Client("oci-registry") HttpClient httpClient) {
        this.httpClient = httpClient;
    }

    public Optional<String> getLatestTag(String repository, TagPolicy policy) {
        // Example: repository is "library/ubuntu"
        // Harbor/DockerHub URL would be like https://registry-1.docker.io/v2/library/ubuntu/tags/list
        // This requires significant logic for URL construction and auth, omitted for brevity.
        String registryHost = "registry-1.docker.io"; // This should be dynamic
        String url = String.format("https://%s/v2/%s/tags/list", registryHost, repository);

        try {
            HttpRequest<?> request = HttpRequest.GET(url); // Auth headers would be added here
            HttpResponse<String> response = httpClient.toBlocking().exchange(request, String.class);

            if (!response.getStatus().getCodeClass().isSuccess()) {
                LOG.error("Failed to fetch tags for {}: HTTP {}", repository, response.getStatus());
                return Optional.empty();
            }

            JsonNode root = objectMapper.readTree(response.body());
            List<String> tags = new ArrayList<>();
            if (root.has("tags") && root.get("tags").isArray()) {
                for (JsonNode tagNode : root.get("tags")) {
                    tags.add(tagNode.asText());
                }
            } else {
                return Optional.empty();
            }
            
            return filterAndSortTags(tags, policy);

        } catch (IOException e) {
            LOG.error("Exception while fetching tags for " + repository, e);
            return Optional.empty();
        }
    }

    private Optional<String> filterAndSortTags(List<String> tags, TagPolicy policy) {
        // The real challenge is in implementing the sorting strategies correctly.
        // SemVer parsing can be tricky. "latest" is often just a floating tag.
        switch (policy.getStrategy()) {
            case "Latest":
                // This is often a bad idea in production. "latest" is mutable.
                // We're assuming it exists and is what's desired.
                return tags.contains("latest") ? Optional.of("latest") : Optional.empty();

            case "SemVer":
                // A proper implementation would use a SemVer library like 'com.github.zafarkhaja:java-semver'
                // to parse and sort versions correctly (e.g., 1.10.0 > 1.9.0).
                return tags.stream()
                        .filter(t -> t.matches("\\d+\\.\\d+\\.\\d+.*")) // Basic semver-like filter
                        .max(Comparator.naturalOrder()); // Naive string comparison, not true SemVer!

            case "Regex":
                Pattern pattern = Pattern.compile(policy.getPattern());
                return tags.stream()
                        .filter(t -> pattern.matcher(t).matches())
                        .sorted(Comparator.reverseOrder()) // Assuming lexical sort is sufficient
                        .findFirst();

            default:
                return Optional.empty();
        }
    }
}

这里的坑在于 SemVer 策略。简单的字符串比较是错误的(例如,”1.10.0” 会被认为小于 “1.9.0”)。在生产级的代码中,必须引入一个可靠的 SemVer 解析库来做正确的版本比较。此外,OCI 仓库的认证、API 端点差异和分页处理是这个模块中最需要打磨的部分。

第四步:打包和部署

我们将使用 Jib 插件将 Micronaut 应用构建成一个 OCI 兼容的容器镜像,无需 Dockerfile。同时,我们会将其编译为 GraalVM Native Image 以获得最佳性能。

build.gradle.kts 配置片段:

// build.gradle.kts
tasks {
    build {
        dependsOn(shadowJar)
    }

    // GraalVM Native Image configuration
    nativeImage {
        // ... GraalVM args, reflection configs for Kubernetes models
    }
}

// Jib configuration
jib {
    from {
        image = "gcr.io/distroless/base-nossl" // Use a minimal base image
    }
    to {
        image = "my-registry/image-watcher-operator:0.1.0"
    }
    container {
        mainClass.set("dev.technomad.operator.Application")
    }
}

部署到 Kubernetes 集群需要以下几个清单:

  1. Namespace: kubectl create ns operators
  2. CRD: kubectl apply -f crd/imagepolicy.yaml
  3. RBAC: 一个 ServiceAccountClusterRole(或 Role)和 ClusterRoleBinding(或 RoleBinding)来授予 Operator 访问 ImagePolicy CRs、DeploymentsSecrets (用于 OCI 凭证) 的权限。
  4. Deployment: 运行 Operator Pod 的 Deployment 清单。
# deploy/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: image-watcher-operator
  namespace: operators
spec:
  replicas: 1
  selector:
    matchLabels:
      app: image-watcher-operator
  template:
    metadata:
      labels:
        app: image-watcher-operator
    spec:
      serviceAccountName: image-watcher-sa
      containers:
      - name: operator
        image: my-registry/image-watcher-operator:0.1.0 # The image we built
        env:
          # Pass the namespace where the operator is running
          - name: OPERATOR_NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace

部署完成后,我们就可以创建 ImagePolicy 资源来验证它的工作了。

# deploy/example-policy.yaml
apiVersion: operator.technomad.dev/v1alpha1
kind: ImagePolicy
metadata:
  name: base-java-updater
  namespace: default
spec:
  repository: "library/openjdk" # Using public docker hub for example
  tagPolicy:
    strategy: SemVer
  updateTarget:
    kind: Deployment
    name: my-java-app
    namespace: default
  pollIntervalMinutes: 5

当这个 ImagePolicy 被应用后,Operator 的调谐循环会被触发。它会去 Docker Hub 拉取 library/openjdk 的所有标签,找到符合 SemVer 的最新版本,然后检查 default 命名空间下名为 my-java-appDeployment。如果该 Deployment 使用的 openjdk 镜像标签不是最新的,Operator 就会执行 patch 操作,触发 Pod 的滚动更新。

sequenceDiagram
    participant User
    participant K8s API Server
    participant ImageWatcherOperator
    participant OciRegistry
    participant TargetDeployment

    User->>K8s API Server: APPLY ImagePolicy CR
    K8s API Server-->>ImageWatcherOperator: Watch Event: ADDED ImagePolicy
    ImageWatcherOperator->>ImageWatcherOperator: Start Reconciliation Loop
    ImageWatcherOperator->>OciRegistry: GET /v2/repo/tags/list
    OciRegistry-->>ImageWatcherOperator: List of tags
    ImageWatcherOperator->>ImageWatcherOperator: Apply TagPolicy (find latest)
    ImageWatcherOperator->>K8s API Server: GET Deployment 'my-java-app'
    K8s API Server-->>ImageWatcherOperator: Current Deployment Spec
    ImageWatcherOperator->>ImageWatcherOperator: Compare latest tag with current image
    alt Tags are different
        ImageWatcherOperator->>K8s API Server: PATCH Deployment with new image tag
        K8s API Server->>TargetDeployment: Trigger Rolling Update
        ImageWatcherOperator->>K8s API Server: UPDATE ImagePolicy Status
    else Tags are the same
        ImageWatcherOperator->>K8s API Server: UPDATE ImagePolicy Status (last checked time)
    end

局限性与未来迭代

当前这个实现是基于轮询的,对于更新不频繁的镜像,这会造成不必要的 API 请求和资源消耗。一个更高级的架构是事件驱动的。许多现代 OCI 仓库(如 Harbor、GCR)支持 Webhook 通知。未来的迭代方向应该是让 Operator 暴露一个 Ingress 端点,接收来自仓库的推送事件。这将使更新近乎实时,并且更加高效。

另外,tagPolicy 的实现目前还比较初级。生产环境中需要支持更复杂的规则,例如 “遵循 1.18.* 分支的最新补丁版本”,这需要更精细的 SemVer 范围匹配逻辑。对 Helm Chart 更新的支持,或者更新其他类型的 Kubernetes 资源(如 StatefulSetDaemonSet),也是自然的扩展方向。


  目录