Skip to content

Commit

Permalink
internal/gomote, internal/gomote/protos: add the upload file endpoint
Browse files Browse the repository at this point in the history
This change adds the upload file endpoint which will be used by the
gomote clients to upload files to GCS before they are retrieved by a
gomote instance. The endpoint generates a signed URL and associated
fields which must be used in the upload.

For golang/go#47521
Updates golang/go#48742

Change-Id: Id85a55b41b8211b3aae8c2e30245a0b71ecfa238
Reviewed-on: https://go-review.googlesource.com/c/build/+/397595
Trust: Carlos Amedee <[email protected]>
Reviewed-by: Heschi Kreinick <[email protected]>
  • Loading branch information
cagedmantis committed Apr 8, 2022
1 parent a7bbee6 commit fc95939
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 197 deletions.
5 changes: 5 additions & 0 deletions buildenv/envs.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ type Environment struct {
// services used by IAP enabled HTTP paths.
// map[backend-service-name]service_id
iapServiceIDs map[string]string

// GomoteTransferBucket is the bucket used by the gomote GRPC service
// to transfer files between gomote clients and the gomote instances.
GomoteTransferBucket string
}

// ComputePrefix returns the URI prefix for Compute Engine resources in a project.
Expand Down Expand Up @@ -304,6 +308,7 @@ var Production = &Environment{
"coordinator-internal-iap": "7963570695201399464",
"relui-internal": "155577380958854618",
},
GomoteTransferBucket: "gomote-transfer",
}

