提交 47aabfdc 编写于 作者: M mindspore-ci-bot 提交者: Gitee

!9 Delete redundant comment lines and rename variable names

Merge pull request !9 from YedongLiu/fixed_var_names
...@@ -37,7 +37,7 @@ func NewServerOption() *ServerOption { ...@@ -37,7 +37,7 @@ func NewServerOption() *ServerOption {
// AddFlags adds flags for a specific CMServer to the specified FlagSet // AddFlags adds flags for a specific CMServer to the specified FlagSet
func (s *ServerOption) AddFlags(fs *flag.FlagSet) { func (s *ServerOption) AddFlags(fs *flag.FlagSet) {
// chaos level will be removed once we have a formal tool to inject failures. // chaos level will be removed once we have a formal tool to inject failures.
fs.IntVar(&s.ChaosLevel, "chaos-level", -1, "DO NOT USE IN PRODUCTION - level of chaos injected into the PyTorchJob created by the operator.") fs.IntVar(&s.ChaosLevel, "chaos-level", -1, "DO NOT USE IN PRODUCTION - level of chaos injected into the MSJob created by the operator.")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.DurationVar(&s.GCInterval, "gc-interval", 10*time.Minute, "GC interval") fs.DurationVar(&s.GCInterval, "gc-interval", 10*time.Minute, "GC interval")
fs.StringVar(&s.ControllerConfigFile, "controller-config-file", "", "Path to file containing the controller config.") fs.StringVar(&s.ControllerConfigFile, "controller-config-file", "", "Path to file containing the controller config.")
......
...@@ -106,8 +106,7 @@ func ConfigureAcceleratorsForMSJobSpec(c *msv1.MSJobSpec, accelerators map[strin ...@@ -106,8 +106,7 @@ func ConfigureAcceleratorsForMSJobSpec(c *msv1.MSJobSpec, accelerators map[strin
// Cleanup cleans up user passed spec, e.g. defaulting, transforming fields. // Cleanup cleans up user passed spec, e.g. defaulting, transforming fields.
// TODO: move this to admission controller // TODO: move this to admission controller
func Cleanup(c *msv1.MSJobSpec) { func Cleanup(c *msv1.MSJobSpec) {
// TODO(jlewi): Add logic to cleanup user provided spec; e.g. by filling in defaults.
// We should have default container images so user doesn't have to provide these.
} }
func CRDName() string { func CRDName() string {
......
...@@ -45,7 +45,6 @@ type MSJob struct { ...@@ -45,7 +45,6 @@ type MSJob struct {
} }
type MSJobSpec struct { type MSJobSpec struct {
// TODO(jlewi): Can we we get rid of this and use some value from Kubernetes or a random ide.
RuntimeId string RuntimeId string
// ReplicaSpecs specifies the MS replicas to run. // ReplicaSpecs specifies the MS replicas to run.
...@@ -81,11 +80,9 @@ const ( ...@@ -81,11 +80,9 @@ const (
const ( const (
DefaultMSContainer string = "mindspore" DefaultMSContainer string = "mindspore"
DefaultMSImage string = "mindspore/mindspore:v0.1.0" DefaultMSImage string = "mindspore/mindspore:v0.1.0-alpha"
) )
// TODO(jlewi): We probably want to add a name field. This would allow us to have more than 1 type of each worker.
// This might be useful if you wanted to have a separate set of workers to do eval.
type MSReplicaSpec struct { type MSReplicaSpec struct {
// Replicas is the number of desired replicas. // Replicas is the number of desired replicas.
// This is a pointer to distinguish between explicit zero and unspecified. // This is a pointer to distinguish between explicit zero and unspecified.
......
...@@ -57,7 +57,7 @@ func ValidateMSJobSpec(c *msv1.MSJobSpec) error { ...@@ -57,7 +57,7 @@ func ValidateMSJobSpec(c *msv1.MSJobSpec) error {
} }
if !isValidReplicaType { if !isValidReplicaType {
return fmt.Errorf("tfReplicaSpec.MSReplicaType is %v but must be one of %v", r.MSReplicaType, validReplicaTypes) return fmt.Errorf("msReplicaSpec.MSReplicaType is %v but must be one of %v", r.MSReplicaType, validReplicaTypes)
} }
for _, c := range r.Template.Spec.Containers { for _, c := range r.Template.Spec.Containers {
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// Package controller provides a Kubernetes controller for a TensorFlow job resource. // Package controller provides a Kubernetes controller for a MindSpore job resource.
package controller package controller
import ( import (
...@@ -85,9 +85,9 @@ type Controller struct { ...@@ -85,9 +85,9 @@ type Controller struct {
syncHandler func(jobKey string) (bool, error) syncHandler func(jobKey string) (bool, error)
} }
func New(kubeClient kubernetes.Interface, APIExtclient apiextensionsclient.Interface, tfJobClient msjobclient.Interface, func New(kubeClient kubernetes.Interface, APIExtclient apiextensionsclient.Interface, msJobClient msjobclient.Interface,
config msv1.ControllerConfig, tfJobInformerFactory informers.SharedInformerFactory) (*Controller, error) { config msv1.ControllerConfig, msJobInformerFactory informers.SharedInformerFactory) (*Controller, error) {
tfJobInformer := tfJobInformerFactory.Kubeflow().V1().MSJobs() msJobInformer := msJobInformerFactory.Kubeflow().V1().MSJobs()
kubeflowscheme.AddToScheme(scheme.Scheme) kubeflowscheme.AddToScheme(scheme.Scheme)
log.Debug("Creating event broadcaster") log.Debug("Creating event broadcaster")
...@@ -99,22 +99,21 @@ func New(kubeClient kubernetes.Interface, APIExtclient apiextensionsclient.Inter ...@@ -99,22 +99,21 @@ func New(kubeClient kubernetes.Interface, APIExtclient apiextensionsclient.Inter
controller := &Controller{ controller := &Controller{
KubeClient: kubeClient, KubeClient: kubeClient,
APIExtclient: APIExtclient, APIExtclient: APIExtclient,
MSJobClient: tfJobClient, MSJobClient: msJobClient,
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MSjobs"), WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MSjobs"),
recorder: recorder, recorder: recorder,
// TODO(jlewi)): What to do about cluster.Cluster?
jobs: make(map[string]*trainer.TrainingJob), jobs: make(map[string]*trainer.TrainingJob),
config: config, config: config,
} }
log.Info("Setting up event handlers") log.Info("Setting up event handlers")
// Set up an event handler for when Foo resources change // Set up an event handler for when Foo resources change
tfJobInformer.Informer().AddEventHandler( msJobInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{ cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool { FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) { switch t := obj.(type) {
case *msv1.MSJob: case *msv1.MSJob:
log.Debugf("filter tfjob name: %v", t.Name) log.Debugf("filter msjob name: %v", t.Name)
return true return true
default: default:
return false return false
...@@ -129,8 +128,8 @@ func New(kubeClient kubernetes.Interface, APIExtclient apiextensionsclient.Inter ...@@ -129,8 +128,8 @@ func New(kubeClient kubernetes.Interface, APIExtclient apiextensionsclient.Inter
}, },
}) })
controller.MSJobLister = tfJobInformer.Lister() controller.MSJobLister = msJobInformer.Lister()
controller.MSJobSynced = tfJobInformer.Informer().HasSynced controller.MSJobSynced = msJobInformer.Informer().HasSynced
controller.syncHandler = controller.syncMSJob controller.syncHandler = controller.syncMSJob
return controller, nil return controller, nil
...@@ -216,7 +215,7 @@ func (c *Controller) syncMSJob(key string) (bool, error) { ...@@ -216,7 +215,7 @@ func (c *Controller) syncMSJob(key string) (bool, error) {
return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key) return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
} }
tfJob, err := c.MSJobLister.MSJobs(ns).Get(name) msJob, err := c.MSJobLister.MSJobs(ns).Get(name)
if err != nil { if err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
...@@ -228,8 +227,8 @@ func (c *Controller) syncMSJob(key string) (bool, error) { ...@@ -228,8 +227,8 @@ func (c *Controller) syncMSJob(key string) (bool, error) {
// Create a new TrainingJob if there is no TrainingJob stored for it in the jobs map or if the UID's don't match. // Create a new TrainingJob if there is no TrainingJob stored for it in the jobs map or if the UID's don't match.
// The UID's won't match in the event we deleted the job and then recreated the job with the same name. // The UID's won't match in the event we deleted the job and then recreated the job with the same name.
if cJob, ok := c.jobs[key]; !ok || cJob.UID() != tfJob.UID { if cJob, ok := c.jobs[key]; !ok || cJob.UID() != msJob.UID {
nc, err := trainer.NewJob(c.KubeClient, c.MSJobClient, c.recorder, tfJob, &c.config) nc, err := trainer.NewJob(c.KubeClient, c.MSJobClient, c.recorder, msJob, &c.config)
if err != nil { if err != nil {
return false, err return false, err
...@@ -243,15 +242,13 @@ func (c *Controller) syncMSJob(key string) (bool, error) { ...@@ -243,15 +242,13 @@ func (c *Controller) syncMSJob(key string) (bool, error) {
return false, err return false, err
} }
tfJob, err = c.MSJobClient.KubeflowV1().MSJobs(tfJob.ObjectMeta.Namespace).Get(tfJob.ObjectMeta.Name, metav1.GetOptions{}) msJob, err = c.MSJobClient.KubeflowV1().MSJobs(msJob.ObjectMeta.Namespace).Get(msJob.ObjectMeta.Name, metav1.GetOptions{})
if err != nil { if err != nil {
return false, err return false, err
} }
// TODO(jlewi): This logic will need to change when/if we get rid of phases and move to conditions. At that if msJob.Status.Phase == msv1.MSJobPhaseCleanUp {
// case we should forget about a job when the appropriate condition is reached.
if tfJob.Status.Phase == msv1.MSJobPhaseCleanUp {
return true, nil return true, nil
} }
return false, nil return false, nil
......
...@@ -29,9 +29,8 @@ import ( ...@@ -29,9 +29,8 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
torchv1alpha1 "gitee.com/mindspore/ms-operator/pkg/apis/mindspore/v1" msv1 "gitee.com/mindspore/ms-operator/pkg/apis/mindspore/v1"
"gitee.com/mindspore/ms-operator/pkg/util/k8sutil" "gitee.com/mindspore/ms-operator/pkg/util/k8sutil"
// TOOO(jlewi): Rename to apiErrors
"gitee.com/mindspore/ms-operator/pkg/apis/mindspore/helper" "gitee.com/mindspore/ms-operator/pkg/apis/mindspore/helper"
"gitee.com/mindspore/ms-operator/pkg/util" "gitee.com/mindspore/ms-operator/pkg/util"
) )
...@@ -47,53 +46,53 @@ type MSReplicaSet struct { ...@@ -47,53 +46,53 @@ type MSReplicaSet struct {
recorder record.EventRecorder recorder record.EventRecorder
// Job is a pointer to the TrainingJob to which this replica belongs. // Job is a pointer to the TrainingJob to which this replica belongs.
Job *TrainingJob Job *TrainingJob
Spec torchv1alpha1.MSReplicaSpec Spec msv1.MSReplicaSpec
} }
// MSReplicas is an interface for managing a set of replicas. // MSReplicas is an interface for managing a set of replicas.
type MSReplicaSetInterface interface { type MSReplicaSetInterface interface {
Create() error Create() error
Delete() error Delete() error
GetStatus() (torchv1alpha1.MSReplicaStatus, error) GetStatus() (msv1.MSReplicaStatus, error)
} }
// MSConfig is a struct representing the TensorFlow config. This struct is turned into an environment // MSConfig is a struct representing the MindSpore config. This struct is turned into an environment
// which is used by TensorFlow processes to configure themselves. // which is used by MindSpore processes to configure themselves.
type MSConfig struct { type MSConfig struct {
Cluster ClusterSpec `json:"cluster"` Cluster ClusterSpec `json:"cluster"`
Task TaskSpec `json:"task"` Task TaskSpec `json:"task"`
Environment string `json:"environment"` Environment string `json:"environment"`
} }
func NewMSReplicaSet(clientSet kubernetes.Interface, recorder record.EventRecorder, tfReplicaSpec torchv1alpha1.MSReplicaSpec, job *TrainingJob) (*MSReplicaSet, error) { func NewMSReplicaSet(clientSet kubernetes.Interface, recorder record.EventRecorder, msReplicaSpec msv1.MSReplicaSpec, job *TrainingJob) (*MSReplicaSet, error) {
if tfReplicaSpec.MSReplicaType == torchv1alpha1.MASTER && *tfReplicaSpec.Replicas != 1 { if msReplicaSpec.MSReplicaType == msv1.MASTER && *msReplicaSpec.Replicas != 1 {
return nil, errors.New("The MASTER must have Replicas = 1") return nil, errors.New("The MASTER must have Replicas = 1")
} }
if tfReplicaSpec.MasterPort == nil { if msReplicaSpec.MasterPort == nil {
return nil, errors.New("tfReplicaSpec.MasterPort can't be nil.") return nil, errors.New("msReplicaSpec.MasterPort can't be nil.")
} }
// Make sure the replica type is valid. // Make sure the replica type is valid.
validReplicaTypes := []torchv1alpha1.MSReplicaType{torchv1alpha1.MASTER, torchv1alpha1.WORKER} validReplicaTypes := []msv1.MSReplicaType{msv1.MASTER, msv1.WORKER}
isValidReplicaType := false isValidReplicaType := false
for _, t := range validReplicaTypes { for _, t := range validReplicaTypes {
if t == tfReplicaSpec.MSReplicaType { if t == msReplicaSpec.MSReplicaType {
isValidReplicaType = true isValidReplicaType = true
break break
} }
} }
if !isValidReplicaType { if !isValidReplicaType {
return nil, fmt.Errorf("tfReplicaSpec.MSReplicaType is %v but must be one of %v", tfReplicaSpec.MSReplicaType, validReplicaTypes) return nil, fmt.Errorf("msReplicaSpec.MSReplicaType is %v but must be one of %v", msReplicaSpec.MSReplicaType, validReplicaTypes)
} }
return &MSReplicaSet{ return &MSReplicaSet{
ClientSet: clientSet, ClientSet: clientSet,
recorder: recorder, recorder: recorder,
Job: job, Job: job,
Spec: tfReplicaSpec, Spec: msReplicaSpec,
}, nil }, nil
} }
...@@ -108,7 +107,7 @@ func (s *MSReplicaSet) Labels() KubernetesLabels { ...@@ -108,7 +107,7 @@ func (s *MSReplicaSet) Labels() KubernetesLabels {
"ms_job_name": s.Job.job.ObjectMeta.Name}) "ms_job_name": s.Job.job.ObjectMeta.Name})
} }
func (s *MSReplicaSet) Create(config *torchv1alpha1.ControllerConfig, worldSize int32) error { func (s *MSReplicaSet) Create(config *msv1.ControllerConfig, worldSize int32) error {
// Create services // Create services
err := s.SyncServices() err := s.SyncServices()
if err != nil { if err != nil {
...@@ -137,7 +136,7 @@ func (s *MSReplicaSet) CreateServiceWithIndex(index int32) (*v1.Service, error) ...@@ -137,7 +136,7 @@ func (s *MSReplicaSet) CreateServiceWithIndex(index int32) (*v1.Service, error)
Selector: taskLabels, Selector: taskLabels,
Ports: []v1.ServicePort{ Ports: []v1.ServicePort{
{ {
Name: "tf-port", Name: "ms-port",
Port: *s.Spec.MasterPort, Port: *s.Spec.MasterPort,
}, },
}, },
...@@ -173,7 +172,7 @@ func (s *MSReplicaSet) CreatePodWithIndex(index int32, worldSize int32) (*v1.Pod ...@@ -173,7 +172,7 @@ func (s *MSReplicaSet) CreatePodWithIndex(index int32, worldSize int32) (*v1.Pod
masterAddr = "localhost" masterAddr = "localhost"
} }
rank := strconv.Itoa(int(index)) rank := strconv.Itoa(int(index))
tfConfig := MSConfig{ msConfig := MSConfig{
Cluster: s.Job.ClusterSpec(), Cluster: s.Job.ClusterSpec(),
Task: TaskSpec{ Task: TaskSpec{
Type: strings.ToLower(string(s.Spec.MSReplicaType)), Type: strings.ToLower(string(s.Spec.MSReplicaType)),
...@@ -183,27 +182,26 @@ func (s *MSReplicaSet) CreatePodWithIndex(index int32, worldSize int32) (*v1.Pod ...@@ -183,27 +182,26 @@ func (s *MSReplicaSet) CreatePodWithIndex(index int32, worldSize int32) (*v1.Pod
Environment: "cloud", Environment: "cloud",
} }
tfConfigJson, err := json.Marshal(tfConfig) msConfigJson, err := json.Marshal(msConfig)
if err != nil { if err != nil {
log.Errorf("Job: %v serializing tfConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(tfConfig), err) log.Errorf("Job: %v serializing msConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(msConfig), err)
return nil, err return nil, err
} }
// TODO(jose5918) Do not need TF_CONFIG but leaving for POC // Add MS_CONFIG environment variable.
// Add TF_CONFIG environment variable.
for i, _ := range pod.Spec.Containers { for i, _ := range pod.Spec.Containers {
// We can't get c in the loop variable because that would be by value so our modifications // We can't get c in the loop variable because that would be by value so our modifications
// wouldn't have any effect. // wouldn't have any effect.
c := &pod.Spec.Containers[i] c := &pod.Spec.Containers[i]
if c.Name != torchv1alpha1.DefaultMSContainer { if c.Name != msv1.DefaultMSContainer {
continue continue
} }
if len(c.Env) == 0 { if len(c.Env) == 0 {
c.Env = make([]v1.EnvVar, 0) c.Env = make([]v1.EnvVar, 0)
} }
c.Env = append(c.Env, v1.EnvVar{ c.Env = append(c.Env, v1.EnvVar{
Name: "TF_CONFIG", Name: "MS_CONFIG",
Value: string(tfConfigJson), Value: string(msConfigJson),
}) })
c.Env = append(c.Env, v1.EnvVar{ c.Env = append(c.Env, v1.EnvVar{
Name: "MASTER_PORT", Name: "MASTER_PORT",
...@@ -258,7 +256,6 @@ func (s *MSReplicaSet) Delete() error { ...@@ -258,7 +256,6 @@ func (s *MSReplicaSet) Delete() error {
} }
// Services doesn't support DeleteCollection so we delete them individually. // Services doesn't support DeleteCollection so we delete them individually.
// TODO(jlewi): We should check if this has changed with K8s 1.8 or other releases.
for index := int32(0); index < *s.Spec.Replicas; index++ { for index := int32(0); index < *s.Spec.Replicas; index++ {
log.V(1).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName((index))) log.V(1).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName((index)))
err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.genName(index), &meta_v1.DeleteOptions{}) err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.genName(index), &meta_v1.DeleteOptions{})
...@@ -269,17 +266,17 @@ func (s *MSReplicaSet) Delete() error { ...@@ -269,17 +266,17 @@ func (s *MSReplicaSet) Delete() error {
} }
} }
// If the ConfigMap for the default parameter server exists, we delete it // If the ConfigMap for the default master exists, we delete it
log.Infof("Get ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName()) log.Infof("Get ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultMasterConfigMapName())
_, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Get(s.defaultPSConfigMapName(), meta_v1.GetOptions{}) _, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Get(s.defaultMasterConfigMapName(), meta_v1.GetOptions{})
if err != nil { if err != nil {
if !k8sutil.IsKubernetesResourceNotFoundError(err) { if !k8sutil.IsKubernetesResourceNotFoundError(err) {
log.Errorf("Error deleting ConfigMap %v; %v", s.defaultPSConfigMapName(), err) log.Errorf("Error deleting ConfigMap %v; %v", s.defaultMasterConfigMapName(), err)
failures = true failures = true
} }
} else { } else {
log.Infof("Delete ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName()) log.Infof("Delete ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultMasterConfigMapName())
err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Delete(s.defaultPSConfigMapName(), &meta_v1.DeleteOptions{}) err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Delete(s.defaultMasterConfigMapName(), &meta_v1.DeleteOptions{})
if err != nil { if err != nil {
log.Errorf("There was a problem deleting the ConfigMaps; %v", err) log.Errorf("There was a problem deleting the ConfigMaps; %v", err)
failures = true failures = true
...@@ -293,7 +290,7 @@ func (s *MSReplicaSet) Delete() error { ...@@ -293,7 +290,7 @@ func (s *MSReplicaSet) Delete() error {
} }
// replicaStatusFromPodList returns a status from a list of pods for a job. // replicaStatusFromPodList returns a status from a list of pods for a job.
func replicaStatusFromPodList(l v1.PodList, name string) torchv1alpha1.ReplicaState { func replicaStatusFromPodList(l v1.PodList, name string) msv1.ReplicaState {
var latest *v1.Pod var latest *v1.Pod
for _, i := range l.Items { for _, i := range l.Items {
if latest == nil { if latest == nil {
...@@ -306,10 +303,10 @@ func replicaStatusFromPodList(l v1.PodList, name string) torchv1alpha1.ReplicaSt ...@@ -306,10 +303,10 @@ func replicaStatusFromPodList(l v1.PodList, name string) torchv1alpha1.ReplicaSt
} }
if latest == nil { if latest == nil {
return torchv1alpha1.ReplicaStateRunning return msv1.ReplicaStateRunning
} }
var tfState v1.ContainerState var msState v1.ContainerState
for _, i := range latest.Status.ContainerStatuses { for _, i := range latest.Status.ContainerStatuses {
if i.Name != name { if i.Name != name {
...@@ -317,45 +314,45 @@ func replicaStatusFromPodList(l v1.PodList, name string) torchv1alpha1.ReplicaSt ...@@ -317,45 +314,45 @@ func replicaStatusFromPodList(l v1.PodList, name string) torchv1alpha1.ReplicaSt
} }
// We need to decide whether to use the current state or the previous termination state. // We need to decide whether to use the current state or the previous termination state.
tfState = i.State msState = i.State
// If the container previously terminated we will look at the termination to decide whether it is a retryable // If the container previously terminated we will look at the termination to decide whether it is a retryable
// or permanenent error. // or permanenent error.
if i.LastTerminationState.Terminated != nil { if i.LastTerminationState.Terminated != nil {
tfState = i.LastTerminationState msState = i.LastTerminationState
} }
} }
if tfState.Running != nil || tfState.Waiting != nil { if msState.Running != nil || msState.Waiting != nil {
return torchv1alpha1.ReplicaStateRunning return msv1.ReplicaStateRunning
} }
if tfState.Terminated != nil { if msState.Terminated != nil {
if tfState.Terminated.ExitCode == 0 { if msState.Terminated.ExitCode == 0 {
return torchv1alpha1.ReplicaStateSucceeded return msv1.ReplicaStateSucceeded
} }
if isRetryableTerminationState(tfState.Terminated) { if isRetryableTerminationState(msState.Terminated) {
// Since its a retryable error just return RUNNING. // Since its a retryable error just return RUNNING.
// We can just let Kubernetes restart the container to retry. // We can just let Kubernetes restart the container to retry.
return torchv1alpha1.ReplicaStateRunning return msv1.ReplicaStateRunning
} }
return torchv1alpha1.ReplicaStateFailed return msv1.ReplicaStateFailed
} }
return torchv1alpha1.ReplicaStateUnknown return msv1.ReplicaStateUnknown
} }
func (s *MSReplicaSet) GetSingleReplicaStatus(index int32) torchv1alpha1.ReplicaState { func (s *MSReplicaSet) GetSingleReplicaStatus(index int32) msv1.ReplicaState {
p, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{}) p, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{})
if err != nil { if err != nil {
return torchv1alpha1.ReplicaStateUnknown return msv1.ReplicaStateUnknown
} }
if v1.PodSucceeded == p.Status.Phase { if v1.PodSucceeded == p.Status.Phase {
return torchv1alpha1.ReplicaStateSucceeded return msv1.ReplicaStateSucceeded
} }
labels := s.Labels() labels := s.Labels()
...@@ -363,33 +360,30 @@ func (s *MSReplicaSet) GetSingleReplicaStatus(index int32) torchv1alpha1.Replica ...@@ -363,33 +360,30 @@ func (s *MSReplicaSet) GetSingleReplicaStatus(index int32) torchv1alpha1.Replica
selector, err := labels.ToSelector() selector, err := labels.ToSelector()
if err != nil { if err != nil {
log.Errorf("labels.ToSelector() error; %v", err) log.Errorf("labels.ToSelector() error; %v", err)
return torchv1alpha1.ReplicaStateFailed return msv1.ReplicaStateFailed
} }
// TODO(jlewi): Handle errors. We need to get the pod and looking at recent container exits.
l, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).List(meta_v1.ListOptions{ l, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).List(meta_v1.ListOptions{
// TODO(jlewi): Why isn't the label selector working?
LabelSelector: selector, LabelSelector: selector,
}) })
if err != nil { if err != nil {
// TODO(jlewi): Are there errors that should be treated as retryable errors? return msv1.ReplicaStateFailed
return torchv1alpha1.ReplicaStateFailed
} }
status := replicaStatusFromPodList(*l, torchv1alpha1.DefaultMSContainer) status := replicaStatusFromPodList(*l, msv1.DefaultMSContainer)
return status return status
} }
// Status returns the status of the replica set. // Status returns the status of the replica set.
func (s *MSReplicaSet) GetStatus() (torchv1alpha1.MSReplicaStatus, error) { func (s *MSReplicaSet) GetStatus() (msv1.MSReplicaStatus, error) {
status := torchv1alpha1.MSReplicaStatus{ status := msv1.MSReplicaStatus{
MSReplicaType: s.Spec.MSReplicaType, MSReplicaType: s.Spec.MSReplicaType,
State: torchv1alpha1.ReplicaStateUnknown, State: msv1.ReplicaStateUnknown,
ReplicasStates: make(map[torchv1alpha1.ReplicaState]int), ReplicasStates: make(map[msv1.ReplicaState]int),
} }
increment := func(state torchv1alpha1.ReplicaState) { increment := func(state msv1.ReplicaState) {
v, ok := status.ReplicasStates[state] v, ok := status.ReplicasStates[state]
if ok { if ok {
status.ReplicasStates[state] = v + 1 status.ReplicasStates[state] = v + 1
...@@ -405,20 +399,20 @@ func (s *MSReplicaSet) GetStatus() (torchv1alpha1.MSReplicaStatus, error) { ...@@ -405,20 +399,20 @@ func (s *MSReplicaSet) GetStatus() (torchv1alpha1.MSReplicaStatus, error) {
// Determine the overall status for the replica set based on the status of the individual // Determine the overall status for the replica set based on the status of the individual
// replicas. // replicas.
// If any of the replicas failed mark the set as failed. // If any of the replicas failed mark the set as failed.
if _, ok := status.ReplicasStates[torchv1alpha1.ReplicaStateFailed]; ok { if _, ok := status.ReplicasStates[msv1.ReplicaStateFailed]; ok {
status.State = torchv1alpha1.ReplicaStateFailed status.State = msv1.ReplicaStateFailed
return status, nil return status, nil
} }
// If any replicas are RUNNING mark it as RUNNING. // If any replicas are RUNNING mark it as RUNNING.
if _, ok := status.ReplicasStates[torchv1alpha1.ReplicaStateRunning]; ok { if _, ok := status.ReplicasStates[msv1.ReplicaStateRunning]; ok {
status.State = torchv1alpha1.ReplicaStateRunning status.State = msv1.ReplicaStateRunning
return status, nil return status, nil
} }
// If all of the replicas succeeded consider it success. // If all of the replicas succeeded consider it success.
if v, ok := status.ReplicasStates[torchv1alpha1.ReplicaStateSucceeded]; ok && int32(v) == *s.Spec.Replicas { if v, ok := status.ReplicasStates[msv1.ReplicaStateSucceeded]; ok && int32(v) == *s.Spec.Replicas {
status.State = torchv1alpha1.ReplicaStateSucceeded status.State = msv1.ReplicaStateSucceeded
return status, nil return status, nil
} }
...@@ -515,7 +509,7 @@ func (s *MSReplicaSet) SyncServices() error { ...@@ -515,7 +509,7 @@ func (s *MSReplicaSet) SyncServices() error {
} }
func (s *MSReplicaSet) genName(index int32) string { func (s *MSReplicaSet) genName(index int32) string {
// Truncate tfjob name to 40 characters // Truncate msjob name to 40 characters
// The whole job name should be compliant with the DNS_LABEL spec, up to a max length of 63 characters // The whole job name should be compliant with the DNS_LABEL spec, up to a max length of 63 characters
// Thus genName(40 chars)-replicaType(6 chars)-runtimeId(4 chars)-index(4 chars), also leaving some spaces // Thus genName(40 chars)-replicaType(6 chars)-runtimeId(4 chars)-index(4 chars), also leaving some spaces
// See https://github.com/kubernetes/community/blob/master/contributors/design-proposals/architecture/identifiers.md // See https://github.com/kubernetes/community/blob/master/contributors/design-proposals/architecture/identifiers.md
...@@ -527,7 +521,7 @@ func (s *MSReplicaSet) genPodName(index int32) string { ...@@ -527,7 +521,7 @@ func (s *MSReplicaSet) genPodName(index int32) string {
return s.genName(index) + "-" + util.RandString(5) return s.genName(index) + "-" + util.RandString(5)
} }
func (s *MSReplicaSet) defaultPSConfigMapName() string { func (s *MSReplicaSet) defaultMasterConfigMapName() string {
return fmt.Sprintf("cm-ps-%v", s.Job.job.Spec.RuntimeId) return fmt.Sprintf("cm-ps-%v", s.Job.job.Spec.RuntimeId)
} }
...@@ -34,8 +34,6 @@ import ( ...@@ -34,8 +34,6 @@ import (
"gitee.com/mindspore/ms-operator/pkg/util" "gitee.com/mindspore/ms-operator/pkg/util"
) )
// TODO(jlewi): We should switch a New pattern and make trainingJob private so we can
// ensure correctness on creation.
type TrainingJob struct { type TrainingJob struct {
job *msv1.MSJob job *msv1.MSJob
...@@ -55,9 +53,7 @@ type TrainingJob struct { ...@@ -55,9 +53,7 @@ type TrainingJob struct {
memberCounter int memberCounter int
} }
// TODO(jose5918): We don't really need the cluster spec for this operator but no harm in leaving it for POC // ClusterSpec represents a cluster MindSpore specification.
// ClusterSpec represents a cluster TensorFlow specification.
// https://www.tensorflow.org/deploy/distributed#create_a_tftrainclusterspec_to_describe_the_cluster
// It is a map from job names to network addresses. // It is a map from job names to network addresses.
type ClusterSpec map[string][]string type ClusterSpec map[string][]string
...@@ -110,7 +106,6 @@ func (j *TrainingJob) ClusterSpec() ClusterSpec { ...@@ -110,7 +106,6 @@ func (j *TrainingJob) ClusterSpec() ClusterSpec {
// createResources creates all the replicas if requested // createResources creates all the replicas if requested
func (j *TrainingJob) createResources(config *msv1.ControllerConfig) error { func (j *TrainingJob) createResources(config *msv1.ControllerConfig) error {
// TODO(jose5918) Need to figure out where it is best to add worldSize logic
// Get MS worldSize by adding replicas // Get MS worldSize by adding replicas
worldSize := int32(0) worldSize := int32(0)
for _, r := range j.Replicas { for _, r := range j.Replicas {
...@@ -144,7 +139,6 @@ func (j *TrainingJob) GetStatus() (msv1.State, []*msv1.MSReplicaStatus, error) { ...@@ -144,7 +139,6 @@ func (j *TrainingJob) GetStatus() (msv1.State, []*msv1.MSReplicaStatus, error) {
replicaStatuses := make([]*msv1.MSReplicaStatus, 0) replicaStatuses := make([]*msv1.MSReplicaStatus, 0)
// The state for each replica. // The state for each replica.
// TODO(jlewi): We will need to modify this code if we want to allow multiples of a given type of replica.
replicaSetStates := make(map[msv1.MSReplicaType]msv1.ReplicaState) replicaSetStates := make(map[msv1.MSReplicaType]msv1.ReplicaState)
for _, r := range j.Replicas { for _, r := range j.Replicas {
...@@ -176,8 +170,6 @@ func (j *TrainingJob) GetStatus() (msv1.State, []*msv1.MSReplicaStatus, error) { ...@@ -176,8 +170,6 @@ func (j *TrainingJob) GetStatus() (msv1.State, []*msv1.MSReplicaStatus, error) {
// isRetryableTerminationState returns true if a container terminated in a state // isRetryableTerminationState returns true if a container terminated in a state
// that we consider retryable. // that we consider retryable.
func isRetryableTerminationState(s *v1.ContainerStateTerminated) bool { func isRetryableTerminationState(s *v1.ContainerStateTerminated) bool {
// TODO(jlewi): Need to match logic in
// https://cs.corp.google.com/piper///depot/google3/cloud/ml/beta/job/training_job_state_util.cc?l=88
if s.Reason == "OOMKilled" { if s.Reason == "OOMKilled" {
// If the user's process causes an OOM and Docker kills the container, // If the user's process causes an OOM and Docker kills the container,
// the termination reason of ContainerState will be specified to // the termination reason of ContainerState will be specified to
...@@ -189,8 +181,6 @@ func isRetryableTerminationState(s *v1.ContainerStateTerminated) bool { ...@@ -189,8 +181,6 @@ func isRetryableTerminationState(s *v1.ContainerStateTerminated) bool {
return false return false
} }
// TODO(jlewi): Should we use the exit code reported in the termination
// log message and not the ExitCode reported by the container.
if s.ExitCode >= 0 && s.ExitCode <= 127 { if s.ExitCode >= 0 && s.ExitCode <= 127 {
// For the exit_code in [0, 127]: // For the exit_code in [0, 127]:
...@@ -271,19 +261,12 @@ func (j *TrainingJob) setupReplicas() error { ...@@ -271,19 +261,12 @@ func (j *TrainingJob) setupReplicas() error {
} }
func (j *TrainingJob) Delete() { func (j *TrainingJob) Delete() {
// TODO(jlewi): Delete is what should cause us to delete the Pods.
// we shouldn't delete the pods when the jobs finish because leaving the pods
// allows us to get the logs from the pods after the job finishes.
//
log.Infof("MSJob %v deleted by the user", j.fullname()) log.Infof("MSJob %v deleted by the user", j.fullname())
// TODO(jlewi): This logic is probably insufficient. // TODO(jlewi): This logic is probably insufficient.
if j.job.Status.Phase != msv1.MSJobPhaseCleanUp { if j.job.Status.Phase != msv1.MSJobPhaseCleanUp {
j.status.Phase = msv1.MSJobPhaseCleanUp j.status.Phase = msv1.MSJobPhaseCleanUp
} }
// TODO(jlewi): Does it make sense to explicitly delete the resources? Should
// we just rely on K8s garbage collection to delete the resources before
// deleting MSJob?
if cErr := j.deleteResources(); cErr != nil { if cErr := j.deleteResources(); cErr != nil {
log.Errorf("trainingJob.deleteResources() error; %v", cErr) log.Errorf("trainingJob.deleteResources() error; %v", cErr)
} }
...@@ -331,9 +314,6 @@ func (j *TrainingJob) Reconcile(config *msv1.ControllerConfig) error { ...@@ -331,9 +314,6 @@ func (j *TrainingJob) Reconcile(config *msv1.ControllerConfig) error {
return err return err
} }
// TODO(jlewi): Can we determine from the CRD status whether we should
// Create the resources or not? We need to ensure the resources exist so for
// now we always call Create.
if j.job.Status.Phase == msv1.MSJobPhaseCreating || j.job.Status.Phase == msv1.MSJobPhaseRunning { if j.job.Status.Phase == msv1.MSJobPhaseCreating || j.job.Status.Phase == msv1.MSJobPhaseRunning {
// We call Create to make sure all the resources exist and are running. // We call Create to make sure all the resources exist and are running.
if cErr := j.createResources(config); cErr != nil { if cErr := j.createResources(config); cErr != nil {
...@@ -354,7 +334,6 @@ func (j *TrainingJob) Reconcile(config *msv1.ControllerConfig) error { ...@@ -354,7 +334,6 @@ func (j *TrainingJob) Reconcile(config *msv1.ControllerConfig) error {
log.Errorf("GetStatus() for job %v returned error: %v", j.job.ObjectMeta.Name, err) log.Errorf("GetStatus() for job %v returned error: %v", j.job.ObjectMeta.Name, err)
return err return err
} }
// TODO(jlewi): We should update the Phase if we detect the job is done.
if state == msv1.StateFailed { if state == msv1.StateFailed {
log.Errorf("Master failed Job: %v.", j.job.ObjectMeta.Name) log.Errorf("Master failed Job: %v.", j.job.ObjectMeta.Name)
j.status.Phase = msv1.MSJobPhaseDone j.status.Phase = msv1.MSJobPhaseDone
...@@ -367,7 +346,6 @@ func (j *TrainingJob) Reconcile(config *msv1.ControllerConfig) error { ...@@ -367,7 +346,6 @@ func (j *TrainingJob) Reconcile(config *msv1.ControllerConfig) error {
log.Infof("Job %v status=%v", j.job.ObjectMeta.Name, util.Pformat(j.status)) log.Infof("Job %v status=%v", j.job.ObjectMeta.Name, util.Pformat(j.status))
} }
} }
// TODO(jose5918) Need to figure out where it is best to add worldSize logic
// Get MS worldSize by adding replicas // Get MS worldSize by adding replicas
worldSize := int32(0) worldSize := int32(0)
for _, r := range j.Replicas { for _, r := range j.Replicas {
......
...@@ -33,8 +33,6 @@ import ( ...@@ -33,8 +33,6 @@ import (
const RecommendedConfigPathEnvVar = "KUBECONFIG" const RecommendedConfigPathEnvVar = "KUBECONFIG"
// TODO(jlewi): I think this function is used to add an owner to a resource. I think we we should use this
// method to ensure all resources created for the TFJob are owned by the TFJob.
func addOwnerRefToObject(o metav1.Object, r metav1.OwnerReference) { func addOwnerRefToObject(o metav1.Object, r metav1.OwnerReference) {
o.SetOwnerReferences(append(o.GetOwnerReferences(), r)) o.SetOwnerReferences(append(o.GetOwnerReferences(), r))
} }
...@@ -99,15 +97,11 @@ func JobListOpt(clusterName string) metav1.ListOptions { ...@@ -99,15 +97,11 @@ func JobListOpt(clusterName string) metav1.ListOptions {
func LabelsForJob(jobName string) map[string]string { func LabelsForJob(jobName string) map[string]string {
return map[string]string{ return map[string]string{
// TODO(jlewi): Need to set appropriate labels for TF.
"ms_job": jobName, "ms_job": jobName,
"app": msv1.AppLabel, "app": msv1.AppLabel,
} }
} }
// TODO(jlewi): CascadeDeletOptions are part of garbage collection policy.
// Do we want to use this? See
// https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
func CascadeDeleteOptions(gracePeriodSeconds int64) *metav1.DeleteOptions { func CascadeDeleteOptions(gracePeriodSeconds int64) *metav1.DeleteOptions {
return &metav1.DeleteOptions{ return &metav1.DeleteOptions{
GracePeriodSeconds: func(t int64) *int64 { return &t }(gracePeriodSeconds), GracePeriodSeconds: func(t int64) *int64 { return &t }(gracePeriodSeconds),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册