提交 c2365749 编写于 作者: L liqingping

Merge branch '22-server-profilings' into 'develop'

Resolve "server汇报profilings接口"

See merge request platform/CloudNative4AI/cluster-lifecycle/di-orchestrator!56
......@@ -87,7 +87,7 @@ lint:
test: ginkgo ## Run tests.
$(GINKGO) -nodes 4 -v -cover -coverprofile=coverage.out ./pkg/...
go tool cover -func=./pkg/controllers/coverage.out
go tool cover -func=./pkg/server/http/coverage.out
go tool cover -func=./pkg/server/coverage.out
go tool cover -func=./pkg/common/gpuallocator/coverage.out
##@ Build
......
......@@ -16,45 +16,46 @@ limitations under the License.
package server
import (
"context"
"flag"
"fmt"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
cmdcommon "opendilab.org/di-orchestrator/cmd/common"
div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1"
gpualloc "opendilab.org/di-orchestrator/pkg/common/gpuallocator"
serverdynamic "opendilab.org/di-orchestrator/pkg/server/dynamic"
serverhttp "opendilab.org/di-orchestrator/pkg/server/http"
dicontext "opendilab.org/di-orchestrator/pkg/context"
"opendilab.org/di-orchestrator/pkg/server"
)
type CreateOptions struct {
cmdcommon.GenericFlags
ServerBindAddress string
GPUAllocPolicy string
ProbeAddress string
MetricAddress string
}
func NewCreateOptions(genFlags cmdcommon.GenericFlags) *CreateOptions {
return &CreateOptions{
GenericFlags: genFlags,
ServerBindAddress: ":8080",
GPUAllocPolicy: gpualloc.SimpleGPUAllocPolicy,
ServerBindAddress: ":8081",
ProbeAddress: ":8080",
MetricAddress: ":8089",
}
}
func (o *CreateOptions) AddFlags(cmd *cobra.Command) {
cmd.Flags().StringVarP(&o.ServerBindAddress, "server-bind-address", "b", o.ServerBindAddress,
cmd.Flags().StringVarP(&o.ServerBindAddress, "server-bind-address", "s", o.ServerBindAddress,
"The address for server to bind to.")
cmd.Flags().StringVarP(&o.GPUAllocPolicy, "gpu-alloc-policy", "p", o.GPUAllocPolicy,
"The policy for server to allocate gpus to pods.")
cmd.Flags().StringVarP(&o.ProbeAddress, "probe-address", "p", o.ProbeAddress,
"The address for probe to connect to.")
cmd.Flags().StringVar(&o.MetricAddress, "metric-addr", o.MetricAddress, "The address the metric endpoint binds to.")
}
// serverCmd represents the server command
......@@ -78,32 +79,55 @@ Examples:
return serverCmd
}
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(div2alpha1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
func runCommand(cmd *cobra.Command, options *CreateOptions) error {
flag.Parse()
logger := zap.New(zap.UseFlagOptions(options.GenericFlags.ZapOpts))
cfg, err := ctrl.GetConfig()
ctrl.SetLogger(logger)
config := ctrl.GetConfigOrDie()
mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: options.MetricAddress,
HealthProbeBindAddress: options.ProbeAddress,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
return err
}
kubeClient := kubernetes.NewForConfigOrDie(cfg)
dynamicClient := dynamic.NewForConfigOrDie(cfg)
dif := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, serverdynamic.ResyncPeriod, corev1.NamespaceAll, nil)
dyi := serverdynamic.NewDynamicInformer(dif)
// start dynamic informer
stopCh := make(chan struct{})
go dif.Start(stopCh)
diGVR := schema.GroupVersionResource{
Group: div2alpha1.GroupVersion.Group,
Version: div2alpha1.GroupVersion.Version,
Resource: "dijobs",
ctx := dicontext.NewContext(context.Background(),
config,
mgr.GetClient(),
mgr.GetEventRecorderFor("di-operator"),
ctrl.Log.WithName("di-operator"))
diServer := server.NewDIServer(ctx, options.ServerBindAddress)
mgr.Add(diServer)
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
return err
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
return err
}
diclient := dynamicClient.Resource(diGVR)
diServer := serverhttp.NewDIServer(kubeClient, diclient, logger, dyi, options.GPUAllocPolicy)
if err := diServer.Start(options.ServerBindAddress); err != nil {
return fmt.Errorf("failed to start di-server: %v", err)
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
return err
}
return nil
}
......@@ -4125,6 +4125,7 @@ rules:
- ""
resources:
- events
- nodes
- pods
- services
verbs:
......@@ -4139,7 +4140,6 @@ rules:
- ""
resources:
- namespaces
- nodes
verbs:
- get
- list
......@@ -4285,12 +4285,12 @@ spec:
spec:
containers:
- args:
- --zap-devel=true
- --probe-addr=:8080
- --metric-addr=:8443
- --leader-elect
command:
- /di-orchestrator
- --zap-devel=true
- operator
envFrom:
- configMapRef:
......@@ -4340,8 +4340,8 @@ spec:
spec:
containers:
- args:
- --server-bind-address=:8080
- --gpu-alloc-policy=simple
- --zap-devel=true
- --server-bind-address=:8081
command:
- /di-orchestrator
- server
......
......@@ -20,8 +20,7 @@ spec:
- server
args:
- --zap-devel=true
- --server-bind-address=:8080
- --gpu-alloc-policy=simple
- --server-bind-address=:8081
image: registry.sensetime.com/cloudnative4ai/di-orchestrator:v1.0.0
imagePullPolicy: Always
name: server
......
......@@ -10,6 +10,7 @@ rules:
- ""
resources:
- events
- nodes
- pods
- services
verbs:
......@@ -24,7 +25,6 @@ rules:
- ""
resources:
- namespaces
- nodes
verbs:
- get
- list
......
......@@ -145,6 +145,10 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fvbommel/sortorder v1.0.1/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs=
github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U=
github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
......@@ -204,6 +208,13 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4=
github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA=
github.com/go-openapi/validate v0.19.8/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE=
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/gobuffalo/here v0.6.0/go.mod h1:wAG085dHOYqUpf+Ap+WOdrPTp5IYcDAs/x7PLa8Y5fM=
......@@ -334,6 +345,7 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
......@@ -356,6 +368,8 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
......@@ -370,6 +384,8 @@ github.com/markbates/pkger v0.17.1/go.mod h1:0JoVlrol20BSywW79rN3kdFFsE5xYM+rSCQ
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
......@@ -514,6 +530,10 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
......@@ -567,6 +587,7 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
......@@ -707,6 +728,7 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
......
......@@ -4,19 +4,8 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
)
type DIJobRequestParams struct {
JobID []string `json:"jobID"`
Generation []string `json:"generation"`
}
const (
RequestParamTypeJobID string = "job_id"
RequestParamTypeGeneration string = "generation"
)
type DIJobRequest struct {
JobID string `json:"jobID"`
Replicas int `json:"replicas"`
Replicas int `json:"replicas"`
}
type ResourceQuantity struct {
......
......@@ -52,8 +52,8 @@ func NewDIJobReconciler(scheme *runtime.Scheme, ctx dicontext.Context) *DIJobRec
//+kubebuilder:rbac:groups=diengine.opendilab.org,resources=dijobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=diengine.opendilab.org,resources=dijobs/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=diengine.opendilab.org,resources=dijobs/finalizers,verbs=update
//+kubebuilder:rbac:groups="",resources=pods;services;events,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=namespaces;nodes,verbs=get;list
//+kubebuilder:rbac:groups="",resources=pods;services;events;nodes,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
......
package dynamic
import (
"fmt"
"log"
"strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1"
)
func GetPodFromObject(obj interface{}) (*corev1.Pod, error) {
podUn := obj.(*unstructured.Unstructured)
var pod corev1.Pod
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(podUn.UnstructuredContent(), &pod); err != nil {
log.Printf("failed to convert pod %v", err)
return nil, err
}
owners := pod.GetOwnerReferences()
for _, owner := range owners {
if owner.Kind == div2alpha1.KindDIJob {
return &pod, nil
}
}
return nil, fmt.Errorf("pod %s not belong to DIJob", pod.Name)
}
func GetServiceFromObject(obj interface{}) (*corev1.Service, error) {
svcUn := obj.(*unstructured.Unstructured)
var service corev1.Service
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(svcUn.UnstructuredContent(), &service); err != nil {
log.Printf("failed to convert service %v", err)
return nil, err
}
owners := service.GetOwnerReferences()
for _, owner := range owners {
if owner.Kind == div2alpha1.KindDIJob {
return &service, nil
}
}
return nil, fmt.Errorf("service %s not belong to DIJob", service.Name)
}
func isNotBelongToDIJobError(err error) bool {
if strings.Contains(err.Error(), "not belong to DIJob") {
return true
}
return false
}
package dynamic
import (
"log"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1"
)
var (
ResyncPeriod = 30 * time.Second
)
type Informers struct {
DIInformer informers.GenericInformer
PodInformer informers.GenericInformer
NodeInformer informers.GenericInformer
}
func NewDynamicInformer(dif dynamicinformer.DynamicSharedInformerFactory) Informers {
// add DIJob informer
diGVR := schema.GroupVersionResource{
Group: div2alpha1.GroupVersion.Group,
Version: div2alpha1.GroupVersion.Version,
Resource: "dijobs",
}
// add pod informer
podGVR := schema.GroupVersionResource{
Group: corev1.SchemeGroupVersion.Group,
Version: corev1.SchemeGroupVersion.Version,
Resource: "pods",
}
// add node infomer
nodeGVR := schema.GroupVersionResource{
Group: corev1.SchemeGroupVersion.Group,
Version: corev1.SchemeGroupVersion.Version,
Resource: "nodes",
}
dyi := Informers{
DIInformer: dif.ForResource(diGVR),
PodInformer: dif.ForResource(podGVR),
NodeInformer: dif.ForResource(nodeGVR),
}
dyi.DIInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// on add object
log.Printf("new DIJob: %s/%s", obj.(*unstructured.Unstructured).GetNamespace(), obj.(*unstructured.Unstructured).GetName())
},
UpdateFunc: func(old, new interface{}) {
// on update object
},
DeleteFunc: func(obj interface{}) {
// on delete object
},
},
)
dyi.PodInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// on add object
pod, err := GetPodFromObject(obj)
if err != nil {
if isNotBelongToDIJobError(err) {
dyi.PodInformer.Informer().GetIndexer().Delete(obj)
}
return
}
log.Printf("new pod: %s/%s", pod.GetNamespace(), pod.GetName())
},
UpdateFunc: func(old, new interface{}) {},
DeleteFunc: func(obj interface{}) {
// on delete object
},
},
)
dyi.NodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(old, new interface{}) {},
DeleteFunc: func(obj interface{}) {},
},
)
return dyi
}
package server
import (
"context"
"fmt"
"net/http"
"strconv"
"github.com/gin-gonic/gin"
apiequality "k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1"
dicommon "opendilab.org/di-orchestrator/pkg/common"
commontypes "opendilab.org/di-orchestrator/pkg/common/types"
diutil "opendilab.org/di-orchestrator/pkg/utils"
)
func (s *DIServer) getRequestJob(c *gin.Context) (*div2alpha1.DIJob, int, error) {
// get request params from request
rawID := c.Param("id")
namespace, name, generation, err := parseJobID(rawID)
if err != nil {
return nil, -1, err
}
job := &div2alpha1.DIJob{}
err = s.ctx.Get(context.Background(), types.NamespacedName{namespace, name}, job)
if err != nil {
return nil, -1, err
}
return job, generation, nil
}
func (s *DIServer) getReplicas(c *gin.Context) {
// get request params from request
job, generation, err := s.getRequestJob(c)
if err != nil {
data, statusCode := s.buildResponse(nil, "", err)
c.JSON(statusCode, data)
return
}
log := s.ctx.Log.WithName("getReplicas").WithValues("job", diutil.NamespacedName(job.Namespace, job.Name))
if int32(generation) != job.Status.Generation {
err := &commontypes.DIError{Type: commontypes.ErrorBadRequest,
Message: fmt.Sprintf("request generation %d is not matched with job generation %d", generation, job.Status.Generation)}
data, statusCode := s.buildResponse(nil, "", err)
c.JSON(statusCode, data)
return
}
reps, err := s.getNamespacedReplicas(job, generation)
if err != nil {
data, statusCode := s.buildResponse(nil, "", err)
c.JSON(statusCode, data)
return
}
log.Info("successfully get replicas")
data, statusCode := s.buildResponse(reps, "successfully get replicas", nil)
c.JSON(statusCode, data)
}
func (s *DIServer) getNamespacedReplicas(job *div2alpha1.DIJob, generation int) ([]string, error) {
log := s.ctx.Log.WithName("getNamespacedReplicas").WithValues("job", diutil.NamespacedName(job.Namespace, job.Name))
// list pods that belong to the DIJob
pods, err := s.ctx.ListJobPods(job)
if err != nil {
log.Error(err, "failed to list collectors and learners")
return nil, err
}
// get access urls
var urls []string
for _, pod := range pods {
if pod.Status.PodIP == "" || pod.Annotations[dicommon.AnnotationGeneration] != strconv.Itoa(generation) {
continue
}
replicas, _ := strconv.Atoi(pod.Annotations[dicommon.AnnotationReplicas])
rank, _ := strconv.Atoi(pod.Annotations[dicommon.AnnotationRank])
if urls == nil {
urls = make([]string, replicas)
}
port, found := diutil.GetDefaultPortFromPod(pod)
if !found {
port = dicommon.DefaultPort
}
podIP := pod.Status.PodIP
url := fmt.Sprintf("%s:%d", podIP, port)
urls[rank] = url
}
return urls, nil
}
// add replicas api
func (s *DIServer) addReplicas(c *gin.Context) {
// get request params from request
job, generation, err := s.getRequestJob(c)
if err != nil {
data, statusCode := s.buildResponse(nil, "", err)
c.JSON(statusCode, data)
return
}
var reqs commontypes.DIJobRequest
if err = c.ShouldBindJSON(&reqs); err != nil {
dierr := &commontypes.DIError{Type: commontypes.ErrorBadRequest,
Message: err.Error()}
data, statusCode := s.buildResponse(nil, "", dierr)
c.JSON(statusCode, data)
return
}
log := s.ctx.Log.WithName("addReplicas").WithValues("job", diutil.NamespacedName(job.Namespace, job.Name))
// add replicas
if int32(generation) != job.Status.Generation {
err := &commontypes.DIError{Type: commontypes.ErrorBadRequest,
Message: fmt.Sprintf("request generation %d is not matched with job generation %d", generation, job.Status.Generation)}
data, statusCode := s.buildResponse(nil, "", err)
c.JSON(statusCode, data)
return
}
if !job.Spec.Preemptible {
oldStatus := job.Status.DeepCopy()
job.Status.Replicas += int32(reqs.Replicas)
if !apiequality.Semantic.DeepEqual(*oldStatus, job.Status) {
if err := s.ctx.UpdateDIJobStatusInCluster(job); err != nil {
log.Error(err, "failed to update DIJobStatus", "job", job.Name)
data, statusCode := s.buildResponse(nil, "", err)
c.JSON(statusCode, data)
return
}
}
}
log.Info("successfully add replicas", "number", reqs.Replicas)
data, statusCode := s.buildResponse(nil, "successfully add replicas", nil)
c.JSON(statusCode, data)
}
// delete replicas api
func (s *DIServer) deleteReplicas(c *gin.Context) {
// get request body
job, generation, err := s.getRequestJob(c)
if err != nil {
dierr := &commontypes.DIError{Type: commontypes.ErrorBadRequest,
Message: err.Error()}
data, statusCode := s.buildResponse(nil, "", dierr)
c.JSON(statusCode, data)
return
}
log := s.ctx.Log.WithName("deleteReplicas").WithValues("job", diutil.NamespacedName(job.Namespace, job.Name))
var reqs commontypes.DIJobRequest
if err = c.ShouldBindJSON(&reqs); err != nil {
dierr := &commontypes.DIError{Type: commontypes.ErrorBadRequest,
Message: err.Error()}
data, statusCode := s.buildResponse(nil, "", dierr)
c.JSON(statusCode, data)
return
}
// delete replicas
if int32(generation) != job.Status.Generation {
err := &commontypes.DIError{Type: commontypes.ErrorBadRequest,
Message: fmt.Sprintf("request generation %d is not matched with job generation %d", generation, job.Status.Generation)}
data, statusCode := s.buildResponse(nil, "", err)
c.JSON(statusCode, data)
return
}
if !job.Spec.Preemptible {
oldStatus := job.Status.DeepCopy()
job.Status.Replicas -= int32(reqs.Replicas)
if !apiequality.Semantic.DeepEqual(*oldStatus, job.Status) {
if err := s.ctx.UpdateDIJobStatusInCluster(job); err != nil {
log.Error(err, "failed to update DIJobStatus", "job", job.Name)
data, statusCode := s.buildResponse(nil, "", err)
c.JSON(statusCode, data)
return
}
}
}
log.Info("successfully delete replicas", "number", reqs.Replicas)
data, statusCode := s.buildResponse(nil, "successfully delete replicas", nil)
c.JSON(statusCode, data)
}
func (s *DIServer) profilings(c *gin.Context) {
// get request body
job, generation, err := s.getRequestJob(c)
if err != nil {
dierr := &commontypes.DIError{Type: commontypes.ErrorBadRequest,
Message: err.Error()}
data, statusCode := s.buildResponse(nil, "", dierr)
c.JSON(statusCode, data)
return
}
log := s.ctx.Log.WithName("deleteReplicas").WithValues("job", diutil.NamespacedName(job.Namespace, job.Name))
var reqs div2alpha1.Profilings
if err = c.ShouldBindJSON(&reqs); err != nil {
dierr := &commontypes.DIError{Type: commontypes.ErrorBadRequest,
Message: err.Error()}
data, statusCode := s.buildResponse(nil, "", dierr)
c.JSON(statusCode, data)
return
}
if int32(generation) != job.Status.Generation {
err := &commontypes.DIError{Type: commontypes.ErrorBadRequest,
Message: fmt.Sprintf("request generation %d is not matched with job generation %d", generation, job.Status.Generation)}
data, statusCode := s.buildResponse(nil, "", err)
c.JSON(statusCode, data)
return
}
oldStatus := job.Status.DeepCopy()
job.Status.Profilings = reqs
if !apiequality.Semantic.DeepEqual(*oldStatus, job.Status) {
if err := s.ctx.UpdateDIJobStatusInCluster(job); err != nil {
log.Error(err, "failed to update DIJobStatus", "job", job.Name)
data, statusCode := s.buildResponse(nil, "", err)
c.JSON(statusCode, data)
return
}
}
log.Info("successfully report profilings")
data, statusCode := s.buildResponse(nil, "successfully report profilings", nil)
c.JSON(statusCode, data)
}
func (s *DIServer) buildResponse(reps []string, msg string, err error) (commontypes.Response, int) {
log := s.ctx.Log.WithName("DIServer")
var success bool = true
var code int = commontypes.CodeSuccess
var statusCode int = http.StatusOK
if err != nil {
success = false
code = commontypes.CodeFailed
msg = err.Error()
// define status code
if commontypes.IsNotFound(err) || k8serrors.IsNotFound(err) {
statusCode = http.StatusNotFound
} else if commontypes.IsAlreadyExists(err) || k8serrors.IsAlreadyExists(err) {
statusCode = http.StatusConflict
} else if commontypes.IsBadRequest(err) || k8serrors.IsBadRequest(err) {
statusCode = http.StatusBadRequest
} else if commontypes.IsNotImplemented(err) {
statusCode = http.StatusNotImplemented
} else {
statusCode = http.StatusInternalServerError
}
log.Error(err, "failed to process request")
}
// build response
rep := commontypes.Response{
Success: success,
Code: code,
Message: msg,
Data: reps,
}
return rep, statusCode
}
package http
import (
"context"
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1"
commontypes "opendilab.org/di-orchestrator/pkg/common/types"
)
var (
statusUpdateRetries = 3
statusUpdatedPauseDuration = 50 * time.Millisecond
)
func (s *DIServer) getDIJob(namespace, name string) (*div2alpha1.DIJob, error) {
diUn, err := s.DIClient.Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
var job div2alpha1.DIJob
err = runtime.DefaultUnstructuredConverter.FromUnstructured(diUn.UnstructuredContent(), &job)
if err != nil {
errMsg := fmt.Sprintf("failed to convert unstructured: %s", diUn.UnstructuredContent())
return nil, fmt.Errorf(errMsg)
}
return &job, nil
}
func (s *DIServer) getCachedDIJobByKey(key string) (*div2alpha1.DIJob, error) {
obj, exists, err := s.dyi.DIInformer.Informer().GetIndexer().GetByKey(key)
if err != nil {
errMsg := fmt.Sprintf("failed to get DIJob: %s", err)
return nil, fmt.Errorf(errMsg)
}
if !exists {
errMsg := fmt.Sprintf("DIJob: %s not exists in cache", key)
return nil, &commontypes.DIError{Type: commontypes.ErrorNotFound, Message: errMsg}
}
diUn := obj.(*unstructured.Unstructured)
var diJob div2alpha1.DIJob
err = runtime.DefaultUnstructuredConverter.FromUnstructured(diUn.UnstructuredContent(), &diJob)
if err != nil {
errMsg := fmt.Sprintf("failed to convert unstructured: %s", diUn.UnstructuredContent())
return nil, fmt.Errorf(errMsg)
}
return &diJob, nil
}
func (s *DIServer) needMultiDDPLearnerPod(resource commontypes.ResourceQuantity) (bool, error) {
if err := s.SyncNodes(); err != nil {
return false, err
}
gpusMajority := s.gpuAllocator.NumGPUsOfMajorityNodeType()
if gpusMajority <= 0 {
return false, nil
}
if int(resource.GPU.Value()) > gpusMajority {
return true, nil
}
return false, nil
}
func (s *DIServer) updateDIJobStatusInCluster(job *div2alpha1.DIJob) error {
var err error
for i := 0; i < statusUpdateRetries; i++ {
newJob := &div2alpha1.DIJob{}
job, err := s.getDIJob(job.Namespace, job.Name)
if err != nil {
break
}
newJob.Status = job.Status
jobMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newJob)
if err != nil {
break
}
jobUn := &unstructured.Unstructured{Object: jobMap}
if _, err := s.DIClient.Namespace(job.Namespace).Update(context.Background(), jobUn, metav1.UpdateOptions{}); err == nil {
time.Sleep(statusUpdatedPauseDuration)
break
}
}
return err
}
package http
import (
"net/http"
)
// healthz is a liveness probe.
func healthz(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}
// // readyz is a readiness probe.
// func readyz(isReady *atomic.Value) http.HandlerFunc {
// return func(w http.ResponseWriter, _ *http.Request) {
// if isReady == nil || !isReady.Load().(bool) {
// http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
// return
// }
// w.WriteHeader(http.StatusOK)
// }
// }
package http
import (
"fmt"
mapset "github.com/deckarep/golang-set"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
commontypes "opendilab.org/di-orchestrator/pkg/common/types"
diutil "opendilab.org/di-orchestrator/pkg/utils"
)
func (s *DIServer) getPodsByNames(namespace string, names []string) ([]*corev1.Pod, error) {
// use set to filter out duplicate items
nameSlice := []interface{}{}
for _, name := range names {
nameSlice = append(nameSlice, name)
}
nameSet := mapset.NewSetFromSlice(nameSlice)
var keys []string
var pods []*corev1.Pod
for name := range nameSet.Iterator().C {
key := diutil.NamespacedName(namespace, name.(string))
keys = append(keys, key)
}
pods, err := s.getPodsByKeys(keys)
if err != nil {
return pods, err
}
return pods, nil
}
func (s *DIServer) getPodsByKeys(keys []string) ([]*corev1.Pod, error) {
var pods []*corev1.Pod
for _, key := range keys {
pod, err := s.getPodByKey(key)
if err != nil {
return pods, err
}
pods = append(pods, pod)
}
return pods, nil
}
func (s *DIServer) getPodByKey(key string) (*corev1.Pod, error) {
obj, exists, err := s.dyi.PodInformer.Informer().GetIndexer().GetByKey(key)
if err != nil {
errMsg := fmt.Sprintf("failed to get pod: %s", err)
return nil, fmt.Errorf(errMsg)
}
if !exists {
errMsg := fmt.Sprintf("pod: %s not exists in cache", key)
return nil, &commontypes.DIError{Type: commontypes.ErrorNotFound, Message: errMsg}
}
podUn := obj.(*unstructured.Unstructured)
var pod corev1.Pod
err = runtime.DefaultUnstructuredConverter.FromUnstructured(podUn.UnstructuredContent(), &pod)
if err != nil {
errMsg := fmt.Sprintf("failed to convert unstructured: %s", podUn.UnstructuredContent())
return nil, fmt.Errorf(errMsg)
}
return &pod, nil
}
func (s *DIServer) listReplicaPodsWithSelector(jobID string, labelSelector labels.Selector) (
pods []*corev1.Pod, err error) {
// list pods that belong to the DIJob
id, err := diutil.SplitNamespaceName(jobID)
if err != nil {
return
}
pods, err = s.listPodsWithSelector(id.Namespace, labelSelector)
if err != nil {
return
}
return
}
func (s *DIServer) listPodsWithSelector(namespace string, labelSelector labels.Selector) ([]*corev1.Pod, error) {
ret, err := s.dyi.PodInformer.Lister().ByNamespace(namespace).List(labelSelector)
if err != nil {
return nil, err
}
pods := []*corev1.Pod{}
for _, obj := range ret {
podUn := obj.(*unstructured.Unstructured)
var pod corev1.Pod
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(podUn.UnstructuredContent(), &pod); err != nil {
return nil, err
}
pods = append(pods, &pod)
}
return pods, nil
}
package http
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
dicommon "opendilab.org/di-orchestrator/pkg/common"
gpualloc "opendilab.org/di-orchestrator/pkg/common/gpuallocator"
commontypes "opendilab.org/di-orchestrator/pkg/common/types"
serverdynamic "opendilab.org/di-orchestrator/pkg/server/dynamic"
diutil "opendilab.org/di-orchestrator/pkg/utils"
)
var (
apiVersion = "v2alpha1"
replicasAPI = "/replicas"
)
func withAPIVersion(api string) string {
return fmt.Sprintf("/%s%s", apiVersion, api)
}
type DIServer struct {
KubeClient *kubernetes.Clientset
DIClient dynamic.NamespaceableResourceInterface
Log logr.Logger
dyi serverdynamic.Informers
gpuAllocator gpualloc.GPUAllocator
}
func NewDIServer(
kubeClient *kubernetes.Clientset,
diClient dynamic.NamespaceableResourceInterface,
log logr.Logger,
dyi serverdynamic.Informers,
gpuAllocPolicy string) *DIServer {
var gpuAllocator gpualloc.GPUAllocator
switch gpuAllocPolicy {
case gpualloc.SimpleGPUAllocPolicy:
gpuAllocator = *gpualloc.NewSimpleGPUAllocator([]*corev1.Node{})
}
return &DIServer{
KubeClient: kubeClient,
DIClient: diClient,
Log: log,
dyi: dyi,
gpuAllocator: gpuAllocator,
}
}
func (s *DIServer) Start(serverBindAddress string) error {
log := s.Log.WithName("DIServer")
http.HandleFunc(withAPIVersion(replicasAPI), s.Replicas)
http.HandleFunc("/healthz", healthz)
log.Info("Start listening on", "port", serverBindAddress)
if err := http.ListenAndServe(serverBindAddress, nil); err != nil {
return err
}
return nil
}
func (s *DIServer) SyncNodes() error {
rets, err := s.dyi.NodeInformer.Lister().List(labels.Everything())
if err != nil && !errors.IsNotFound(err) {
return err
}
var nodes []*corev1.Node
for _, ret := range rets {
un := ret.(*unstructured.Unstructured)
var node corev1.Node
if err = runtime.DefaultUnstructuredConverter.FromUnstructured(un.UnstructuredContent(), &node); err != nil {
return err
}
nodes = append(nodes, &node)
}
s.gpuAllocator.Nodes = nodes
return nil
}
func (s *DIServer) Replicas(w http.ResponseWriter, r *http.Request) {
log := s.Log.WithName("Replicas")
var reps []string
var err error
var msg string
// handle request by request method
switch r.Method {
case "GET":
msg = "successfully get replicas"
reps, err = s.getReplicas(r)
case "POST":
msg = "successfully create replicas"
err = s.addReplicas(r)
case "DELETE":
msg = "successfully delete replicas"
err = s.deleteReplicas(r)
default:
err = &commontypes.DIError{Type: commontypes.ErrorNotImplemented, Message: fmt.Sprintf("%s not implemented", r.Method)}
log.Error(err, "method not implemented")
}
rep, statusCode := s.buildResponse(reps, msg, err)
// write response
if err = writeResponse(w, rep, statusCode); err != nil {
log.Error(err, "failed to write response")
}
}
func (s *DIServer) getReplicas(r *http.Request) ([]string, error) {
// get request params from request
rp := commontypes.DIJobRequestParams{}
params := r.URL.Query()
for k, v := range params {
switch strings.ToLower(k) {
case commontypes.RequestParamTypeJobID:
rp.JobID = v
case commontypes.RequestParamTypeGeneration:
rp.Generation = v
default:
errInfo := fmt.Sprintf("request param %s is not supported", k)
return nil, &commontypes.DIError{Type: commontypes.ErrorBadRequest, Message: errInfo}
}
}
var reps []string
var err error
if rp.JobID != nil && rp.Generation != nil {
reps, err = s.getNamespacedReplicas(rp.JobID[0], rp.Generation[0])
if err != nil {
return nil, err
}
}
return reps, nil
}
func (s *DIServer) getNamespacedReplicas(jobID string, generation string) ([]string, error) {
log := s.Log.WithName("getNamespacedReplicas")
job, err := s.getCachedDIJobByKey(jobID)
if err != nil {
log.Error(err, "failed to get owner reference")
return nil, err
}
// list pods that belong to the DIJob
labelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: diutil.GenLabels(*job),
})
if err != nil {
return nil, err
}
pods, err := s.listReplicaPodsWithSelector(jobID, labelSelector)
if err != nil {
log.Error(err, "failed to list collectors and learners")
return nil, err
}
// get access urls
var urls []string
for _, pod := range pods {
if pod.Status.PodIP == "" || pod.Annotations[dicommon.AnnotationGeneration] != generation {
continue
}
replicas, _ := strconv.Atoi(pod.Annotations[dicommon.AnnotationReplicas])
rank, _ := strconv.Atoi(pod.Annotations[dicommon.AnnotationRank])
if urls == nil {
urls = make([]string, replicas)
}
port, found := diutil.GetDefaultPortFromPod(pod)
if !found {
port = dicommon.DefaultPort
}
podIP := pod.Status.PodIP
url := fmt.Sprintf("%s:%d", podIP, port)
urls[rank] = url
}
log.Info("get replicas", "url", urls)
return urls, nil
}
// add replicas api
func (s *DIServer) addReplicas(r *http.Request) error {
log := s.Log.WithName("addReplicas")
// get request body
var req commontypes.DIJobRequest
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
errMsg := fmt.Sprintf("failed to decode request body: %v", err)
return &commontypes.DIError{Type: commontypes.ErrorBadRequest, Message: errMsg}
}
job, err := s.getCachedDIJobByKey(req.JobID)
if err != nil {
return err
}
// add replicas
if job.Spec.Preemptible {
oldStatus := job.Status.DeepCopy()
job.Status.Replicas += int32(req.Replicas)
if !apiequality.Semantic.DeepEqual(*oldStatus, job.Status) {
if err := s.updateDIJobStatusInCluster(job); err != nil {
log.Error(err, "failed to update DIJobStatus", "job", job.Name)
return err
}
}
}
log.Info("successfully add replicas")
return nil
}
// delete replicas api
func (s *DIServer) deleteReplicas(r *http.Request) error {
log := s.Log.WithName("addReplicas")
// get request body
var req commontypes.DIJobRequest
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
errMsg := fmt.Sprintf("failed to decode request body: %v", err)
return &commontypes.DIError{Type: commontypes.ErrorBadRequest, Message: errMsg}
}
job, err := s.getCachedDIJobByKey(req.JobID)
if err != nil {
return err
}
// delete replicas
if job.Spec.Preemptible {
oldStatus := job.Status.DeepCopy()
job.Status.Replicas -= int32(req.Replicas)
if !apiequality.Semantic.DeepEqual(*oldStatus, job.Status) {
if err := s.updateDIJobStatusInCluster(job); err != nil {
log.Error(err, "failed to update DIJobStatus", "job", job.Name)
return err
}
}
}
log.Info("successfully delete replicas")
return nil
}
func (s *DIServer) buildResponse(reps []string, msg string, err error) (commontypes.Response, int) {
log := s.Log.WithName("DIServer")
var success bool = true
var code int = commontypes.CodeSuccess
var statusCode int = http.StatusOK
if err != nil {
success = false
code = commontypes.CodeFailed
msg = err.Error()
// define status code
if commontypes.IsNotFound(err) {
statusCode = http.StatusNotFound
} else if commontypes.IsAlreadyExists(err) {
statusCode = http.StatusConflict
} else if commontypes.IsBadRequest(err) {
statusCode = http.StatusBadRequest
} else if commontypes.IsNotImplemented(err) {
statusCode = http.StatusNotImplemented
} else {
statusCode = http.StatusInternalServerError
}
log.Error(err, "failed to process request")
}
// build response
rep := commontypes.Response{
Success: success,
Code: code,
Message: msg,
Data: reps,
}
return rep, statusCode
}
func writeResponse(w http.ResponseWriter, rep commontypes.Response, statusCode int) error {
w.Header().Set("Conten-Type", "application/json")
w.WriteHeader(statusCode)
repJSON, err := json.Marshal(rep)
if err != nil {
errMsg := fmt.Sprintf("failed to marshal json: %s", err)
http.Error(w, errMsg, http.StatusInternalServerError)
return err
}
_, err = w.Write(repJSON)
if err != nil {
errMsg := fmt.Sprintf("failed to write json: %s", err)
http.Error(w, errMsg, http.StatusInternalServerError)
return err
}
return nil
}
package server
import (
"context"
"github.com/gin-gonic/gin"
dicontext "opendilab.org/di-orchestrator/pkg/context"
)
var (
apiVersion = "v2alpha1"
)
type DIServer struct {
ctx dicontext.Context
serverBindAddress string
}
func NewDIServer(
ctx dicontext.Context,
serverBindAddress string) *DIServer {
return &DIServer{
ctx: ctx,
serverBindAddress: serverBindAddress,
}
}
func (s *DIServer) Start(ctx context.Context) error {
log := s.ctx.Log.WithName("DIServer")
r := gin.Default()
v2alpha1 := r.Group(apiVersion)
{
v2alpha1.GET("job/:id/replicas", s.getReplicas)
v2alpha1.POST("job/:id/replicas", s.addReplicas)
v2alpha1.DELETE("job/:id/replicas", s.deleteReplicas)
v2alpha1.POST("job/:id/profilings", s.profilings)
}
log.Info("Start listening on", "port", s.serverBindAddress)
if err := r.Run(s.serverBindAddress); err != nil {
return err
}
return nil
}
package http
package server
// import (
// "bytes"
......
......@@ -14,11 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package http
package server
import (
"context"
"flag"
"fmt"
"net"
"path/filepath"
......@@ -32,11 +31,8 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
......@@ -44,7 +40,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"
div2alpha1 "opendilab.org/di-orchestrator/pkg/api/v2alpha1"
serverdynamic "opendilab.org/di-orchestrator/pkg/server/dynamic"
dicontext "opendilab.org/di-orchestrator/pkg/context"
//+kubebuilder:scaffold:imports
)
......@@ -67,7 +63,6 @@ var (
// var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment
var kubeClient *kubernetes.Clientset
func TestServer(t *testing.T) {
RegisterFailHandler(Fail)
......@@ -82,7 +77,7 @@ var _ = BeforeSuite(func() {
By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")},
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
ErrorIfCRDPathMissing: true,
}
......@@ -115,38 +110,26 @@ var _ = BeforeSuite(func() {
fmt.Printf("node: %s added to cluster\n", node.Name)
}
kubeClient = kubernetes.NewForConfigOrDie(cfg)
dynamicClient := dynamic.NewForConfigOrDie(cfg)
diGVR := schema.GroupVersionResource{
Group: div2alpha1.GroupVersion.Group,
Version: div2alpha1.GroupVersion.Version,
Resource: "dijobs",
}
diclient := dynamicClient.Resource(diGVR)
dif := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, serverdynamic.ResyncPeriod, corev1.NamespaceAll, nil)
dyi := serverdynamic.NewDynamicInformer(dif)
// start dynamic informer
stopCh := make(chan struct{})
go dif.Start(stopCh)
opts := zap.Options{
Development: true,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
logger := zap.New(zap.UseFlagOptions(&opts))
metricPort := config.GinkgoConfig.ParallelNode + 8200
metricAddress := fmt.Sprintf(":%d", metricPort)
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
MetricsBindAddress: metricAddress,
})
Expect(err).NotTo(HaveOccurred())
gpuAllocPolicy := "simple"
diServer := NewDIServer(kubeClient, diclient, logger, dyi, gpuAllocPolicy)
ctx := dicontext.NewContext(context.Background(),
cfg,
mgr.GetClient(),
mgr.GetEventRecorderFor("di-server"),
ctrl.Log.WithName("di-server"))
localServingPort = port + config.GinkgoConfig.ParallelNode
addrPort := fmt.Sprintf("%s:%d", localServingHost, localServingPort)
go func() {
err := diServer.Start(addrPort)
diServer := NewDIServer(ctx, addrPort)
mgr.Add(diServer)
err := mgr.Start(ctrl.SetupSignalHandler())
fmt.Println(err.Error())
}()
......
package server
import (
"fmt"
"strconv"
"strings"
commontypes "opendilab.org/di-orchestrator/pkg/common/types"
)
func parseJobID(jobID string) (namespace, name string, generation int, err error) {
items := strings.Split(jobID, ".")
if len(items) != 3 {
return "", "", -1, &commontypes.DIError{
Type: commontypes.ErrorBadRequest,
Message: fmt.Sprintf("job id %s must be in namespace.name.generation format", jobID)}
}
gen, err := strconv.Atoi(items[2])
if err != nil {
return "", "", -1, &commontypes.DIError{
Type: commontypes.ErrorBadRequest,
Message: fmt.Sprintf("request generation %s is not a valid number", items[2]),
}
}
return items[0], items[1], gen, nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册