Skip to content

Commit

Permalink
internal/coordinator/pool: add ec2 pool
Browse files Browse the repository at this point in the history
The EC2 buildlet pool added by this commit will manage the lifecycle
of buildlets running on EC2. EC2 VMs will only be created for the
ARM64 architecture. As VMs are requested, the pool will ensure that
there are enough resources for the VM to be created and keep track of
the VMs created. Once a VM is destroyed, the resources will be made
available for other VMs. This pool will only keep instances as they
are needed.

Updates golang/go#36841
Updates golang/go#38337

Change-Id: Ic777485c0b0a69ec13726c58b49e9fdc1df4777e
Reviewed-on: https://go-review.googlesource.com/c/build/+/247907
Run-TryBot: Carlos Amedee <[email protected]>
TryBot-Result: Gobot Gobot <[email protected]>
Reviewed-by: Alexander Rakoczy <[email protected]>
Reviewed-by: Dmitri Shuralyov <[email protected]>
  • Loading branch information
cagedmantis committed Aug 18, 2020
1 parent 078b759 commit 3dadacc
Show file tree
Hide file tree
Showing 5 changed files with 1,154 additions and 14 deletions.
361 changes: 361 additions & 0 deletions internal/coordinator/pool/ec2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,361 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// +build go1.13
// +build linux darwin

package pool

import (
"context"
"errors"
"fmt"
"html"
"io"
"log"
"sync"
"time"

"golang.org/x/build/buildenv"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/internal/cloud"
"golang.org/x/build/internal/spanlog"
)

var _ Buildlet = (*EC2Buildlet)(nil)

// ec2Buildlet is the package level buildlet pool.
//
// TODO(golang.org/issues/38337) remove once a package level variable is no longer
// required by the main package.
var ec2Buildlet *EC2Buildlet

// EC2BuildetPool retrieves the package level EC2Buildlet pool set by the constructor.
//
// TODO(golang.org/issues/38337) remove once a package level variable is no longer
// required by the main package.
func EC2BuildetPool() *EC2Buildlet {
return ec2Buildlet
}

func init() {
// initializes a basic package level ec2Buildlet pool to enable basic testing in other
// packages.
//
// TODO(golang.org/issues/38337) remove once a package level variable is no longer
// required by the main package.
ec2Buildlet = &EC2Buildlet{
ledger: newLedger(),
}
}

// awsClient represents the aws client used to interact with AWS. This is a partial
// implementation of pool.AWSClient.
type awsClient interface {
DestroyInstances(ctx context.Context, instIDs ...string) error
Quota(ctx context.Context, service, code string) (int64, error)
InstanceTypesARM(ctx context.Context) ([]*cloud.InstanceType, error)
RunningInstances(ctx context.Context) ([]*cloud.Instance, error)
}

// EC2Opt is optional configuration for the the buildlet.
type EC2Opt func(*EC2Buildlet)

// WithVMDeleteTimeout sets the VM deletion timeout for all EC2 VMs.
func WithVMDeleteTimeout(timeout time.Duration) EC2Opt {
return func(eb *EC2Buildlet) {
eb.vmDeleteTimeout = timeout
}
}

// EC2Buildlet manages a pool of AWS EC2 buildlets.
type EC2Buildlet struct {
once sync.Once
// done channel closing will signal the pollers to discontinue polling
done chan struct{}

// awsClient is the client used to interact with AWS services.
awsClient awsClient
// buildEnv contains the build enviornment settings.
buildEnv *buildenv.Environment
// buildletClient is the client used to create a buildlet.
buildletClient ec2BuildletClient
// hosts provides the host configuration for all hosts. It is passed in to facilitate
// testing.
hosts map[string]*dashboard.HostConfig
// isRemoteBuildletFunc informs the caller is a VM instance is being used as a remote
// buildlet.
//
// TODO(golang.org/issues/38337) remove once we find a way to pass in remote buildlet
// information at the get buidlet request.
isRemoteBuildlet IsRemoteBuildletFunc
// ledger tracks instances and their resource allocations.
ledger *ledger
// vmDeleteTimeout contains the timeout used to determine if a VM should be deleted.
vmDeleteTimeout time.Duration
}

// ec2BuildletClient represents an EC2 buildlet client in the buildlet package.
type ec2BuildletClient interface {
StartNewVM(ctx context.Context, buildEnv *buildenv.Environment, hconf *dashboard.HostConfig, vmName, hostType string, opts *buildlet.VMOpts) (*buildlet.Client, error)
}

