Skip to content

Commit

Permalink
correctly handle domains from NodePools when honoring taints
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Jun 13, 2024
1 parent 835cd98 commit fc8ca88
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 59 deletions.
31 changes: 14 additions & 17 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -223,7 +222,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNod
nodePoolList.OrderByWeight()

instanceTypes := map[string][]*cloudprovider.InstanceType{}
domains := map[string]sets.Set[string]{}
domainGroups := map[string]scheduler.TopologyDomainGroup{}
for _, nodePool := range nodePoolList.Items {
// Get instance type options
instanceTypeOptions, err := p.cloudProvider.GetInstanceTypes(ctx, lo.ToPtr(nodePool))
Expand All @@ -239,6 +238,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNod
continue
}
instanceTypes[nodePool.Name] = append(instanceTypes[nodePool.Name], instanceTypeOptions...)
nodePoolTaints := nodePool.Spec.Template.Spec.Taints

// Construct Topology Domains
for _, instanceType := range instanceTypeOptions {
Expand All @@ -248,15 +248,12 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNod
requirements.Add(scheduling.NewLabelRequirements(nodePool.Spec.Template.Labels).Values()...)
requirements.Add(instanceType.Requirements.Values()...)

for key, requirement := range requirements {
// This code used to execute a Union between domains[key] and requirement.Values().
// The downside of this is that Union is immutable and takes a copy of the set it is executed upon.
// This resulted in a lot of memory pressure on the heap and poor performance
// https://github.com/aws/karpenter/issues/3565
if domains[key] == nil {
domains[key] = sets.New(requirement.Values()...)
} else {
domains[key].Insert(requirement.Values()...)
for topologyKey, requirement := range requirements {
if _, ok := domainGroups[topologyKey]; !ok {
domainGroups[topologyKey] = scheduler.NewTopologyDomainGroup()
}
for _, domain := range requirement.Values() {
domainGroups[topologyKey].Insert(domain, nodePoolTaints...)
}
}
}
Expand All @@ -265,11 +262,11 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNod
requirements.Add(scheduling.NewLabelRequirements(nodePool.Spec.Template.Labels).Values()...)
for key, requirement := range requirements {
if requirement.Operator() == v1.NodeSelectorOpIn {
// The following is a performance optimisation, for the explanation see the comment above
if domains[key] == nil {
domains[key] = sets.New(requirement.Values()...)
} else {
domains[key].Insert(requirement.Values()...)
if _, ok := domainGroups[key]; !ok {
domainGroups[key] = scheduler.NewTopologyDomainGroup()
}
for _, value := range requirement.Values() {
domainGroups[key].Insert(value, nodePoolTaints...)
}
}
}
Expand All @@ -279,7 +276,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNod
pods = p.injectVolumeTopologyRequirements(ctx, pods)

// Calculate cluster topology
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domains, pods)
topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, domainGroups, pods)
if err != nil {
return nil, fmt.Errorf("tracking topology counts, %w", err)
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/controllers/provisioning/scheduling/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ type Topology struct {
// in some cases.
inverseTopologies map[uint64]*TopologyGroup
// The universe of domains by topology key
domains map[string]sets.Set[string]
domainGroups map[string]TopologyDomainGroup
// excludedPods are the pod UIDs of pods that are excluded from counting. This is used so we can simulate
// moving pods to prevent them from being double counted.
excludedPods sets.Set[string]
cluster *state.Cluster
}

func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, domains map[string]sets.Set[string], pods []*v1.Pod) (*Topology, error) {
func NewTopology(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, domainGroups map[string]TopologyDomainGroup, pods []*v1.Pod) (*Topology, error) {
t := &Topology{
kubeClient: kubeClient,
cluster: cluster,
domains: domains,
domainGroups: domainGroups,
topologies: map[uint64]*TopologyGroup{},
inverseTopologies: map[uint64]*TopologyGroup{},
excludedPods: sets.New[string](),
Expand Down Expand Up @@ -233,7 +233,7 @@ func (t *Topology) updateInverseAntiAffinity(ctx context.Context, pod *v1.Pod, d
return err
}

tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domains[term.TopologyKey])
tg := NewTopologyGroup(TopologyTypePodAntiAffinity, term.TopologyKey, pod, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey])

hash := tg.Hash()
if existing, ok := t.inverseTopologies[hash]; !ok {
Expand Down Expand Up @@ -269,6 +269,10 @@ func (t *Topology) countDomains(ctx context.Context, tg *TopologyGroup) error {
// capture new domain values from existing nodes that may not have any pods selected by the topology group
// scheduled to them already
t.cluster.ForEachNode(func(n *state.StateNode) bool {
// ignore state nodes which are tracking in-flight NodeClaims
if n.Node == nil {
return true
}
// ignore the node if it doesn't match the topology group
if !tg.nodeFilter.Matches(n.Node) {
return true
Expand Down Expand Up @@ -330,7 +334,7 @@ func (t *Topology) newForTopologies(p *v1.Pod) []*TopologyGroup {
var topologyGroups []*TopologyGroup
for _, cs := range p.Spec.TopologySpreadConstraints {
topologyGroups = append(topologyGroups, NewTopologyGroup(TopologyTypeSpread, cs.TopologyKey, p, sets.New(p.Namespace),
cs.LabelSelector, cs.MaxSkew, cs.MinDomains, cs.NodeTaintsPolicy, cs.NodeAffinityPolicy, t.domains[cs.TopologyKey]))
cs.LabelSelector, cs.MaxSkew, cs.MinDomains, cs.NodeTaintsPolicy, cs.NodeAffinityPolicy, t.domainGroups[cs.TopologyKey]))
}
return topologyGroups
}
Expand Down Expand Up @@ -367,7 +371,7 @@ func (t *Topology) newForAffinities(ctx context.Context, p *v1.Pod) ([]*Topology
if err != nil {
return nil, err
}
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domains[term.TopologyKey]))
topologyGroups = append(topologyGroups, NewTopologyGroup(topologyType, term.TopologyKey, p, namespaces, term.LabelSelector, math.MaxInt32, nil, nil, nil, t.domainGroups[term.TopologyKey]))
}
}
return topologyGroups, nil
Expand Down
176 changes: 147 additions & 29 deletions pkg/controllers/provisioning/scheduling/topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package scheduling_test

import (
"fmt"
"time"

. "github.com/onsi/gomega"
Expand Down Expand Up @@ -1132,7 +1133,7 @@ var _ = Describe("Topology", func() {
Skip("NodeTaintsPolicy ony enabled by default for K8s >= 1.26.x")
}

const spreadLabel = "karpenter.sh/fake-label"
const spreadLabel = "fake-label"
nodePool.Spec.Template.Labels = map[string]string{
spreadLabel: "baz",
}
Expand Down Expand Up @@ -1206,7 +1207,7 @@ var _ = Describe("Topology", func() {
if env.Version.Minor() < 26 {
Skip("NodeTaintsPolicy ony enabled by default for K8s >= 1.26.x")
}
const spreadLabel = "karpenter.sh/fake-label"
const spreadLabel = "fake-label"
nodePool.Spec.Template.Labels = map[string]string{
spreadLabel: "baz",
}
Expand Down Expand Up @@ -1275,23 +1276,83 @@ var _ = Describe("Topology", func() {
// and should schedule all of the pods on the same node
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(5))
})
FIt("should balance pods across a label when discovered from the provisioner (NodeTaintsPolicy=honor)", func() {
It("should balance pods across a label when discovered from the provisioner (NodeTaintsPolicy=ignore)", func() {
const spreadLabel = "fake-label"
const taintKey = "taint-key"
nodePool.Spec.Template.Spec.Requirements = append(nodePool.Spec.Template.Spec.Requirements, v1.NodeSelectorRequirement{
Key: spreadLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{"foo"},
})
taintedNodePool := test.NodePool(v1beta1.NodePool{
Spec: v1beta1.NodePoolSpec{
Template: v1beta1.NodeClaimTemplate{
Spec: v1beta1.NodeClaimSpec{
Taints: []v1.Taint{
{
Key: taintKey,
Value: "taint-value",
Effect: v1.TaintEffectNoSchedule,
},
},
Requirements: []v1.NodeSelectorRequirement{
{
Key: v1beta1.CapacityTypeLabelKey,
Operator: v1.NodeSelectorOpExists,
},
{
Key: spreadLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{"bar"},
},
},
},
},
},
})

honor := v1.NodeInclusionPolicyIgnore
topology := []v1.TopologySpreadConstraint{{
TopologyKey: spreadLabel,
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
MaxSkew: 1,
NodeTaintsPolicy: &honor,
}}

pods := test.UnschedulablePods(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
TopologySpreadConstraints: topology,
}, 2)

ExpectApplied(ctx, env.Client, nodePool, taintedNodePool)
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...)

// should fail to schedule both pods, one pod is scheduled to domain "foo" but the other can't be scheduled to domain "bar"
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1))
})
It("should balance pods across a label when discovered from the provisioner (NodeTaintsPolicy=honor)", func() {
if env.Version.Minor() < 26 {
Skip("NodeTaintsPolicy ony enabled by default for K8s >= 1.26.x")
}
const spreadLabel = "karpenter.sh/fake-label"
nodePool.Spec.Template.Labels = map[string]string{
spreadLabel: "baz",
}

nodePoolTainted := test.NodePool(v1beta1.NodePool{
const spreadLabel = "fake-label"
const taintKey = "taint-key"
nodePool.Spec.Template.Spec.Requirements = append(nodePool.Spec.Template.Spec.Requirements, v1.NodeSelectorRequirement{
Key: spreadLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{"foo"},
})
taintedNodePool := test.NodePool(v1beta1.NodePool{
Spec: v1beta1.NodePoolSpec{
Template: v1beta1.NodeClaimTemplate{
Spec: v1beta1.NodeClaimSpec{
Taints: []v1.Taint{
{
Key: "taintname",
Value: "taintvalue",
Key: taintKey,
Value: "taint-value",
Effect: v1.TaintEffectNoSchedule,
},
},
Expand All @@ -1303,17 +1364,14 @@ var _ = Describe("Topology", func() {
{
Key: spreadLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{"foo", "bar"},
Values: []string{"bar"},
},
},
},
},
},
})

ExpectApplied(ctx, env.Client, nodePool, nodePoolTainted)

ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov)
honor := v1.NodeInclusionPolicyHonor
topology := []v1.TopologySpreadConstraint{{
TopologyKey: spreadLabel,
Expand All @@ -1323,20 +1381,80 @@ var _ = Describe("Topology", func() {
NodeTaintsPolicy: &honor,
}}

ExpectApplied(ctx, env.Client, nodePool)
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov,
test.UnschedulablePods(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
pods := test.UnschedulablePods(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
TopologySpreadConstraints: topology,
}, 2)

ExpectApplied(ctx, env.Client, nodePool, taintedNodePool)
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...)

// should schedule all pods to domain "foo", ignoring bar since pods don't tolerate
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(2))
})
It("should balance pods across a label when mutually exclusive NodePools (by taints) share domains (NodeTaintsPolicy=honor)", func() {
const spreadLabel = "fake-label"
const taintKey = "taint-key"

nodePools := lo.Map([][]string{{"foo", "bar"}, {"foo", "baz"}}, func(domains []string, i int) *v1beta1.NodePool {
return test.NodePool(v1beta1.NodePool{
Spec: v1beta1.NodePoolSpec{
Template: v1beta1.NodeClaimTemplate{
Spec: v1beta1.NodeClaimSpec{
Taints: []v1.Taint{
{
Key: taintKey,
Value: fmt.Sprintf("nodepool-%d", i),
Effect: v1.TaintEffectNoSchedule,
},
},
Requirements: []v1.NodeSelectorRequirement{
{
Key: v1beta1.CapacityTypeLabelKey,
Operator: v1.NodeSelectorOpExists,
},
{
Key: spreadLabel,
Operator: v1.NodeSelectorOpIn,
Values: domains,
},
},
},
},
},
})
})

honor := v1.NodeInclusionPolicyHonor
topology := []v1.TopologySpreadConstraint{{
TopologyKey: spreadLabel,
WhenUnsatisfiable: v1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
MaxSkew: 1,
NodeTaintsPolicy: &honor,
}}

pods := lo.Flatten(lo.Map(nodePools, func(np *v1beta1.NodePool, _ int) []*v1.Pod {
return test.UnschedulablePods(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
TopologySpreadConstraints: topology,
}, 5)...,
)
// and should schedule all of the pods on the same node
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(5))
Tolerations: []v1.Toleration{{
Key: taintKey,
Effect: v1.TaintEffectNoSchedule,
Value: np.Spec.Template.Spec.Taints[0].Value,
}},
}, 2)
}))

ExpectApplied(ctx, env.Client, nodePools[0], nodePools[1])
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...)

// Expect 3 total nodes provisioned, 2 pods schedule to foo, 1 to bar, and 1 to baz
ExpectSkew(ctx, env.Client, "default", &topology[0]).To(ConsistOf(1, 2, 1))
})
})

Expand All @@ -1345,8 +1463,8 @@ var _ = Describe("Topology", func() {
if env.Version.Minor() < 26 {
Skip("NodeAffinityPolicy ony enabled by default for K8s >= 1.26.x")
}
const spreadLabel = "karpenter.sh/fake-label"
const affinityLabel = "karpenter.sh/selector"
const spreadLabel = "fake-label"
const affinityLabel = "selector"
const affinityMismatch = "mismatch"
const affinityMatch = "value"

Expand Down Expand Up @@ -1416,8 +1534,8 @@ var _ = Describe("Topology", func() {
if env.Version.Minor() < 26 {
Skip("NodeAffinityPolicy ony enabled by default for K8s >= 1.26.x")
}
const spreadLabel = "karpenter.sh/fake-label"
const affinityLabel = "karpenter.sh/selector"
const spreadLabel = "fake-label"
const affinityLabel = "selector"
const affinityMismatch = "mismatch"
const affinityMatch = "value"

Expand Down
Loading

0 comments on commit fc8ca88

Please sign in to comment.