Files
zitadel-k8s-operator/internal/controller/cluster_controller.go
HaimKortovich dc90b67880
All checks were successful
Build and Publish / build-release (push) Successful in 8m58s
appptrocol is pointer
2026-04-10 12:36:22 -05:00

473 lines
17 KiB
Go

/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"crypto/x509"
"encoding/base64"
"encoding/pem"
"fmt"
"time"
zitadelv1alpha1 "gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/api/v1alpha1"
builder "gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/builder"
condition "gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/condition"
"gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/configuration"
configmap "gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/controller/configmap"
secret "gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/controller/secret"
"gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/controller/service"
"gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/deployment"
"gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/masterkey"
systemapiaccount "gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/systemapi"
"github.com/hashicorp/go-multierror"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type reconcilePhase struct {
Name string
Reconcile func(context.Context, *zitadelv1alpha1.Cluster) (ctrl.Result, error)
}
type patcher func(*zitadelv1alpha1.ClusterStatus) error
// ClusterReconciler reconciles a Cluster object
type ClusterReconciler struct {
client.Client
Scheme *runtime.Scheme
ConditionReady *condition.Ready
Builder *builder.Builder
SecretReconciler *secret.SecretReconciler
ConfigMapReconciler *configmap.ConfigMapReconciler
ServiceReconciler *service.ServiceReconciler
RefResolver *zitadelv1alpha1.RefResolver
}
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;patch
// +kubebuilder:rbac:groups="",resources=services,verbs=list;watch;create;patch
// +kubebuilder:rbac:groups="",resources=secrets,verbs=list;watch;create;patch
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=create;patch;get;list;watch
// +kubebuilder:rbac:groups="",resources=endpoints/restricted,verbs=create;patch;get;list;watch
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;delete
// +kubebuilder:rbac:groups="",resources=events,verbs=list;watch;create;patch
// +kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=list;watch;create;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=list;watch;create;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=list;watch;create;patch
// +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=list;watch;create;patch
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=roles;rolebindings;clusterrolebindings,verbs=list;watch;create;patch
// +kubebuilder:rbac:groups=zitadel.github.com,resources=clusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=zitadel.github.com,resources=clusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=zitadel.github.com,resources=clusters/finalizers,verbs=update
// +kubebuilder:rbac:groups=zitadel.github.com,resources=instances,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=zitadel.github.com,resources=instances/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=zitadel.github.com,resources=instances/finalizers,verbs=update
// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters/finalizers,verbs=update
// +kubebuilder:rbac:groups=certificates.k8s.io,resources=certificatesigningrequests,verbs=get;list;watch;create;patch;delete
// +kubebuilder:rbac:groups=certificates.k8s.io,resources=certificatesigningrequests/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=certificates.k8s.io,resources=certificatesigningrequests/approval,verbs=update
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Info("Starting Reconcile")
var zitadel zitadelv1alpha1.Cluster
if err := r.Get(ctx, req.NamespacedName, &zitadel); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
phases := []reconcilePhase{
{
Name: "Spec",
Reconcile: r.setSpecDefaults,
},
{
Name: "Status",
Reconcile: r.setStatusDefaults,
},
{
Name: "MasterkeySecret",
Reconcile: r.reconcileMasterKeySecret,
},
{
Name: "ServiceAccount",
Reconcile: r.reconcileSystemAPIUser,
},
{
Name: "Configuration",
Reconcile: r.reconcileConfig,
},
{
Name: "InitJob",
Reconcile: r.reconcileInitJob,
},
{
Name: "SetupJob",
Reconcile: r.reconcileSetupJob,
},
{
Name: "Deployment",
Reconcile: r.reconcileDeployment,
},
{
Name: "Service",
Reconcile: r.reconcileService,
},
}
for _, p := range phases {
result, err := p.Reconcile(ctx, &zitadel)
if err != nil {
if errors.IsNotFound(err) {
continue
}
var errBundle *multierror.Error
errBundle = multierror.Append(errBundle, err)
msg := fmt.Sprintf("Error reconciling %s: %v", p.Name, err)
patchErr := r.patchStatus(ctx, &zitadel, func(s *zitadelv1alpha1.ClusterStatus) error {
patcher := r.ConditionReady.PatcherFailed(msg)
patcher(s)
return nil
})
if errors.IsNotFound(patchErr) {
errBundle = multierror.Append(errBundle, patchErr)
}
if err := errBundle.ErrorOrNil(); err != nil {
return ctrl.Result{}, fmt.Errorf("error reconciling %s: %v", p.Name, err)
}
}
if !result.IsZero() {
return result, err
}
}
if err := r.patchStatus(ctx, &zitadel, r.patcher(ctx, &zitadel)); err != nil && !errors.IsNotFound(err) {
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: 15 * time.Minute}, nil
}
func (r *ClusterReconciler) setSpecDefaults(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
return ctrl.Result{}, r.patch(ctx, zitadel, func(zit *zitadelv1alpha1.Cluster) {
zit.SetDefaults()
})
}
func (r *ClusterReconciler) setStatusDefaults(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
return ctrl.Result{}, r.patchStatus(ctx, zitadel, func(status *zitadelv1alpha1.ClusterStatus) error {
status.FillWithDefaults(zitadel)
return nil
})
}
func (r *ClusterReconciler) reconcileMasterKeySecret(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
secretName := masterkey.MasterKeyName(zitadel)
key := types.NamespacedName{
Name: secretName,
Namespace: zitadel.Namespace,
}
_, err := r.SecretReconciler.ReconcileRandomPassword(ctx, key, masterkey.Key, zitadel)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *ClusterReconciler) reconcileSystemAPIUser(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
secretName := systemapiaccount.SystemAPIAccountName(zitadel)
key := types.NamespacedName{
Name: secretName,
Namespace: zitadel.Namespace,
}
_, err := r.SecretReconciler.ReconcileRandomPrivateRSA(ctx, key, systemapiaccount.Key, zitadel)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *ClusterReconciler) reconcileConfig(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
postgres, err := r.RefResolver.PostgreSQLClusterRef(ctx, &zitadel.Spec.PostgreSQLClusterRef, zitadel.Namespace)
if err != nil {
return ctrl.Result{}, err
}
configName := configuration.ConfigurationName(zitadel)
key := types.NamespacedName{
Name: configName,
Namespace: zitadel.Namespace,
}
privateKeyData, err := r.RefResolver.SecretKeyRef(ctx, corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: systemapiaccount.SystemAPIAccountName(zitadel)}, Key: systemapiaccount.Key}, zitadel.Namespace)
if err != nil {
return ctrl.Result{}, err
}
pemBlock, _ := pem.Decode([]byte(privateKeyData))
if pemBlock == nil {
return ctrl.Result{}, fmt.Errorf("failed to decode PEM block")
}
privateKey, err := x509.ParsePKCS1PrivateKey(pemBlock.Bytes)
publicKeyBytes, err := x509.MarshalPKIXPublicKey(&privateKey.PublicKey)
if err != nil {
return ctrl.Result{}, err
}
publicKeyPem := pem.EncodeToMemory(
&pem.Block{
Type: "RSA PUBLIC KEY",
Bytes: publicKeyBytes,
},
)
base64key := base64.StdEncoding.EncodeToString(publicKeyPem)
err = r.ConfigMapReconciler.ReconcileZitadelConfiguration(ctx, key, zitadel, postgres, base64key)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *ClusterReconciler) reconcileInitJob(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
key := client.ObjectKeyFromObject(zitadel)
key.Name = "init-job-" + key.Name
// Build the desired job
desiredInitJob, err := r.Builder.BuildInitJob(zitadel, key)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error building InitJob: %v", err)
}
var existingJob batchv1.Job
err = r.Get(ctx, key, &existingJob)
if err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{}, fmt.Errorf("error getting InitJob: %v", err)
}
// If job is not found, create the job
if err := r.Create(ctx, desiredInitJob); err != nil {
return ctrl.Result{}, fmt.Errorf("error creating InitJob: %v", err)
}
return ctrl.Result{}, nil
}
// Compare the image in the existing job with the desired image
existingImage := existingJob.Spec.Template.Spec.Containers[0].Image
desiredImage := desiredInitJob.Spec.Template.Spec.Containers[0].Image
// If the images don't match, delete the existing job and wait for deletion
if existingImage != desiredImage {
if err := r.Delete(ctx, &existingJob); err != nil {
return ctrl.Result{}, fmt.Errorf("error deleting existing InitJob: %v", err)
}
// Wait for the job to be fully deleted before creating a new one
for {
err := r.Get(ctx, key, &existingJob)
if errors.IsNotFound(err) {
break // Job has been deleted, we can proceed
}
if err != nil {
return ctrl.Result{}, fmt.Errorf("error checking if InitJob is deleted: %v", err)
}
// Sleep for a short interval to avoid tight loop
time.Sleep(1 * time.Second)
}
// Now create the new job
if err := r.Create(ctx, desiredInitJob); err != nil {
return ctrl.Result{}, fmt.Errorf("error creating new InitJob: %v", err)
}
}
if err := r.Get(ctx, key, &existingJob); err != nil {
return ctrl.Result{}, fmt.Errorf("error fetching existing InitJob status: %v", err)
}
if existingJob.Status.Succeeded != 1 { // Replace with actual success condition
return ctrl.Result{}, nil
}
// If the job exists and the image matches, no action is needed
return ctrl.Result{}, nil
}
func (r *ClusterReconciler) reconcileSetupJob(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
key := client.ObjectKeyFromObject(zitadel)
key.Name = "setup-job-" + key.Name
// Build the desired job
desiredSetupJob, err := r.Builder.BuildSetupJob(zitadel, key)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error building SetupJob: %v", err)
}
var existingJob batchv1.Job
err = r.Get(ctx, key, &existingJob)
if err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{}, fmt.Errorf("error getting SetupJob: %v", err)
}
// If job is not found, create the job
if err := r.Create(ctx, desiredSetupJob); err != nil {
return ctrl.Result{}, fmt.Errorf("error creating SetupJob: %v", err)
}
return ctrl.Result{}, nil
}
// Compare the image in the existing job with the desired image
existingImage := existingJob.Spec.Template.Spec.Containers[0].Image
desiredImage := desiredSetupJob.Spec.Template.Spec.Containers[0].Image
// If the images don't match, delete the existing job and wait for deletion
if existingImage != desiredImage {
if err := r.Delete(ctx, &existingJob); err != nil {
return ctrl.Result{}, fmt.Errorf("error deleting existing SetupJob: %v", err)
}
// Wait for the job to be fully deleted before creating a new one
for {
err := r.Get(ctx, key, &existingJob)
if errors.IsNotFound(err) {
break // Job has been deleted, we can proceed
}
if err != nil {
return ctrl.Result{}, fmt.Errorf("error checking if SetupJob is deleted: %v", err)
}
// Sleep for a short interval to avoid tight loop
time.Sleep(1 * time.Second)
}
// Now create the new job
if err := r.Create(ctx, desiredSetupJob); err != nil {
return ctrl.Result{}, fmt.Errorf("error creating new SetupJob: %v", err)
}
}
if err := r.Get(ctx, key, &existingJob); err != nil {
return ctrl.Result{}, fmt.Errorf("error fetching existing SetupJob status: %v", err)
}
if existingJob.Status.Succeeded != 1 { // Replace with actual success condition
return ctrl.Result{}, nil
}
// If the job exists and the image matches, no action is needed
return ctrl.Result{}, nil
}
func (r *ClusterReconciler) reconcileDeployment(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
// TODO: Reload on config changed
key := client.ObjectKeyFromObject(zitadel)
desiredSts, err := r.Builder.BuildDeployment(zitadel, key)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error building Deployment: %v", err)
}
var existingDep appsv1.Deployment
if err := r.Get(ctx, key, &existingDep); err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{}, fmt.Errorf("error getting Deployment: %v", err)
}
if err := r.Create(ctx, desiredSts); err != nil {
return ctrl.Result{}, fmt.Errorf("error creating Deployment: %v", err)
}
return ctrl.Result{}, nil
}
patch := client.MergeFrom(existingDep.DeepCopy())
existingDep.Spec.Template = desiredSts.Spec.Template
existingDep.Spec.Replicas = desiredSts.Spec.Replicas
return ctrl.Result{}, r.Patch(ctx, &existingDep, patch)
}
func (r *ClusterReconciler) reconcileService(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
return ctrl.Result{}, r.reconcileDefaultService(ctx, zitadel)
}
func (r *ClusterReconciler) reconcileDefaultService(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) error {
key := client.ObjectKeyFromObject(zitadel)
opts := builder.ServiceOpts{
Ports: []corev1.ServicePort{
{
Name: deployment.ZitadelName,
Port: deployment.ZitadelPort,
AppProtocol: ptr.To("h2c"),
},
},
}
desiredSvc, err := r.Builder.BuildService(zitadel, key, opts)
if err != nil {
return fmt.Errorf("error building Service: %v", err)
}
return r.ServiceReconciler.Reconcile(ctx, desiredSvc)
}
func (r *ClusterReconciler) patchStatus(ctx context.Context, zitadel *zitadelv1alpha1.Cluster,
patcher patcher) error {
patch := client.MergeFrom(zitadel.DeepCopy())
if err := patcher(&zitadel.Status); err != nil {
return err
}
return r.Status().Patch(ctx, zitadel, patch)
}
func (r *ClusterReconciler) patcher(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) patcher {
return func(s *zitadelv1alpha1.ClusterStatus) error {
var sts appsv1.Deployment
if err := r.Get(ctx, client.ObjectKeyFromObject(zitadel), &sts); err != nil {
return err
}
zitadel.Status.Replicas = sts.Status.ReadyReplicas
condition.SetReadyWithDeployment(&zitadel.Status, &sts)
return nil
}
}
func (r *ClusterReconciler) patch(ctx context.Context, zitadel *zitadelv1alpha1.Cluster,
patcher func(*zitadelv1alpha1.Cluster)) error {
patch := client.MergeFrom(zitadel.DeepCopy())
patcher(zitadel)
return r.Patch(ctx, zitadel, patch)
}
// SetupWithManager sets up the controller with the Manager.
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&zitadelv1alpha1.Cluster{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Owns(&corev1.ConfigMap{}).
Owns(&corev1.Secret{}).
WithOptions(controller.Options{RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](time.Millisecond*500, time.Minute*3)}).
Complete(r)
}