WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 31 additions & 41 deletions controllers/parameters/policy_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ import (

appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
parametersv1alpha1 "github.com/apecloud/kubeblocks/apis/parameters/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
cfgcm "github.com/apecloud/kubeblocks/pkg/configuration/config_manager"
"github.com/apecloud/kubeblocks/pkg/configuration/core"
cfgproto "github.com/apecloud/kubeblocks/pkg/configuration/proto"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/configuration"
"github.com/apecloud/kubeblocks/pkg/controller/instanceset"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/generics"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

Expand Down Expand Up @@ -202,54 +204,42 @@ func validIPv6Address(ip net.IP) bool {
return ip != nil && ip.To16() != nil
}

func getComponentSpecPtrByName(cli client.Client, ctx intctrlutil.RequestCtx, cluster *appsv1.Cluster, compName string) (*appsv1.ClusterComponentSpec, error) {
for i := range cluster.Spec.ComponentSpecs {
componentSpec := &cluster.Spec.ComponentSpecs[i]
if componentSpec.Name == compName {
return componentSpec, nil
}
func restartWorkload[T generics.Object, PT generics.PObject[T], L generics.ObjList[T], PL generics.PObjList[T, L]](cli client.Client, ctx context.Context, annotationKey, annotationValue string, obj PT, _ func(T, PT, L, PL)) error {

template := transformPodTemplate(obj)
if template.Annotations != nil && template.Annotations[annotationKey] == annotationValue {
return nil
}
// check if the component is a sharding component
compObjList := &appsv1.ComponentList{}
if err := cli.List(ctx.Ctx, compObjList, client.MatchingLabels{
constant.AppInstanceLabelKey: cluster.Name,
constant.KBAppComponentLabelKey: compName,
}); err != nil {
return nil, err

patch := client.MergeFrom(PT(obj.DeepCopy()))
if template.Annotations == nil {
template.Annotations = map[string]string{}
}
if len(compObjList.Items) > 0 {
shardingName := compObjList.Items[0].Labels[constant.KBAppShardingNameLabelKey]
if shardingName != "" {
for i := range cluster.Spec.Shardings {
shardSpec := &cluster.Spec.Shardings[i]
if shardSpec.Name == shardingName {
return &shardSpec.Template, nil
}
}
}
template.Annotations[annotationKey] = annotationValue
if err := cli.Patch(ctx, obj, patch); err != nil {
return err
}
return nil, fmt.Errorf("component %s not found", compName)
return nil
}

func restartComponent(cli client.Client, ctx intctrlutil.RequestCtx, configKey string, newVersion string, cluster *appsv1.Cluster, compName string) error {
func restartComponent(cli client.Client, ctx intctrlutil.RequestCtx, configKey string, newVersion string, objs []client.Object, recordEvent func(obj client.Object)) (client.Object, error) {
var err error
cfgAnnotationKey := core.GenerateUniqKeyWithConfig(constant.UpgradeRestartAnnotationKey, configKey)

compSpec, err := getComponentSpecPtrByName(cli, ctx, cluster, compName)
if err != nil {
return err
}

if compSpec.Annotations == nil {
compSpec.Annotations = map[string]string{}
}

if compSpec.Annotations[cfgAnnotationKey] == newVersion {
return nil
for _, obj := range objs {
switch w := obj.(type) {
case *workloads.InstanceSet:
err = restartWorkload(cli, ctx.Ctx, cfgAnnotationKey, newVersion, w, generics.InstanceSetSignature)
default:
// ignore other types workload
}
if err != nil {
return obj, err
}
if recordEvent != nil {
recordEvent(obj)
}
}

compSpec.Annotations[cfgAnnotationKey] = newVersion

return cli.Update(ctx.Ctx, cluster)
return nil, nil
}

type ReloadAction interface {
Expand Down
21 changes: 20 additions & 1 deletion controllers/parameters/restart_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package parameters

import (
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
parametersv1alpha1 "github.com/apecloud/kubeblocks/apis/parameters/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/configuration/core"
Expand Down Expand Up @@ -54,7 +57,23 @@ func restartAndVerifyComponent(rctx reconfigureContext, funcs RollingUpgradeFunc
progress = core.NotStarted
)

if err := funcs.RestartComponent(rctx.Client, rctx.RequestCtx, configKey, newVersion, rctx.Cluster, rctx.ClusterComponent.Name); err != nil {
recordEvent := func(obj client.Object) {
rctx.Recorder.Eventf(obj,
corev1.EventTypeNormal, "ReconfigureRestarted",
"restarting component[%s] in cluster[%s], version: %s", rctx.ClusterComponent.Name, rctx.Cluster.Name, newVersion)
}

objs := make([]client.Object, 0)
for _, unit := range rctx.InstanceSetUnits {
objs = append(objs, &unit)
}

if obj, err := funcs.RestartComponent(rctx.Client, rctx.RequestCtx, configKey, newVersion, objs, recordEvent); err != nil {
if obj != nil {
rctx.Recorder.Eventf(obj,
corev1.EventTypeWarning, "ReconfigureFailed",
"failed to restart component[%s] in cluster[%s], version: %s", client.ObjectKeyFromObject(obj), rctx.Cluster.Name, newVersion)
}
return makeReturnedStatus(ESFailedAndRetry), err
}

Expand Down
6 changes: 3 additions & 3 deletions controllers/parameters/restart_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var _ = Describe("Reconfigure restartPolicy", func() {

// mock client update caller
updateErr := core.MakeError("update failed!")
k8sMockClient.MockUpdateMethod(
k8sMockClient.MockPatchMethod(
testutil.WithFailed(updateErr, testutil.WithTimes(1)),
testutil.WithSucceed(testutil.WithAnyTimes()))

Expand Down Expand Up @@ -139,7 +139,7 @@ var _ = Describe("Reconfigure restartPolicy", func() {
}),
withClusterComponent(2))

k8sMockClient.MockUpdateMethod(testutil.WithSucceed(testutil.WithAnyTimes()))
k8sMockClient.MockPatchMethod(testutil.WithSucceed(testutil.WithAnyTimes()))
k8sMockClient.MockListMethod(testutil.WithListReturned(
testutil.WithConstructListSequenceResult([][]runtime.Object{
fromPodObjectList(newMockPodsWithInstanceSet(&mockParam.InstanceSetUnits[0], 2)),
Expand Down Expand Up @@ -187,7 +187,7 @@ var _ = Describe("Reconfigure restartPolicy", func() {
withClusterComponent(2))

updateErr := core.MakeError("update failed!")
k8sMockClient.MockUpdateMethod(
k8sMockClient.MockPatchMethod(
testutil.WithFailed(updateErr, testutil.WithTimes(1)),
testutil.WithSucceed(testutil.WithAnyTimes()))

Expand Down
3 changes: 1 addition & 2 deletions controllers/parameters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ import (
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
cfgproto "github.com/apecloud/kubeblocks/pkg/configuration/proto"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

type createReconfigureClient func(addr string) (cfgproto.ReconfigureClient, error)

type GetPodsFunc func(params reconfigureContext) ([]corev1.Pod, error)
type RestartComponent func(client client.Client, ctx intctrlutil.RequestCtx, key string, version string, cluster *appsv1.Cluster, compName string) error
type RestartComponent func(client client.Client, ctx intctrlutil.RequestCtx, key string, version string, objs []client.Object, recordEvent func(obj client.Object)) (client.Object, error)

type RestartContainerFunc func(pod *corev1.Pod, ctx context.Context, containerName []string, createConnFn createReconfigureClient) error
type OnlineUpdatePodFunc func(pod *corev1.Pod, ctx context.Context, createClient createReconfigureClient, configSpec string, configFile string, updatedParams map[string]string) error
Expand Down