Skip to content

Commit

Permalink
fix: dynamic cluster distribution issue 20965, update the shard… (arg…
Browse files Browse the repository at this point in the history
…oproj#21042)

Signed-off-by: caijing <[email protected]>
Co-authored-by: Ishita Sequeira <[email protected]>
  • Loading branch information
ivan-cai and ishitasequeira authored Feb 10, 2025
1 parent b600c5e commit d183d9c
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 2 deletions.
25 changes: 24 additions & 1 deletion controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,32 @@ func NewApplicationController(
return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas)
}
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil {
shard, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard)
if err != nil {
return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %w", err)
}

// update the shard number in the clusterSharding, and resync all applications if the shard number is updated
if ctrl.clusterSharding.UpdateShard(shard) {
// update shard number in stateCache
ctrl.stateCache.UpdateShard(shard)

// resync all applications
apps, err := ctrl.appLister.List(labels.Everything())
if err != nil {
return err
}
for _, app := range apps {
if !ctrl.canProcessApp(app) {
continue
}
key, err := cache.MetaNamespaceKeyFunc(app)
if err == nil {
ctrl.appRefreshQueue.AddRateLimited(key)
ctrl.clusterSharding.AddApp(app)
}
}
}
}
}
return nil
Expand Down
7 changes: 7 additions & 0 deletions controller/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ type LiveStateCache interface {
GetClustersInfo() []clustercache.ClusterInfo
// Init must be executed before cache can be used
Init() error
// UpdateShard will update the shard of ClusterSharding when the shard has changed.
UpdateShard(shard int) bool
}

type ObjectUpdatedHandler = func(managedByApp map[string]bool, ref corev1.ObjectReference)
Expand Down Expand Up @@ -906,3 +908,8 @@ func (c *liveStateCache) GetClustersInfo() []clustercache.ClusterInfo {
func (c *liveStateCache) GetClusterCache(server *appv1.Cluster) (clustercache.ClusterCache, error) {
return c.getSyncedCluster(server)
}

// UpdateShard will update the shard of ClusterSharding when the shard has changed.
func (c *liveStateCache) UpdateShard(shard int) bool {
return c.clusterSharding.UpdateShard(shard)
}
18 changes: 18 additions & 0 deletions controller/cache/mocks/LiveStateCache.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions controller/sharding/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type ClusterShardingCache interface {
UpdateApp(a *v1alpha1.Application)
IsManagedCluster(c *v1alpha1.Cluster) bool
GetDistribution() map[string]int
GetAppDistribution() map[string]int
UpdateShard(shard int) bool
}

type ClusterSharding struct {
Expand Down Expand Up @@ -244,3 +246,33 @@ func (sharding *ClusterSharding) UpdateApp(a *v1alpha1.Application) {
log.Debugf("Skipping sharding distribution update. No relevant changes")
}
}

// GetAppDistribution should be not be called from a DestributionFunction because
// it could cause a deadlock when updateDistribution is called.
func (sharding *ClusterSharding) GetAppDistribution() map[string]int {
sharding.lock.RLock()
clusters := sharding.Clusters
apps := sharding.Apps
sharding.lock.RUnlock()

appDistribution := make(map[string]int, len(clusters))

for _, a := range apps {
if _, ok := appDistribution[a.Spec.Destination.Server]; !ok {
appDistribution[a.Spec.Destination.Server] = 0
}
appDistribution[a.Spec.Destination.Server]++
}
return appDistribution
}

// UpdateShard will update the shard of ClusterSharding when the shard has changed.
func (sharding *ClusterSharding) UpdateShard(shard int) bool {
if shard != sharding.Shard {
sharding.lock.RLock()
sharding.Shard = shard
sharding.lock.RUnlock()
return true
}
return false
}
5 changes: 4 additions & 1 deletion controller/sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,10 @@ func GetClusterSharding(kubeClient kubernetes.Interface, settingsMgr *settings.S
err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %w", err)
break
}
log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
// if `err == nil`, should not log the following warning message
if err != nil {
log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
}
}
errors.CheckError(err)
} else {
Expand Down

0 comments on commit d183d9c

Please sign in to comment.