diff --git a/cmd/main.go b/cmd/main.go index 9b960c2bc..332623f0d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -283,15 +283,17 @@ func main() { os.Exit(1) } if err := (&controller.PostgresDatabaseReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("postgresdatabase-controller"), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PostgresDatabase") os.Exit(1) } if err := (&controller.PostgresClusterReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("postgrescluster-controller"), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PostgresCluster") os.Exit(1) diff --git a/internal/controller/postgrescluster_controller.go b/internal/controller/postgrescluster_controller.go index 163a07f9d..70b11c9e6 100644 --- a/internal/controller/postgrescluster_controller.go +++ b/internal/controller/postgrescluster_controller.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -41,7 +42,8 @@ const ( // PostgresClusterReconciler reconciles PostgresCluster resources. type PostgresClusterReconciler struct { client.Client - Scheme *runtime.Scheme + Scheme *runtime.Scheme + Recorder record.EventRecorder } // +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusters,verbs=get;list;watch;create;update;patch;delete @@ -52,9 +54,11 @@ type PostgresClusterReconciler struct { // +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters/status,verbs=get // +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=poolers,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=poolers/status,verbs=get +// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - return clustercore.PostgresClusterService(ctx, r.Client, r.Scheme, req) + rc := &clustercore.ReconcileContext{Client: r.Client, Scheme: r.Scheme, Recorder: r.Recorder} + return clustercore.PostgresClusterService(ctx, rc, req) } // SetupWithManager registers the controller and owned resource watches. diff --git a/internal/controller/postgresdatabase_controller.go b/internal/controller/postgresdatabase_controller.go index 40faa3eb3..0c6db9628 100644 --- a/internal/controller/postgresdatabase_controller.go +++ b/internal/controller/postgresdatabase_controller.go @@ -29,6 +29,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -41,7 +42,8 @@ import ( // PostgresDatabaseReconciler reconciles a PostgresDatabase object. type PostgresDatabaseReconciler struct { client.Client - Scheme *runtime.Scheme + Scheme *runtime.Scheme + Recorder record.EventRecorder } const ( @@ -56,6 +58,7 @@ const ( //+kubebuilder:rbac:groups=postgresql.cnpg.io,resources=databases,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;delete //+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;delete +//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch func (r *PostgresDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) @@ -68,7 +71,8 @@ func (r *PostgresDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Req } return ctrl.Result{}, err } - return dbcore.PostgresDatabaseService(ctx, r.Client, r.Scheme, postgresDB, dbadapter.NewDBRepository) + rc := &dbcore.ReconcileContext{Client: r.Client, Scheme: r.Scheme, Recorder: r.Recorder} + return dbcore.PostgresDatabaseService(ctx, rc, postgresDB, dbadapter.NewDBRepository) } // SetupWithManager sets up the controller with the Manager. @@ -104,9 +108,15 @@ func (r *PostgresDatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error { }, ), )). - Owns(&cnpgv1.Database{}). - Owns(&corev1.Secret{}). - Owns(&corev1.ConfigMap{}). + Owns(&cnpgv1.Database{}, builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(event.CreateEvent) bool { return false }, + })). + Owns(&corev1.Secret{}, builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(event.CreateEvent) bool { return false }, + })). + Owns(&corev1.ConfigMap{}, builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(event.CreateEvent) bool { return false }, + })). Named("postgresdatabase"). WithOptions(controller.Options{ MaxConcurrentReconciles: DatabaseTotalWorker, diff --git a/pkg/postgresql/cluster/core/cluster.go b/pkg/postgresql/cluster/core/cluster.go index d073b79cf..8642362fe 100644 --- a/pkg/postgresql/cluster/core/cluster.go +++ b/pkg/postgresql/cluster/core/cluster.go @@ -39,7 +39,8 @@ import ( ) // PostgresClusterService is the application service entry point called by the primary adapter (reconciler). -func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtime.Scheme, req ctrl.Request) (ctrl.Result, error) { +func PostgresClusterService(ctx context.Context, rc *ReconcileContext, req ctrl.Request) (ctrl.Result, error) { + c := rc.Client logger := log.FromContext(ctx) logger.Info("Reconciling PostgresCluster", "name", req.Name, "namespace", req.Namespace) @@ -67,12 +68,13 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim } // Finalizer handling must come before any other processing. - if err := handleFinalizer(ctx, c, scheme, postgresCluster, secret); err != nil { + if err := handleFinalizer(ctx, rc, postgresCluster, secret); err != nil { if apierrors.IsNotFound(err) { logger.Info("PostgresCluster already deleted, skipping finalizer update") return ctrl.Result{}, nil } logger.Error(err, "Failed to handle finalizer") + rc.emitWarning(postgresCluster, EventCleanupFailed, fmt.Sprintf("Cleanup failed: %v", err)) errs := []error{err} if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterDeleteFailed, fmt.Sprintf("Failed to delete resources during cleanup: %v", err), failedClusterPhase); statusErr != nil { @@ -105,6 +107,7 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim clusterClass := &enterprisev4.PostgresClusterClass{} if err := c.Get(ctx, client.ObjectKey{Name: postgresCluster.Spec.Class}, clusterClass); err != nil { logger.Error(err, "Unable to fetch referenced PostgresClusterClass", "className", postgresCluster.Spec.Class) + rc.emitWarning(postgresCluster, EventClusterClassNotFound, fmt.Sprintf("ClusterClass %s not found", postgresCluster.Spec.Class)) if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterClassNotFound, fmt.Sprintf("ClusterClass %s not found: %v", postgresCluster.Spec.Class, err), failedClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") @@ -116,6 +119,7 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim mergedConfig, err := getMergedConfig(clusterClass, postgresCluster) if err != nil { logger.Error(err, "Failed to merge PostgresCluster configuration") + rc.emitWarning(postgresCluster, EventConfigMergeFailed, fmt.Sprintf("Failed to merge configuration: %v", err)) if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonInvalidConfiguration, fmt.Sprintf("Failed to merge configuration: %v", err), failedClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") @@ -135,6 +139,7 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim secretExists, secretErr := clusterSecretExists(ctx, c, postgresCluster.Namespace, postgresSecretName, secret) if secretErr != nil { logger.Error(secretErr, "Failed to check if PostgresCluster secret exists", "name", postgresSecretName) + rc.emitWarning(postgresCluster, EventSecretReconcileFailed, fmt.Sprintf("Failed to check secret existence: %v", secretErr)) if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonUserSecretFailed, fmt.Sprintf("Failed to check secret existence: %v", secretErr), failedClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") @@ -143,8 +148,9 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim } if !secretExists { logger.Info("Creating PostgresCluster secret", "name", postgresSecretName) - if err := ensureClusterSecret(ctx, c, scheme, postgresCluster, postgresSecretName, secret); err != nil { + if err := ensureClusterSecret(ctx, c, rc.Scheme, postgresCluster, postgresSecretName, secret); err != nil { logger.Error(err, "Failed to ensure PostgresCluster secret", "name", postgresSecretName) + rc.emitWarning(postgresCluster, EventSecretReconcileFailed, fmt.Sprintf("Failed to generate cluster secret: %v", err)) if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonUserSecretFailed, fmt.Sprintf("Failed to generate PostgresCluster secret: %v", err), failedClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") @@ -159,23 +165,26 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim logger.Error(err, "Failed to update status after secret creation") return ctrl.Result{}, err } + rc.emitNormal(postgresCluster, EventSecretReady, fmt.Sprintf("Superuser secret %s created", postgresSecretName)) logger.Info("SuperUserSecretRef persisted to status") } // Re-attach ownerRef if it was stripped (e.g. by a Retain-policy deletion of a previous cluster). - hasOwnerRef, ownerRefErr := controllerutil.HasOwnerReference(secret.GetOwnerReferences(), postgresCluster, scheme) + hasOwnerRef, ownerRefErr := controllerutil.HasOwnerReference(secret.GetOwnerReferences(), postgresCluster, rc.Scheme) if ownerRefErr != nil { logger.Error(ownerRefErr, "Failed to check owner reference on Secret") return ctrl.Result{}, fmt.Errorf("failed to check owner reference on secret: %w", ownerRefErr) } if secretExists && !hasOwnerRef { logger.Info("Connecting existing secret to PostgresCluster by adding owner reference", "name", postgresSecretName) + rc.emitNormal(postgresCluster, EventClusterAdopted, fmt.Sprintf("Adopted existing CNPG cluster and secret %s", postgresSecretName)) originalSecret := secret.DeepCopy() - if err := ctrl.SetControllerReference(postgresCluster, secret, scheme); err != nil { + if err := ctrl.SetControllerReference(postgresCluster, secret, rc.Scheme); err != nil { return ctrl.Result{}, fmt.Errorf("failed to set controller reference on existing secret: %w", err) } if err := patchObject(ctx, c, originalSecret, secret, "Secret"); err != nil { logger.Error(err, "Failed to patch existing secret with controller reference") + rc.emitWarning(postgresCluster, EventSecretReconcileFailed, fmt.Sprintf("Failed to patch existing secret: %v", err)) if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonSuperUserSecretFailed, fmt.Sprintf("Failed to patch existing secret: %v", err), failedClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") @@ -201,15 +210,17 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim switch { case apierrors.IsNotFound(err): logger.Info("CNPG Cluster not found, creating", "name", postgresCluster.Name) - newCluster := buildCNPGCluster(scheme, postgresCluster, mergedConfig, postgresSecretName) + newCluster := buildCNPGCluster(rc.Scheme, postgresCluster, mergedConfig, postgresSecretName) if err := c.Create(ctx, newCluster); err != nil { logger.Error(err, "Failed to create CNPG Cluster") + rc.emitWarning(postgresCluster, EventClusterCreateFailed, fmt.Sprintf("Failed to create CNPG cluster: %v", err)) if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterBuildFailed, fmt.Sprintf("Failed to create CNPG Cluster: %v", err), failedClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") } return ctrl.Result{}, err } + rc.emitNormal(postgresCluster, EventClusterCreationStarted, "CNPG cluster created, waiting for healthy state") if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterBuildSucceeded, "CNPG Cluster created", pendingClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") @@ -241,12 +252,19 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim return ctrl.Result{Requeue: true}, nil case patchErr != nil: logger.Error(patchErr, "Failed to patch CNPG Cluster", "name", cnpgCluster.Name) + rc.emitWarning(postgresCluster, EventClusterUpdateFailed, fmt.Sprintf("Failed to patch CNPG cluster: %v", patchErr)) if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterPatchFailed, fmt.Sprintf("Failed to patch CNPG Cluster: %v", patchErr), failedClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") } return ctrl.Result{}, patchErr default: + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterBuildSucceeded, + "CNPG Cluster spec updated, waiting for healthy state", provisioningClusterPhase); statusErr != nil { + logger.Error(statusErr, "Failed to update status after patch") + return ctrl.Result{Requeue: true}, nil + } + rc.emitNormal(postgresCluster, EventClusterUpdateStarted, "CNPG cluster spec updated, waiting for healthy state") logger.Info("CNPG Cluster patched successfully, requeueing for status update", "name", cnpgCluster.Name) return ctrl.Result{RequeueAfter: retryDelay}, nil } @@ -255,6 +273,7 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim // Reconcile ManagedRoles. if err := reconcileManagedRoles(ctx, c, postgresCluster, cnpgCluster); err != nil { logger.Error(err, "Failed to reconcile managed roles") + rc.emitWarning(postgresCluster, EventManagedRolesFailed, fmt.Sprintf("Failed to reconcile managed roles: %v", err)) if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonManagedRolesFailed, fmt.Sprintf("Failed to reconcile managed roles: %v", err), failedClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") @@ -320,14 +339,16 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim } return ctrl.Result{RequeueAfter: retryDelay}, nil } - if err := createOrUpdateConnectionPoolers(ctx, c, scheme, postgresCluster, mergedConfig, cnpgCluster); err != nil { + if err := createOrUpdateConnectionPoolers(ctx, c, rc.Scheme, postgresCluster, mergedConfig, cnpgCluster); err != nil { logger.Error(err, "Failed to reconcile connection pooler") + rc.emitWarning(postgresCluster, EventPoolerReconcileFailed, fmt.Sprintf("Failed to reconcile connection pooler: %v", err)) if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, fmt.Sprintf("Failed to reconcile connection pooler: %v", err), failedClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") } return ctrl.Result{}, err } + rc.emitNormal(postgresCluster, EventPoolerCreationStarted, "Connection poolers created, waiting for readiness") logger.Info("Connection Poolers created, requeueing to check readiness") if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerCreating, "Connection poolers are being provisioned", provisioningClusterPhase); statusErr != nil { @@ -359,22 +380,27 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim return ctrl.Result{RequeueAfter: retryDelay}, nil default: + oldConditions := make([]metav1.Condition, len(postgresCluster.Status.Conditions)) + copy(oldConditions, postgresCluster.Status.Conditions) if err := syncPoolerStatus(ctx, c, postgresCluster); err != nil { logger.Error(err, "Failed to sync pooler status") + rc.emitWarning(postgresCluster, EventPoolerReconcileFailed, fmt.Sprintf("Failed to sync pooler status: %v", err)) if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, fmt.Sprintf("Failed to sync pooler status: %v", err), failedClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") } return ctrl.Result{}, err } + rc.emitPoolerReadyTransition(postgresCluster, oldConditions) } // Reconcile ConfigMap when CNPG cluster is healthy. if cnpgCluster.Status.Phase == cnpgv1.PhaseHealthy { logger.Info("CNPG Cluster is ready, reconciling ConfigMap for connection details") - desiredCM, err := generateConfigMap(ctx, c, scheme, postgresCluster, cnpgCluster, postgresSecretName) + desiredCM, err := generateConfigMap(ctx, c, rc.Scheme, postgresCluster, cnpgCluster, postgresSecretName) if err != nil { logger.Error(err, "Failed to generate ConfigMap") + rc.emitWarning(postgresCluster, EventConfigMapReconcileFailed, fmt.Sprintf("Failed to reconcile ConfigMap: %v", err)) if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonConfigMapFailed, fmt.Sprintf("Failed to generate ConfigMap: %v", err), failedClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") @@ -387,7 +413,7 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim cm.Annotations = desiredCM.Annotations cm.Labels = desiredCM.Labels if !metav1.IsControlledBy(cm, postgresCluster) { - if err := ctrl.SetControllerReference(postgresCluster, cm, scheme); err != nil { + if err := ctrl.SetControllerReference(postgresCluster, cm, rc.Scheme); err != nil { return fmt.Errorf("set controller reference failed: %w", err) } } @@ -395,6 +421,7 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim }) if err != nil { logger.Error(err, "Failed to reconcile ConfigMap", "name", desiredCM.Name) + rc.emitWarning(postgresCluster, EventConfigMapReconcileFailed, fmt.Sprintf("Failed to reconcile ConfigMap: %v", err)) if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonConfigMapFailed, fmt.Sprintf("Failed to reconcile ConfigMap: %v", err), failedClusterPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") @@ -403,8 +430,10 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim } switch createOrUpdateResult { case controllerutil.OperationResultCreated: + rc.emitNormal(postgresCluster, EventConfigMapReady, fmt.Sprintf("ConfigMap %s created", desiredCM.Name)) logger.Info("ConfigMap created", "name", desiredCM.Name) case controllerutil.OperationResultUpdated: + rc.emitNormal(postgresCluster, EventConfigMapReady, fmt.Sprintf("ConfigMap %s updated", desiredCM.Name)) logger.Info("ConfigMap updated", "name", desiredCM.Name) default: logger.Info("ConfigMap unchanged", "name", desiredCM.Name) @@ -415,6 +444,10 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim } // Final status sync. + var oldPhase string + if postgresCluster.Status.Phase != nil { + oldPhase = *postgresCluster.Status.Phase + } if err := syncStatus(ctx, c, postgresCluster, cnpgCluster); err != nil { logger.Error(err, "Failed to sync status") if apierrors.IsConflict(err) { @@ -423,6 +456,11 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim } return ctrl.Result{}, fmt.Errorf("failed to sync status: %w", err) } + var newPhase string + if postgresCluster.Status.Phase != nil { + newPhase = *postgresCluster.Status.Phase + } + rc.emitClusterPhaseTransition(postgresCluster, oldPhase, newPhase) if cnpgCluster.Status.Phase == cnpgv1.PhaseHealthy { rwPooler := &cnpgv1.Pooler{} rwErr := c.Get(ctx, types.NamespacedName{ @@ -436,7 +474,10 @@ func PostgresClusterService(ctx context.Context, c client.Client, scheme *runtim }, roPooler) if rwErr == nil && roErr == nil && arePoolersReady(rwPooler, roPooler) { logger.Info("Poolers are ready, syncing pooler status") + poolerOldConditions := make([]metav1.Condition, len(postgresCluster.Status.Conditions)) + copy(poolerOldConditions, postgresCluster.Status.Conditions) _ = syncPoolerStatus(ctx, c, postgresCluster) + rc.emitPoolerReadyTransition(postgresCluster, poolerOldConditions) } } logger.Info("Reconciliation complete") @@ -924,7 +965,9 @@ func deleteCNPGCluster(ctx context.Context, c client.Client, cnpgCluster *cnpgv1 // handleFinalizer processes deletion cleanup: removes poolers, then deletes or orphans the CNPG Cluster // based on ClusterDeletionPolicy, then removes the finalizer. -func handleFinalizer(ctx context.Context, c client.Client, scheme *runtime.Scheme, cluster *enterprisev4.PostgresCluster, secret *corev1.Secret) error { +func handleFinalizer(ctx context.Context, rc *ReconcileContext, cluster *enterprisev4.PostgresCluster, secret *corev1.Secret) error { + c := rc.Client + scheme := rc.Scheme logger := log.FromContext(ctx) if cluster.GetDeletionTimestamp() == nil { logger.Info("PostgresCluster not marked for deletion, skipping finalizer logic") @@ -947,17 +990,16 @@ func handleFinalizer(ctx context.Context, c client.Client, scheme *runtime.Schem } logger.Info("Processing finalizer cleanup for PostgresCluster") - if err := deleteConnectionPoolers(ctx, c, cluster); err != nil { - logger.Error(err, "Failed to delete connection poolers during cleanup") - return fmt.Errorf("failed to delete connection poolers: %w", err) - } - - // Dereference *string — empty string falls through to default (unknown policy). policy := "" if cluster.Spec.ClusterDeletionPolicy != nil { policy = *cluster.Spec.ClusterDeletionPolicy } + if err := deleteConnectionPoolers(ctx, c, cluster); err != nil { + logger.Error(err, "Failed to delete connection poolers during cleanup") + return fmt.Errorf("failed to delete connection poolers: %w", err) + } + switch policy { case clusterDeletionPolicyDelete: logger.Info("ClusterDeletionPolicy is 'Delete', deleting CNPG Cluster and associated resources") @@ -1024,6 +1066,7 @@ func handleFinalizer(ctx context.Context, c client.Client, scheme *runtime.Schem logger.Error(err, "Failed to remove finalizer from PostgresCluster") return fmt.Errorf("failed to remove finalizer: %w", err) } + rc.emitNormal(cluster, EventCleanupComplete, fmt.Sprintf("Cleanup complete (policy: %s)", policy)) logger.Info("Finalizer removed, cleanup complete") return nil } diff --git a/pkg/postgresql/cluster/core/events.go b/pkg/postgresql/cluster/core/events.go new file mode 100644 index 000000000..afcfd768e --- /dev/null +++ b/pkg/postgresql/cluster/core/events.go @@ -0,0 +1,62 @@ +package core + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + EventSecretReady = "SecretReady" + EventConfigMapReady = "ConfigMapReady" + EventClusterAdopted = "ClusterAdopted" + EventClusterCreationStarted = "ClusterCreationStarted" + EventClusterUpdateStarted = "ClusterUpdateStarted" + EventClusterReady = "ClusterReady" + EventPoolerCreationStarted = "PoolerCreationStarted" + EventPoolerReady = "PoolerReady" + EventCleanupComplete = "CleanupComplete" + EventClusterClassNotFound = "ClusterClassNotFound" + EventConfigMergeFailed = "ConfigMergeFailed" + EventSecretReconcileFailed = "SecretReconcileFailed" + EventClusterCreateFailed = "ClusterCreateFailed" + EventClusterUpdateFailed = "ClusterUpdateFailed" + EventManagedRolesFailed = "ManagedRolesFailed" + EventPoolerReconcileFailed = "PoolerReconcileFailed" + EventConfigMapReconcileFailed = "ConfigMapReconcileFailed" + EventClusterDegraded = "ClusterDegraded" + EventCleanupFailed = "CleanupFailed" +) + +func (rc *ReconcileContext) emitNormal(obj client.Object, reason, message string) { + rc.Recorder.Event(obj, corev1.EventTypeNormal, reason, message) +} + +func (rc *ReconcileContext) emitWarning(obj client.Object, reason, message string) { + rc.Recorder.Event(obj, corev1.EventTypeWarning, reason, message) +} + +// emitClusterPhaseTransition emits ClusterReady or ClusterDegraded only on +// actual phase transitions. Provisioning and Configuring are expected phases +// after our own create/update operations, so they don't emit ClusterDegraded. +func (rc *ReconcileContext) emitClusterPhaseTransition(obj client.Object, oldPhase, newPhase string) { + switch { + case oldPhase != string(readyClusterPhase) && newPhase == string(readyClusterPhase): + rc.emitNormal(obj, EventClusterReady, "Cluster is up and running") + // only when cluster degraded from ready but not to provisioning or configuring + case oldPhase == string(readyClusterPhase) && newPhase != string(readyClusterPhase) && + newPhase != string(provisioningClusterPhase) && newPhase != string(configuringClusterPhase): + rc.emitWarning(obj, EventClusterDegraded, fmt.Sprintf("Cluster entered phase: %s", newPhase)) + } +} + +// emitPoolerReadyTransition emits PoolerReady only when the condition was not +// previously True — prevents re-emission on every reconcile while already ready. +func (rc *ReconcileContext) emitPoolerReadyTransition(obj client.Object, conditions []metav1.Condition) { + if !meta.IsStatusConditionTrue(conditions, string(poolerReady)) { + rc.emitNormal(obj, EventPoolerReady, "Connection poolers are ready") + } +} diff --git a/pkg/postgresql/cluster/core/types.go b/pkg/postgresql/cluster/core/types.go index 19886fd73..042a5ae82 100644 --- a/pkg/postgresql/cluster/core/types.go +++ b/pkg/postgresql/cluster/core/types.go @@ -5,8 +5,20 @@ import ( enterprisev4 "github.com/splunk/splunk-operator/api/v4" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" ) +// ReconcileContext bundles infrastructure dependencies injected by the controller +// shell (primary adapter). The service layer declares what it needs via this struct +// rather than reaching into context — keeping ports explicit and testable. +type ReconcileContext struct { + Client client.Client + Scheme *runtime.Scheme + Recorder record.EventRecorder +} + // normalizedCNPGClusterSpec is a subset of cnpgv1.ClusterSpec fields used for drift detection. // Only fields we set in buildCNPGClusterSpec are included — CNPG-injected defaults are excluded // to avoid false-positive drift on every reconcile. diff --git a/pkg/postgresql/database/core/database.go b/pkg/postgresql/database/core/database.go index 483076774..19f9b7ced 100644 --- a/pkg/postgresql/database/core/database.go +++ b/pkg/postgresql/database/core/database.go @@ -32,11 +32,11 @@ type NewDBRepoFunc func(ctx context.Context, host, dbName, password string) (DBR // newDBRepo is injected to keep the core free of pgx imports. func PostgresDatabaseService( ctx context.Context, - c client.Client, - scheme *runtime.Scheme, + rc *ReconcileContext, postgresDB *enterprisev4.PostgresDatabase, newDBRepo NewDBRepoFunc, ) (ctrl.Result, error) { + c := rc.Client logger := log.FromContext(ctx) logger.Info("Reconciling PostgresDatabase", "name", postgresDB.Name, "namespace", postgresDB.Namespace) @@ -46,8 +46,9 @@ func PostgresDatabaseService( // Finalizer: cleanup on deletion, register on creation. if postgresDB.GetDeletionTimestamp() != nil { - if err := handleDeletion(ctx, c, postgresDB); err != nil { + if err := handleDeletion(ctx, rc, postgresDB); err != nil { logger.Error(err, "Cleanup failed for PostgresDatabase") + rc.emitWarning(postgresDB, EventCleanupFailed, fmt.Sprintf("Cleanup failed: %v", err)) return ctrl.Result{}, err } return ctrl.Result{}, nil @@ -77,6 +78,7 @@ func PostgresDatabaseService( cluster, err := fetchCluster(ctx, c, postgresDB) if err != nil { if errors.IsNotFound(err) { + rc.emitWarning(postgresDB, EventClusterNotFound, fmt.Sprintf("PostgresCluster %s not found", postgresDB.Spec.ClusterRef.Name)) if err := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterNotFound, "Cluster CR not found", pendingDBPhase); err != nil { return ctrl.Result{}, err } @@ -93,12 +95,14 @@ func PostgresDatabaseService( switch clusterStatus { case ClusterNotReady, ClusterNoProvisionerRef: + rc.emitWarning(postgresDB, EventClusterNotReady, "Referenced PostgresCluster is not ready yet") if err := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterProvisioning, "Cluster is not in ready state yet", pendingDBPhase); err != nil { return ctrl.Result{}, err } return ctrl.Result{RequeueAfter: retryDelay}, nil case ClusterReady: + rc.emitOnConditionTransition(postgresDB, postgresDB.Status.Conditions, clusterReady, EventClusterValidated, "Referenced PostgresCluster is ready") if err := updateStatus(clusterReady, metav1.ConditionTrue, reasonClusterAvailable, "Cluster is operational", provisioningDBPhase); err != nil { return ctrl.Result{}, err } @@ -111,6 +115,7 @@ func PostgresDatabaseService( "If you deleted a previous PostgresDatabase, recreate it with the original name to re-adopt the orphaned resources.", strings.Join(roleConflicts, ", ")) logger.Error(nil, conflictMsg) + rc.emitWarning(postgresDB, EventRoleConflict, conflictMsg) errs := []error{fmt.Errorf("role conflict detected: %s", strings.Join(roleConflicts, ", "))} if statusErr := updateStatus(rolesReady, metav1.ConditionFalse, reasonRoleConflict, conflictMsg, failedDBPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") @@ -132,13 +137,15 @@ func PostgresDatabaseService( // Phase: CredentialProvisioning — secrets must exist before roles are patched. // CNPG rejects a PasswordSecretRef pointing at a missing secret. - if err := reconcileUserSecrets(ctx, c, scheme, postgresDB); err != nil { + if err := reconcileUserSecrets(ctx, c, rc.Scheme, postgresDB); err != nil { + rc.emitWarning(postgresDB, EventUserSecretsFailed, fmt.Sprintf("Failed to reconcile user secrets: %v", err)) if statusErr := updateStatus(secretsReady, metav1.ConditionFalse, reasonSecretsCreationFailed, fmt.Sprintf("Failed to reconcile user secrets: %v", err), provisioningDBPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") } return ctrl.Result{}, err } + rc.emitOnConditionTransition(postgresDB, postgresDB.Status.Conditions, secretsReady, EventSecretsReady, fmt.Sprintf("All secrets provisioned for %d databases", len(postgresDB.Spec.Databases))) if err := updateStatus(secretsReady, metav1.ConditionTrue, reasonSecretsCreated, fmt.Sprintf("All secrets provisioned for %d databases", len(postgresDB.Spec.Databases)), provisioningDBPhase); err != nil { return ctrl.Result{}, err @@ -147,13 +154,15 @@ func PostgresDatabaseService( // Phase: ConnectionMetadata — ConfigMaps carry connection info consumers need as soon // as databases are ready, so they are created alongside secrets. endpoints := resolveClusterEndpoints(cluster, cnpgCluster, postgresDB.Namespace) - if err := reconcileRoleConfigMaps(ctx, c, scheme, postgresDB, endpoints); err != nil { + if err := reconcileRoleConfigMaps(ctx, c, rc.Scheme, postgresDB, endpoints); err != nil { + rc.emitWarning(postgresDB, EventAccessConfigFailed, fmt.Sprintf("Failed to reconcile ConfigMaps: %v", err)) if statusErr := updateStatus(configMapsReady, metav1.ConditionFalse, reasonConfigMapsCreationFailed, fmt.Sprintf("Failed to reconcile ConfigMaps: %v", err), provisioningDBPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") } return ctrl.Result{}, err } + rc.emitOnConditionTransition(postgresDB, postgresDB.Status.Conditions, configMapsReady, EventConfigMapsReady, fmt.Sprintf("All ConfigMaps provisioned for %d databases", len(postgresDB.Spec.Databases))) if err := updateStatus(configMapsReady, metav1.ConditionTrue, reasonConfigMapsCreated, fmt.Sprintf("All ConfigMaps provisioned for %d databases", len(postgresDB.Spec.Databases)), provisioningDBPhase); err != nil { return ctrl.Result{}, err @@ -173,8 +182,10 @@ func PostgresDatabaseService( logger.Info("User spec changed, patching CNPG Cluster", "missing", missing) if err := patchManagedRoles(ctx, c, postgresDB, cluster); err != nil { logger.Error(err, "Failed to patch users in CNPG Cluster") + rc.emitWarning(postgresDB, EventManagedRolesPatchFailed, fmt.Sprintf("Failed to patch managed roles: %v", err)) return ctrl.Result{}, err } + rc.emitNormal(postgresDB, EventRoleReconciliationStarted, fmt.Sprintf("Patched managed roles, waiting for %d roles to reconcile", len(desiredUsers))) if err := updateStatus(rolesReady, metav1.ConditionFalse, reasonWaitingForCNPG, fmt.Sprintf("Waiting for %d roles to be reconciled", len(desiredUsers)), provisioningDBPhase); err != nil { return ctrl.Result{}, err @@ -184,6 +195,7 @@ func PostgresDatabaseService( notReadyRoles, err := verifyRolesReady(ctx, desiredUsers, cnpgCluster) if err != nil { + rc.emitWarning(postgresDB, EventRoleFailed, fmt.Sprintf("Role reconciliation failed: %v", err)) if statusErr := updateStatus(rolesReady, metav1.ConditionFalse, reasonUsersCreationFailed, fmt.Sprintf("Role creation failed: %v", err), failedDBPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") @@ -197,16 +209,22 @@ func PostgresDatabaseService( } return ctrl.Result{RequeueAfter: retryDelay}, nil } + rc.emitOnConditionTransition(postgresDB, postgresDB.Status.Conditions, rolesReady, EventRolesReady, fmt.Sprintf("All %d roles reconciled", len(desiredUsers))) if err := updateStatus(rolesReady, metav1.ConditionTrue, reasonUsersAvailable, fmt.Sprintf("All %d users in PostgreSQL", len(desiredUsers)), provisioningDBPhase); err != nil { return ctrl.Result{}, err } // Phase: DatabaseProvisioning - if err := reconcileCNPGDatabases(ctx, c, scheme, postgresDB, cluster); err != nil { + adopted, err := reconcileCNPGDatabases(ctx, c, rc.Scheme, postgresDB, cluster) + if err != nil { logger.Error(err, "Failed to reconcile CNPG Databases") + rc.emitWarning(postgresDB, EventDatabasesReconcileFailed, fmt.Sprintf("Failed to reconcile databases: %v", err)) return ctrl.Result{}, err } + if len(adopted) > 0 { + rc.emitNormal(postgresDB, EventResourcesAdopted, fmt.Sprintf("Adopted retained databases: %v", adopted)) + } notReadyDBs, err := verifyDatabasesReady(ctx, c, postgresDB) if err != nil { @@ -214,12 +232,14 @@ func PostgresDatabaseService( return ctrl.Result{}, err } if len(notReadyDBs) > 0 { + rc.emitOnceBeforeWait(postgresDB, postgresDB.Status.Conditions, databasesReady, EventDatabaseReconciliationStarted, fmt.Sprintf("Reconciling %d databases, waiting for readiness", len(postgresDB.Spec.Databases))) if err := updateStatus(databasesReady, metav1.ConditionFalse, reasonWaitingForCNPG, fmt.Sprintf("Waiting for databases to be ready: %v", notReadyDBs), provisioningDBPhase); err != nil { return ctrl.Result{}, err } return ctrl.Result{RequeueAfter: retryDelay}, nil } + rc.emitOnConditionTransition(postgresDB, postgresDB.Status.Conditions, databasesReady, EventDatabasesReady, fmt.Sprintf("All %d databases ready", len(postgresDB.Spec.Databases))) if err := updateStatus(databasesReady, metav1.ConditionTrue, reasonDatabasesAvailable, fmt.Sprintf("All %d databases ready", len(postgresDB.Spec.Databases)), readyDBPhase); err != nil { return ctrl.Result{}, err @@ -255,18 +275,21 @@ func PostgresDatabaseService( } if err := reconcileRWRolePrivileges(ctx, endpoints.RWHost, string(pw), dbNames, newDBRepo); err != nil { + rc.emitWarning(postgresDB, EventPrivilegesGrantFailed, fmt.Sprintf("Failed to grant RW role privileges: %v", err)) if statusErr := updateStatus(privilegesReady, metav1.ConditionFalse, reasonPrivilegesGrantFailed, fmt.Sprintf("Failed to grant RW role privileges: %v", err), provisioningDBPhase); statusErr != nil { logger.Error(statusErr, "Failed to update status") } return ctrl.Result{}, err } + rc.emitOnConditionTransition(postgresDB, postgresDB.Status.Conditions, privilegesReady, EventPrivilegesReady, fmt.Sprintf("RW role privileges granted for all %d databases", len(postgresDB.Spec.Databases))) if err := updateStatus(privilegesReady, metav1.ConditionTrue, reasonPrivilegesGranted, fmt.Sprintf("RW role privileges granted for all %d databases", len(postgresDB.Spec.Databases)), readyDBPhase); err != nil { return ctrl.Result{}, err } } + rc.emitNormal(postgresDB, EventPostgresDatabaseReady, fmt.Sprintf("PostgresDatabase %s is ready", postgresDB.Name)) postgresDB.Status.Databases = populateDatabaseStatus(postgresDB) postgresDB.Status.ObservedGeneration = &postgresDB.Generation @@ -429,8 +452,9 @@ func verifyRolesReady(ctx context.Context, expectedUsers []string, cnpgCluster * return notReady, nil } -func reconcileCNPGDatabases(ctx context.Context, c client.Client, scheme *runtime.Scheme, postgresDB *enterprisev4.PostgresDatabase, cluster *enterprisev4.PostgresCluster) error { +func reconcileCNPGDatabases(ctx context.Context, c client.Client, scheme *runtime.Scheme, postgresDB *enterprisev4.PostgresDatabase, cluster *enterprisev4.PostgresCluster) ([]string, error) { logger := log.FromContext(ctx) + var adopted []string for _, dbSpec := range postgresDB.Spec.Databases { cnpgDBName := cnpgDatabaseName(postgresDB.Name, dbSpec.Name) cnpgDB := &cnpgv1.Database{ @@ -442,6 +466,7 @@ func reconcileCNPGDatabases(ctx context.Context, c client.Client, scheme *runtim if reAdopting { logger.Info("Re-adopting orphaned CNPG Database", "name", cnpgDBName) delete(cnpgDB.Annotations, annotationRetainedFrom) + adopted = append(adopted, dbSpec.Name) } if cnpgDB.CreationTimestamp.IsZero() || reAdopting { return controllerutil.SetControllerReference(postgresDB, cnpgDB, scheme) @@ -449,10 +474,10 @@ func reconcileCNPGDatabases(ctx context.Context, c client.Client, scheme *runtim return nil }) if err != nil { - return fmt.Errorf("reconciling CNPG Database %s: %w", cnpgDBName, err) + return adopted, fmt.Errorf("reconciling CNPG Database %s: %w", cnpgDBName, err) } } - return nil + return adopted, nil } func verifyDatabasesReady(ctx context.Context, c client.Client, postgresDB *enterprisev4.PostgresDatabase) ([]string, error) { @@ -499,7 +524,8 @@ func buildDeletionPlan(databases []enterprisev4.DatabaseDefinition) deletionPlan return plan } -func handleDeletion(ctx context.Context, c client.Client, postgresDB *enterprisev4.PostgresDatabase) error { +func handleDeletion(ctx context.Context, rc *ReconcileContext, postgresDB *enterprisev4.PostgresDatabase) error { + c := rc.Client plan := buildDeletionPlan(postgresDB.Spec.Databases) if err := orphanRetainedResources(ctx, c, postgresDB, plan.retained); err != nil { return err @@ -517,6 +543,7 @@ func handleDeletion(ctx context.Context, c client.Client, postgresDB *enterprise } return fmt.Errorf("removing finalizer: %w", err) } + rc.emitNormal(postgresDB, EventCleanupComplete, fmt.Sprintf("Cleanup complete (%d retained, %d deleted)", len(plan.retained), len(plan.deleted))) log.FromContext(ctx).Info("Cleanup complete", "name", postgresDB.Name, "retained", len(plan.retained), "deleted", len(plan.deleted)) return nil } diff --git a/pkg/postgresql/database/core/events.go b/pkg/postgresql/database/core/events.go new file mode 100644 index 000000000..987b8bbfb --- /dev/null +++ b/pkg/postgresql/database/core/events.go @@ -0,0 +1,58 @@ +package core + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + EventPostgresDatabaseReady = "PostgresDatabaseReady" + EventResourcesAdopted = "ResourcesAdopted" + EventClusterValidated = "ClusterValidated" + EventSecretsReady = "SecretsReady" + EventConfigMapsReady = "ConfigMapsReady" + EventRoleReconciliationStarted = "RoleReconciliationStarted" + EventRolesReady = "RolesReady" + EventDatabaseReconciliationStarted = "DatabaseReconciliationStarted" + EventDatabasesReady = "DatabasesReady" + EventPrivilegesReady = "PrivilegesReady" + EventCleanupComplete = "CleanupComplete" + EventClusterNotFound = "ClusterNotFound" + EventClusterNotReady = "ClusterNotReady" + EventRoleConflict = "RoleConflict" + EventUserSecretsFailed = "UserSecretsFailed" + EventAccessConfigFailed = "AccessConfigFailed" + EventManagedRolesPatchFailed = "ManagedRolesPatchFailed" + EventRoleFailed = "RoleFailed" + EventDatabasesReconcileFailed = "DatabasesReconcileFailed" + EventPrivilegesGrantFailed = "PrivilegesGrantFailed" + EventCleanupFailed = "CleanupFailed" +) + +func (rc *ReconcileContext) emitNormal(obj client.Object, reason, message string) { + rc.Recorder.Event(obj, corev1.EventTypeNormal, reason, message) +} + +func (rc *ReconcileContext) emitWarning(obj client.Object, reason, message string) { + rc.Recorder.Event(obj, corev1.EventTypeWarning, reason, message) +} + +// emitOnConditionTransition emits a Normal event only when the condition is not +// already True — prevents duplicate events on repeated reconciles. +func (rc *ReconcileContext) emitOnConditionTransition(obj client.Object, conditions []metav1.Condition, condType conditionTypes, reason, message string) { + if !meta.IsStatusConditionTrue(conditions, string(condType)) { + rc.emitNormal(obj, reason, message) + } +} + +// emitOnceBeforeWait emits a Normal event when the condition is either absent +// or currently True — i.e. the first time we enter a wait cycle. On subsequent +// requeue polls the condition is already False, so no duplicate is emitted. +func (rc *ReconcileContext) emitOnceBeforeWait(obj client.Object, conditions []metav1.Condition, condType conditionTypes, reason, message string) { + cond := meta.FindStatusCondition(conditions, string(condType)) + if cond == nil || cond.Status == metav1.ConditionTrue { + rc.emitNormal(obj, reason, message) + } +} diff --git a/pkg/postgresql/database/core/types.go b/pkg/postgresql/database/core/types.go index 0d1fa116a..bf07fd19f 100644 --- a/pkg/postgresql/database/core/types.go +++ b/pkg/postgresql/database/core/types.go @@ -4,8 +4,18 @@ import ( "time" enterprisev4 "github.com/splunk/splunk-operator/api/v4" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" ) +// ReconcileContext bundles infrastructure dependencies injected by the controller +type ReconcileContext struct { + Client client.Client + Scheme *runtime.Scheme + Recorder record.EventRecorder +} + type reconcileDBPhases string type conditionTypes string type conditionReasons string