// NewEC2Buildlet creates a new EC2 buildlet pool used to create and manage the lifecycle of
// EC2 buildlets. Information about ARM64 instance types is retrieved before starting the pool.
// EC2 quota types are also retrieved before starting the pool. The pool will continuously poll
// for quotas which limit the resources that can be consumed by the pool. It will also periodically
// search for VMs which are no longer in use or are untracked by the pool in order to delete them.
func NewEC2Buildlet(client *cloud.AWSClient, buildEnv *buildenv.Environment, hosts map[string]*dashboard.HostConfig, fn IsRemoteBuildletFunc, opts ...EC2Opt) (*EC2Buildlet, error) {
if fn == nil {
return nil, errors.New("remote buildlet check function is not set")
}
b := &EC2Buildlet{
awsClient: client,
buildEnv: buildEnv,
buildletClient: buildlet.NewEC2Client(client),
done: make(chan struct{}),
hosts: hosts,
isRemoteBuildlet: fn,
ledger: newLedger(),
vmDeleteTimeout: 45 * time.Minute, // default VM delete timeout
}
for _, opt := range opts {
opt(b)
}
if err := b.retrieveAndSetQuota(); err != nil {
return nil, fmt.Errorf("unable to create EC2 pool: %w", err)
}
if err := b.retrieveAndSetInstanceTypes(); err != nil {
return nil, fmt.Errorf("unable to create EC2 pool: %w", err)
}
go b.cleanupUnusedVMs()
go b.pollQuota()

// TODO(golang.org/issues/38337) remove once a package level variable is no longer
// required by the main package.
ec2Buildlet = b
return b, nil
}

// GetBuildlet retrieves a buildlet client for a newly created buildlet.
func (eb *EC2Buildlet) GetBuildlet(ctx context.Context, hostType string, lg Logger) (*buildlet.Client, error) {
hconf, ok := eb.hosts[hostType]
if !ok {
return nil, fmt.Errorf("ec2 pool: unknown host type %q", hostType)
}
instName := instanceName(hostType, 7)
log.Printf("Creating EC2 VM %q for %s", instName, hostType)
kp, err := buildlet.NewKeyPair()
if err != nil {
log.Printf("failed to create TLS key pair for %s: %s", hostType, err)
return nil, fmt.Errorf("failed to create TLS key pair: %w", err)
}

qsp := lg.CreateSpan("awaiting_ec2_quota")
err = eb.ledger.ReserveResources(ctx, instName, hconf.MachineType())
qsp.Done(err)
if err != nil {
return nil, err
}

ec2BuildletSpan := lg.CreateSpan("create_ec2_buildlet", instName)
defer func() { ec2BuildletSpan.Done(err) }()

var (
createSpan = lg.CreateSpan("create_ec2_instance", instName)
waitBuildlet spanlog.Span
curSpan = createSpan
instanceCreated bool
)
bc, err := eb.buildletClient.StartNewVM(ctx, eb.buildEnv, hconf, instName, hostType, &buildlet.VMOpts{
Zone: "", // allow the EC2 api pick an availability zone with capacity
TLS: kp,
Meta: make(map[string]string),
DeleteIn: deleteTimeoutFromContextOrValue(ctx, eb.vmDeleteTimeout),
OnInstanceRequested: func() {
log.Printf("EC2 VM %q now booting", instName)
},
OnInstanceCreated: func() {
log.Printf("EC2 VM %q now running", instName)
createSpan.Done(nil)
instanceCreated = true
waitBuildlet = lg.CreateSpan("wait_buildlet_start", instName)
curSpan = waitBuildlet
},
OnGotEC2InstanceInfo: func(inst *cloud.Instance) {
lg.LogEventTime("got_instance_info", "waiting_for_buildlet...")
eb.ledger.UpdateReservation(instName, inst.ID)
},
})
if err != nil {
curSpan.Done(err)
log.Printf("EC2 VM creation failed for %s: %v", hostType, err)
if instanceCreated {
log.Printf("EC2 VM %q failed initialize buildlet client. deleting...", instName)
eb.buildletDone(instName)
} else {
eb.ledger.Remove(instName)
}
return nil, err
}
waitBuildlet.Done(nil)
bc.SetDescription(fmt.Sprintf("EC2 VM: %s", instName))
bc.SetOnHeartbeatFailure(func() {
log.Printf("EC2 VM %q failed heartbeat", instName)
eb.buildletDone(instName)
})
return bc, nil
}

// String gives a report of capacity usage for the EC2 buildlet pool.
func (eb *EC2Buildlet) String() string {
return fmt.Sprintf("EC2 pool capacity: %s", eb.capacityString())
}

// capacityString() gives a report of capacity usage.
func (eb *EC2Buildlet) capacityString() string {
r := eb.ledger.Resources()
return fmt.Sprintf("%d instances; %d/%d CPUs", r.InstCount, r.CPUUsed, r.CPULimit)
}

