未验证 提交 73e39f17 编写于 作者: L Luther Monson 提交者: GitHub

Merge pull request #32134 from luthermonson/enqueue-fixes-master

...@@ -54,7 +54,6 @@ ...@@ -54,7 +54,6 @@
userNodeRemoveCleanupAnnotationOld = "nodes.management.cattle.io/user-node-remove-cleanup" userNodeRemoveCleanupAnnotationOld = "nodes.management.cattle.io/user-node-remove-cleanup"
userNodeRemoveFinalizerPrefix = "clusterscoped.controller.cattle.io/user-node-remove_" userNodeRemoveFinalizerPrefix = "clusterscoped.controller.cattle.io/user-node-remove_"
userNodeRemoveAnnotationPrefix = "lifecycle.cattle.io/create.user-node-remove_" userNodeRemoveAnnotationPrefix = "lifecycle.cattle.io/create.user-node-remove_"
userScaledownAnnotation = "nodes.management.cattle.io/scaledown"
) )
// aliases maps Schema field => driver field // aliases maps Schema field => driver field
...@@ -290,16 +289,6 @@ func (m *Lifecycle) Remove(obj *v3.Node) (runtime.Object, error) { ...@@ -290,16 +289,6 @@ func (m *Lifecycle) Remove(obj *v3.Node) (runtime.Object, error) {
return m.deleteV1Node(newObj.(*v3.Node)) return m.deleteV1Node(newObj.(*v3.Node))
} }
func (m *Lifecycle) enqueueNodePool(obj *v3.Node) {
logrus.Debugf("[node-controller] enqueing node pool %s", obj.Spec.NodePoolName)
pool, err := m.getNodePool(obj.Spec.NodePoolName)
if err != nil {
logrus.Errorf("[node-controller] enqueue nodepool error %s: %s", obj.Spec.NodePoolName, err)
return
}
m.nodePoolController.Enqueue(pool.Namespace, pool.Name)
}
func (m *Lifecycle) provision(driverConfig, nodeDir string, obj *v3.Node) (*v3.Node, error) { func (m *Lifecycle) provision(driverConfig, nodeDir string, obj *v3.Node) (*v3.Node, error) {
configRawMap := map[string]interface{}{} configRawMap := map[string]interface{}{}
if err := json.Unmarshal([]byte(driverConfig), &configRawMap); err != nil { if err := json.Unmarshal([]byte(driverConfig), &configRawMap); err != nil {
...@@ -492,9 +481,6 @@ func (m *Lifecycle) ready(obj *v3.Node) (*v3.Node, error) { ...@@ -492,9 +481,6 @@ func (m *Lifecycle) ready(obj *v3.Node) (*v3.Node, error) {
return nil, err return nil, err
} }
// if the node provision failed let the pool schedule it for delete
defer m.enqueueNodePool(obj)
// Provision in the background so we can poll and save the config // Provision in the background so we can poll and save the config
done := make(chan error) done := make(chan error)
go func() { go func() {
...@@ -524,35 +510,6 @@ func (m *Lifecycle) ready(obj *v3.Node) (*v3.Node, error) { ...@@ -524,35 +510,6 @@ func (m *Lifecycle) ready(obj *v3.Node) (*v3.Node, error) {
return obj, err return obj, err
} }
func (m *Lifecycle) scaledown(obj *v3.Node) (runtime.Object, error) {
logrus.Debugf("[node] checking when to scaledown node %s", obj.Name)
scaledown, err := time.Parse(time.RFC3339, obj.Spec.ScaledownTime)
if err != nil {
return obj, fmt.Errorf("[node] failed to parse scaledown time, is it in RFC3339? %s", obj.Spec.ScaledownTime)
}
// time to scaledown, send to nodepool to delete the node
pool, err := m.getNodePool(obj.Spec.NodePoolName)
if err != nil {
if kerror.IsNotFound(err) {
return obj, nil
}
return obj, err
}
if scaledown.Before(time.Now()) {
logrus.Debugf("[node] enqueing nodepool %s for scaledown immediately", pool.Name)
m.nodePoolController.Enqueue(pool.Namespace, pool.Name)
} else if after := scaledown.Sub(time.Now()); after > 0 {
logrus.Debugf("[node] enqueing nodepool %s for scaledown in %s", pool.Name, after)
m.nodePoolController.EnqueueAfter(pool.Namespace, pool.Name, after+(5*time.Second))
}
copy := obj.DeepCopy()
copy.Annotations[userScaledownAnnotation] = "true"
return m.nodeClient.Update(copy)
}
func (m *Lifecycle) sync(key string, obj *v3.Node) (runtime.Object, error) { func (m *Lifecycle) sync(key string, obj *v3.Node) (runtime.Object, error) {
if obj == nil || obj.DeletionTimestamp != nil { if obj == nil || obj.DeletionTimestamp != nil {
return nil, nil return nil, nil
...@@ -567,12 +524,6 @@ func (m *Lifecycle) sync(key string, obj *v3.Node) (runtime.Object, error) { ...@@ -567,12 +524,6 @@ func (m *Lifecycle) sync(key string, obj *v3.Node) (runtime.Object, error) {
} }
func (m *Lifecycle) Updated(obj *v3.Node) (runtime.Object, error) { func (m *Lifecycle) Updated(obj *v3.Node) (runtime.Object, error) {
if obj.Spec.ScaledownTime != "" {
if scaledownAnnotation, ok := obj.Annotations[userScaledownAnnotation]; !ok || scaledownAnnotation != "true" {
return m.scaledown(obj)
}
}
newObj, err := v32.NodeConditionProvisioned.Once(obj, func() (runtime.Object, error) { newObj, err := v32.NodeConditionProvisioned.Once(obj, func() (runtime.Object, error) {
if obj.Status.NodeTemplateSpec == nil { if obj.Status.NodeTemplateSpec == nil {
m.setWaiting(obj) m.setWaiting(obj)
......
...@@ -353,6 +353,8 @@ func (c *Controller) createOrCheckNodes(nodePool *v3.NodePool, allNodes []*v3.No ...@@ -353,6 +353,8 @@ func (c *Controller) createOrCheckNodes(nodePool *v3.NodePool, allNodes []*v3.No
} }
if node.Spec.ScaledownTime != "" { if node.Spec.ScaledownTime != "" {
logrus.Debugf("[nodepool] scaledown time detected for %s: %s and now it is %s",
node.Name, node.Spec.ScaledownTime, time.Now().Format(time.RFC3339))
scaledown, err := time.Parse(time.RFC3339, node.Spec.ScaledownTime) scaledown, err := time.Parse(time.RFC3339, node.Spec.ScaledownTime)
if err != nil { if err != nil {
logrus.Errorf("[nodepool] failed to parse scaledown time, is it in RFC3339? %s: %s", node.Spec.ScaledownTime, err) logrus.Errorf("[nodepool] failed to parse scaledown time, is it in RFC3339? %s: %s", node.Spec.ScaledownTime, err)
...@@ -368,6 +370,9 @@ func (c *Controller) createOrCheckNodes(nodePool *v3.NodePool, allNodes []*v3.No ...@@ -368,6 +370,9 @@ func (c *Controller) createOrCheckNodes(nodePool *v3.NodePool, allNodes []*v3.No
quantity-- quantity--
continue continue
} }
// scaledown happening in the future, enqueue after to check again later
c.NodePoolController.EnqueueAfter(nodePool.Namespace, nodePool.Name, scaledown.Sub(time.Now()))
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册