From 72c2936283ff1b16e13fce4d64cdad096abf0875 Mon Sep 17 00:00:00 2001 From: Michael Maxwell Date: Thu, 18 Aug 2022 11:01:55 -0700 Subject: [PATCH] ProductCatalogService Cleanup (#317) --- CHANGELOG.md | 2 + .../{server.go => main.go} | 220 ++++++------------ 2 files changed, 71 insertions(+), 151 deletions(-) rename src/productcatalogservice/{server.go => main.go} (50%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3388e161de..bbf73ae8dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,3 +77,5 @@ significant modifications will be credited to OpenTelemetry Authors. ([#293](https://github.com/open-telemetry/opentelemetry-demo/pull/293)) * Enable Locust loadgen environment variable config options ([#316](https://github.com/open-telemetry/opentelemetry-demo/pull/316)) +* Simplified and cleaned up ProductCatalogService +([#317](https://github.com/open-telemetry/opentelemetry-demo/pull/317)) diff --git a/src/productcatalogservice/server.go b/src/productcatalogservice/main.go similarity index 50% rename from src/productcatalogservice/server.go rename to src/productcatalogservice/main.go index 70b2285741..1e9457b63c 100644 --- a/src/productcatalogservice/server.go +++ b/src/productcatalogservice/main.go @@ -16,17 +16,11 @@ package main import ( "context" - "flag" "fmt" - "google.golang.org/grpc/credentials/insecure" "io/ioutil" "net" "os" - "os/signal" "strings" - "sync" - "syscall" - "time" pb "github.com/opentelemetry/opentelemetry-demo/src/productcatalogservice/genproto/hipstershop" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -48,35 +42,17 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/grpc/credentials/insecure" ) var ( - cat pb.ListProductsResponse - catalogMutex *sync.Mutex - log *logrus.Logger - extraLatency time.Duration - - port = "3550" - - reloadCatalog bool + log *logrus.Logger + catalog []*pb.Product ) func init() { log = logrus.New() - log.Formatter = &logrus.JSONFormatter{ - FieldMap: logrus.FieldMap{ - logrus.FieldKeyTime: "timestamp", - logrus.FieldKeyLevel: "severity", - logrus.FieldKeyMsg: "message", - }, - TimestampFormat: time.RFC3339Nano, - } - log.Out = os.Stdout - catalogMutex = &sync.Mutex{} - err := readCatalogFile(&cat) - if err != nil { - log.Warnf("could not parse product catalog") - } + catalog = readCatalogFile() } func InitTracerProvider() *sdktrace.TracerProvider { @@ -84,11 +60,9 @@ func InitTracerProvider() *sdktrace.TracerProvider { exporter, err := otlptracegrpc.New(ctx) if err != nil { - log.Fatal(err) + log.Fatalf("OTLP Trace gRPC Creation: %v", err) } - tp := sdktrace.NewTracerProvider( - sdktrace.WithBatcher(exporter), - ) + tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter)) otel.SetTracerProvider(tp) otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) return tp @@ -98,65 +72,30 @@ func main() { tp := InitTracerProvider() defer func() { if err := tp.Shutdown(context.Background()); err != nil { - log.Printf("Error shutting down tracer provider: %v", err) - } - }() - - flag.Parse() - - // set injected latency - if s := os.Getenv("EXTRA_LATENCY"); s != "" { - v, err := time.ParseDuration(s) - if err != nil { - log.Fatalf("failed to parse EXTRA_LATENCY (%s) as time.Duration: %+v", v, err) - } - extraLatency = v - log.Infof("extra latency enabled (duration: %v)", extraLatency) - } else { - extraLatency = time.Duration(0) - } - - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGUSR2) - go func() { - for { - sig := <-sigs - log.Printf("Received signal: %s", sig) - if sig == syscall.SIGUSR1 { - reloadCatalog = true - log.Infof("Enable catalog reloading") - } else { - reloadCatalog = false - log.Infof("Disable catalog reloading") - } + log.Fatalf("Tracer Provider Shutdown: %v", err) } }() + svc := &productCatalog{} var port string mustMapEnv(&port, "PRODUCT_CATALOG_SERVICE_PORT") + mustMapEnv(&svc.featureFlagSvcAddr, "FEATURE_FLAG_GRPC_SERVICE_ADDR") - log.Infof("starting grpc server at :%s", port) - run(port) - select {} -} + log.Infof("ProductCatalogService gRPC server started on port: %s", port) -func run(port string) string { - l, err := net.Listen("tcp", fmt.Sprintf(":%s", port)) + ln, err := net.Listen("tcp", fmt.Sprintf(":%s", port)) if err != nil { - log.Fatal(err) + log.Fatalf("TCP Listen: %v", err) } + var srv *grpc.Server = grpc.NewServer( grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()), ) - svc := &productCatalog{} - mustMapEnv(&svc.featureFlagSvcAddr, "FEATURE_FLAG_GRPC_SERVICE_ADDR") - pb.RegisterProductCatalogServiceServer(srv, svc) healthpb.RegisterHealthServer(srv, svc) - go srv.Serve(l) - return l.Addr().String() + srv.Serve(ln) } type productCatalog struct { @@ -164,38 +103,26 @@ type productCatalog struct { pb.UnimplementedProductCatalogServiceServer } -func readCatalogFile(catalog *pb.ListProductsResponse) error { - catalogMutex.Lock() - defer catalogMutex.Unlock() +func readCatalogFile() []*pb.Product { catalogJSON, err := ioutil.ReadFile("products.json") if err != nil { - log.Fatalf("failed to open product catalog json file: %v", err) - return err - } - if err := protojson.Unmarshal(catalogJSON, catalog); err != nil { - log.Warnf("failed to parse the catalog JSON: %v", err) - return err + log.Fatalf("Reading Catalog File: %v", err) } - log.Info("successfully parsed product catalog json") - return nil -} -func parseCatalog() []*pb.Product { - if reloadCatalog || len(cat.Products) == 0 { - err := readCatalogFile(&cat) - if err != nil { - return []*pb.Product{} - } + var res pb.ListProductsResponse + if err := protojson.Unmarshal(catalogJSON, &res); err != nil { + log.Fatalf("Parsing Catalog JSON: %v", err) } - return cat.Products + + return res.Products } -func mustMapEnv(target *string, envKey string) { - v := os.Getenv(envKey) - if v == "" { - panic(fmt.Sprintf("environment variable %q not set", envKey)) +func mustMapEnv(target *string, key string) { + value, present := os.LookupEnv(key) + if !present { + log.Fatalf("Environment Variable Not Set: %q", key) } - *target = v + *target = value } func (p *productCatalog) Check(ctx context.Context, req *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { @@ -209,14 +136,10 @@ func (p *productCatalog) Watch(req *healthpb.HealthCheckRequest, ws healthpb.Hea func (p *productCatalog) ListProducts(ctx context.Context, req *pb.Empty) (*pb.ListProductsResponse, error) { span := trace.SpanFromContext(ctx) - time.Sleep(extraLatency) - var ps []*pb.Product - ps = parseCatalog() - span.SetAttributes( - attribute.Int("app.products.count", len(ps)), + attribute.Int("app.products.count", len(catalog)), ) - return &pb.ListProductsResponse{Products: ps}, nil + return &pb.ListProductsResponse{Products: catalog}, nil } func (p *productCatalog) GetProduct(ctx context.Context, req *pb.GetProductRequest) (*pb.Product, error) { @@ -225,82 +148,77 @@ func (p *productCatalog) GetProduct(ctx context.Context, req *pb.GetProductReque attribute.String("app.product.id", req.Id), ) - time.Sleep(extraLatency) - - // conditional break if feature flag is enabled on a specific product + // GetProduct will fail on a specific product when feature flag is enabled if p.checkProductFailure(ctx, req.Id) { - msg := fmt.Sprintf("interal error: product catalog feature flag for failure is enabled") + msg := fmt.Sprintf("Error: ProductCatalogService Fail Feature Flag Enabled") span.SetStatus(otelcodes.Error, msg) span.AddEvent(msg) return nil, status.Errorf(codes.Internal, msg) } var found *pb.Product - for i := 0; i < len(parseCatalog()); i++ { - if req.Id == parseCatalog()[i].Id { - found = parseCatalog()[i] + for _, product := range catalog { + if req.Id == product.Id { + found = product + break } } if found == nil { - msg := fmt.Sprintf("no product with ID %s", req.Id) + msg := fmt.Sprintf("Product Not Found: %s", req.Id) span.SetStatus(otelcodes.Error, msg) span.AddEvent(msg) return nil, status.Errorf(codes.NotFound, msg) - } else { - msg := fmt.Sprintf("found product with ID %s, name %s", req.Id, found.Name) - span.AddEvent(msg) - span.SetAttributes( - attribute.String("app.product.name", found.Name), - ) } + + msg := fmt.Sprintf("Product Found - ID: %s, Name: %s", req.Id, found.Name) + span.AddEvent(msg) + span.SetAttributes( + attribute.String("app.product.name", found.Name), + ) return found, nil } func (p *productCatalog) SearchProducts(ctx context.Context, req *pb.SearchProductsRequest) (*pb.SearchProductsResponse, error) { span := trace.SpanFromContext(ctx) - time.Sleep(extraLatency) - // Intepret query as a substring match in name or description. - var ps []*pb.Product - for _, p := range parseCatalog() { - if strings.Contains(strings.ToLower(p.Name), strings.ToLower(req.Query)) || - strings.Contains(strings.ToLower(p.Description), strings.ToLower(req.Query)) { - ps = append(ps, p) + + var result []*pb.Product + for _, product := range catalog { + if strings.Contains(strings.ToLower(product.Name), strings.ToLower(req.Query)) || + strings.Contains(strings.ToLower(product.Description), strings.ToLower(req.Query)) { + result = append(result, product) } } span.SetAttributes( - attribute.Int("app.products.count", len(ps)), + attribute.Int("app.products.count", len(result)), ) - return &pb.SearchProductsResponse{Results: ps}, nil + return &pb.SearchProductsResponse{Results: result}, nil } func (p *productCatalog) checkProductFailure(ctx context.Context, id string) bool { + if id != "OLJCESPC7Z" { + return false + } - if id == "OLJCESPC7Z" { - conn, err := createClient(ctx, p.featureFlagSvcAddr) - if err != nil { - //report the error but don't fail - span := trace.SpanFromContext(ctx) - span.AddEvent("error", trace.WithAttributes(attribute.String("message", "failed to connect to feature flag service"))) - return false - } - defer conn.Close() - - ffResponse, err := pb.NewFeatureFlagServiceClient(conn).GetFlag(ctx, &pb.GetFlagRequest{ - Name: "productCatalogFailure", - }) - if err != nil { - span := trace.SpanFromContext(ctx) - span.AddEvent("error", trace.WithAttributes(attribute.String("message", "failed to retrieve product catalog feature flag"))) - return false - } - - if ffResponse.GetFlag().Enabled { - return true - } + conn, err := createClient(ctx, p.featureFlagSvcAddr) + if err != nil { + span := trace.SpanFromContext(ctx) + span.AddEvent("error", trace.WithAttributes(attribute.String("message", "Feature Flag Connection Failed"))) + return false + } + defer conn.Close() + flagName := "productCatalogFailure" + ffResponse, err := pb.NewFeatureFlagServiceClient(conn).GetFlag(ctx, &pb.GetFlagRequest{ + Name: flagName, + }) + if err != nil { + span := trace.SpanFromContext(ctx) + span.AddEvent("error", trace.WithAttributes(attribute.String("message", fmt.Sprintf("GetFlag Failed: %s", flagName)))) + return false } - return false + + return ffResponse.GetFlag().Enabled } func createClient(ctx context.Context, svcAddr string) (*grpc.ClientConn, error) {