WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cluster
// Postgres CustomResourceDefinition object i.e. Spilo

import (
"context"
"database/sql"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -70,7 +71,7 @@ type kubeResources struct {
CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately
// Pods are treated separately
}

// Cluster describes postgresql cluster
Expand All @@ -95,7 +96,7 @@ type Cluster struct {

teamsAPIClient teams.Interface
oauthTokenGetter OAuthTokenGetter
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
KubeClient k8sutil.KubernetesClient // TODO: move clients to the better place?
currentProcess Process
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
Expand Down Expand Up @@ -149,7 +150,8 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
PatroniEndpoints: make(map[string]*v1.Endpoints),
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim),
Streams: make(map[string]*zalandov1.FabricEventStream)},
Streams: make(map[string]*zalandov1.FabricEventStream),
},
userSyncStrategy: users.DefaultUserSyncStrategy{
PasswordEncryption: passwordEncryption,
RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix,
Expand Down Expand Up @@ -276,7 +278,7 @@ func (c *Cluster) Create() (err error) {
errStatus error
)
if err == nil {
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) // TODO: are you sure it's running?
} else {
c.logger.Warningf("cluster created failed: %v", err)
pgUpdatedStatus, errStatus = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed)
Expand Down Expand Up @@ -440,7 +442,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
var match, needsRollUpdate, needsReplace bool

match = true
//TODO: improve me
// TODO: improve me
if *c.Statefulset.Spec.Replicas != *statefulSet.Spec.Replicas {
match = false
reasons = append(reasons, "new statefulset's number of replicas does not match the current one")
Expand Down Expand Up @@ -672,7 +674,6 @@ func compareResourcesAssumeFirstNotNil(a *v1.ResourceRequirements, b *v1.Resourc
}
}
return true

}

func compareEnv(a, b []v1.EnvVar) bool {
Expand Down Expand Up @@ -707,9 +708,7 @@ func compareEnv(a, b []v1.EnvVar) bool {
}

func compareSpiloConfiguration(configa, configb string) bool {
var (
oa, ob spiloConfiguration
)
var oa, ob spiloConfiguration

var err error
err = json.Unmarshal([]byte(configa), &oa)
Expand Down Expand Up @@ -818,7 +817,6 @@ func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]
}

return reason != "", reason

}

func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
Expand Down Expand Up @@ -895,7 +893,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLog
}

func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
// TODO: improve comparison
if !reflect.DeepEqual(new.Spec, cur.Spec) {
return false, "new PDB's spec does not match the current one"
}
Expand Down Expand Up @@ -944,8 +942,17 @@ func (c *Cluster) removeFinalizer() error {
}

c.logger.Infof("removing finalizer %s", finalizerName)
finalizers := util.RemoveString(c.ObjectMeta.Finalizers, finalizerName)
newSpec, err := c.KubeClient.SetFinalizer(c.clusterName(), c.DeepCopy(), finalizers)

// Fetch the latest version of the object to avoid resourceVersion conflicts
clusterName := c.clusterName()
latestPg, err := c.KubeClient.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Get(
context.TODO(), clusterName.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error fetching latest postgresql for finalizer removal: %v", err)
}

finalizers := util.RemoveString(latestPg.ObjectMeta.Finalizers, finalizerName)
newSpec, err := c.KubeClient.SetFinalizer(clusterName, latestPg, finalizers)
if err != nil {
return fmt.Errorf("error removing finalizer: %v", err)
}
Expand Down Expand Up @@ -1063,7 +1070,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}

