Skip to content

Commit

Permalink
user quota. microsoft/pai#5503
Browse files Browse the repository at this point in the history
Signed-off-by: siaimes <[email protected]>
  • Loading branch information
siaimes committed May 11, 2022
1 parent 66f26ac commit 1cf5282
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 0 deletions.
102 changes: 102 additions & 0 deletions pkg/algorithm/hived_algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ package algorithm

import (
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/microsoft/hivedscheduler/pkg/api"
"github.com/microsoft/hivedscheduler/pkg/common"
Expand All @@ -48,6 +54,8 @@ type HivedAlgorithm struct {
freeCellList map[CellChain]ChainCellList
// all affinity groups that have been allocated or are preempting other groups
affinityGroups map[string]*AlgoAffinityGroup
//
quota api.Quota

// vcFreeCellNum, allVCFreeCellNum, and totalLeftCellNum are used to track cell usage of the VCs.
// Note that these numbers count both healthy and bad cells.
Expand Down Expand Up @@ -124,6 +132,7 @@ func NewHivedAlgorithm(sConfig *api.Config) *HivedAlgorithm {
cellChains: chains,
cellTypes: cellTypes,
affinityGroups: map[string]*AlgoAffinityGroup{},
quota: *sConfig.UserQuota,
apiClusterStatus: api.ClusterStatus{
PhysicalCluster: api.PhysicalClusterStatus{},
VirtualClusters: map[api.VirtualClusterName]api.VirtualClusterStatus{},
Expand Down Expand Up @@ -895,6 +904,93 @@ func (h *HivedAlgorithm) handleSchedulingRequest(
return physicalPlacement, virtualPlacement, ""
}

// getQuota get quota from api, format: "quota":N, respond body contains this substring will be parsed as quota.
func (h *HivedAlgorithm) getQuota(username string) (
quota int32,
failedReason string,
failed bool) {

// No quota
if h.quota.Api == "" {
return -1, "", false
}

// Request
req, _ := http.NewRequest("GET", h.quota.Api+username, nil)
// For PAI, we nedd "Authorization", for user-provided api, it may not be needed.
if h.quota.Token != "" {
req.Header.Set("Authorization", "Bearer "+h.quota.Token)
}
resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req)
// Request failed
if err != nil {
return 0, "Get quota failed: " + err.Error(), true
}
// respond status not OK
if resp.StatusCode != 200 {
return 0, "Get quota failed: " + resp.Status, true
}
// Get body data
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
// Failed to read body
if err != nil {
return 0, "Get quota failed: Read respond body failed", true
}
// Extract quota from respond body
regex := regexp.MustCompile(`"quota":(\d+)`)
regexRet := regex.FindStringSubmatch(string(body))
// No quota found, means no quota
if len(regexRet) == 0 {
return -1, "", false
}
// Extract quota from regex
tmp, _ := strconv.ParseInt(regexRet[1], 10, 32)
quota = int32(tmp) // convert to int32
return quota, "", false
}

// scheduleUserQuota make sure that the number of SKUs used by the user does not exceed the quota.
func (h *HivedAlgorithm) scheduleUserQuota(
sr schedulingRequest) (
failedReason string,
failed bool) {
// Find username from affinityGroupName
var username string = sr.affinityGroupName[:strings.Index(sr.affinityGroupName, "~")]

// Get leaf cell quota from quota api
quota, failedReason, failed := h.getQuota(username)
// Failed to get leaf cell quota
if failed {
return failedReason, failed
}
// No quota
if quota < 0 {
return "", false
}

// Count used leaf cell count
var used int32 = 0
for group := range h.affinityGroups {
if h.affinityGroups[group].name[:strings.Index(h.affinityGroups[group].name, "~")] == username {
for pod := range h.affinityGroups[group].totalPodNums {
used += pod * h.affinityGroups[group].totalPodNums[pod]
}
}
}

// Count required leaf cell count
var required int32 = 0
for key := range sr.affinityGroupPodNums {
required += key * sr.affinityGroupPodNums[key]
}

if used+required > quota {
return "exceeded user quota", true
}
return "", false
}

// scheduleGuaranteedAffinityGroup schedules an affinity group in its VC,
// and then maps the placement in VC to the physical cluster.
func (h *HivedAlgorithm) scheduleGuaranteedAffinityGroup(
Expand All @@ -903,6 +999,12 @@ func (h *HivedAlgorithm) scheduleGuaranteedAffinityGroup(
virtualPlacement groupVirtualPlacement,
failedReason string) {

// schedule Quota
failedReason, failed := h.scheduleUserQuota(sr)
if failed {
return nil, nil, failedReason
}

// schedule in VC
virtualPlacement, failedReason = h.vcSchedulers[sr.vc].schedule(sr)
if virtualPlacement == nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type Config struct {

// Specify all the virtual clusters belongs to the physical cluster
VirtualClusters *map[VirtualClusterName]VirtualClusterSpec `yaml:"virtualClusters"`
// UserQuota define the api and token for quota based scheduler
UserQuota *Quota `yaml:"userQuota,omitempty"`
}

func NewConfig(rawConfig *Config) *Config {
Expand Down Expand Up @@ -109,6 +111,9 @@ func NewConfig(rawConfig *Config) *Config {
if c.VirtualClusters == nil {
c.VirtualClusters = defaultVirtualClusters()
}
if c.UserQuota == nil {
c.UserQuota = defaultQuota()
}
// Append default value for empty items in physical cell
defaultingPhysicalCells(c.PhysicalCluster)
// Validation
Expand Down Expand Up @@ -185,6 +190,13 @@ func defaultVirtualClusters() *map[VirtualClusterName]VirtualClusterSpec {
return &map[VirtualClusterName]VirtualClusterSpec{}
}

func defaultQuota() *Quota {
quota := Quota{}
quota.Api = ""
quota.Token = ""
return &quota
}

func InitRawConfig(configPath *string) *Config {
c := Config{}
configFilePath := *configPath
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ type ClusterStatus struct {
VirtualClusters map[VirtualClusterName]VirtualClusterStatus `json:"virtualClusters"`
}

type Quota struct {
// Api for userQuota
Api string `yaml:"api,omitempty"`
// Token for userQuota
Token string `yaml:"token,omitempty"`
}

func (pcs *PhysicalCellStatus) deepCopy() *PhysicalCellStatus {
copied := &PhysicalCellStatus{
CellStatus: pcs.CellStatus,
Expand Down

0 comments on commit 1cf5282

Please sign in to comment.