Skip to content

Commit

Permalink
ProductCatalogService Cleanup (open-telemetry#317)
Browse files Browse the repository at this point in the history
  • Loading branch information
mic-max authored Aug 18, 2022
1 parent 1d3a68c commit 72c2936
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 151 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,5 @@ significant modifications will be credited to OpenTelemetry Authors.
([#293](https:/open-telemetry/opentelemetry-demo/pull/293))
* Enable Locust loadgen environment variable config options
([#316](https:/open-telemetry/opentelemetry-demo/pull/316))
* Simplified and cleaned up ProductCatalogService
([#317](https:/open-telemetry/opentelemetry-demo/pull/317))
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,11 @@ package main

import (
"context"
"flag"
"fmt"
"google.golang.org/grpc/credentials/insecure"
"io/ioutil"
"net"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

pb "github.com/opentelemetry/opentelemetry-demo/src/productcatalogservice/genproto/hipstershop"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
Expand All @@ -48,47 +42,27 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc/credentials/insecure"
)

var (
cat pb.ListProductsResponse
catalogMutex *sync.Mutex
log *logrus.Logger
extraLatency time.Duration

port = "3550"

reloadCatalog bool
log *logrus.Logger
catalog []*pb.Product
)

func init() {
log = logrus.New()
log.Formatter = &logrus.JSONFormatter{
FieldMap: logrus.FieldMap{
logrus.FieldKeyTime: "timestamp",
logrus.FieldKeyLevel: "severity",
logrus.FieldKeyMsg: "message",
},
TimestampFormat: time.RFC3339Nano,
}
log.Out = os.Stdout
catalogMutex = &sync.Mutex{}
err := readCatalogFile(&cat)
if err != nil {
log.Warnf("could not parse product catalog")
}
catalog = readCatalogFile()
}

func InitTracerProvider() *sdktrace.TracerProvider {
ctx := context.Background()

exporter, err := otlptracegrpc.New(ctx)
if err != nil {
log.Fatal(err)
log.Fatalf("OTLP Trace gRPC Creation: %v", err)
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
)
tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter))
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp
Expand All @@ -98,104 +72,57 @@ func main() {
tp := InitTracerProvider()
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()

flag.Parse()

// set injected latency
if s := os.Getenv("EXTRA_LATENCY"); s != "" {
v, err := time.ParseDuration(s)
if err != nil {
log.Fatalf("failed to parse EXTRA_LATENCY (%s) as time.Duration: %+v", v, err)
}
extraLatency = v
log.Infof("extra latency enabled (duration: %v)", extraLatency)
} else {
extraLatency = time.Duration(0)
}

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGUSR2)
go func() {
for {
sig := <-sigs
log.Printf("Received signal: %s", sig)
if sig == syscall.SIGUSR1 {
reloadCatalog = true
log.Infof("Enable catalog reloading")
} else {
reloadCatalog = false
log.Infof("Disable catalog reloading")
}
log.Fatalf("Tracer Provider Shutdown: %v", err)
}
}()

svc := &productCatalog{}
var port string
mustMapEnv(&port, "PRODUCT_CATALOG_SERVICE_PORT")
mustMapEnv(&svc.featureFlagSvcAddr, "FEATURE_FLAG_GRPC_SERVICE_ADDR")

log.Infof("starting grpc server at :%s", port)
run(port)
select {}
}
log.Infof("ProductCatalogService gRPC server started on port: %s", port)

func run(port string) string {
l, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
ln, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
if err != nil {
log.Fatal(err)
log.Fatalf("TCP Listen: %v", err)
}

var srv *grpc.Server = grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
)

svc := &productCatalog{}
mustMapEnv(&svc.featureFlagSvcAddr, "FEATURE_FLAG_GRPC_SERVICE_ADDR")

pb.RegisterProductCatalogServiceServer(srv, svc)
healthpb.RegisterHealthServer(srv, svc)
go srv.Serve(l)
return l.Addr().String()
srv.Serve(ln)
}

type productCatalog struct {
featureFlagSvcAddr string
pb.UnimplementedProductCatalogServiceServer
}

func readCatalogFile(catalog *pb.ListProductsResponse) error {
catalogMutex.Lock()
defer catalogMutex.Unlock()
func readCatalogFile() []*pb.Product {
catalogJSON, err := ioutil.ReadFile("products.json")
if err != nil {
log.Fatalf("failed to open product catalog json file: %v", err)
return err
}
if err := protojson.Unmarshal(catalogJSON, catalog); err != nil {
log.Warnf("failed to parse the catalog JSON: %v", err)
return err
log.Fatalf("Reading Catalog File: %v", err)
}
log.Info("successfully parsed product catalog json")
return nil
}

func parseCatalog() []*pb.Product {
if reloadCatalog || len(cat.Products) == 0 {
err := readCatalogFile(&cat)
if err != nil {
return []*pb.Product{}
}
var res pb.ListProductsResponse
if err := protojson.Unmarshal(catalogJSON, &res); err != nil {
log.Fatalf("Parsing Catalog JSON: %v", err)
}
return cat.Products

return res.Products
}

func mustMapEnv(target *string, envKey string) {
v := os.Getenv(envKey)
if v == "" {
panic(fmt.Sprintf("environment variable %q not set", envKey))
func mustMapEnv(target *string, key string) {
value, present := os.LookupEnv(key)
if !present {
log.Fatalf("Environment Variable Not Set: %q", key)
}
*target = v
*target = value
}

func (p *productCatalog) Check(ctx context.Context, req *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
Expand All @@ -209,14 +136,10 @@ func (p *productCatalog) Watch(req *healthpb.HealthCheckRequest, ws healthpb.Hea
func (p *productCatalog) ListProducts(ctx context.Context, req *pb.Empty) (*pb.ListProductsResponse, error) {
span := trace.SpanFromContext(ctx)

time.Sleep(extraLatency)
var ps []*pb.Product
ps = parseCatalog()

span.SetAttributes(
attribute.Int("app.products.count", len(ps)),
attribute.Int("app.products.count", len(catalog)),
)
return &pb.ListProductsResponse{Products: ps}, nil
return &pb.ListProductsResponse{Products: catalog}, nil
}

func (p *productCatalog) GetProduct(ctx context.Context, req *pb.GetProductRequest) (*pb.Product, error) {
Expand All @@ -225,82 +148,77 @@ func (p *productCatalog) GetProduct(ctx context.Context, req *pb.GetProductReque
attribute.String("app.product.id", req.Id),
)

time.Sleep(extraLatency)

// conditional break if feature flag is enabled on a specific product
// GetProduct will fail on a specific product when feature flag is enabled
if p.checkProductFailure(ctx, req.Id) {
msg := fmt.Sprintf("interal error: product catalog feature flag for failure is enabled")
msg := fmt.Sprintf("Error: ProductCatalogService Fail Feature Flag Enabled")
span.SetStatus(otelcodes.Error, msg)
span.AddEvent(msg)
return nil, status.Errorf(codes.Internal, msg)
}

var found *pb.Product
for i := 0; i < len(parseCatalog()); i++ {
if req.Id == parseCatalog()[i].Id {
found = parseCatalog()[i]
for _, product := range catalog {
if req.Id == product.Id {
found = product
break
}
}

if found == nil {
msg := fmt.Sprintf("no product with ID %s", req.Id)
msg := fmt.Sprintf("Product Not Found: %s", req.Id)
span.SetStatus(otelcodes.Error, msg)
span.AddEvent(msg)
return nil, status.Errorf(codes.NotFound, msg)
} else {
msg := fmt.Sprintf("found product with ID %s, name %s", req.Id, found.Name)
span.AddEvent(msg)
span.SetAttributes(
attribute.String("app.product.name", found.Name),
)
}

msg := fmt.Sprintf("Product Found - ID: %s, Name: %s", req.Id, found.Name)
span.AddEvent(msg)
span.SetAttributes(
attribute.String("app.product.name", found.Name),
)
return found, nil
}

func (p *productCatalog) SearchProducts(ctx context.Context, req *pb.SearchProductsRequest) (*pb.SearchProductsResponse, error) {
span := trace.SpanFromContext(ctx)
time.Sleep(extraLatency)
// Intepret query as a substring match in name or description.
var ps []*pb.Product
for _, p := range parseCatalog() {
if strings.Contains(strings.ToLower(p.Name), strings.ToLower(req.Query)) ||
strings.Contains(strings.ToLower(p.Description), strings.ToLower(req.Query)) {
ps = append(ps, p)

var result []*pb.Product
for _, product := range catalog {
if strings.Contains(strings.ToLower(product.Name), strings.ToLower(req.Query)) ||
strings.Contains(strings.ToLower(product.Description), strings.ToLower(req.Query)) {
result = append(result, product)
}
}
span.SetAttributes(
attribute.Int("app.products.count", len(ps)),
attribute.Int("app.products.count", len(result)),
)
return &pb.SearchProductsResponse{Results: ps}, nil
return &pb.SearchProductsResponse{Results: result}, nil
}

func (p *productCatalog) checkProductFailure(ctx context.Context, id string) bool {
if id != "OLJCESPC7Z" {
return false
}

if id == "OLJCESPC7Z" {
conn, err := createClient(ctx, p.featureFlagSvcAddr)
if err != nil {
//report the error but don't fail
span := trace.SpanFromContext(ctx)
span.AddEvent("error", trace.WithAttributes(attribute.String("message", "failed to connect to feature flag service")))
return false
}
defer conn.Close()

ffResponse, err := pb.NewFeatureFlagServiceClient(conn).GetFlag(ctx, &pb.GetFlagRequest{
Name: "productCatalogFailure",
})
if err != nil {
span := trace.SpanFromContext(ctx)
span.AddEvent("error", trace.WithAttributes(attribute.String("message", "failed to retrieve product catalog feature flag")))
return false
}

if ffResponse.GetFlag().Enabled {
return true
}
conn, err := createClient(ctx, p.featureFlagSvcAddr)
if err != nil {
span := trace.SpanFromContext(ctx)
span.AddEvent("error", trace.WithAttributes(attribute.String("message", "Feature Flag Connection Failed")))
return false
}
defer conn.Close()

flagName := "productCatalogFailure"
ffResponse, err := pb.NewFeatureFlagServiceClient(conn).GetFlag(ctx, &pb.GetFlagRequest{
Name: flagName,
})
if err != nil {
span := trace.SpanFromContext(ctx)
span.AddEvent("error", trace.WithAttributes(attribute.String("message", fmt.Sprintf("GetFlag Failed: %s", flagName))))
return false
}
return false

return ffResponse.GetFlag().Enabled
}

func createClient(ctx context.Context, svcAddr string) (*grpc.ClientConn, error) {
Expand Down

0 comments on commit 72c2936

Please sign in to comment.