diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index b6a4e24a8..f2f3651e7 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -3,6 +3,7 @@ package cluster // Postgres CustomResourceDefinition object i.e. Spilo import ( + "context" "database/sql" "encoding/json" "fmt" @@ -70,7 +71,7 @@ type kubeResources struct { CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget LogicalBackupJob *batchv1.CronJob Streams map[string]*zalandov1.FabricEventStream - //Pods are treated separately + // Pods are treated separately } // Cluster describes postgresql cluster @@ -95,7 +96,7 @@ type Cluster struct { teamsAPIClient teams.Interface oauthTokenGetter OAuthTokenGetter - KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place? + KubeClient k8sutil.KubernetesClient // TODO: move clients to the better place? currentProcess Process processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex @@ -149,7 +150,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres PatroniEndpoints: make(map[string]*v1.Endpoints), PatroniConfigMaps: make(map[string]*v1.ConfigMap), VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim), - Streams: make(map[string]*zalandov1.FabricEventStream)}, + Streams: make(map[string]*zalandov1.FabricEventStream), + }, userSyncStrategy: users.DefaultUserSyncStrategy{ PasswordEncryption: passwordEncryption, RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix, @@ -276,7 +278,7 @@ func (c *Cluster) Create() (err error) { errStatus error ) if err == nil { - pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running? + pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) // TODO: are you sure it's running? } else { c.logger.Warningf("cluster created failed: %v", err) pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed) @@ -440,7 +442,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa var match, needsRollUpdate, needsReplace bool match = true - //TODO: improve me + // TODO: improve me if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas { match = false reasons = append(reasons, "new statefulset's number of replicas does not match the current one") @@ -672,7 +674,6 @@ func compareResourcesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resourc } } return true - } func compareEnv(a, b []v1.EnvVar) bool { @@ -707,9 +708,7 @@ func compareEnv(a, b []v1.EnvVar) bool { } func compareSpiloConfiguration(configa, configb string) bool { - var ( - oa, ob spiloConfiguration - ) + var oa, ob spiloConfiguration var err error err = json.Unmarshal([]byte(configa), &oa) @@ -818,7 +817,6 @@ func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[] } return reason != "", reason - } func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) { @@ -895,7 +893,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLog } func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) { - //TODO: improve comparison + // TODO: improve comparison if !reflect.DeepEqual(new.Spec, cur.Spec) { return false, "new PDB's spec does not match the current one" } @@ -944,8 +942,17 @@ func (c *Cluster) removeFinalizer() error { } c.logger.Infof("removing finalizer %s", finalizerName) - finalizers := util.RemoveString(c.ObjectMeta.Finalizers, finalizerName) - newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers) + + // Fetch the latest version of the object to avoid resourceVersion conflicts + clusterName := c.clusterName() + latestPg, err := c.KubeClient.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Get( + context.TODO(), clusterName.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("error fetching latest postgresql for finalizer removal: %v", err) + } + + finalizers := util.RemoveString(latestPg.ObjectMeta.Finalizers, finalizerName) + newSpec, err := c.KubeClient.SetFinalizer(clusterName, latestPg, finalizers) if err != nil { return fmt.Errorf("error removing finalizer: %v", err) } @@ -1063,7 +1070,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } c.logger.Debug("syncing secrets") - //TODO: mind the secrets of the deleted/new users + // TODO: mind the secrets of the deleted/new users if err := c.syncSecrets(); err != nil { c.logger.Errorf("could not sync secrets: %v", err) updateFailed = true @@ -1101,7 +1108,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { // logical backup job func() { - // create if it did not exist if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup { c.logger.Debug("creating backup cron job") @@ -1129,7 +1135,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed = true } } - }() // Roles and Databases @@ -1206,7 +1211,7 @@ func syncResources(a, b *v1.ResourceRequirements) bool { // before the pods, it will be re-created by the current master pod and will remain, obstructing the // creation of the new cluster with the same name. Therefore, the endpoints should be deleted last. func (c *Cluster) Delete() error { - var anyErrors = false + anyErrors := false c.mu.Lock() defer c.mu.Unlock() c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources") @@ -1297,7 +1302,6 @@ func (c *Cluster) NeedsRepair() (bool, acidv1.PostgresStatus) { c.specMu.RLock() defer c.specMu.RUnlock() return !c.Status.Success(), c.Status - } // ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue. @@ -1406,7 +1410,6 @@ func (c *Cluster) initSystemUsers() { } func (c *Cluster) initPreparedDatabaseRoles() error { - if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}} } @@ -1472,10 +1475,9 @@ func (c *Cluster) initPreparedDatabaseRoles() error { } func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix, searchPath, secretNamespace string) error { - for defaultRole, inherits := range defaultRoles { namespace := c.Namespace - //if namespaced secrets are allowed + // if namespaced secrets are allowed if secretNamespace != "" { if c.Config.OpConfig.EnableCrossNamespaceSecret { namespace = secretNamespace @@ -1543,7 +1545,7 @@ func (c *Cluster) initRobotUsers() error { } } - //if namespaced secrets are allowed + // if namespaced secrets are allowed if c.Config.OpConfig.EnableCrossNamespaceSecret { if strings.Contains(username, ".") { splits := strings.Split(username, ".") @@ -1594,7 +1596,6 @@ func (c *Cluster) initAdditionalOwnerRoles() { func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) error { teamMembers, err := c.getTeamMembers(teamID) - if err != nil { return fmt.Errorf("could not get list of team members for team %q: %v", teamID, err) } @@ -1633,7 +1634,6 @@ func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) e } func (c *Cluster) initHumanUsers() error { - var clusterIsOwnedBySuperuserTeam bool superuserTeams := []string{} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e46b9ee44..aa6262264 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -277,7 +277,7 @@ func (c *Controller) initRoleBinding() { }`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name) c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue) } - c.logger.Info("Parse role bindings") + // re-uses k8s internal parsing. See k8s client-go issue #193 for explanation decode := scheme.Codecs.UniversalDeserializer().Decode obj, groupVersionKind, err := decode([]byte(c.opConfig.PodServiceAccountRoleBindingDefinition), nil, nil) diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 824a030f4..22042377e 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -258,13 +258,26 @@ func (c *Controller) processEvent(event ClusterEvent) { lg.Infoln("cluster has been created") case EventUpdate: - lg.Infoln("update of the cluster started") - if !clusterFound { lg.Warningln("cluster does not exist") return } c.curWorkerCluster.Store(event.WorkerID, cl) + + // Check if this cluster has been marked for deletion + if !event.NewSpec.ObjectMeta.DeletionTimestamp.IsZero() { + lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", event.NewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339)) + if err = cl.Delete(); err != nil { + cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err) + c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error) + lg.Error(cl.Error) + return + } + lg.Infoln("cluster has been deleted via update event") + return + } + + lg.Infoln("update of the cluster started") err = cl.Update(event.OldSpec, event.NewSpec) if err != nil { cl.Error = fmt.Sprintf("could not update cluster: %v", err) @@ -379,7 +392,6 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, } func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) { - deprecate := func(deprecated, replacement string) { c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement) } @@ -425,7 +437,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. clusterError string ) - if informerOldSpec != nil { //update, delete + if informerOldSpec != nil { // update, delete uid = informerOldSpec.GetUID() clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta) @@ -440,7 +452,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1. } else { clusterError = informerOldSpec.Error } - } else { //add, sync + } else { // add, sync uid = informerNewSpec.GetUID() clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta) clusterError = informerNewSpec.Error @@ -539,7 +551,19 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) { pgOld := c.postgresqlCheck(prev) pgNew := c.postgresqlCheck(cur) if pgOld != nil && pgNew != nil { - // Avoid the inifinite recursion for status updates + clusterName := util.NameFromMeta(pgNew.ObjectMeta) + + // Check if DeletionTimestamp was set (resource marked for deletion) + deletionTimestampChanged := pgOld.ObjectMeta.DeletionTimestamp.IsZero() && !pgNew.ObjectMeta.DeletionTimestamp.IsZero() + if deletionTimestampChanged { + c.logger.WithField("cluster-name", clusterName).Infof( + "UPDATE event: DeletionTimestamp set to %s, queueing event", + pgNew.ObjectMeta.DeletionTimestamp.Format(time.RFC3339)) + c.queueClusterEvent(pgOld, pgNew, EventUpdate) + return + } + + // Avoid the infinite recursion for status updates if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) { return @@ -578,7 +602,6 @@ or config maps. The operator does not sync accounts/role bindings after creation. */ func (c *Controller) submitRBACCredentials(event ClusterEvent) error { - namespace := event.NewSpec.GetNamespace() if err := c.createPodServiceAccount(namespace); err != nil { @@ -592,7 +615,6 @@ func (c *Controller) submitRBACCredentials(event ClusterEvent) error { } func (c *Controller) createPodServiceAccount(namespace string) error { - podServiceAccountName := c.opConfig.PodServiceAccountName _, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{}) if k8sutil.ResourceNotFound(err) { @@ -615,7 +637,6 @@ func (c *Controller) createPodServiceAccount(namespace string) error { } func (c *Controller) createRoleBindings(namespace string) error { - podServiceAccountName := c.opConfig.PodServiceAccountName podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name