c.logger.Debug("syncing secrets")
//TODO: mind the secrets of the deleted/new users
// TODO: mind the secrets of the deleted/new users
if err := c.syncSecrets(); err != nil {
c.logger.Errorf("could not sync secrets: %v", err)
updateFailed = true
Expand Down Expand Up @@ -1101,7 +1108,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {

// logical backup job
func() {

// create if it did not exist
if !oldSpec.Spec.EnableLogicalBackup && newSpec.Spec.EnableLogicalBackup {
c.logger.Debug("creating backup cron job")
Expand Down Expand Up @@ -1129,7 +1135,6 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed = true
}
}

}()

// Roles and Databases
Expand Down Expand Up @@ -1206,7 +1211,7 @@ func syncResources(a, b *v1.ResourceRequirements) bool {
// before the pods, it will be re-created by the current master pod and will remain, obstructing the
// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last.
func (c *Cluster) Delete() error {
var anyErrors = false
anyErrors := false
c.mu.Lock()
defer c.mu.Unlock()
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources")
Expand Down Expand Up @@ -1297,7 +1302,6 @@ func (c *Cluster) NeedsRepair() (bool, acidv1.PostgresStatus) {
c.specMu.RLock()
defer c.specMu.RUnlock()
return !c.Status.Success(), c.Status

}

// ReceivePodEvent is called back by the controller in order to add the cluster's pod event to the queue.
Expand Down Expand Up @@ -1406,7 +1410,6 @@ func (c *Cluster) initSystemUsers() {
}

func (c *Cluster) initPreparedDatabaseRoles() error {

if c.Spec.PreparedDatabases != nil && len(c.Spec.PreparedDatabases) == 0 { // TODO: add option to disable creating such a default DB
c.Spec.PreparedDatabases = map[string]acidv1.PreparedDatabase{strings.Replace(c.Name, "-", "_", -1): {}}
}
Expand Down Expand Up @@ -1472,10 +1475,9 @@ func (c *Cluster) initPreparedDatabaseRoles() error {
}

func (c *Cluster) initDefaultRoles(defaultRoles map[string]string, admin, prefix, searchPath, secretNamespace string) error {

for defaultRole, inherits := range defaultRoles {
namespace := c.Namespace
//if namespaced secrets are allowed
// if namespaced secrets are allowed
if secretNamespace != "" {
if c.Config.OpConfig.EnableCrossNamespaceSecret {
namespace = secretNamespace
Expand Down Expand Up @@ -1543,7 +1545,7 @@ func (c *Cluster) initRobotUsers() error {
}
}

//if namespaced secrets are allowed
// if namespaced secrets are allowed
if c.Config.OpConfig.EnableCrossNamespaceSecret {
if strings.Contains(username, ".") {
splits := strings.Split(username, ".")
Expand Down Expand Up @@ -1594,7 +1596,6 @@ func (c *Cluster) initAdditionalOwnerRoles() {

func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) error {
teamMembers, err := c.getTeamMembers(teamID)

if err != nil {
return fmt.Errorf("could not get list of team members for team %q: %v", teamID, err)
}
Expand Down Expand Up @@ -1633,7 +1634,6 @@ func (c *Cluster) initTeamMembers(teamID string, isPostgresSuperuserTeam bool) e
}

func (c *Cluster) initHumanUsers() error {

var clusterIsOwnedBySuperuserTeam bool
superuserTeams := []string{}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (c *Controller) initRoleBinding() {
}`, c.PodServiceAccount.Name, c.PodServiceAccount.Name, c.PodServiceAccount.Name)
c.opConfig.PodServiceAccountRoleBindingDefinition = compactValue(stringValue)
}
c.logger.Info("Parse role bindings")

// re-uses k8s internal parsing. See k8s client-go issue #193 for explanation
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, groupVersionKind, err := decode([]byte(c.opConfig.PodServiceAccountRoleBindingDefinition), nil, nil)
Expand Down
39 changes: 30 additions & 9 deletions pkg/controller/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,26 @@ func (c *Controller) processEvent(event ClusterEvent) {

lg.Infoln("cluster has been created")
case EventUpdate:
lg.Infoln("update of the cluster started")

if !clusterFound {
lg.Warningln("cluster does not exist")
return
}
c.curWorkerCluster.Store(event.WorkerID, cl)

// Check if this cluster has been marked for deletion
if !event.NewSpec.ObjectMeta.DeletionTimestamp.IsZero() {
lg.Infof("cluster has a DeletionTimestamp of %s, starting deletion now.", event.NewSpec.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
if err = cl.Delete(); err != nil {
cl.Error = fmt.Sprintf("error deleting cluster and its resources: %v", err)
c.eventRecorder.Eventf(cl.GetReference(), v1.EventTypeWarning, "Delete", "%v", cl.Error)
lg.Error(cl.Error)
return
}
lg.Infoln("cluster has been deleted via update event")
return
}

lg.Infoln("update of the cluster started")
err = cl.Update(event.OldSpec, event.NewSpec)
if err != nil {
cl.Error = fmt.Sprintf("could not update cluster: %v", err)
Expand Down Expand Up @@ -379,7 +392,6 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
}

func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.PostgresSpec) {

deprecate := func(deprecated, replacement string) {
c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
}
Expand Down Expand Up @@ -425,7 +437,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
clusterError string
)

if informerOldSpec != nil { //update, delete
if informerOldSpec != nil { // update, delete
uid = informerOldSpec.GetUID()
clusterName = util.NameFromMeta(informerOldSpec.ObjectMeta)

Expand All @@ -440,7 +452,7 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
} else {
clusterError = informerOldSpec.Error
}
} else { //add, sync
} else { // add, sync
uid = informerNewSpec.GetUID()
clusterName = util.NameFromMeta(informerNewSpec.ObjectMeta)
clusterError = informerNewSpec.Error
Expand Down Expand Up @@ -539,7 +551,19 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) {
pgOld := c.postgresqlCheck(prev)
pgNew := c.postgresqlCheck(cur)
if pgOld != nil && pgNew != nil {
// Avoid the inifinite recursion for status updates
clusterName := util.NameFromMeta(pgNew.ObjectMeta)

// Check if DeletionTimestamp was set (resource marked for deletion)
deletionTimestampChanged := pgOld.ObjectMeta.DeletionTimestamp.IsZero() && !pgNew.ObjectMeta.DeletionTimestamp.IsZero()
if deletionTimestampChanged {
c.logger.WithField("cluster-name", clusterName).Infof(
"UPDATE event: DeletionTimestamp set to %s, queueing event",
pgNew.ObjectMeta.DeletionTimestamp.Format(time.RFC3339))
c.queueClusterEvent(pgOld, pgNew, EventUpdate)
return
}

// Avoid the infinite recursion for status updates
if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) {
if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) {
return
Expand Down Expand Up @@ -578,7 +602,6 @@ or config maps.
The operator does not sync accounts/role bindings after creation.
*/
func (c *Controller) submitRBACCredentials(event ClusterEvent) error {

namespace := event.NewSpec.GetNamespace()

if err := c.createPodServiceAccount(namespace); err != nil {
Expand All @@ -592,7 +615,6 @@ func (c *Controller) submitRBACCredentials(event ClusterEvent) error {
}

func (c *Controller) createPodServiceAccount(namespace string) error {

podServiceAccountName := c.opConfig.PodServiceAccountName
_, err := c.KubeClient.ServiceAccounts(namespace).Get(context.TODO(), podServiceAccountName, metav1.GetOptions{})
if k8sutil.ResourceNotFound(err) {
Expand All @@ -615,7 +637,6 @@ func (c *Controller) createPodServiceAccount(namespace string) error {
}

func (c *Controller) createRoleBindings(namespace string) error {

podServiceAccountName := c.opConfig.PodServiceAccountName
podServiceAccountRoleBindingName := c.PodServiceAccountRoleBinding.Name

Expand Down