// WriteHTMLStatus writes the status of the EC2 buildlet pool to an io.Writer.
func (eb *EC2Buildlet) WriteHTMLStatus(w io.Writer) {
fmt.Fprintf(w, "<b>EC2 pool</b> capacity: %s", eb.capacityString())

active := eb.ledger.ResourceTime()
if len(active) > 0 {
fmt.Fprintf(w, "<ul>")
for _, inst := range active {
fmt.Fprintf(w, "<li>%v, %s</li>\n", html.EscapeString(inst.Name), friendlyDuration(time.Since(inst.Creation)))
}
fmt.Fprintf(w, "</ul>")
}
}

// buildletDone issues a call to destroy the EC2 instance and removes
// the instance from the ledger. Removing the instance from the ledger
// also releases any resources allocated to that instance. If an instance
// is not found in the ledger or on EC2 then an error is logged. All
// untracked instances will be cleaned up by the polling cleanupUnusedVMs
// method.
func (eb *EC2Buildlet) buildletDone(instName string) {
vmID := eb.ledger.InstanceID(instName)
if vmID == "" {
log.Printf("EC2 vm %s not found", instName)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := eb.awsClient.DestroyInstances(ctx, vmID); err != nil {
log.Printf("EC2 VM %s deletion failed: %s", instName, err)
}
eb.ledger.Remove(instName)
}

// Close stops the pollers used by the EC2Buildlet pool from running.
func (eb *EC2Buildlet) Close() {
eb.once.Do(func() {
close(eb.done)
})
}

// retrieveAndSetQuota queries EC2 for account relevant quotas and sets the quota in the ledger.
func (eb *EC2Buildlet) retrieveAndSetQuota() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

cpuQuota, err := eb.awsClient.Quota(ctx, cloud.QuotaServiceEC2, cloud.QuotaCodeCPUOnDemand)
if err != nil {
log.Printf("unable to query for cpu quota: %s", err)
return err
}
eb.ledger.SetCPULimit(cpuQuota)
return nil
}

// pollQuota repeatedly polls for the EC2 quota data and sets the quota data in
// the ledger. It stops polling once the done channel has been closed.
func (eb *EC2Buildlet) pollQuota() {
t := time.NewTicker(time.Hour)
defer t.Stop()
for {
select {
case <-t.C:
err := eb.retrieveAndSetQuota()
if err != nil {
log.Printf("polling for EC2 quota failed: %s", err)
}
case <-eb.done:
// closing the done channel signals the end of the polling loop.
log.Printf("stopped polling for EC2 quota")
return
}
}
}

// retrieveAndSetInstanceTypes retrieves the ARM64 instance types from the EC2
// service and sets them in the ledger.
func (eb *EC2Buildlet) retrieveAndSetInstanceTypes() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

its, err := eb.awsClient.InstanceTypesARM(ctx)
if err != nil {
return fmt.Errorf("unable to retrieve instance types: %w", err)
}
eb.ledger.UpdateInstanceTypes(its)
log.Printf("ec2 buildlet pool instance types updated")
return nil
}

// cleanupUnusedVMs periodically queries for VMs which are not tracked in the ledger and
// deletes them. If the done channel has been closed then the polling will exit.
func (eb *EC2Buildlet) cleanupUnusedVMs() {
t := time.NewTicker(2 * time.Minute)
defer t.Stop()
for {
select {
case <-t.C:
log.Printf("cleaning up unused EC2 instances")
eb.destroyUntrackedInstances()
case <-eb.done:
// closing the done channel signals the end of the polling loop.
log.Printf("stopped cleaning up unused EC2 instances")
return
}
}
}

// destroyUntrackedInstances searches for VMs which exist but are not being tracked in the
// ledger and deletes them.
func (eb *EC2Buildlet) destroyUntrackedInstances() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

insts, err := eb.awsClient.RunningInstances(ctx)
if err != nil {
log.Printf("failed to query for instances: %s", err)
return
}
deleteInsts := make([]string, 0, len(insts))
for _, inst := range insts {
if eb.isRemoteBuildlet(inst.Name) {
// Remote buildlets have their own expiration mechanism that respects active SSH sessions.
log.Printf("destroyUntrackedInstances: skipping remote buildlet %q", inst.Name)
continue
}
if id := eb.ledger.InstanceID(inst.Name); id != "" {
continue
}
deleteInsts = append(deleteInsts, inst.ID)
log.Printf("queued for deleting untracked EC2 VM %q with id %q", inst.Name, inst.ID)
}
if len(deleteInsts) == 0 {
return
}
if err := eb.awsClient.DestroyInstances(ctx, deleteInsts...); err != nil {
log.Printf("failed cleaning EC2 VMs: %s", err)
}
}
Loading

0 comments on commit 3dadacc

Please sign in to comment.