WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 33 additions & 21 deletions cmd/memberagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,19 @@ var (
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
leaderElectionNamespace = flag.String("leader-election-namespace", "kube-system", "The namespace in which the leader election resource will be created.")
// TODO(weiweng): only keep enableV1Alpha1APIs for backward compatibility with helm charts. Remove soon.
enableV1Alpha1APIs = flag.Bool("enable-v1alpha1-apis", false, "If set, the agents will watch for the v1alpha1 APIs. This is deprecated and will be removed soon.")
enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.")
propertyProvider = flag.String("property-provider", "none", "The property provider to use for the agent.")
region = flag.String("region", "", "The region where the member cluster resides.")
cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/config.json", "The path to the cloud cloudconfig file.")
watchWorkWithPriorityQueue = flag.Bool("enable-watch-work-with-priority-queue", false, "If set, the apply_work controller will watch/reconcile work objects that are created new or have recent updates")
watchWorkReconcileAgeMinutes = flag.Int("watch-work-reconcile-age", 60, "maximum age (in minutes) of work objects for apply_work controller to watch/reconcile")
deletionWaitTime = flag.Int("deletion-wait-time", 5, "The time the work-applier will wait for work object to be deleted before updating the applied work owner reference")
enablePprof = flag.Bool("enable-pprof", false, "enable pprof profiling")
pprofPort = flag.Int("pprof-port", 6065, "port for pprof profiling")
hubPprofPort = flag.Int("hub-pprof-port", 6066, "port for hub pprof profiling")
hubQPS = flag.Float64("hub-api-qps", 50, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
hubBurst = flag.Int("hub-api-burst", 500, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
memberQPS = flag.Float64("member-api-qps", 250, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
memberBurst = flag.Int("member-api-burst", 1000, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
enableV1Alpha1APIs = flag.Bool("enable-v1alpha1-apis", false, "If set, the agents will watch for the v1alpha1 APIs. This is deprecated and will be removed soon.")
enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.")
propertyProvider = flag.String("property-provider", "none", "The property provider to use for the agent.")
region = flag.String("region", "", "The region where the member cluster resides.")
cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/config.json", "The path to the cloud cloudconfig file.")
deletionWaitTime = flag.Int("deletion-wait-time", 5, "The time the work-applier will wait for work object to be deleted before updating the applied work owner reference")
enablePprof = flag.Bool("enable-pprof", false, "enable pprof profiling")
pprofPort = flag.Int("pprof-port", 6065, "port for pprof profiling")
hubPprofPort = flag.Int("hub-pprof-port", 6066, "port for hub pprof profiling")
hubQPS = flag.Float64("hub-api-qps", 50, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
hubBurst = flag.Int("hub-api-burst", 500, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
memberQPS = flag.Float64("member-api-qps", 250, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
memberBurst = flag.Int("member-api-burst", 1000, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")

// Work applier requeue rate limiter settings.
workApplierRequeueRateLimiterAttemptsWithFixedDelay = flag.Int("work-applier-requeue-rate-limiter-attempts-with-fixed-delay", 1, "If set, the work applier will requeue work objects with a fixed delay for the specified number of attempts before switching to exponential backoff.")
Expand All @@ -102,6 +100,12 @@ var (
workApplierRequeueRateLimiterExponentialBaseForFastBackoff = flag.Float64("work-applier-requeue-rate-limiter-exponential-base-for-fast-backoff", 1.5, "If set, the work applier will start to back off fast at this factor after it completes the slow backoff stage, until it reaches the fast backoff delay cap. Its value should be larger than the base value for the slow backoff stage.")
workApplierRequeueRateLimiterMaxFastBackoffDelaySeconds = flag.Float64("work-applier-requeue-rate-limiter-max-fast-backoff-delay-seconds", 900, "If set, the work applier will not back off longer than this value in seconds when it is in the fast backoff stage.")
workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs = flag.Bool("work-applier-requeue-rate-limiter-skip-to-fast-backoff-for-available-or-diff-reported-work-objs", true, "If set, the rate limiter will skip the slow backoff stage and start fast backoff immediately for work objects that are available or have diff reported.")

// Work applier priority queue settings.
enableWorkApplierPriorityQueue = flag.Bool("enable-work-applier-priority-queue", false, "If set, the work applier will use a priority queue to process work objects.")
workApplierPriorityLinearEquationCoeffA = flag.Int("work-applier-priority-linear-equation-coeff-a", -3, "The work applier sets the priority for a Work object processing attempt using the linear equation: priority = A * (work object age in minutes) + B. This flag sets the coefficient A in the equation.")
workApplierPriorityLinearEquationCoeffB = flag.Int("work-applier-priority-linear-equation-coeff-b", 100, "The work applier sets the priority for a Work object processing attempt using the linear equation: priority = A * (work object age in minutes) + B. This flag sets the coefficient B in the equation.")

// Azure property provider feature gates.
isAzProviderCostPropertiesEnabled = flag.Bool("use-cost-properties-in-azure-provider", true, "If set, the Azure property provider will expose cost properties in the member cluster.")
isAzProviderAvailableResPropertiesEnabled = flag.Bool("use-available-res-properties-in-azure-provider", true, "If set, the Azure property provider will expose available resources properties in the member cluster.")
Expand Down Expand Up @@ -133,6 +137,13 @@ func main() {
klog.ErrorS(errors.New("either enable-v1alpha1-apis or enable-v1beta1-apis is required"), "Invalid APIs flags")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
// TO-DO (chenyu1): refactor the validation logic.
if workApplierPriorityLinearEquationCoeffA == nil || *workApplierPriorityLinearEquationCoeffA >= 0 {
klog.ErrorS(errors.New("parameter workApplierPriorityLinearEquationCoeffA is set incorrectly; must use a value less than 0"), "InvalidFlag", "workApplierPriorityLinearEquationCoeffA")
}
if workApplierPriorityLinearEquationCoeffB == nil || *workApplierPriorityLinearEquationCoeffB <= 0 {
klog.ErrorS(errors.New("parameter workApplierPriorityLinearEquationCoeffB is set incorrectly; must use a value greater than 0"), "InvalidFlag", "workApplierPriorityLinearEquationCoeffB")
}

hubURL := os.Getenv("HUB_SERVER_URL")

Expand Down Expand Up @@ -373,7 +384,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
// create the work controller, so we can pass it to the internal member cluster reconciler
// Set up the work applier. Note that it is referenced by the InternalMemberCluster controller.

// Set up the requeue rate limiter for the work applier.
//
Expand Down Expand Up @@ -413,7 +424,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
*workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs,
)

workController := workapplier.NewReconciler(
workApplier := workapplier.NewReconciler(
hubMgr.GetClient(),
targetNS,
spokeDynamicClient,
Expand All @@ -426,12 +437,13 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
// Use the default worker count (4) for parallelized manifest processing.
parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
time.Minute*time.Duration(*deletionWaitTime),
*watchWorkWithPriorityQueue,
*watchWorkReconcileAgeMinutes,
requeueRateLimiter,
*enableWorkApplierPriorityQueue,
workApplierPriorityLinearEquationCoeffA,
workApplierPriorityLinearEquationCoeffB,
)

if err = workController.SetupWithManager(hubMgr); err != nil {
if err = workApplier.SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Failed to create v1beta1 controller", "controller", "work")
return err
}
Expand Down Expand Up @@ -459,7 +471,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
ctx,
hubMgr.GetClient(),
memberMgr.GetConfig(), memberMgr.GetClient(),
workController,
workApplier,
pp)
if err != nil {
klog.ErrorS(err, "Failed to create InternalMemberCluster v1beta1 reconciler")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ var _ = BeforeSuite(func() {

// This controller is created for testing purposes only; no reconciliation loop is actually
// run.
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, nil, nil)

propertyProvider1 = &manuallyUpdatedProvider{}
member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1)
Expand All @@ -402,7 +402,7 @@ var _ = BeforeSuite(func() {

// This controller is created for testing purposes only; no reconciliation loop is actually
// run.
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, nil, nil)

member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil)
Expect(err).NotTo(HaveOccurred())
Expand Down
Loading
Loading