Some checks failed
Build and Publish / build-release (push) Failing after 8m7s
472 lines
17 KiB
Go
472 lines
17 KiB
Go
/*
|
|
Copyright 2024.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package controller
|
|
|
|
import (
|
|
"context"
|
|
"crypto/x509"
|
|
"encoding/base64"
|
|
"encoding/pem"
|
|
"fmt"
|
|
"time"
|
|
|
|
zitadelv1alpha1 "gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/api/v1alpha1"
|
|
builder "gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/builder"
|
|
condition "gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/condition"
|
|
"gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/configuration"
|
|
configmap "gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/controller/configmap"
|
|
secret "gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/controller/secret"
|
|
"gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/controller/service"
|
|
"gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/deployment"
|
|
"gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/masterkey"
|
|
systemapiaccount "gitea.corredorconect.com/software-engineering/zitadel-k8s-operator/pkg/systemapi"
|
|
"github.com/hashicorp/go-multierror"
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/util/workqueue"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller"
|
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
)
|
|
|
|
type reconcilePhase struct {
|
|
Name string
|
|
Reconcile func(context.Context, *zitadelv1alpha1.Cluster) (ctrl.Result, error)
|
|
}
|
|
|
|
type patcher func(*zitadelv1alpha1.ClusterStatus) error
|
|
|
|
// ClusterReconciler reconciles a Cluster object
|
|
type ClusterReconciler struct {
|
|
client.Client
|
|
Scheme *runtime.Scheme
|
|
ConditionReady *condition.Ready
|
|
Builder *builder.Builder
|
|
SecretReconciler *secret.SecretReconciler
|
|
ConfigMapReconciler *configmap.ConfigMapReconciler
|
|
ServiceReconciler *service.ServiceReconciler
|
|
RefResolver *zitadelv1alpha1.RefResolver
|
|
}
|
|
|
|
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;patch
|
|
// +kubebuilder:rbac:groups="",resources=services,verbs=list;watch;create;patch
|
|
// +kubebuilder:rbac:groups="",resources=secrets,verbs=list;watch;create;patch
|
|
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=create;patch;get;list;watch
|
|
// +kubebuilder:rbac:groups="",resources=endpoints/restricted,verbs=create;patch;get;list;watch
|
|
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;delete
|
|
// +kubebuilder:rbac:groups="",resources=events,verbs=list;watch;create;patch
|
|
// +kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=list;watch;create;patch
|
|
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=list;watch;create;patch
|
|
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=list;watch;create;patch
|
|
// +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=list;watch;create;patch
|
|
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=roles;rolebindings;clusterrolebindings,verbs=list;watch;create;patch
|
|
// +kubebuilder:rbac:groups=zitadel.github.com,resources=clusters,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=zitadel.github.com,resources=clusters/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups=zitadel.github.com,resources=clusters/finalizers,verbs=update
|
|
// +kubebuilder:rbac:groups=zitadel.github.com,resources=instances,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=zitadel.github.com,resources=instances/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups=zitadel.github.com,resources=instances/finalizers,verbs=update
|
|
// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters/finalizers,verbs=update
|
|
// +kubebuilder:rbac:groups=certificates.k8s.io,resources=certificatesigningrequests,verbs=get;list;watch;create;patch;delete
|
|
// +kubebuilder:rbac:groups=certificates.k8s.io,resources=certificatesigningrequests/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups=certificates.k8s.io,resources=certificatesigningrequests/approval,verbs=update
|
|
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
|
|
|
|
func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
|
logger := log.FromContext(ctx)
|
|
logger.Info("Starting Reconcile")
|
|
|
|
var zitadel zitadelv1alpha1.Cluster
|
|
|
|
if err := r.Get(ctx, req.NamespacedName, &zitadel); err != nil {
|
|
return ctrl.Result{}, client.IgnoreNotFound(err)
|
|
}
|
|
|
|
phases := []reconcilePhase{
|
|
{
|
|
Name: "Spec",
|
|
Reconcile: r.setSpecDefaults,
|
|
},
|
|
{
|
|
Name: "Status",
|
|
Reconcile: r.setStatusDefaults,
|
|
},
|
|
{
|
|
Name: "MasterkeySecret",
|
|
Reconcile: r.reconcileMasterKeySecret,
|
|
},
|
|
{
|
|
Name: "ServiceAccount",
|
|
Reconcile: r.reconcileSystemAPIUser,
|
|
},
|
|
{
|
|
Name: "Configuration",
|
|
Reconcile: r.reconcileConfig,
|
|
},
|
|
{
|
|
Name: "InitJob",
|
|
Reconcile: r.reconcileInitJob,
|
|
},
|
|
{
|
|
Name: "SetupJob",
|
|
Reconcile: r.reconcileSetupJob,
|
|
},
|
|
{
|
|
Name: "Deployment",
|
|
Reconcile: r.reconcileDeployment,
|
|
},
|
|
{
|
|
Name: "Service",
|
|
Reconcile: r.reconcileService,
|
|
},
|
|
}
|
|
|
|
for _, p := range phases {
|
|
result, err := p.Reconcile(ctx, &zitadel)
|
|
if err != nil {
|
|
if errors.IsNotFound(err) {
|
|
continue
|
|
}
|
|
|
|
var errBundle *multierror.Error
|
|
errBundle = multierror.Append(errBundle, err)
|
|
|
|
msg := fmt.Sprintf("Error reconciling %s: %v", p.Name, err)
|
|
patchErr := r.patchStatus(ctx, &zitadel, func(s *zitadelv1alpha1.ClusterStatus) error {
|
|
patcher := r.ConditionReady.PatcherFailed(msg)
|
|
patcher(s)
|
|
return nil
|
|
})
|
|
if errors.IsNotFound(patchErr) {
|
|
errBundle = multierror.Append(errBundle, patchErr)
|
|
}
|
|
|
|
if err := errBundle.ErrorOrNil(); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error reconciling %s: %v", p.Name, err)
|
|
}
|
|
}
|
|
if !result.IsZero() {
|
|
return result, err
|
|
}
|
|
}
|
|
|
|
if err := r.patchStatus(ctx, &zitadel, r.patcher(ctx, &zitadel)); err != nil && !errors.IsNotFound(err) {
|
|
return ctrl.Result{}, err
|
|
}
|
|
return ctrl.Result{RequeueAfter: 15 * time.Minute}, nil
|
|
}
|
|
|
|
func (r *ClusterReconciler) setSpecDefaults(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
|
|
return ctrl.Result{}, r.patch(ctx, zitadel, func(zit *zitadelv1alpha1.Cluster) {
|
|
zit.SetDefaults()
|
|
})
|
|
}
|
|
|
|
func (r *ClusterReconciler) setStatusDefaults(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
|
|
return ctrl.Result{}, r.patchStatus(ctx, zitadel, func(status *zitadelv1alpha1.ClusterStatus) error {
|
|
status.FillWithDefaults(zitadel)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (r *ClusterReconciler) reconcileMasterKeySecret(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
|
|
secretName := masterkey.MasterKeyName(zitadel)
|
|
key := types.NamespacedName{
|
|
Name: secretName,
|
|
Namespace: zitadel.Namespace,
|
|
}
|
|
_, err := r.SecretReconciler.ReconcileRandomPassword(ctx, key, masterkey.Key, zitadel)
|
|
|
|
if err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func (r *ClusterReconciler) reconcileSystemAPIUser(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
|
|
secretName := systemapiaccount.SystemAPIAccountName(zitadel)
|
|
key := types.NamespacedName{
|
|
Name: secretName,
|
|
Namespace: zitadel.Namespace,
|
|
}
|
|
_, err := r.SecretReconciler.ReconcileRandomPrivateRSA(ctx, key, systemapiaccount.Key, zitadel)
|
|
|
|
if err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func (r *ClusterReconciler) reconcileConfig(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
|
|
postgres, err := r.RefResolver.PostgreSQLClusterRef(ctx, &zitadel.Spec.PostgreSQLClusterRef, zitadel.Namespace)
|
|
if err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
configName := configuration.ConfigurationName(zitadel)
|
|
key := types.NamespacedName{
|
|
Name: configName,
|
|
Namespace: zitadel.Namespace,
|
|
}
|
|
privateKeyData, err := r.RefResolver.SecretKeyRef(ctx, corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: systemapiaccount.SystemAPIAccountName(zitadel)}, Key: systemapiaccount.Key}, zitadel.Namespace)
|
|
if err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
pemBlock, _ := pem.Decode([]byte(privateKeyData))
|
|
if pemBlock == nil {
|
|
return ctrl.Result{}, fmt.Errorf("failed to decode PEM block")
|
|
}
|
|
privateKey, err := x509.ParsePKCS1PrivateKey(pemBlock.Bytes)
|
|
publicKeyBytes, err := x509.MarshalPKIXPublicKey(&privateKey.PublicKey)
|
|
if err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
publicKeyPem := pem.EncodeToMemory(
|
|
&pem.Block{
|
|
Type: "RSA PUBLIC KEY",
|
|
Bytes: publicKeyBytes,
|
|
},
|
|
)
|
|
base64key := base64.StdEncoding.EncodeToString(publicKeyPem)
|
|
err = r.ConfigMapReconciler.ReconcileZitadelConfiguration(ctx, key, zitadel, postgres, base64key)
|
|
|
|
if err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func (r *ClusterReconciler) reconcileInitJob(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
|
|
key := client.ObjectKeyFromObject(zitadel)
|
|
key.Name = "init-job-" + key.Name
|
|
|
|
// Build the desired job
|
|
desiredInitJob, err := r.Builder.BuildInitJob(zitadel, key)
|
|
if err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error building InitJob: %v", err)
|
|
}
|
|
|
|
var existingJob batchv1.Job
|
|
err = r.Get(ctx, key, &existingJob)
|
|
if err != nil {
|
|
if !errors.IsNotFound(err) {
|
|
return ctrl.Result{}, fmt.Errorf("error getting InitJob: %v", err)
|
|
}
|
|
// If job is not found, create the job
|
|
if err := r.Create(ctx, desiredInitJob); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error creating InitJob: %v", err)
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// Compare the image in the existing job with the desired image
|
|
existingImage := existingJob.Spec.Template.Spec.Containers[0].Image
|
|
desiredImage := desiredInitJob.Spec.Template.Spec.Containers[0].Image
|
|
|
|
// If the images don't match, delete the existing job and wait for deletion
|
|
if existingImage != desiredImage {
|
|
|
|
if err := r.Delete(ctx, &existingJob); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error deleting existing InitJob: %v", err)
|
|
}
|
|
|
|
// Wait for the job to be fully deleted before creating a new one
|
|
for {
|
|
err := r.Get(ctx, key, &existingJob)
|
|
if errors.IsNotFound(err) {
|
|
break // Job has been deleted, we can proceed
|
|
}
|
|
if err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error checking if InitJob is deleted: %v", err)
|
|
}
|
|
// Sleep for a short interval to avoid tight loop
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
// Now create the new job
|
|
if err := r.Create(ctx, desiredInitJob); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error creating new InitJob: %v", err)
|
|
}
|
|
}
|
|
if err := r.Get(ctx, key, &existingJob); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error fetching existing InitJob status: %v", err)
|
|
}
|
|
|
|
if existingJob.Status.Succeeded != 1 { // Replace with actual success condition
|
|
return ctrl.Result{}, nil
|
|
}
|
|
// If the job exists and the image matches, no action is needed
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func (r *ClusterReconciler) reconcileSetupJob(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
|
|
key := client.ObjectKeyFromObject(zitadel)
|
|
key.Name = "setup-job-" + key.Name
|
|
|
|
// Build the desired job
|
|
desiredSetupJob, err := r.Builder.BuildSetupJob(zitadel, key)
|
|
if err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error building SetupJob: %v", err)
|
|
}
|
|
|
|
var existingJob batchv1.Job
|
|
err = r.Get(ctx, key, &existingJob)
|
|
if err != nil {
|
|
if !errors.IsNotFound(err) {
|
|
return ctrl.Result{}, fmt.Errorf("error getting SetupJob: %v", err)
|
|
}
|
|
// If job is not found, create the job
|
|
if err := r.Create(ctx, desiredSetupJob); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error creating SetupJob: %v", err)
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// Compare the image in the existing job with the desired image
|
|
existingImage := existingJob.Spec.Template.Spec.Containers[0].Image
|
|
desiredImage := desiredSetupJob.Spec.Template.Spec.Containers[0].Image
|
|
|
|
// If the images don't match, delete the existing job and wait for deletion
|
|
if existingImage != desiredImage {
|
|
|
|
if err := r.Delete(ctx, &existingJob); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error deleting existing SetupJob: %v", err)
|
|
}
|
|
|
|
// Wait for the job to be fully deleted before creating a new one
|
|
for {
|
|
err := r.Get(ctx, key, &existingJob)
|
|
if errors.IsNotFound(err) {
|
|
break // Job has been deleted, we can proceed
|
|
}
|
|
if err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error checking if SetupJob is deleted: %v", err)
|
|
}
|
|
// Sleep for a short interval to avoid tight loop
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
// Now create the new job
|
|
if err := r.Create(ctx, desiredSetupJob); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error creating new SetupJob: %v", err)
|
|
}
|
|
}
|
|
if err := r.Get(ctx, key, &existingJob); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error fetching existing SetupJob status: %v", err)
|
|
}
|
|
|
|
if existingJob.Status.Succeeded != 1 { // Replace with actual success condition
|
|
return ctrl.Result{}, nil
|
|
}
|
|
// If the job exists and the image matches, no action is needed
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func (r *ClusterReconciler) reconcileDeployment(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
|
|
// TODO: Reload on config changed
|
|
key := client.ObjectKeyFromObject(zitadel)
|
|
desiredSts, err := r.Builder.BuildDeployment(zitadel, key)
|
|
if err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error building Deployment: %v", err)
|
|
}
|
|
var existingDep appsv1.Deployment
|
|
if err := r.Get(ctx, key, &existingDep); err != nil {
|
|
if !errors.IsNotFound(err) {
|
|
return ctrl.Result{}, fmt.Errorf("error getting Deployment: %v", err)
|
|
}
|
|
if err := r.Create(ctx, desiredSts); err != nil {
|
|
return ctrl.Result{}, fmt.Errorf("error creating Deployment: %v", err)
|
|
}
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
patch := client.MergeFrom(existingDep.DeepCopy())
|
|
existingDep.Spec.Template = desiredSts.Spec.Template
|
|
existingDep.Spec.Replicas = desiredSts.Spec.Replicas
|
|
return ctrl.Result{}, r.Patch(ctx, &existingDep, patch)
|
|
}
|
|
|
|
func (r *ClusterReconciler) reconcileService(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) (ctrl.Result, error) {
|
|
return ctrl.Result{}, r.reconcileDefaultService(ctx, zitadel)
|
|
}
|
|
|
|
func (r *ClusterReconciler) reconcileDefaultService(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) error {
|
|
key := client.ObjectKeyFromObject(zitadel)
|
|
opts := builder.ServiceOpts{
|
|
Ports: []corev1.ServicePort{
|
|
{
|
|
Name: deployment.ZitadelName,
|
|
Port: deployment.ZitadelPort,
|
|
AppProtocol: "h2c",
|
|
},
|
|
},
|
|
}
|
|
desiredSvc, err := r.Builder.BuildService(zitadel, key, opts)
|
|
if err != nil {
|
|
return fmt.Errorf("error building Service: %v", err)
|
|
}
|
|
return r.ServiceReconciler.Reconcile(ctx, desiredSvc)
|
|
}
|
|
|
|
func (r *ClusterReconciler) patchStatus(ctx context.Context, zitadel *zitadelv1alpha1.Cluster,
|
|
patcher patcher) error {
|
|
patch := client.MergeFrom(zitadel.DeepCopy())
|
|
if err := patcher(&zitadel.Status); err != nil {
|
|
return err
|
|
}
|
|
return r.Status().Patch(ctx, zitadel, patch)
|
|
}
|
|
|
|
func (r *ClusterReconciler) patcher(ctx context.Context, zitadel *zitadelv1alpha1.Cluster) patcher {
|
|
return func(s *zitadelv1alpha1.ClusterStatus) error {
|
|
var sts appsv1.Deployment
|
|
if err := r.Get(ctx, client.ObjectKeyFromObject(zitadel), &sts); err != nil {
|
|
return err
|
|
}
|
|
zitadel.Status.Replicas = sts.Status.ReadyReplicas
|
|
|
|
condition.SetReadyWithDeployment(&zitadel.Status, &sts)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (r *ClusterReconciler) patch(ctx context.Context, zitadel *zitadelv1alpha1.Cluster,
|
|
patcher func(*zitadelv1alpha1.Cluster)) error {
|
|
patch := client.MergeFrom(zitadel.DeepCopy())
|
|
patcher(zitadel)
|
|
return r.Patch(ctx, zitadel, patch)
|
|
}
|
|
|
|
// SetupWithManager sets up the controller with the Manager.
|
|
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
For(&zitadelv1alpha1.Cluster{}).
|
|
Owns(&appsv1.Deployment{}).
|
|
Owns(&corev1.Service{}).
|
|
Owns(&corev1.ConfigMap{}).
|
|
Owns(&corev1.Secret{}).
|
|
WithOptions(controller.Options{RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](time.Millisecond*500, time.Minute*3)}).
|
|
Complete(r)
|
|
}
|