var Development = &Environment{
Expand Down
16 changes: 15 additions & 1 deletion cmd/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"golang.org/x/build/revdial/v2"
"golang.org/x/build/types"
"golang.org/x/time/rate"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
Expand Down Expand Up @@ -343,13 +344,15 @@ func main() {
}
sshCA := mustRetrieveSSHCertificateAuthority()

var gomoteBucket string
var opts []grpc.ServerOption
if *buildEnvName == "" && *mode != "dev" && metadata.OnGCE() {
projectID, err := metadata.ProjectID()
if err != nil {
log.Fatalf("metadata.ProjectID() = %v", err)
}
env := buildenv.ByProjectID(projectID)
gomoteBucket = env.GomoteTransferBucket
var coordinatorBackend, serviceID = "coordinator-internal-iap", ""
if serviceID = env.IAPServiceID(coordinatorBackend); serviceID == "" {
log.Fatalf("unable to retrieve Service ID for backend service=%q", coordinatorBackend)
Expand All @@ -363,7 +366,7 @@ func main() {
dashV1 := legacydash.Handler(gce.GoDSClient(), maintnerClient, string(masterKey()), grpcServer)
dashV2 := &builddash.Handler{Datastore: gce.GoDSClient(), Maintner: maintnerClient}
gs := &gRPCServer{dashboardURL: "https://build.golang.org"}
gomoteServer := gomote.New(remote.NewSessionPool(context.Background()), sched, sshCA)
gomoteServer := gomote.New(remote.NewSessionPool(context.Background()), sched, sshCA, gomoteBucket, mustStorageClient())
protos.RegisterCoordinatorServer(grpcServer, gs)
gomoteprotos.RegisterGomoteServiceServer(grpcServer, gomoteServer)
mux.HandleFunc("/", grpcHandlerFunc(grpcServer, handleStatus)) // Serve a status page at farmer.golang.org.
Expand Down Expand Up @@ -2202,3 +2205,14 @@ func mustRetrieveSSHCertificateAuthority() (privateKey []byte) {
}
return
}

func mustStorageClient() *storage.Client {
if metadata.OnGCE() {
return pool.NewGCEConfiguration().StorageClient()
}
storageClient, err := storage.NewClient(context.Background(), option.WithoutAuthentication())
if err != nil {
log.Fatalf("unable to create storage client: %s", err)
}
return storageClient
}
46 changes: 45 additions & 1 deletion internal/gomote/gomote.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"strings"
"time"

"cloud.google.com/go/storage"
"github.com/google/uuid"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/internal/access"
Expand All @@ -34,24 +36,33 @@ type scheduler interface {
GetBuildlet(ctx context.Context, si *schedule.SchedItem) (buildlet.Client, error)
}

// bucketHandle interface used to enable testing of the storage.bucketHandle.
type bucketHandle interface {
GenerateSignedPostPolicyV4(object string, opts *storage.PostPolicyV4Options) (*storage.PostPolicyV4, error)
}

// Server is a gomote server implementation.
type Server struct {
// embed the unimplemented server.
protos.UnimplementedGomoteServiceServer

bucket bucketHandle
buildlets *remote.SessionPool
gceBucketName string
scheduler scheduler
sshCertificateAuthority ssh.Signer
}

// New creates a gomote server. If the rawCAPriKey is invalid, the program will exit.
func New(rsp *remote.SessionPool, sched *schedule.Scheduler, rawCAPriKey []byte) *Server {
func New(rsp *remote.SessionPool, sched *schedule.Scheduler, rawCAPriKey []byte, gomoteGCSBucket string, storageClient *storage.Client) *Server {
signer, err := ssh.ParsePrivateKey(rawCAPriKey)
if err != nil {
log.Fatalf("unable to parse raw certificate authority private key into signer=%s", err)
}
return &Server{
bucket: storageClient.Bucket(gomoteGCSBucket),
buildlets: rsp,
gceBucketName: gomoteGCSBucket,
scheduler: sched,
sshCertificateAuthority: signer,
}
Expand Down Expand Up @@ -336,6 +347,39 @@ func (s *Server) SignSSHKey(ctx context.Context, req *protos.SignSSHKeyRequest)
}, nil
}

// UploadFile creates a URL and a set of HTTP post fields which are used to upload a file to a staging GCS bucket. Uploaded files are made available to the
// gomote instances via a subsequent call to one of the WriteFromURL endpoints.
func (s *Server) UploadFile(ctx context.Context, req *protos.UploadFileRequest) (*protos.UploadFileResponse, error) {
_, err := access.IAPFromContext(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "request does not contain the required authentication")
}
url, fields, err := s.signURLForUpload(uuid.NewString())
if err != nil {
log.Printf("unable to create signed URL: %s", err)
return nil, status.Errorf(codes.Internal, "unable to create signed url")
}
return &protos.UploadFileResponse{
Url: url,
Fields: fields,
}, nil
}

// signURLForUpload generates a signed URL and a set of http Post fields to be used to upload an object to GCS without authenticating.
func (s *Server) signURLForUpload(object string) (url string, fields map[string]string, err error) {
if object == "" {
return "", nil, errors.New("invalid object name")
}
pv4, err := s.bucket.GenerateSignedPostPolicyV4(object, &storage.PostPolicyV4Options{
Expires: time.Now().Add(10 * time.Minute),
Insecure: false,
})
if err != nil {
return "", nil, fmt.Errorf("unable to generate signed url: %w", err)
}
return pv4.URL, pv4.Fields, nil
}

// WriteTGZFromURL will instruct the gomote instance to download the tar.gz from the provided URL. The tar.gz file will be unpacked in the work directory
// relative to the directory provided.
func (s *Server) WriteTGZFromURL(ctx context.Context, req *protos.WriteTGZFromURLRequest) (*protos.WriteTGZFromURLResponse, error) {
Expand Down
62 changes: 62 additions & 0 deletions internal/gomote/gomote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ package gomote

import (
"context"
"errors"
"fmt"
"io"
"testing"
"time"

"cloud.google.com/go/storage"
"github.com/google/go-cmp/cmp"
"golang.org/x/build/internal/access"
"golang.org/x/build/internal/coordinator/remote"
Expand All @@ -27,13 +29,17 @@ import (
"google.golang.org/protobuf/testing/protocmp"
)

const testBucketName = "unit-testing-bucket"

func fakeGomoteServer(t *testing.T, ctx context.Context) protos.GomoteServiceServer {
signer, err := ssh.ParsePrivateKey([]byte(devCertCAPrivate))
if err != nil {
t.Fatalf("unable to parse raw certificate authority private key into signer=%s", err)
}
return &Server{
bucket: &fakeBucketHandler{bucketName: testBucketName},
buildlets: remote.NewSessionPool(ctx),
gceBucketName: testBucketName,
scheduler: schedule.NewFake(),
sshCertificateAuthority: signer,
}
Expand Down Expand Up @@ -675,6 +681,48 @@ func TestSignSSHKeyError(t *testing.T) {
}
}

func TestUploadFile(t *testing.T) {
ctx := access.FakeContextWithOutgoingIAPAuth(context.Background(), fakeIAP())
client := setupGomoteTest(t, context.Background())
_ = mustCreateInstance(t, client, fakeIAP())
if _, err := client.UploadFile(ctx, &protos.UploadFileRequest{}); err != nil {
t.Fatalf("client.UploadFile(ctx, req) = response, %s; want no error", err)
}
}

func TestUploadFileError(t *testing.T) {
// This test will create a gomote instance and attempt to call UploadFile.
// If overrideID is set to true, the test will use a different gomoteID than the
// the one created for the test.
testCases := []struct {
desc string
ctx context.Context
overrideID bool
filename string
wantCode codes.Code
}{
{
desc: "unauthenticated request",
ctx: context.Background(),
wantCode: codes.Unauthenticated,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
client := setupGomoteTest(t, context.Background())
_ = mustCreateInstance(t, client, fakeIAP())
req := &protos.UploadFileRequest{}
got, err := client.UploadFile(tc.ctx, req)
if err != nil && status.Code(err) != tc.wantCode {
t.Fatalf("unexpected error: %s; want %s", err, tc.wantCode)
}
if err == nil {
t.Fatalf("client.UploadFile(ctx, %v) = %v, nil; want error", req, got)
}
})
}
}

func TestWriteTGZFromURL(t *testing.T) {
ctx := access.FakeContextWithOutgoingIAPAuth(context.Background(), fakeIAP())
client := setupGomoteTest(t, context.Background())
Expand Down Expand Up @@ -871,3 +919,17 @@ OfjWFhdu6e4JYiVfN7ZYAAAAE3Rlc3R1c2VyQGdvbGFuZy5vcmcBAg==
// devCertCAPublic is a public SSH CA certificate to be used for development.
devCertCAPublic = `ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJV3YUncNv+hXneJEO3VEuxxOfjWFhdu6e4JYiVfN7ZY [email protected]`
)

type fakeBucketHandler struct{ bucketName string }

func (fbc *fakeBucketHandler) GenerateSignedPostPolicyV4(object string, opts *storage.PostPolicyV4Options) (*storage.PostPolicyV4, error) {
if object == "" || opts == nil {
return nil, errors.New("invalid arguments")
}
return &storage.PostPolicyV4{
URL: fmt.Sprintf("https://localhost/%s/%s", fbc.bucketName, object),
Fields: map[string]string{
"x-permission-to-post": "granted",
},
}, nil
}
Loading

0 comments on commit fc95939

Please sign in to comment.