diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 63d400934..634320d82 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -486,6 +486,25 @@ Note that `s3_wal_path` and `gs_wal_path` are mutually exclusive. from a remote primary. See the Patroni documentation [here](https://patroni.readthedocs.io/en/latest/standby_cluster.html) for more details. Optional. +## Lifecycle configuration + +Parameters to control cluster hibernate/wake-up behavior. + +* **phase** + Set to `"stopped"` to hibernate the cluster. When this field is set on a + running cluster, the operator will: + * Store the current number of instances in the status + * Scale down the StatefulSet to 0 replicas + * Scale down the connection pooler to 0 replicas + * Set the cluster status to "Stopping", then "Stopped" + + When this field is removed from a stopped cluster, the operator will: + * Restore the number of instances from the stored value + * Scale up the StatefulSet and connection pooler + * Set the cluster status to "Updating", then "Running" + + This field is optional. When not set, the cluster operates normally. + ## Volume properties Those parameters are grouped under the `volume` top-level key and define the @@ -714,3 +733,21 @@ can have the following properties: * **memory** memory requests to be set as an annotation on the stream resource. Optional. + +## Status fields + +The operator reports the cluster state through the `status` sub-resource. These +fields are managed by the operator and should not be set manually. + +* **PostgresClusterStatus** + Current state of the cluster. One of: Creating, Updating, Running, + UpdateFailed, SyncFailed, CreateFailed, Invalid, Stopping, Stopped. + +* **previousNumberOfInstances** + The number of instances the cluster had before hibernation. Used to restore + the cluster to its previous size when waking up. Cleared after wake-up. + +* **previousPoolerInstances** + A map of connection pooler role to its replica count before hibernation. + The keys are "master" and "replica". Used to restore the pooler when waking + up. Cleared after wake-up. diff --git a/docs/user.md b/docs/user.md index 1c530a48c..5cca700a3 100644 --- a/docs/user.md +++ b/docs/user.md @@ -930,6 +930,107 @@ When you apply this manifest, the operator will: The process is asynchronous. You can monitor the operator logs and the state of the `postgresql` resource to follow the progress. Once the new cluster is up and running, your applications can reconnect. +## Hibernate and Wake-up a Cluster + +The operator supports hibernating a PostgreSQL cluster to save resources when it's +not needed, and waking it up again when required. This feature: + +* Scales down the PostgreSQL StatefulSet to 0 replicas (stops all pods) +* Scales down the connection pooler to 0 replicas +* Preserves the cluster configuration and data (PVCs are retained) +* Stores the previous replica counts for automatic restoration + +### Initiating Hibernate + +To hibernate a running cluster, set the `lifecycle.phase` field to `"stopped"`: + +```yaml +apiVersion: "acid.zalan.do/v1" +kind: postgresql +metadata: + name: acid-test-cluster +spec: + teamId: "test-team" + # ... other cluster parameters + numberOfInstances: 3 + lifecycle: + phase: "stopped" +``` + +When you apply this manifest, the operator will: + +* Store the current `numberOfInstances` in `status.previousNumberOfInstances` +* Store the connection pooler replica counts in `status.previousPoolerInstances` +* Set `spec.numberOfInstances` to 0 +* Scale down the StatefulSet to 0 replicas +* Scale down the connection pooler deployments to 0 replicas +* Suspend the logical backup CronJob (if enabled) +* Set `status.PostgresClusterStatus` to "Stopping", then "Stopped" + +### Waking up a Cluster + +To wake up a hibernated cluster, remove the `lifecycle.phase` field or set it to +an empty value: + +```yaml +apiVersion: "acid.zalan.do/v1" +kind: postgresql +metadata: + name: acid-test-cluster +spec: + teamId: "test-team" + # ... other cluster parameters + # lifecycle.phase is not set or is removed +``` + +When you apply this manifest, the operator will: + +* Restore `numberOfInstances` from `status.previousNumberOfInstances` +* Restore the connection pooler replica counts from `status.previousPoolerInstances` +* Resume the logical backup CronJob (if enabled) +* Scale up the StatefulSet to the previous replica count +* Scale up the connection pooler deployments to the previous replica counts +* Set `status.PostgresClusterStatus` to "Updating", then "Running" +* Clear `status.previousNumberOfInstances` and `status.previousPoolerInstances` + +### Cluster Status During Lifecycle Transitions + +| Status | Meaning | +|--------|---------| +| Running | Cluster is running normally | +| Stopping | Cluster is transitioning to stopped state (pods terminating) | +| Stopped | All pods have been terminated, cluster is hibernated | + +### Restrictions During Hibernate + +* **During Stopping**: All spec changes are blocked. You must wait for the cluster + to reach the Stopped state before making changes. + +* **During Stopped**: Spec changes are blocked unless you remove `lifecycle.phase` + to wake up the cluster. This prevents accidental modifications to a hibernated + cluster. + +### Connection Pooler Behavior + +The connection pooler is automatically scaled alongside the cluster: + +* When the cluster hibernates, the pooler is scaled to 0 replicas +* When the cluster wakes up, the pooler is restored to its previous replica count +* The previous replica counts are stored in `status.previousPoolerInstances` + +Note: If the connection pooler was already at 0 replicas before hibernate, it +will remain at 0 after wake-up. + +### Logical Backup Behavior + +The logical backup CronJob is automatically suspended during hibernate: + +* When the cluster hibernates, the backup CronJob is suspended (`.spec.suspend: true`) +* When the cluster wakes up, the backup CronJob is automatically resumed +* The backup schedule is preserved and resumes from its normal schedule + +This prevents failed backup jobs from running when the database is unavailable. + ## Setting up a standby cluster Standby cluster is a [Patroni feature](https://github.com/zalando/patroni/blob/master/docs/replica_bootstrap.rst#standby-cluster) diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 39811824e..10e952cd2 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -3246,6 +3246,13 @@ spec: - name type: object type: array + lifecycle: + description: LifecycleSpec describes the lifecycle state of a Postgres + cluster. + properties: + phase: + type: string + type: object logicalBackupRetention: type: string logicalBackupSchedule: @@ -4197,6 +4204,14 @@ spec: properties: PostgresClusterStatus: type: string + previousNumberOfInstances: + format: int32 + type: integer + previousPoolerInstances: + type: object + additionalProperties: + format: int32 + type: integer required: - PostgresClusterStatus type: object diff --git a/pkg/apis/acid.zalan.do/v1/const.go b/pkg/apis/acid.zalan.do/v1/const.go index 4102ea3d3..69012427a 100644 --- a/pkg/apis/acid.zalan.do/v1/const.go +++ b/pkg/apis/acid.zalan.do/v1/const.go @@ -9,6 +9,8 @@ const ( ClusterStatusSyncFailed = "SyncFailed" ClusterStatusAddFailed = "CreateFailed" ClusterStatusRunning = "Running" + ClusterStatusStopping = "Stopping" + ClusterStatusStopped = "Stopped" ClusterStatusInvalid = "Invalid" ) diff --git a/pkg/apis/acid.zalan.do/v1/postgresql.crd.yaml b/pkg/apis/acid.zalan.do/v1/postgresql.crd.yaml index 39811824e..10e952cd2 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql.crd.yaml +++ b/pkg/apis/acid.zalan.do/v1/postgresql.crd.yaml @@ -3246,6 +3246,13 @@ spec: - name type: object type: array + lifecycle: + description: LifecycleSpec describes the lifecycle state of a Postgres + cluster. + properties: + phase: + type: string + type: object logicalBackupRetention: type: string logicalBackupSchedule: @@ -4197,6 +4204,14 @@ spec: properties: PostgresClusterStatus: type: string + previousNumberOfInstances: + format: int32 + type: integer + previousPoolerInstances: + type: object + additionalProperties: + format: int32 + type: integer required: - PostgresClusterStatus type: object diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 1dadfd06c..23c8a286a 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -115,6 +115,7 @@ type PostgresSpec struct { TLS *TLSDescription `json:"tls,omitempty"` AdditionalVolumes []AdditionalVolume `json:"additionalVolumes,omitempty"` Streams []Stream `json:"streams,omitempty"` + Lifecycle *LifecycleSpec `json:"lifecycle,omitempty"` Env []v1.EnvVar `json:"env,omitempty"` // deprecated @@ -257,6 +258,11 @@ type StandbyDescription struct { StandbyPrimarySlotName string `json:"standby_primary_slot_name,omitempty"` } +// LifecycleSpec describes the lifecycle state of a Postgres cluster. +type LifecycleSpec struct { + Phase string `json:"phase,omitempty"` +} + // TLSDescription specs TLS properties type TLSDescription struct { // +required @@ -302,7 +308,9 @@ type UserFlags []string // PostgresStatus contains status of the PostgreSQL cluster (running, creation failed etc.) type PostgresStatus struct { - PostgresClusterStatus string `json:"PostgresClusterStatus"` + PostgresClusterStatus string `json:"PostgresClusterStatus"` + PreviousNumberOfInstances int32 `json:"previousNumberOfInstances,omitempty"` + PreviousPoolerInstances map[string]int32 `json:"previousPoolerInstances,omitempty"` } // ConnectionPooler Options for connection pooler diff --git a/pkg/apis/acid.zalan.do/v1/util.go b/pkg/apis/acid.zalan.do/v1/util.go index 719defe93..7bbdc0bbf 100644 --- a/pkg/apis/acid.zalan.do/v1/util.go +++ b/pkg/apis/acid.zalan.do/v1/util.go @@ -101,6 +101,16 @@ func (postgresStatus PostgresStatus) Creating() bool { return postgresStatus.PostgresClusterStatus == ClusterStatusCreating } +// Stopping status of cluster +func (postgresStatus PostgresStatus) Stopping() bool { + return postgresStatus.PostgresClusterStatus == ClusterStatusStopping +} + +// Stopped status of cluster +func (postgresStatus PostgresStatus) Stopped() bool { + return postgresStatus.PostgresClusterStatus == ClusterStatusStopped +} + func (postgresStatus PostgresStatus) String() string { return postgresStatus.PostgresClusterStatus } diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 159a87f35..692a6fe30 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -310,6 +310,22 @@ func (in *KubernetesMetaConfiguration) DeepCopy() *KubernetesMetaConfiguration { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LifecycleSpec) DeepCopyInto(out *LifecycleSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LifecycleSpec. +func (in *LifecycleSpec) DeepCopy() *LifecycleSpec { + if in == nil { + return nil + } + out := new(LifecycleSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LoadBalancerConfiguration) DeepCopyInto(out *LoadBalancerConfiguration) { *out = *in @@ -874,6 +890,11 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Lifecycle != nil { + in, out := &in.Lifecycle, &out.Lifecycle + *out = new(LifecycleSpec) + **out = **in + } if in.Env != nil { in, out := &in.Env, &out.Env *out = make([]corev1.EnvVar, len(*in)) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 04c974f4c..dfe328821 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1008,13 +1008,31 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { c.mu.Lock() defer c.mu.Unlock() + // Block all spec changes when cluster is stopped or stopping + blocked, err := c.blockLifecycleUpdate(newSpec) + if err != nil { + return err + } + if blocked { + return nil + } + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdating - newSpec, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec) + newSpec, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec) if err != nil { return fmt.Errorf("could not set cluster status to updating: %w", err) } + // Handle lifecycle transitions (hibernate/wake-up) + handled, err := c.handleHibernateAndWakeUp(newSpec) + if err != nil { + return err + } + if handled { + return nil + } + if !c.isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) { // do not apply any major version related changes yet newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion @@ -1220,6 +1238,134 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { return nil } +// blockLifecycleUpdate checks if an update should be blocked due to lifecycle state. +// Returns (blocked bool, err error): +// - (true, nil) if update is blocked and caller should return early +// - (false, nil) if update can proceed +// - (false, error) on error +func (c *Cluster) blockLifecycleUpdate(newSpec *acidv1.Postgresql) (bool, error) { + if !c.Status.Stopped() && !c.Status.Stopping() { + return false, nil + } + + lifecyclePhase := "" + if newSpec.Spec.Lifecycle != nil { + lifecyclePhase = newSpec.Spec.Lifecycle.Phase + } + + // During Stopping: block ALL spec changes (no cancellation allowed) + if c.Status.Stopping() { + return true, fmt.Errorf("cannot update cluster while it is stopping. Wait for it to fully stop first") + } + + // During Stopped: only block if keeping lifecycle.phase="stopped" + if lifecyclePhase == "stopped" { + return true, fmt.Errorf("cannot update cluster while stopped. Remove lifecycle.phase to wake up the cluster") + } + + return false, nil +} + +// handleHibernateAndWakeUp processes lifecycle hibernate/wake-up transitions. +// Returns (handled bool, err error): +// - (true, nil) if lifecycle transition was handled, Update() should return early +// - (false, nil) if no lifecycle transition, normal update continues +// - (false, error) on error +func (c *Cluster) handleHibernateAndWakeUp(newSpec *acidv1.Postgresql) (bool, error) { + // === INITIATE HIBERNATE: Running -> Stopping === + if c.Status.Running() && newSpec.Spec.Lifecycle != nil && newSpec.Spec.Lifecycle.Phase == "stopped" { + c.logger.Infof("[lifecycle] initiating hibernate for cluster %s: current numberOfInstances=%d", c.Name, c.Spec.NumberOfInstances) + + // Store previousNumberOfInstances BEFORE setting numberOfInstances to 0 + newSpec.Status.PreviousNumberOfInstances = c.Spec.NumberOfInstances + newSpec.Spec.NumberOfInstances = 0 + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusStopping + + // Scale down pooler deployments and store current replica counts + if err := c.scalePoolerDown(newSpec); err != nil { + return false, fmt.Errorf("could not scale pooler during hibernate: %w", err) + } + + // Suspend logical backup CronJob if enabled + if c.Spec.EnableLogicalBackup { + if err := c.suspendLogicalBackupJob(); err != nil { + return false, fmt.Errorf("could not suspend logical backup job during hibernate: %w", err) + } + c.logger.Info("[lifecycle] logical backup job suspended") + } + + c.logger.Infof("[lifecycle] hibernate initiated: setting numberOfInstances=0, previousNumberOfInstances=%d", newSpec.Status.PreviousNumberOfInstances) + + // Update spec first (Update only updates spec when CR has status subresource) + pgUpdated, err := c.KubeClient.UpdatePostgresCR(c.clusterName(), newSpec) + if err != nil { + return false, fmt.Errorf("could not update spec during hibernate: %w", err) + } + c.logger.Info("[lifecycle] hibernate: spec updated successfully") + + // Update status separately - preserve status values since UpdatePostgresCR returns object with status zeroed + pgUpdated.Status.PreviousNumberOfInstances = newSpec.Status.PreviousNumberOfInstances + pgUpdated.Status.PostgresClusterStatus = newSpec.Status.PostgresClusterStatus + pgUpdated.Status.PreviousPoolerInstances = newSpec.Status.PreviousPoolerInstances + + pgUpdated, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), pgUpdated) + if err != nil { + return false, fmt.Errorf("could not update status during hibernate: %w", err) + } + c.logger.Infof("[lifecycle] hibernate: status updated successfully, previousNumberOfInstances=%d", pgUpdated.Status.PreviousNumberOfInstances) + + c.setSpec(pgUpdated) + return true, nil + } + + // === WAKE-UP: Stopped -> Running === + if c.Status.Stopped() && (newSpec.Spec.Lifecycle == nil || newSpec.Spec.Lifecycle.Phase != "stopped") { + if newSpec.Status.PreviousNumberOfInstances > 0 { + c.logger.Infof("[lifecycle] waking up cluster %s: restoring numberOfInstances=%d", c.Name, newSpec.Status.PreviousNumberOfInstances) + + // Restore pooler deployments to previous replica counts FIRST + if err := c.scalePoolerUp(newSpec); err != nil { + return false, fmt.Errorf("could not scale pooler during wake-up: %w", err) + } + + // Resume logical backup CronJob if enabled + if c.Spec.EnableLogicalBackup { + if err := c.unsuspendLogicalBackupJob(); err != nil { + return false, fmt.Errorf("could not resume logical backup job during wake-up: %w", err) + } + c.logger.Info("[lifecycle] logical backup job resumed") + } + + // Restore numberOfInstances from previousNumberOfInstances + newSpec.Spec.NumberOfInstances = newSpec.Status.PreviousNumberOfInstances + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdating + + // Update spec first + pgUpdated, err := c.KubeClient.UpdatePostgresCR(c.clusterName(), newSpec) + if err != nil { + return false, fmt.Errorf("could not update spec during wake-up: %w", err) + } + c.logger.Info("[lifecycle] wake-up: spec updated successfully") + + // Update status separately, and clear previousNumberOfInstances and previousPoolerInstances after restore + pgUpdated.Status.PreviousNumberOfInstances = 0 + pgUpdated.Status.PostgresClusterStatus = newSpec.Status.PostgresClusterStatus + pgUpdated.Status.PreviousPoolerInstances = nil + + pgUpdated, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), pgUpdated) + if err != nil { + return false, fmt.Errorf("could not update status during wake-up: %w", err) + } + c.logger.Info("[lifecycle] wake-up: status updated successfully, previousNumberOfInstances cleared") + + c.setSpec(pgUpdated) + return true, nil + } + } + + return false, nil +} + func syncResources(a, b *v1.ResourceRequirements) bool { for _, res := range []v1.ResourceName{ v1.ResourceCPU, diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index e38540d3e..2d54bf49c 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -2354,3 +2354,292 @@ func TestUpdatePITRResources(t *testing.T) { }) } } + +func TestUpdate_LifecycleBlocksDuringStopping(t *testing.T) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + client := k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + StatefulSetsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + SecretsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + } + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: 3, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopping", + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, client, pg, logger, eventRecorder) + + cluster.Name = "test-cluster" + cluster.Namespace = "default" + + oldSpec := pg.DeepCopy() + newSpec := pg.DeepCopy() + newSpec.Spec.NumberOfInstances = 5 + + err := cluster.Update(oldSpec, newSpec) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot update cluster while it is stopping") +} + +func TestUpdate_LifecycleBlocksWhenStoppedWithPhase(t *testing.T) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + client := k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + StatefulSetsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + SecretsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + } + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: 0, + Lifecycle: &acidv1.LifecycleSpec{Phase: "stopped"}, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopped", + PreviousNumberOfInstances: 3, + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, client, pg, logger, eventRecorder) + + cluster.Name = "test-cluster" + cluster.Namespace = "default" + + oldSpec := pg.DeepCopy() + newSpec := pg.DeepCopy() + newSpec.Spec.NumberOfInstances = 5 + + err := cluster.Update(oldSpec, newSpec) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot update cluster while stopped") +} + +func TestUpdate_LifecycleAllowsWakeUp(t *testing.T) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + updateCalled := false + statusUpdateCalled := false + + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + updateAction := action.(k8stesting.UpdateAction) + pg := updateAction.GetObject().(*acidv1.Postgresql) + updateCalled = true + return true, pg, nil + }) + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "status" { + statusUpdateCalled = true + updateAction := action.(k8stesting.UpdateAction) + pg := updateAction.GetObject().(*acidv1.Postgresql) + return true, pg, nil + } + return false, nil, nil + }) + + client := k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + StatefulSetsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + SecretsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + } + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: 0, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopped", + PreviousNumberOfInstances: 3, + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, client, pg, logger, eventRecorder) + + cluster.Name = "test-cluster" + cluster.Namespace = "default" + + oldSpec := pg.DeepCopy() + newSpec := pg.DeepCopy() + + err := cluster.Update(oldSpec, newSpec) + + assert.NoError(t, err) + assert.True(t, updateCalled, "Update should have been called for wake-up") + assert.True(t, statusUpdateCalled, "Status update should have been called for wake-up") +} + +func TestUpdate_LifecycleInitiatesHibernate(t *testing.T) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + updateCalled := false + statusUpdateCalled := false + + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + updateAction := action.(k8stesting.UpdateAction) + pg := updateAction.GetObject().(*acidv1.Postgresql) + updateCalled = true + return true, pg, nil + }) + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "status" { + statusUpdateCalled = true + updateAction := action.(k8stesting.UpdateAction) + pg := updateAction.GetObject().(*acidv1.Postgresql) + return true, pg, nil + } + return false, nil, nil + }) + + client := k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + StatefulSetsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + SecretsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + } + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: 3, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Running", + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, client, pg, logger, eventRecorder) + + cluster.Name = "test-cluster" + cluster.Namespace = "default" + + oldSpec := pg.DeepCopy() + newSpec := pg.DeepCopy() + newSpec.Spec.Lifecycle = &acidv1.LifecycleSpec{Phase: "stopped"} + + err := cluster.Update(oldSpec, newSpec) + + assert.NoError(t, err) + assert.True(t, updateCalled, "Update should have been called for hibernate") + assert.True(t, statusUpdateCalled, "Status update should have been called for hibernate") +} + +func TestUpdate_LifecycleNormalUpdate(t *testing.T) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + client := k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + StatefulSetsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + SecretsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + } + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: 3, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Running", + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, client, pg, logger, eventRecorder) + + cluster.Name = "test-cluster" + cluster.Namespace = "default" + + newSpec := pg.DeepCopy() + blocked, err := cluster.blockLifecycleUpdate(newSpec) + + assert.False(t, blocked) + assert.NoError(t, err) +} diff --git a/pkg/cluster/lifecycle.go b/pkg/cluster/lifecycle.go new file mode 100644 index 000000000..249e43ec3 --- /dev/null +++ b/pkg/cluster/lifecycle.go @@ -0,0 +1,163 @@ +package cluster + +import ( + "context" + "fmt" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" +) + +// manageHibernateState manages cluster hibernate/wake-up state transitions. +// Returns true if sync should continue, false if it should return early. +// +// This function handles the following state transitions: +// - Running -> Stopping: When user sets lifecycle.phase = "stopped" +// - Stopping -> Stopped: When StatefulSet replicas reach 0 +// - Stopped -> Updating: When user clears lifecycle.phase (wake-up) +// - Updating -> Running: Normal sync continues, defer sets final status +func (c *Cluster) manageHibernateState(oldSpec acidv1.Postgresql, newSpec *acidv1.Postgresql) bool { + // When Update() is called, it sets status=Updating before Sync() runs. + // So we need to check if oldSpec.Status was Stopped and newSpec is Updating + // with lifecycle cleared to properly detect wake-up. + isWakingUp := oldSpec.Status.Stopped() && + newSpec.Status.PostgresClusterStatus == acidv1.ClusterStatusUpdating && + (newSpec.Spec.Lifecycle == nil || newSpec.Spec.Lifecycle.Phase != "stopped") + + // If lifecycle was cleared and we have previousNumberOfInstances and numberOfInstances is 0 + isWakingUpSimple := newSpec.Spec.Lifecycle == nil || newSpec.Spec.Lifecycle.Phase != "stopped" + hasPreviousInstances := newSpec.Status.PreviousNumberOfInstances > 0 + needsRestore := newSpec.Spec.NumberOfInstances == 0 + + // double verification of waking up + isWakingUp = isWakingUp || (isWakingUpSimple && hasPreviousInstances && needsRestore) + + // === INITIATE HIBERNATE: Running -> Stopping === + // Only initiate if not already stopping or stopped, and lifecycle.phase = "stopped" + if newSpec.Spec.Lifecycle != nil && + newSpec.Spec.Lifecycle.Phase == "stopped" && + !newSpec.Status.Stopping() && + !newSpec.Status.Stopped() { + + newSpec.Status.PreviousNumberOfInstances = newSpec.Spec.NumberOfInstances + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusStopping + newSpec.Spec.NumberOfInstances = 0 + c.logger.Infof("[lifecycle] cluster is going to hibernate, stored previous number of instances: %d", + newSpec.Status.PreviousNumberOfInstances) + return true + } + + // === STOPPING -> STOPPED: Check actual StatefulSet replicas === + // Only transition to Stopped when StatefulSet replicas have actually reached 0 + if newSpec.Status.Stopping() { + if c.Statefulset != nil && *c.Statefulset.Spec.Replicas == 0 { + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusStopped + c.logger.Info("[lifecycle] cluster has stopped, all pods are terminated") + } + return true + } + + // === WAKE-UP: Stopped/Updating -> Running === + // Restore numberOfInstances from previousNumberOfInstances when waking up + if newSpec.Status.Stopped() || isWakingUp { + // Check if lifecycle.phase was cleared (user wants to wake up) + if isWakingUp || newSpec.Spec.Lifecycle == nil || newSpec.Spec.Lifecycle.Phase != "stopped" { + if newSpec.Status.PreviousNumberOfInstances > 0 { + newSpec.Spec.NumberOfInstances = newSpec.Status.PreviousNumberOfInstances + c.logger.Infof("[lifecycle] cluster is waking up, restoring number of instances: %d", + newSpec.Status.PreviousNumberOfInstances) + } else { + c.logger.Warningf("[lifecycle] cluster is waking up but previousNumberOfInstances is 0, cannot restore") + } + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdating + return true + } + // Still stopped and lifecycle.phase = "stopped", skip further sync + return false + } + + return true +} + +// getPoolerReplicas returns the current replica count for a pooler role. +// Returns 0 if pooler doesn't exist or hasn't been synced yet. +func (c *Cluster) getPoolerReplicas(role PostgresRole) int32 { + if c.ConnectionPooler == nil || c.ConnectionPooler[role] == nil || + c.ConnectionPooler[role].Deployment == nil || + c.ConnectionPooler[role].Deployment.Spec.Replicas == nil { + return 0 + } + return *c.ConnectionPooler[role].Deployment.Spec.Replicas +} + +// scalePoolerDown scales pooler deployments to 0 and stores current replica count. +// Should be called during hibernate initiation. +func (c *Cluster) scalePoolerDown(newSpec *acidv1.Postgresql) error { + if c.ConnectionPooler == nil { + return nil + } + + for role := range c.ConnectionPooler { + replicas := c.getPoolerReplicas(role) + + // Store current replicas in status + if newSpec.Status.PreviousPoolerInstances == nil { + newSpec.Status.PreviousPoolerInstances = make(map[string]int32) + } + newSpec.Status.PreviousPoolerInstances[string(role)] = replicas + + // Scale to 0 if currently non-zero + if replicas > 0 { + if err := c.patchPoolerReplicas(role, 0); err != nil { + return err + } + c.logger.Infof("[lifecycle] pooler %s scaled to 0 (was %d)", role, replicas) + } + } + return nil +} + +// scalePoolerUp restores pooler deployments to previous replica counts. +// Should be called during wake-up. +func (c *Cluster) scalePoolerUp(newSpec *acidv1.Postgresql) error { + if newSpec.Status.PreviousPoolerInstances == nil { + return nil + } + + for roleStr, replicas := range newSpec.Status.PreviousPoolerInstances { + role := PostgresRole(roleStr) + + // Scale to stored value (could be 0) + if err := c.patchPoolerReplicas(role, replicas); err != nil { + return err + } + c.logger.Infof("[lifecycle] pooler %s scaled to %d", role, replicas) + } + return nil +} + +// patchPoolerReplicas patches the pooler deployment replica count. +func (c *Cluster) patchPoolerReplicas(role PostgresRole, replicas int32) error { + // Check if deployment exists first + _, err := c.KubeClient.Deployments(c.Namespace).Get( + context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil // Pooler doesn't exist, no-op + } + return fmt.Errorf("could not get pooler deployment for %s: %w", role, err) + } + + // Patch with merge patch + patchData := fmt.Sprintf(`{"spec":{"replicas":%d}}`, replicas) + _, err = c.KubeClient.Deployments(c.Namespace).Patch( + context.TODO(), c.connectionPoolerName(role), types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch pooler deployment %s replicas: %w", role, err) + } + return nil +} + diff --git a/pkg/cluster/lifecycle_test.go b/pkg/cluster/lifecycle_test.go new file mode 100644 index 000000000..b43f43cd0 --- /dev/null +++ b/pkg/cluster/lifecycle_test.go @@ -0,0 +1,1426 @@ +package cluster + +import ( + "context" + "fmt" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/record" +) + +var lifecycleLogger = logrus.New().WithField("test", "lifecycle") +var lifecycleEventRecorder = record.NewFakeRecorder(10) + +func int32Ptr(i int32) *int32 { return &i } + +func newTestLifecycleCluster(status string, numberOfInstances int32, lifecyclePhase string) *Cluster { + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: numberOfInstances, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: status, + }, + } + + if lifecyclePhase != "" { + pg.Spec.Lifecycle = &acidv1.LifecycleSpec{ + Phase: lifecyclePhase, + } + } + + return &Cluster{ + Config: Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, + Postgresql: pg, + logger: lifecycleLogger, + eventRecorder: lifecycleEventRecorder, + } +} + +func newTestPoolerObjects(role PostgresRole, replicas int32) *ConnectionPoolerObjects { + return &ConnectionPoolerObjects{ + Deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-%s-pooler", role), + Namespace: "default", + }, + Spec: appsv1.DeploymentSpec{ + Replicas: int32Ptr(replicas), + }, + }, + Name: fmt.Sprintf("test-%s-pooler", role), + ClusterName: "test-cluster", + Namespace: "default", + Role: role, + } +} + +func newFakeK8sClientForLifecycle() (*k8sutil.KubernetesClient, *fake.Clientset, *fakeacidv1.Clientset) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + client := &k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + StatefulSetsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + SecretsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + } + + return client, clientSet, acidClientSet +} + +func createTestPostgresqlInClient(client *k8sutil.KubernetesClient, name, namespace string, spec *acidv1.PostgresSpec, status *acidv1.PostgresStatus) *acidv1.Postgresql { + pg := &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + if spec != nil { + pg.Spec = *spec + } + if status != nil { + pg.Status = *status + } + + created, err := client.Postgresqls(namespace).Create(context.TODO(), pg, metav1.CreateOptions{}) + if err == nil { + return created + } + return pg +} + +func updatePostgresqlInClient(client *k8sutil.KubernetesClient, pg *acidv1.Postgresql) (*acidv1.Postgresql, error) { + return client.Postgresqls(pg.Namespace).Update(context.TODO(), pg, metav1.UpdateOptions{}) +} + +func updatePostgresqlStatusInClient(client *k8sutil.KubernetesClient, pg *acidv1.Postgresql) (*acidv1.Postgresql, error) { + return client.Postgresqls(pg.Namespace).UpdateStatus(context.TODO(), pg, metav1.UpdateOptions{}) +} + +func TestGetPoolerReplicas(t *testing.T) { + tests := []struct { + name string + poolerObjs map[PostgresRole]*ConnectionPoolerObjects + role PostgresRole + want int32 + }{ + { + name: "nil ConnectionPooler map", + poolerObjs: nil, + role: Master, + want: 0, + }, + { + name: "ConnectionPooler for role is nil", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{Master: nil}, + role: Master, + want: 0, + }, + { + name: "Deployment is nil", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{Master: {Deployment: nil}}, + role: Master, + want: 0, + }, + { + name: "Replicas is nil", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{ + Master: {Deployment: &appsv1.Deployment{Spec: appsv1.DeploymentSpec{Replicas: nil}}}, + }, + role: Master, + want: 0, + }, + { + name: "Master with 2 replicas", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{ + Master: newTestPoolerObjects(Master, 2), + }, + role: Master, + want: 2, + }, + { + name: "Master with 0 replicas", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{ + Master: newTestPoolerObjects(Master, 0), + }, + role: Master, + want: 0, + }, + { + name: "Replica with 3 replicas", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{ + Replica: newTestPoolerObjects(Replica, 3), + }, + role: Replica, + want: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Cluster{ + ConnectionPooler: tt.poolerObjs, + } + got := c.getPoolerReplicas(tt.role) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestPatchPoolerReplicas(t *testing.T) { + tests := []struct { + name string + replicas int32 + setupClient func(clientSet *fake.Clientset) + wantErr bool + errContains string + }{ + { + name: "deployment exists, patch succeeds", + replicas: 0, + setupClient: func(clientSet *fake.Clientset) { + // Pre-create the deployment in the fake clientset + clientSet.AppsV1().Deployments("default").Create(context.TODO(), &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "test-cluster-pooler"}, + Spec: appsv1.DeploymentSpec{Replicas: int32Ptr(2)}, + }, metav1.CreateOptions{}) + }, + wantErr: false, + }, + { + name: "deployment not found", + replicas: 2, + setupClient: func(clientSet *fake.Clientset) { + // Don't create anything - will trigger NotFound + }, + wantErr: false, // NotFound is handled gracefully + }, + { + name: "patch returns error", + replicas: 2, + setupClient: func(clientSet *fake.Clientset) { + // Pre-create deployment but make patch fail via reactor + clientSet.AppsV1().Deployments("default").Create(context.TODO(), &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "test-cluster-pooler"}, + Spec: appsv1.DeploymentSpec{Replicas: int32Ptr(2)}, + }, metav1.CreateOptions{}) + clientSet.PrependReactor("patch", "deployments", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("network error") + }) + }, + wantErr: true, + errContains: "could not patch pooler deployment", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientSet := fake.NewSimpleClientset() + if tt.setupClient != nil { + tt.setupClient(clientSet) + } + + kubeClient := &k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + } + + c := &Cluster{ + KubeClient: *kubeClient, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + } + + err := c.patchPoolerReplicas(Master, tt.replicas) + if tt.wantErr { + assert.Error(t, err) + if tt.errContains != "" { + assert.Contains(t, err.Error(), tt.errContains) + } + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestBlockLifecycleUpdate(t *testing.T) { + tests := []struct { + name string + currentStatus string + lifecyclePhase string + wantBlocked bool + wantErr bool + errContains string + }{ + { + name: "Running cluster, allows update", + currentStatus: "Running", + wantBlocked: false, + wantErr: false, + }, + { + name: "Stopping state, blocks update", + currentStatus: "Stopping", + wantBlocked: true, + wantErr: true, + errContains: "cannot update cluster while it is stopping", + }, + { + name: "Stopped with lifecycle phase, blocks update", + currentStatus: "Stopped", + lifecyclePhase: "stopped", + wantBlocked: true, + wantErr: true, + errContains: "cannot update cluster while stopped", + }, + { + name: "Stopped without lifecycle phase, allows update (wake-up)", + currentStatus: "Stopped", + lifecyclePhase: "", + wantBlocked: false, + wantErr: false, + }, + { + name: "Stopped with nil lifecycle, allows update (wake-up)", + currentStatus: "Stopped", + lifecyclePhase: "", + wantBlocked: false, + wantErr: false, + }, + { + name: "Stopped with empty lifecycle phase, allows update (wake-up)", + currentStatus: "Stopped", + lifecyclePhase: "", + wantBlocked: false, + wantErr: false, + }, + { + name: "UpdateFailed state, allows update", + currentStatus: "UpdateFailed", + wantBlocked: false, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newTestLifecycleCluster(tt.currentStatus, 3, tt.lifecyclePhase) + + newSpec := c.DeepCopy() + if tt.lifecyclePhase != "" && tt.currentStatus != "Stopped" { + newSpec.Spec.Lifecycle = &acidv1.LifecycleSpec{Phase: tt.lifecyclePhase} + } + + blocked, err := c.blockLifecycleUpdate(newSpec) + + if tt.wantErr { + assert.Error(t, err) + if tt.errContains != "" { + assert.Contains(t, err.Error(), tt.errContains) + } + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.wantBlocked, blocked) + }) + } +} + +func TestManageHibernateState(t *testing.T) { + tests := []struct { + name string + oldSpecStatus string + newSpecStatus string + newSpecLifecyclePhase string + numberOfInstances int32 + previousNumberOfInst int32 + statefulsetReplicas *int32 + wantContinue bool + wantNumberOfInstances *int32 + wantStatus string + }{ + { + name: "Running to Stopping - initiates hibernate", + oldSpecStatus: "Running", + newSpecStatus: "Running", + newSpecLifecyclePhase: "stopped", + numberOfInstances: 3, + statefulsetReplicas: int32Ptr(3), + wantContinue: true, + wantNumberOfInstances: int32Ptr(0), + wantStatus: "Stopping", + }, + { + name: "Stopping to Stopped - when replicas reach 0", + oldSpecStatus: "Stopping", + newSpecStatus: "Stopping", + numberOfInstances: 0, + statefulsetReplicas: int32Ptr(0), + wantContinue: true, + wantNumberOfInstances: int32Ptr(0), + wantStatus: "Stopped", + }, + { + name: "Stopping - replicas not yet 0", + oldSpecStatus: "Stopping", + newSpecStatus: "Stopping", + numberOfInstances: 0, + statefulsetReplicas: int32Ptr(2), // Still terminating + wantContinue: true, + wantNumberOfInstances: nil, // Should NOT change + wantStatus: "Stopping", // Should NOT change + }, + { + name: "Stopped to wake-up - restores numberOfInstances", + oldSpecStatus: "Stopped", + newSpecStatus: "Stopped", + newSpecLifecyclePhase: "", // Cleared + numberOfInstances: 0, + previousNumberOfInst: 3, + statefulsetReplicas: int32Ptr(0), + wantContinue: true, + wantNumberOfInstances: int32Ptr(3), + wantStatus: "Updating", + }, + { + name: "Stopped but lifecycle still 'stopped' - skip sync", + oldSpecStatus: "Stopped", + newSpecStatus: "Stopped", + newSpecLifecyclePhase: "stopped", + numberOfInstances: 0, + statefulsetReplicas: int32Ptr(0), + wantContinue: false, // Should skip sync + wantNumberOfInstances: nil, + wantStatus: "Stopped", + }, + { + name: "Running without lifecycle - continue normal", + oldSpecStatus: "Running", + newSpecStatus: "Running", + newSpecLifecyclePhase: "", + numberOfInstances: 3, + statefulsetReplicas: int32Ptr(3), + wantContinue: true, + wantNumberOfInstances: int32Ptr(3), // Unchanged + wantStatus: "Running", // Unchanged + }, + { + name: "Running to Updating - normal update", + oldSpecStatus: "Running", + newSpecStatus: "Updating", + newSpecLifecyclePhase: "", + numberOfInstances: 3, + statefulsetReplicas: int32Ptr(3), + wantContinue: true, + wantNumberOfInstances: int32Ptr(3), + wantStatus: "Updating", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Cluster{ + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + } + + if tt.statefulsetReplicas != nil { + c.Statefulset = &appsv1.StatefulSet{ + Spec: appsv1.StatefulSetSpec{ + Replicas: tt.statefulsetReplicas, + }, + } + } + + oldSpec := acidv1.Postgresql{ + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: tt.oldSpecStatus, + }, + } + + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + NumberOfInstances: tt.numberOfInstances, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: tt.newSpecStatus, + PreviousNumberOfInstances: tt.previousNumberOfInst, + }, + } + + if tt.newSpecLifecyclePhase != "" { + newSpec.Spec.Lifecycle = &acidv1.LifecycleSpec{ + Phase: tt.newSpecLifecyclePhase, + } + } + + gotContinue := c.manageHibernateState(oldSpec, newSpec) + + assert.Equal(t, tt.wantContinue, gotContinue) + + if tt.wantNumberOfInstances != nil { + assert.Equal(t, *tt.wantNumberOfInstances, newSpec.Spec.NumberOfInstances) + } + assert.Equal(t, tt.wantStatus, newSpec.Status.PostgresClusterStatus) + }) + } +} + +func TestHandleHibernateAndWakeUp_Hibernate(t *testing.T) { + tests := []struct { + name string + currentStatus string + numberOfInstances int32 + lifecyclePhase string + poolerObjs map[PostgresRole]*ConnectionPoolerObjects + k8sUpdateSucceeds bool + k8sStatusSucceeds bool + poolerPatchSucceeds bool + wantHandled bool + wantErr bool + errContains string + }{ + { + name: "Running + lifecycle.phase=stopped - initiates hibernate", + currentStatus: "Running", + numberOfInstances: 3, + lifecyclePhase: "stopped", + k8sUpdateSucceeds: true, + k8sStatusSucceeds: true, + poolerPatchSucceeds: true, + wantHandled: true, + wantErr: false, + }, + { + name: "Running + lifecycle.phase=stopped - K8s update fails", + currentStatus: "Running", + numberOfInstances: 3, + lifecyclePhase: "stopped", + k8sUpdateSucceeds: false, + wantHandled: false, + wantErr: true, + errContains: "could not update spec during hibernate", + }, + { + name: "Running + lifecycle.phase=stopped - K8s status update fails", + currentStatus: "Running", + numberOfInstances: 3, + lifecyclePhase: "stopped", + k8sUpdateSucceeds: true, + k8sStatusSucceeds: false, + wantHandled: false, + wantErr: true, + errContains: "could not update status during hibernate", + }, + { + name: "Running + no lifecycle phase - no transition", + currentStatus: "Running", + numberOfInstances: 3, + lifecyclePhase: "", + wantHandled: false, + wantErr: false, + }, + { + name: "Stopped + lifecycle cleared - no-op (Update handles this)", + currentStatus: "Stopped", + numberOfInstances: 0, + lifecyclePhase: "", + wantHandled: false, // handleHibernateAndWakeUp doesn't handle wake-up, that's done via Update flow + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + kubeClient, _, acidClientSet := newFakeK8sClientForLifecycle() + + if tt.k8sUpdateSucceeds { + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + updateAction := action.(k8stesting.UpdateAction) + pg := updateAction.GetObject().(*acidv1.Postgresql) + return true, pg, nil + }) + } + + if !tt.k8sStatusSucceeds { + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "status" { + return true, nil, fmt.Errorf("status update failed") + } + return false, nil, nil + }) + } + + c := &Cluster{ + Config: Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: tt.numberOfInstances, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: tt.currentStatus, + }, + }, + KubeClient: *kubeClient, + ConnectionPooler: tt.poolerObjs, + logger: lifecycleLogger, + eventRecorder: lifecycleEventRecorder, + } + + newSpec := c.DeepCopy() + if tt.lifecyclePhase != "" { + newSpec.Spec.Lifecycle = &acidv1.LifecycleSpec{Phase: tt.lifecyclePhase} + } + + handled, err := c.handleHibernateAndWakeUp(newSpec) + + if tt.wantErr { + assert.Error(t, err) + if tt.errContains != "" { + assert.Contains(t, err.Error(), tt.errContains) + } + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.wantHandled, handled) + }) + } +} + +func TestHandleHibernateAndWakeUp_WakeUp(t *testing.T) { + tests := []struct { + name string + currentStatus string + numberOfInstances int32 + previousNumberOfInst int32 + previousPoolerInstances map[string]int32 + lifecyclePhase string + k8sUpdateSucceeds bool + k8sStatusSucceeds bool + poolerPatchSucceeds bool + wantHandled bool + wantErr bool + errContains string + }{ + { + name: "Stopped + lifecycle cleared - wake-up", + currentStatus: "Stopped", + numberOfInstances: 0, + previousNumberOfInst: 3, + previousPoolerInstances: map[string]int32{"master": 2, "replica": 0}, + lifecyclePhase: "", + k8sUpdateSucceeds: true, + k8sStatusSucceeds: true, + wantHandled: true, + wantErr: false, + }, + { + name: "Stopped + previousNumberOfInstances is 0", + currentStatus: "Stopped", + numberOfInstances: 0, + previousNumberOfInst: 0, + lifecyclePhase: "", + wantHandled: false, // No restore possible + wantErr: false, + }, + { + name: "Stopped + lifecycle cleared + K8s update fails", + currentStatus: "Stopped", + numberOfInstances: 0, + previousNumberOfInst: 3, + lifecyclePhase: "", + k8sUpdateSucceeds: false, + wantHandled: false, + wantErr: true, + errContains: "could not update spec during wake-up", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + kubeClient, _, acidClientSet := newFakeK8sClientForLifecycle() + + if tt.k8sUpdateSucceeds { + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + updateAction := action.(k8stesting.UpdateAction) + pg := updateAction.GetObject().(*acidv1.Postgresql) + return true, pg, nil + }) + } + + c := &Cluster{ + Config: Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: tt.numberOfInstances, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: tt.currentStatus, + PreviousNumberOfInstances: tt.previousNumberOfInst, + PreviousPoolerInstances: tt.previousPoolerInstances, + }, + }, + KubeClient: *kubeClient, + logger: lifecycleLogger, + eventRecorder: lifecycleEventRecorder, + } + + newSpec := c.DeepCopy() + + handled, err := c.handleHibernateAndWakeUp(newSpec) + + if tt.wantErr { + assert.Error(t, err) + if tt.errContains != "" { + assert.Contains(t, err.Error(), tt.errContains) + } + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.wantHandled, handled) + }) + } +} + +func TestScalePoolerDown(t *testing.T) { + tests := []struct { + name string + poolerObjs map[PostgresRole]*ConnectionPoolerObjects + wantStored map[string]int32 + }{ + { + name: "nil ConnectionPooler - no-op", + poolerObjs: nil, + wantStored: nil, + }, + { + name: "Master at 2 replicas", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{Master: newTestPoolerObjects(Master, 2)}, + wantStored: map[string]int32{"master": 2}, + }, + { + name: "Master already at 0 replicas", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{Master: newTestPoolerObjects(Master, 0)}, + wantStored: map[string]int32{"master": 0}, + }, + { + name: "Both Master and Replica", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{ + Master: newTestPoolerObjects(Master, 2), + Replica: newTestPoolerObjects(Replica, 1), + }, + wantStored: map[string]int32{"master": 2, "replica": 1}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientSet := fake.NewSimpleClientset() + kubeClient := &k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + } + + c := &Cluster{ + KubeClient: *kubeClient, + ConnectionPooler: tt.poolerObjs, + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + } + + newSpec := &acidv1.Postgresql{} + err := c.scalePoolerDown(newSpec) + + assert.NoError(t, err) + assert.Equal(t, tt.wantStored, newSpec.Status.PreviousPoolerInstances) + }) + } +} + +func TestScalePoolerUp(t *testing.T) { + tests := []struct { + name string + prevInst map[string]int32 + wantErr bool + }{ + { + name: "nil PreviousPoolerInstances - no-op", + prevInst: nil, + }, + { + name: "Restore master to 2", + prevInst: map[string]int32{"master": 2}, + }, + { + name: "Restore both roles", + prevInst: map[string]int32{"master": 2, "replica": 1}, + }, + { + name: "Restore master to 0 (keep at 0)", + prevInst: map[string]int32{"master": 0}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientSet := fake.NewSimpleClientset() + kubeClient := &k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + } + + c := &Cluster{ + KubeClient: *kubeClient, + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + } + + newSpec := &acidv1.Postgresql{ + Status: acidv1.PostgresStatus{ + PreviousPoolerInstances: tt.prevInst, + }, + } + err := c.scalePoolerUp(newSpec) + + assert.NoError(t, err) + }) + } +} + +func TestLifecycleStateTransitions(t *testing.T) { + t.Run("complete Hibernate flow: Running -> Stopping -> Stopped", func(t *testing.T) { + c := &Cluster{ + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + }, + } + + // Initial: Running + oldSpec := acidv1.Postgresql{ + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Running"}, + } + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 3, + Lifecycle: &acidv1.LifecycleSpec{Phase: "stopped"}, + }, + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Running"}, + } + + // Step 1: manageHibernateState should initiate hibernate + continueSync := c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, int32(0), newSpec.Spec.NumberOfInstances) + assert.Equal(t, "Stopping", newSpec.Status.PostgresClusterStatus) + assert.Equal(t, int32(3), newSpec.Status.PreviousNumberOfInstances) + }) + + t.Run("complete Wake-up flow: Stopped -> Updating -> Running", func(t *testing.T) { + c := &Cluster{ + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + }, + } + + // After hibernate: Stopped, replicas = 0 + c.Statefulset = &appsv1.StatefulSet{ + Spec: appsv1.StatefulSetSpec{Replicas: int32Ptr(0)}, + } + + oldSpec := acidv1.Postgresql{ + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Stopped"}, + } + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 0, + // Lifecycle cleared by user + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopped", + PreviousNumberOfInstances: 3, + }, + } + + // Step 1: manageHibernateState should restore + continueSync := c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, int32(3), newSpec.Spec.NumberOfInstances) + assert.Equal(t, "Updating", newSpec.Status.PostgresClusterStatus) + }) +} + +func TestLifecycleUpdateBlocksDuringStopping(t *testing.T) { + kubeClient, _, _ := newFakeK8sClientForLifecycle() + + c := &Cluster{ + Config: Config{ + OpConfig: config.Config{}, + }, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 0, + Lifecycle: &acidv1.LifecycleSpec{Phase: "stopped"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopping", + }, + }, + KubeClient: *kubeClient, + logger: lifecycleLogger, + } + + newSpec := c.DeepCopy() + blocked, err := c.blockLifecycleUpdate(newSpec) + + assert.True(t, blocked) + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot update cluster while it is stopping") +} + +func TestLifecycleUpdateBlocksWhenStoppedWithPhase(t *testing.T) { + kubeClient, _, _ := newFakeK8sClientForLifecycle() + + c := &Cluster{ + Config: Config{ + OpConfig: config.Config{}, + }, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 0, + Lifecycle: &acidv1.LifecycleSpec{Phase: "stopped"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopped", + }, + }, + KubeClient: *kubeClient, + logger: lifecycleLogger, + } + + newSpec := c.DeepCopy() + blocked, err := c.blockLifecycleUpdate(newSpec) + + assert.True(t, blocked) + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot update cluster while stopped") +} + +func TestLifecycleUpdateAllowsWakeUp(t *testing.T) { + kubeClient, _, _ := newFakeK8sClientForLifecycle() + + c := &Cluster{ + Config: Config{ + OpConfig: config.Config{}, + }, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 0, + // Lifecycle cleared by user + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopped", + PreviousNumberOfInstances: 3, + }, + }, + KubeClient: *kubeClient, + logger: lifecycleLogger, + } + + newSpec := c.DeepCopy() + blocked, err := c.blockLifecycleUpdate(newSpec) + + assert.False(t, blocked) + assert.NoError(t, err) +} + +func TestManageHibernateState_EdgeCases(t *testing.T) { + tests := []struct { + name string + statefulsetReplicas *int32 + currentStatus string + newSpecLifecyclePhase string + newSpecNumberOfInst int32 + previousNumberOfInst int32 + wantContinue bool + wantNumberOfInstances *int32 + wantStatus string + }{ + { + name: "Stopping state with nil statefulset - should not transition to Stopped", + statefulsetReplicas: nil, + currentStatus: "Stopping", + newSpecLifecyclePhase: "stopped", + newSpecNumberOfInst: 0, + wantContinue: true, + wantNumberOfInstances: nil, + wantStatus: "Stopping", + }, + { + name: "Stopping with nil Lifecycle spec", + statefulsetReplicas: int32Ptr(0), + currentStatus: "Stopping", + newSpecLifecyclePhase: "", + newSpecNumberOfInst: 0, + wantContinue: true, + wantNumberOfInstances: nil, + wantStatus: "Stopped", + }, + { + name: "Running to Stopping when numberOfInstances already 0", + statefulsetReplicas: int32Ptr(0), + currentStatus: "Running", + newSpecLifecyclePhase: "stopped", + newSpecNumberOfInst: 0, + wantContinue: true, + wantNumberOfInstances: int32Ptr(0), + wantStatus: "Stopping", + }, + { + name: "Stopped with previousNumberOfInstances=0 - sets Updating but warns", + statefulsetReplicas: int32Ptr(0), + currentStatus: "Stopped", + newSpecLifecyclePhase: "", + newSpecNumberOfInst: 0, + previousNumberOfInst: 0, + wantContinue: true, + wantNumberOfInstances: int32Ptr(0), + wantStatus: "Updating", + }, + { + name: "Stopped with nil Lifecycle - wake-up", + statefulsetReplicas: int32Ptr(0), + currentStatus: "Stopped", + newSpecLifecyclePhase: "", + newSpecNumberOfInst: 0, + previousNumberOfInst: 3, + wantContinue: true, + wantNumberOfInstances: int32Ptr(3), + wantStatus: "Updating", + }, + { + name: "Stopped with empty Lifecycle phase - wake-up", + statefulsetReplicas: int32Ptr(0), + currentStatus: "Stopped", + newSpecLifecyclePhase: "", + newSpecNumberOfInst: 0, + previousNumberOfInst: 2, + wantContinue: true, + wantNumberOfInstances: int32Ptr(2), + wantStatus: "Updating", + }, + { + name: "Stopped but lifecycle still 'stopped' - skip sync", + statefulsetReplicas: int32Ptr(0), + currentStatus: "Stopped", + newSpecLifecyclePhase: "stopped", + newSpecNumberOfInst: 0, + previousNumberOfInst: 3, + wantContinue: false, + wantNumberOfInstances: nil, + wantStatus: "Stopped", + }, + { + name: "Running without lifecycle change - normal update", + statefulsetReplicas: int32Ptr(3), + currentStatus: "Running", + newSpecLifecyclePhase: "", + newSpecNumberOfInst: 3, + previousNumberOfInst: 0, + wantContinue: true, + wantNumberOfInstances: int32Ptr(3), + wantStatus: "Running", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Cluster{ + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + } + + if tt.statefulsetReplicas != nil { + c.Statefulset = &appsv1.StatefulSet{ + Spec: appsv1.StatefulSetSpec{ + Replicas: tt.statefulsetReplicas, + }, + } + } + + oldSpec := acidv1.Postgresql{ + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: tt.currentStatus, + }, + } + + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + NumberOfInstances: tt.newSpecNumberOfInst, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: tt.currentStatus, + PreviousNumberOfInstances: tt.previousNumberOfInst, + }, + } + + if tt.newSpecLifecyclePhase != "" || tt.newSpecLifecyclePhase == "" && tt.currentStatus == "Stopped" { + newSpec.Spec.Lifecycle = &acidv1.LifecycleSpec{ + Phase: tt.newSpecLifecyclePhase, + } + } + + gotContinue := c.manageHibernateState(oldSpec, newSpec) + + assert.Equal(t, tt.wantContinue, gotContinue, "continue sync mismatch") + if tt.wantNumberOfInstances != nil { + assert.Equal(t, *tt.wantNumberOfInstances, newSpec.Spec.NumberOfInstances, "numberOfInstances mismatch") + } + assert.Equal(t, tt.wantStatus, newSpec.Status.PostgresClusterStatus, "status mismatch") + }) + } +} + +func TestManageHibernateState_StateTransitionSequence(t *testing.T) { + t.Run("Running -> Stopping -> Stopped sequence", func(t *testing.T) { + c := &Cluster{ + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + }, + } + + oldSpec := acidv1.Postgresql{ + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Running"}, + } + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 3, + Lifecycle: &acidv1.LifecycleSpec{Phase: "stopped"}, + }, + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Running"}, + } + + continueSync := c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, int32(0), newSpec.Spec.NumberOfInstances) + assert.Equal(t, int32(3), newSpec.Status.PreviousNumberOfInstances) + assert.Equal(t, "Stopping", newSpec.Status.PostgresClusterStatus) + + c.Statefulset = &appsv1.StatefulSet{ + Spec: appsv1.StatefulSetSpec{Replicas: int32Ptr(2)}, + } + oldSpec = acidv1.Postgresql{ + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Stopping"}, + } + newSpec.Status.PostgresClusterStatus = "Stopping" + + continueSync = c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, "Stopping", newSpec.Status.PostgresClusterStatus) + + c.Statefulset.Spec.Replicas = int32Ptr(0) + continueSync = c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, "Stopped", newSpec.Status.PostgresClusterStatus) + }) + + t.Run("Stopped -> Updating -> Running sequence", func(t *testing.T) { + c := &Cluster{ + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + }, + } + + oldSpec := acidv1.Postgresql{ + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Stopped"}, + } + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 0, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopped", + PreviousNumberOfInstances: 3, + }, + } + + continueSync := c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, int32(3), newSpec.Spec.NumberOfInstances) + assert.Equal(t, "Updating", newSpec.Status.PostgresClusterStatus) + + oldSpec = acidv1.Postgresql{ + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Updating"}, + } + newSpec.Status.PostgresClusterStatus = "Updating" + + continueSync = c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, int32(3), newSpec.Spec.NumberOfInstances) + }) +} + +func TestSuspendLogicalBackupJob(t *testing.T) { + tests := []struct { + name string + jobExists bool + patchFails bool + wantErr bool + }{ + { + name: "job exists, suspend succeeds", + jobExists: true, + wantErr: false, + }, + { + name: "job does not exist - no-op", + jobExists: false, + wantErr: false, + }, + { + name: "job exists but patch fails", + jobExists: true, + patchFails: true, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientSet := fake.NewSimpleClientset() + jobName := "logical-backup-test-cluster" + + if tt.jobExists { + clientSet.BatchV1().CronJobs("default").Create(context.TODO(), &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: "default", + }, + Spec: batchv1.CronJobSpec{ + Schedule: "30 00 * * *", + }, + }, metav1.CreateOptions{}) + } + + if tt.patchFails { + clientSet.PrependReactor("patch", "cronjobs", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("network error") + }) + } + + kubeClient := &k8sutil.KubernetesClient{ + CronJobsGetter: clientSet.BatchV1(), + } + + var job *batchv1.CronJob + if tt.jobExists { + job, _ = kubeClient.CronJobs("default").Get(context.TODO(), jobName, metav1.GetOptions{}) + } + + c := New( + Config{ + OpConfig: config.Config{ + LogicalBackup: config.LogicalBackup{ + LogicalBackupJobPrefix: "logical-backup-", + }, + }, + }, + *kubeClient, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + lifecycleLogger, + lifecycleEventRecorder, + ) + c.LogicalBackupJob = job + + err := c.suspendLogicalBackupJob() + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + if tt.jobExists && !tt.patchFails { + updatedJob, _ := kubeClient.CronJobs("default").Get(context.TODO(), jobName, metav1.GetOptions{}) + if updatedJob != nil { + assert.True(t, *updatedJob.Spec.Suspend, "job should be suspended") + } + } + } + }) + } +} + +func TestUnsuspendLogicalBackupJob(t *testing.T) { + tests := []struct { + name string + jobExists bool + patchFails bool + wantErr bool + }{ + { + name: "job exists, unsuspend succeeds", + jobExists: true, + wantErr: false, + }, + { + name: "job does not exist - no-op", + jobExists: false, + wantErr: false, + }, + { + name: "job exists but patch fails", + jobExists: true, + patchFails: true, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientSet := fake.NewSimpleClientset() + jobName := "logical-backup-test-cluster" + + if tt.jobExists { + suspendTrue := true + clientSet.BatchV1().CronJobs("default").Create(context.TODO(), &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: "default", + }, + Spec: batchv1.CronJobSpec{ + Schedule: "30 00 * * *", + Suspend: &suspendTrue, + }, + }, metav1.CreateOptions{}) + } + + if tt.patchFails { + clientSet.PrependReactor("patch", "cronjobs", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("network error") + }) + } + + kubeClient := &k8sutil.KubernetesClient{ + CronJobsGetter: clientSet.BatchV1(), + } + + var job *batchv1.CronJob + if tt.jobExists { + job, _ = kubeClient.CronJobs("default").Get(context.TODO(), jobName, metav1.GetOptions{}) + } + + c := New( + Config{ + OpConfig: config.Config{ + LogicalBackup: config.LogicalBackup{ + LogicalBackupJobPrefix: "logical-backup-", + }, + }, + }, + *kubeClient, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + lifecycleLogger, + lifecycleEventRecorder, + ) + c.LogicalBackupJob = job + + err := c.unsuspendLogicalBackupJob() + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + if tt.jobExists && !tt.patchFails { + updatedJob, _ := kubeClient.CronJobs("default").Get(context.TODO(), jobName, metav1.GetOptions{}) + if updatedJob != nil { + assert.False(t, *updatedJob.Spec.Suspend, "job should be unsuspended") + } + } + } + }) + } +} \ No newline at end of file diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 1925733de..07dfae0c1 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -825,6 +825,58 @@ func (c *Cluster) deleteLogicalBackupJob() error { return nil } +func (c *Cluster) suspendLogicalBackupJob() error { + if c.LogicalBackupJob == nil { + c.logger.Debug("logical backup job is not loaded, skipping suspend") + return nil + } + + c.setProcessName("suspending logical backup job") + + patchData := fmt.Sprintf(`{"spec":{"suspend":true}}`) + cronJob, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Patch( + context.TODO(), + c.getLogicalBackupJobName(), + types.MergePatchType, + []byte(patchData), + metav1.PatchOptions{}, + "", + ) + if err != nil { + return fmt.Errorf("could not suspend logical backup job: %w", err) + } + c.LogicalBackupJob = cronJob + c.logger.Info("logical backup job suspended") + + return nil +} + +func (c *Cluster) unsuspendLogicalBackupJob() error { + if c.LogicalBackupJob == nil { + c.logger.Debug("logical backup job is not loaded, skipping unsuspend") + return nil + } + + c.setProcessName("resuming logical backup job") + + patchData := fmt.Sprintf(`{"spec":{"suspend":false}}`) + cronJob, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Patch( + context.TODO(), + c.getLogicalBackupJobName(), + types.MergePatchType, + []byte(patchData), + metav1.PatchOptions{}, + "", + ) + if err != nil { + return fmt.Errorf("could not resume logical backup job: %w", err) + } + c.LogicalBackupJob = cronJob + c.logger.Info("logical backup job resumed") + + return nil +} + // GetServiceMaster returns cluster's kubernetes master Service func (c *Cluster) GetServiceMaster() *v1.Service { return c.Services[Master] diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index ffebd306c..13982900c 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -47,7 +47,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { if err != nil { c.logger.Warningf("error while syncing cluster state: %v", err) newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed - } else if !c.Status.Running() { + } else if !c.Status.Running() && !c.Status.Stopping() && !c.Status.Stopped() { newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning } @@ -65,6 +65,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { c.logger.Debugf("could not sync finalizers: %v", err) } + // Handle lifecycle hibernate/wake-up state transitions + if !c.manageHibernateState(oldSpec, newSpec) { + return nil + } + if err = c.initUsers(); err != nil { err = fmt.Errorf("could not init users: %v", err) return err diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index c34faddd4..0e31112ad 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -200,6 +200,16 @@ func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.Namespaced return pg, nil } +// UpdatePostgresCR of Postgres cluster (updates full resource including spec) +func (client *KubernetesClient) UpdatePostgresCR(clusterName spec.NamespacedName, pg *apiacidv1.Postgresql) (*apiacidv1.Postgresql, error) { + pg, err := client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Update(context.TODO(), pg, metav1.UpdateOptions{}) + if err != nil { + return pg, fmt.Errorf("could not update PostgresCR: %v", err) + } + + return pg, nil +} + // SetFinalizer of Postgres cluster func (client *KubernetesClient) SetFinalizer(clusterName spec.NamespacedName, pg *apiacidv1.Postgresql, finalizers []string) (*apiacidv1.Postgresql, error) { var (