diff --git a/CHANGELOG.md b/CHANGELOG.md index 1829549ca0b..80731b0eadc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,10 +8,27 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added + +- Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package. + This `Registration` can be used to unregister callbacks. (#3522) +- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524) + ### Removed - The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520) +### Changed + +- Global error handler uses an atomic value instead of a mutex. (#3543) +- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524) +- Add `NewMetricProducer` to `go.opentelemetry.io/otel/bridge/opencensus`, which can be used to pass OpenCensus metrics to an OpenTelemetry Reader. (#3541) +- Global logger uses an atomic value instead of a mutex. (#3545) + +### Deprecated + +- The `NewMetricExporter` in `go.opentelemetry.io/otel/bridge/opencensus` is deprecated. Use `NewMetricProducer` instead. (#3541) + ## [1.11.2/0.34.0] 2022-12-05 ### Added diff --git a/bridge/opencensus/metric.go b/bridge/opencensus/metric.go index df22c874ab8..040dbdfeb8d 100644 --- a/bridge/opencensus/metric.go +++ b/bridge/opencensus/metric.go @@ -22,6 +22,7 @@ import ( ocmetricdata "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricexport" + "go.opencensus.io/metric/metricproducer" "go.opentelemetry.io/otel" internal "go.opentelemetry.io/otel/bridge/opencensus/internal/ocmetric" @@ -33,6 +34,36 @@ import ( const scopeName = "go.opentelemetry.io/otel/bridge/opencensus" +type producer struct { + manager *metricproducer.Manager +} + +// NewMetricProducer returns a metric.Producer that fetches metrics from +// OpenCensus. +func NewMetricProducer() metric.Producer { + return &producer{ + manager: metricproducer.GlobalManager(), + } +} + +func (p *producer) Produce(context.Context) ([]metricdata.ScopeMetrics, error) { + producers := p.manager.GetAll() + data := []*ocmetricdata.Metric{} + for _, ocProducer := range producers { + data = append(data, ocProducer.Read()...) + } + otelmetrics, err := internal.ConvertMetrics(data) + if len(otelmetrics) == 0 { + return nil, err + } + return []metricdata.ScopeMetrics{{ + Scope: instrumentation.Scope{ + Name: scopeName, + }, + Metrics: otelmetrics, + }}, err +} + // exporter implements the OpenCensus metric Exporter interface using an // OpenTelemetry base exporter. type exporter struct { @@ -42,6 +73,7 @@ type exporter struct { // NewMetricExporter returns an OpenCensus exporter that exports to an // OpenTelemetry (push) exporter. +// Deprecated: Use NewMetricProducer instead. func NewMetricExporter(base metric.Exporter, res *resource.Resource) metricexport.Exporter { return &exporter{base: base, res: res} } diff --git a/bridge/opencensus/metric_test.go b/bridge/opencensus/metric_test.go index 57350668362..ed348901f4e 100644 --- a/bridge/opencensus/metric_test.go +++ b/bridge/opencensus/metric_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" ocmetricdata "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricproducer" ocresource "go.opencensus.io/resource" "go.opentelemetry.io/otel/attribute" @@ -32,6 +33,134 @@ import ( "go.opentelemetry.io/otel/sdk/resource" ) +func TestMetricProducer(t *testing.T) { + now := time.Now() + for _, tc := range []struct { + desc string + input []*ocmetricdata.Metric + expected []metricdata.ScopeMetrics + expectErr bool + }{ + { + desc: "empty", + expected: nil, + }, + { + desc: "success", + input: []*ocmetricdata.Metric{ + { + Resource: &ocresource.Resource{ + Labels: map[string]string{ + "R1": "V1", + "R2": "V2", + }, + }, + TimeSeries: []*ocmetricdata.TimeSeries{ + { + StartTime: now, + Points: []ocmetricdata.Point{ + {Value: int64(123), Time: now}, + }, + }, + }, + }, + }, + expected: []metricdata.ScopeMetrics{{ + Scope: instrumentation.Scope{ + Name: scopeName, + }, + Metrics: []metricdata.Metrics{ + { + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Value: 123, + }, + }, + }, + }, + }, + }}, + }, + { + desc: "partial success", + input: []*ocmetricdata.Metric{ + { + Descriptor: ocmetricdata.Descriptor{ + Name: "foo.com/bad-point", + Description: "a bad type", + Unit: ocmetricdata.UnitDimensionless, + Type: ocmetricdata.TypeGaugeDistribution, + }, + }, + { + Resource: &ocresource.Resource{ + Labels: map[string]string{ + "R1": "V1", + "R2": "V2", + }, + }, + TimeSeries: []*ocmetricdata.TimeSeries{ + { + StartTime: now, + Points: []ocmetricdata.Point{ + {Value: int64(123), Time: now}, + }, + }, + }, + }, + }, + expected: []metricdata.ScopeMetrics{{ + Scope: instrumentation.Scope{ + Name: scopeName, + }, + Metrics: []metricdata.Metrics{ + { + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Value: 123, + }, + }, + }, + }, + }, + }}, + expectErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + fakeProducer := &fakeOCProducer{metrics: tc.input} + metricproducer.GlobalManager().AddProducer(fakeProducer) + defer metricproducer.GlobalManager().DeleteProducer(fakeProducer) + output, err := NewMetricProducer().Produce(context.Background()) + if tc.expectErr { + require.Error(t, err) + } else { + require.Nil(t, err) + } + require.Equal(t, len(output), len(tc.expected)) + for i := range output { + metricdatatest.AssertEqual(t, tc.expected[i], output[i]) + } + }) + } +} + +type fakeOCProducer struct { + metrics []*ocmetricdata.Metric +} + +func (f *fakeOCProducer) Read() []*ocmetricdata.Metric { + return f.metrics +} + func TestPushMetricsExporter(t *testing.T) { now := time.Now() for _, tc := range []struct { diff --git a/example/opencensus/main.go b/example/opencensus/main.go index c35c47cac09..8afbd9e5b9b 100644 --- a/example/opencensus/main.go +++ b/example/opencensus/main.go @@ -22,7 +22,6 @@ import ( ocmetric "go.opencensus.io/metric" "go.opencensus.io/metric/metricdata" - "go.opencensus.io/metric/metricexport" "go.opencensus.io/metric/metricproducer" "go.opencensus.io/stats" "go.opencensus.io/stats/view" @@ -34,7 +33,6 @@ import ( "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) @@ -103,20 +101,12 @@ func tracing(otExporter sdktrace.SpanExporter) { // monitoring demonstrates creating an IntervalReader using the OpenTelemetry // exporter to send metrics to the exporter by using either an OpenCensus // registry or an OpenCensus view. -func monitoring(otExporter metric.Exporter) error { - log.Println("Using the OpenTelemetry stdoutmetric exporter to export OpenCensus metrics. This allows routing telemetry from both OpenTelemetry and OpenCensus to a single exporter.") - ocExporter := opencensus.NewMetricExporter(otExporter, resource.Default()) - intervalReader, err := metricexport.NewIntervalReader(&metricexport.Reader{}, ocExporter) - if err != nil { - return fmt.Errorf("failed to create interval reader: %w", err) - } - intervalReader.ReportingInterval = 10 * time.Second - log.Println("Emitting metrics using OpenCensus APIs. These should be printed out using the OpenTelemetry stdoutmetric exporter.") - err = intervalReader.Start() - if err != nil { - return fmt.Errorf("failed to start interval reader: %w", err) - } - defer intervalReader.Stop() +func monitoring(exporter metric.Exporter) error { + log.Println("Adding the OpenCensus metric Producer to an OpenTelemetry Reader to export OpenCensus metrics using the OpenTelemetry stdout exporter.") + reader := metric.NewPeriodicReader(exporter) + // Register the OpenCensus metric Producer to add metrics from OpenCensus to the output. + reader.RegisterProducer(opencensus.NewMetricProducer()) + metric.NewMeterProvider(metric.WithReader(reader)) log.Println("Registering a gauge metric using an OpenCensus registry.") r := ocmetric.NewRegistry() diff --git a/example/prometheus/main.go b/example/prometheus/main.go index bc15f041486..39015994517 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -68,7 +68,7 @@ func main() { if err != nil { log.Fatal(err) } - err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + _, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { n := -10. + rand.Float64()*(90.) // [-10, 100) gauge.Observe(ctx, n, attrs...) }) diff --git a/handler.go b/handler.go index 36cf09f7290..ecd363ab516 100644 --- a/handler.go +++ b/handler.go @@ -17,7 +17,8 @@ package otel // import "go.opentelemetry.io/otel" import ( "log" "os" - "sync" + "sync/atomic" + "unsafe" ) var ( @@ -34,28 +35,26 @@ var ( ) type delegator struct { - lock *sync.RWMutex - eh ErrorHandler + delegate unsafe.Pointer } func (d *delegator) Handle(err error) { - d.lock.RLock() - defer d.lock.RUnlock() - d.eh.Handle(err) + d.getDelegate().Handle(err) +} + +func (d *delegator) getDelegate() ErrorHandler { + return *(*ErrorHandler)(atomic.LoadPointer(&d.delegate)) } // setDelegate sets the ErrorHandler delegate. func (d *delegator) setDelegate(eh ErrorHandler) { - d.lock.Lock() - defer d.lock.Unlock() - d.eh = eh + atomic.StorePointer(&d.delegate, unsafe.Pointer(&eh)) } func defaultErrorHandler() *delegator { - return &delegator{ - lock: &sync.RWMutex{}, - eh: &errLogger{l: log.New(os.Stderr, "", log.LstdFlags)}, - } + d := &delegator{} + d.setDelegate(&errLogger{l: log.New(os.Stderr, "", log.LstdFlags)}) + return d } // errLogger logs errors if no delegate is set, otherwise they are delegated. diff --git a/handler_test.go b/handler_test.go index 32906198f8c..b6b9b20cae0 100644 --- a/handler_test.go +++ b/handler_test.go @@ -54,7 +54,7 @@ type HandlerTestSuite struct { func (s *HandlerTestSuite) SetupSuite() { s.errCatcher = new(testErrCatcher) - s.origHandler = globalErrorHandler.eh + s.origHandler = globalErrorHandler.getDelegate() globalErrorHandler.setDelegate(&errLogger{l: log.New(s.errCatcher, "", 0)}) } @@ -111,12 +111,12 @@ func (s *HandlerTestSuite) TestAllowMultipleSets() { secondary := &errLogger{l: log.New(notUsed, "", 0)} SetErrorHandler(secondary) s.Require().Same(GetErrorHandler(), globalErrorHandler, "set changed globalErrorHandler") - s.Require().Same(globalErrorHandler.eh, secondary, "new Handler not set") + s.Require().Same(globalErrorHandler.getDelegate(), secondary, "new Handler not set") tertiary := &errLogger{l: log.New(notUsed, "", 0)} SetErrorHandler(tertiary) s.Require().Same(GetErrorHandler(), globalErrorHandler, "set changed globalErrorHandler") - s.Assert().Same(globalErrorHandler.eh, tertiary, "user Handler not overridden") + s.Assert().Same(globalErrorHandler.getDelegate(), tertiary, "user Handler not overridden") } func TestHandlerTestSuite(t *testing.T) { diff --git a/internal/global/internal_logging.go b/internal/global/internal_logging.go index ccb3258711a..293c08961fb 100644 --- a/internal/global/internal_logging.go +++ b/internal/global/internal_logging.go @@ -17,7 +17,8 @@ package global // import "go.opentelemetry.io/otel/internal/global" import ( "log" "os" - "sync" + "sync/atomic" + "unsafe" "github.com/go-logr/logr" "github.com/go-logr/stdr" @@ -27,37 +28,36 @@ import ( // // The default logger uses stdr which is backed by the standard `log.Logger` // interface. This logger will only show messages at the Error Level. -var globalLogger logr.Logger = stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile)) -var globalLoggerLock = &sync.RWMutex{} +var globalLogger unsafe.Pointer + +func init() { + SetLogger(stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile))) +} // SetLogger overrides the globalLogger with l. // // To see Info messages use a logger with `l.V(1).Enabled() == true` // To see Debug messages use a logger with `l.V(5).Enabled() == true`. func SetLogger(l logr.Logger) { - globalLoggerLock.Lock() - defer globalLoggerLock.Unlock() - globalLogger = l + atomic.StorePointer(&globalLogger, unsafe.Pointer(&l)) +} + +func getLogger() logr.Logger { + return *(*logr.Logger)(atomic.LoadPointer(&globalLogger)) } // Info prints messages about the general state of the API or SDK. // This should usually be less then 5 messages a minute. func Info(msg string, keysAndValues ...interface{}) { - globalLoggerLock.RLock() - defer globalLoggerLock.RUnlock() - globalLogger.V(1).Info(msg, keysAndValues...) + getLogger().V(1).Info(msg, keysAndValues...) } // Error prints messages about exceptional states of the API or SDK. func Error(err error, msg string, keysAndValues ...interface{}) { - globalLoggerLock.RLock() - defer globalLoggerLock.RUnlock() - globalLogger.Error(err, msg, keysAndValues...) + getLogger().Error(err, msg, keysAndValues...) } // Debug prints messages about all internal changes in the API or SDK. func Debug(msg string, keysAndValues ...interface{}) { - globalLoggerLock.RLock() - defer globalLoggerLock.RUnlock() - globalLogger.V(5).Info(msg, keysAndValues...) + getLogger().V(5).Info(msg, keysAndValues...) } diff --git a/internal/tools/go.mod b/internal/tools/go.mod index d9ae24949e3..441ab33e3dc 100644 --- a/internal/tools/go.mod +++ b/internal/tools/go.mod @@ -13,7 +13,7 @@ require ( go.opentelemetry.io/build-tools/dbotconf v0.3.0 go.opentelemetry.io/build-tools/multimod v0.3.0 go.opentelemetry.io/build-tools/semconvgen v0.3.0 - golang.org/x/tools v0.3.0 + golang.org/x/tools v0.4.0 ) require ( @@ -188,7 +188,7 @@ require ( golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/exp/typeparams v0.0.0-20220827204233-334a2380cb91 // indirect golang.org/x/mod v0.7.0 // indirect - golang.org/x/net v0.2.0 // indirect + golang.org/x/net v0.3.0 // indirect golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.3.0 // indirect golang.org/x/text v0.5.0 // indirect diff --git a/internal/tools/go.sum b/internal/tools/go.sum index 4c2f1b835ed..4db947ba629 100644 --- a/internal/tools/go.sum +++ b/internal/tools/go.sum @@ -730,8 +730,8 @@ golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.2.0 h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU= -golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= +golang.org/x/net v0.3.0 h1:VWL6FNY2bEEmsGVKabSlHu5Irp34xmMRoqb/9lF9lxk= +golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -822,7 +822,7 @@ golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.2.0 h1:z85xZCsEl7bi/KwbNADeBYoOP0++7W1ipu+aGnpwzRM= +golang.org/x/term v0.3.0 h1:qoo4akIqOcDME5bhc/NgxUdovd6BSS2uMsVjB56q1xI= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -917,8 +917,8 @@ golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/tools v0.1.11/go.mod h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.3.0 h1:SrNbZl6ECOS1qFzgTdQfWXZM9XBkiA6tkFrH9YSTPHM= -golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k= +golang.org/x/tools v0.4.0 h1:7mTAgkunk3fr4GAloyyCasadO6h9zSsQZbwvcaIciV4= +golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/metric/example_test.go b/metric/example_test.go index bc94d7572ef..dc22989f627 100644 --- a/metric/example_test.go +++ b/metric/example_test.go @@ -61,7 +61,7 @@ func ExampleMeter_asynchronous_single() { panic(err) } - err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage}, + _, err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage}, func(ctx context.Context) { // instrument.WithCallbackFunc(func(ctx context.Context) { //Do Work to get the real memoryUsage @@ -86,7 +86,7 @@ func ExampleMeter_asynchronous_multiple() { gcCount, _ := meter.AsyncInt64().Counter("gcCount") gcPause, _ := meter.SyncFloat64().Histogram("gcPause") - err := meter.RegisterCallback([]instrument.Asynchronous{ + _, err := meter.RegisterCallback([]instrument.Asynchronous{ heapAlloc, gcCount, }, diff --git a/metric/internal/global/meter.go b/metric/internal/global/meter.go index 0fa924f397c..e8c83578459 100644 --- a/metric/internal/global/meter.go +++ b/metric/internal/global/meter.go @@ -15,6 +15,7 @@ package global // import "go.opentelemetry.io/otel/metric/internal/global" import ( + "container/list" "context" "sync" "sync/atomic" @@ -109,7 +110,8 @@ type meter struct { mtx sync.Mutex instruments []delegatedInstrument - callbacks []delegatedCallback + + registry list.List delegate atomic.Value // metric.Meter } @@ -135,12 +137,14 @@ func (m *meter) setDelegate(provider metric.MeterProvider) { inst.setDelegate(meter) } - for _, callback := range m.callbacks { - callback.setDelegate(meter) + for e := m.registry.Front(); e != nil; e = e.Next() { + r := e.Value.(*registration) + r.setDelegate(meter) + m.registry.Remove(e) } m.instruments = nil - m.callbacks = nil + m.registry.Init() } // AsyncInt64 is the namespace for the Asynchronous Integer instruments. @@ -167,20 +171,24 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { // // It is only valid to call Observe within the scope of the passed function, // and only on the instruments that were registered with this call. -func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error { +func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { insts = unwrapInstruments(insts) - return del.RegisterCallback(insts, function) + return del.RegisterCallback(insts, f) } m.mtx.Lock() defer m.mtx.Unlock() - m.callbacks = append(m.callbacks, delegatedCallback{ - instruments: insts, - function: function, - }) - return nil + reg := ®istration{instruments: insts, function: f} + e := m.registry.PushBack(reg) + reg.unreg = func() error { + m.mtx.Lock() + _ = m.registry.Remove(e) + m.mtx.Unlock() + return nil + } + return reg, nil } type wrapped interface { @@ -217,17 +225,44 @@ func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { return (*sfInstProvider)(m) } -type delegatedCallback struct { +type registration struct { instruments []instrument.Asynchronous function func(context.Context) + + unreg func() error + unregMu sync.Mutex } -func (c *delegatedCallback) setDelegate(m metric.Meter) { +func (c *registration) setDelegate(m metric.Meter) { insts := unwrapInstruments(c.instruments) - err := m.RegisterCallback(insts, c.function) + + c.unregMu.Lock() + defer c.unregMu.Unlock() + + if c.unreg == nil { + // Unregister already called. + return + } + + reg, err := m.RegisterCallback(insts, c.function) if err != nil { otel.Handle(err) } + + c.unreg = reg.Unregister +} + +func (c *registration) Unregister() error { + c.unregMu.Lock() + defer c.unregMu.Unlock() + if c.unreg == nil { + // Unregister already called. + return nil + } + + var err error + err, c.unreg = c.unreg(), nil + return err } type afInstProvider meter diff --git a/metric/internal/global/meter_test.go b/metric/internal/global/meter_test.go index 8865f06d57b..15a0bf877af 100644 --- a/metric/internal/global/meter_test.go +++ b/metric/internal/global/meter_test.go @@ -68,7 +68,7 @@ func TestMeterRace(t *testing.T) { _, _ = mtr.SyncInt64().Counter(name) _, _ = mtr.SyncInt64().UpDownCounter(name) _, _ = mtr.SyncInt64().Histogram(name) - _ = mtr.RegisterCallback(nil, func(ctx context.Context) {}) + _, _ = mtr.RegisterCallback(nil, func(ctx context.Context) {}) if !once { wg.Done() once = true @@ -86,6 +86,35 @@ func TestMeterRace(t *testing.T) { close(finish) } +func TestUnregisterRace(t *testing.T) { + mtr := &meter{} + reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) {}) + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(1) + finish := make(chan struct{}) + go func() { + for i, once := 0, false; ; i++ { + _ = reg.Unregister() + if !once { + wg.Done() + once = true + } + select { + case <-finish: + return + default: + } + } + }() + _ = reg.Unregister() + + wg.Wait() + mtr.setDelegate(metric.NewNoopMeterProvider()) + close(finish) +} + func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Counter, asyncfloat64.Counter) { afcounter, err := m.AsyncFloat64().Counter("test_Async_Counter") require.NoError(t, err) @@ -101,9 +130,10 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Coun _, err = m.AsyncInt64().Gauge("test_Async_Gauge") assert.NoError(t, err) - require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) { afcounter.Observe(ctx, 3) - })) + }) + require.NoError(t, err) sfcounter, err := m.SyncFloat64().Counter("test_Async_Counter") require.NoError(t, err) @@ -257,3 +287,51 @@ func TestMeterDefersDelegations(t *testing.T) { assert.IsType(t, &afCounter{}, actr) assert.Equal(t, 1, mp.count) } + +func TestRegistrationDelegation(t *testing.T) { + // globalMeterProvider := otel.GetMeterProvider + globalMeterProvider := &meterProvider{} + + m := globalMeterProvider.Meter("go.opentelemetry.io/otel/metric/internal/global/meter_test") + require.IsType(t, &meter{}, m) + mImpl := m.(*meter) + + actr, err := m.AsyncFloat64().Counter("test_Async_Counter") + require.NoError(t, err) + + var called0 bool + reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) { + called0 = true + }) + require.NoError(t, err) + require.Equal(t, 1, mImpl.registry.Len(), "callback not registered") + // This means reg0 should not be delegated. + assert.NoError(t, reg0.Unregister()) + assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered") + + var called1 bool + reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) { + called1 = true + }) + require.NoError(t, err) + require.Equal(t, 1, mImpl.registry.Len(), "second callback not registered") + + mp := &testMeterProvider{} + + // otel.SetMeterProvider(mp) + globalMeterProvider.setDelegate(mp) + + testCollect(t, m) // This is a hacky way to emulate a read from an exporter + require.False(t, called0, "pre-delegation unregistered callback called") + require.True(t, called1, "callback not called") + + called1 = false + assert.NoError(t, reg1.Unregister(), "unregister second callback") + + testCollect(t, m) // This is a hacky way to emulate a read from an exporter + assert.False(t, called1, "unregistered callback called") + + assert.NotPanics(t, func() { + assert.NoError(t, reg1.Unregister(), "duplicate unregister calls") + }) +} diff --git a/metric/internal/global/meter_types_test.go b/metric/internal/global/meter_types_test.go index ac6e93ebe38..53dfbc7528d 100644 --- a/metric/internal/global/meter_types_test.go +++ b/metric/internal/global/meter_types_test.go @@ -64,8 +64,21 @@ func (m *testMeter) AsyncFloat64() asyncfloat64.InstrumentProvider { // // It is only valid to call Observe within the scope of the passed function, // and only on the instruments that were registered with this call. -func (m *testMeter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error { - m.callbacks = append(m.callbacks, function) +func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { + m.callbacks = append(m.callbacks, f) + return testReg{ + f: func(idx int) func() { + return func() { m.callbacks[idx] = nil } + }(len(m.callbacks) - 1), + }, nil +} + +type testReg struct { + f func() +} + +func (r testReg) Unregister() error { + r.f() return nil } @@ -85,6 +98,10 @@ func (m *testMeter) SyncFloat64() syncfloat64.InstrumentProvider { func (m *testMeter) collect() { ctx := context.Background() for _, f := range m.callbacks { + if f == nil { + // Unregister. + continue + } f(ctx) } } diff --git a/metric/meter.go b/metric/meter.go index 23e6853afbb..3a505264ca0 100644 --- a/metric/meter.go +++ b/metric/meter.go @@ -51,14 +51,30 @@ type Meter interface { // To Observe data with instruments it must be registered in a callback. AsyncFloat64() asyncfloat64.InstrumentProvider - // RegisterCallback captures the function that will be called during Collect. - // - // It is only valid to call Observe within the scope of the passed function, - // and only on the instruments that were registered with this call. - RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error - // SyncInt64 is the namespace for the Synchronous Integer instruments SyncInt64() syncint64.InstrumentProvider // SyncFloat64 is the namespace for the Synchronous Float instruments SyncFloat64() syncfloat64.InstrumentProvider + + // RegisterCallback registers f to be called during the collection of a + // measurement cycle. + // + // If Unregister of the returned Registration is called, f needs to be + // unregistered and not called during collection. + // + // The instruments f is registered with are the only instruments that f may + // observe values for. + // + // If no instruments are passed, f should not be registered nor called + // during collection. + RegisterCallback(instruments []instrument.Asynchronous, f func(context.Context)) (Registration, error) +} + +// Registration is an token representing the unique registration of a callback +// for a set of instruments with a Meter. +type Registration interface { + // Unregister removes the callback registration from a Meter. + // + // This method needs to be idempotent and concurrent safe. + Unregister() error } diff --git a/metric/noop.go b/metric/noop.go index e8b9a9a1458..7454a790337 100644 --- a/metric/noop.go +++ b/metric/noop.go @@ -64,10 +64,14 @@ func (noopMeter) SyncFloat64() syncfloat64.InstrumentProvider { } // RegisterCallback creates a register callback that does not record any metrics. -func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) error { - return nil +func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) (Registration, error) { + return noopReg{}, nil } +type noopReg struct{} + +func (noopReg) Unregister() error { return nil } + type nonrecordingAsyncFloat64Instrument struct { instrument.Asynchronous } diff --git a/sdk/metric/config.go b/sdk/metric/config.go index c78b0416415..c837df8b76f 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -54,14 +54,19 @@ func unify(funcs []func(context.Context) error) func(context.Context) error { errs = append(errs, err) } } - switch len(errs) { - case 0: - return nil - case 1: - return errs[0] - default: - return fmt.Errorf("%v", errs) - } + return unifyErrors(errs) + } +} + +// unifyErrors combines multiple errors into a single error. +func unifyErrors(errs []error) error { + switch len(errs) { + case 0: + return nil + case 1: + return errs[0] + default: + return fmt.Errorf("%v", errs) } } diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index a924d879d00..dc5eff2eee2 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -28,12 +28,13 @@ import ( ) type reader struct { - producer producer - temporalityFunc TemporalitySelector - aggregationFunc AggregationSelector - collectFunc func(context.Context) (metricdata.ResourceMetrics, error) - forceFlushFunc func(context.Context) error - shutdownFunc func(context.Context) error + producer sdkProducer + externalProducers []Producer + temporalityFunc TemporalitySelector + aggregationFunc AggregationSelector + collectFunc func(context.Context) (metricdata.ResourceMetrics, error) + forceFlushFunc func(context.Context) error + shutdownFunc func(context.Context) error } var _ Reader = (*reader)(nil) @@ -42,7 +43,8 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n return r.aggregationFunc(kind) } -func (r *reader) register(p producer) { r.producer = p } +func (r *reader) register(p sdkProducer) { r.producer = p } +func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.externalProducers, p) } func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality { return r.temporalityFunc(kind) } diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 0ebfadf33a3..48a8b291e77 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -28,9 +28,13 @@ import ( // manualReader is a simple Reader that allows an application to // read metrics on demand. type manualReader struct { - producer atomic.Value + sdkProducer atomic.Value shutdownOnce sync.Once + mu sync.Mutex + isShutdown bool + externalProducers atomic.Value + temporalitySelector TemporalitySelector aggregationSelector AggregationSelector } @@ -41,22 +45,39 @@ var _ = map[Reader]struct{}{&manualReader{}: {}} // NewManualReader returns a Reader which is directly called to collect metrics. func NewManualReader(opts ...ManualReaderOption) Reader { cfg := newManualReaderConfig(opts) - return &manualReader{ + r := &manualReader{ temporalitySelector: cfg.temporalitySelector, aggregationSelector: cfg.aggregationSelector, } + r.externalProducers.Store([]Producer{}) + return r } -// register stores the Producer which enables the caller to read -// metrics on demand. -func (mr *manualReader) register(p producer) { +// register stores the sdkProducer which enables the caller +// to read metrics from the SDK on demand. +func (mr *manualReader) register(p sdkProducer) { // Only register once. If producer is already set, do nothing. - if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { + if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { msg := "did not register manual reader" global.Error(errDuplicateRegister, msg) } } +// RegisterProducer stores the external Producer which enables the caller +// to read metrics on demand. +func (mr *manualReader) RegisterProducer(p Producer) { + mr.mu.Lock() + defer mr.mu.Unlock() + if mr.isShutdown { + return + } + currentProducers := mr.externalProducers.Load().([]Producer) + newProducers := []Producer{} + newProducers = append(newProducers, currentProducers...) + newProducers = append(newProducers, p) + mr.externalProducers.Store(newProducers) +} + // temporality reports the Temporality for the instrument kind provided. func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality { return mr.temporalitySelector(kind) @@ -77,18 +98,23 @@ func (mr *manualReader) Shutdown(context.Context) error { err := ErrReaderShutdown mr.shutdownOnce.Do(func() { // Any future call to Collect will now return ErrReaderShutdown. - mr.producer.Store(produceHolder{ + mr.sdkProducer.Store(produceHolder{ produce: shutdownProducer{}.produce, }) + mr.mu.Lock() + defer mr.mu.Unlock() + mr.isShutdown = true + // release references to Producer(s) + mr.externalProducers.Store([]Producer{}) err = nil }) return err } -// Collect gathers all metrics from the SDK, calling any callbacks necessary. -// Collect will return an error if called after shutdown. +// Collect gathers all metrics from the SDK and other Producers, calling any +// callbacks necessary. Collect will return an error if called after shutdown. func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - p := mr.producer.Load() + p := mr.sdkProducer.Load() if p == nil { return metricdata.ResourceMetrics{}, ErrReaderNotRegistered } @@ -103,7 +129,19 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics return metricdata.ResourceMetrics{}, err } - return ph.produce(ctx) + rm, err := ph.produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + var errs []error + for _, producer := range mr.externalProducers.Load().([]Producer) { + externalMetrics, err := producer.Produce(ctx) + if err != nil { + errs = append(errs, err) + } + rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) + } + return rm, unifyErrors(errs) } // manualReaderConfig contains configuration options for a ManualReader. diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 418827e9672..c2c515af35c 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -69,7 +69,7 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { // RegisterCallback registers the function f to be called when any of the // insts Collect method is called. -func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error { +func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { for _, inst := range insts { // Only register if at least one instrument has a non-drop aggregation. // Otherwise, calling f during collection will be wasted computation. @@ -91,14 +91,21 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context } } // All insts use drop aggregation. - return nil + return noopRegister{}, nil } -func (m *meter) registerCallback(f func(context.Context)) error { - m.pipes.registerCallback(f) +type noopRegister struct{} + +func (noopRegister) Unregister() error { return nil } +type callback func(context.Context) + +func (m *meter) registerCallback(c callback) (metric.Registration, error) { + return m.pipes.registerCallback(c), nil +} + // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { return syncInt64Provider{m.instProviderInt64} diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 013a52e5924..d904b118ad4 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -103,11 +103,63 @@ func TestMeterCallbackCreationConcurrency(t *testing.T) { m := NewMeterProvider().Meter("callback-concurrency") go func() { - _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + _, _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) wg.Done() }() go func() { - _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + _, _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + wg.Done() + }() + wg.Wait() +} + +func TestNoopCallbackUnregisterConcurrency(t *testing.T) { + m := NewMeterProvider().Meter("noop-unregister-concurrency") + reg, err := m.RegisterCallback(nil, func(ctx context.Context) {}) + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + _ = reg.Unregister() + wg.Done() + }() + go func() { + _ = reg.Unregister() + wg.Done() + }() + wg.Wait() +} + +func TestCallbackUnregisterConcurrency(t *testing.T) { + reader := NewManualReader() + provider := NewMeterProvider(WithReader(reader)) + meter := provider.Meter("unregister-concurrency") + + actr, err := meter.AsyncFloat64().Counter("counter") + require.NoError(t, err) + + ag, err := meter.AsyncInt64().Gauge("gauge") + require.NoError(t, err) + + i := []instrument.Asynchronous{actr} + regCtr, err := meter.RegisterCallback(i, func(ctx context.Context) {}) + require.NoError(t, err) + + i = []instrument.Asynchronous{ag} + regG, err := meter.RegisterCallback(i, func(ctx context.Context) {}) + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + _ = regCtr.Unregister() + _ = regG.Unregister() + wg.Done() + }() + go func() { + _ = regCtr.Unregister() + _ = regG.Unregister() wg.Done() }() wg.Wait() @@ -126,7 +178,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { ctr, err := m.AsyncInt64().Counter("aint") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 3) }) assert.NoError(t, err) @@ -150,7 +202,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { ctr, err := m.AsyncInt64().UpDownCounter("aint") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 11) }) assert.NoError(t, err) @@ -174,7 +226,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { gauge, err := m.AsyncInt64().Gauge("agauge") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { gauge.Observe(ctx, 11) }) assert.NoError(t, err) @@ -196,7 +248,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { ctr, err := m.AsyncFloat64().Counter("afloat") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 3) }) assert.NoError(t, err) @@ -220,7 +272,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { ctr, err := m.AsyncFloat64().UpDownCounter("afloat") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 11) }) assert.NoError(t, err) @@ -244,7 +296,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { gauge, err := m.AsyncFloat64().Gauge("agauge") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { gauge.Observe(ctx, 11) }) assert.NoError(t, err) @@ -418,7 +470,7 @@ func TestMetersProvideScope(t *testing.T) { m1 := mp.Meter("scope1") ctr1, err := m1.AsyncFloat64().Counter("ctr1") assert.NoError(t, err) - err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) { + _, err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) { ctr1.Observe(ctx, 5) }) assert.NoError(t, err) @@ -426,7 +478,7 @@ func TestMetersProvideScope(t *testing.T) { m2 := mp.Meter("scope2") ctr2, err := m2.AsyncInt64().Counter("ctr2") assert.NoError(t, err) - err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) { + _, err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) { ctr2.Observe(ctx, 7) }) assert.NoError(t, err) @@ -480,6 +532,53 @@ func TestMetersProvideScope(t *testing.T) { metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) } +func TestUnregisterUnregisters(t *testing.T) { + r := NewManualReader() + mp := NewMeterProvider(WithReader(r)) + m := mp.Meter("TestUnregisterUnregisters") + + int64Counter, err := m.AsyncInt64().Counter("int64.counter") + require.NoError(t, err) + + int64UpDownCounter, err := m.AsyncInt64().UpDownCounter("int64.up_down_counter") + require.NoError(t, err) + + int64Gauge, err := m.AsyncInt64().Gauge("int64.gauge") + require.NoError(t, err) + + floag64Counter, err := m.AsyncFloat64().Counter("floag64.counter") + require.NoError(t, err) + + floag64UpDownCounter, err := m.AsyncFloat64().UpDownCounter("floag64.up_down_counter") + require.NoError(t, err) + + floag64Gauge, err := m.AsyncFloat64().Gauge("floag64.gauge") + require.NoError(t, err) + + var called bool + reg, err := m.RegisterCallback([]instrument.Asynchronous{ + int64Counter, + int64UpDownCounter, + int64Gauge, + floag64Counter, + floag64UpDownCounter, + floag64Gauge, + }, func(context.Context) { called = true }) + require.NoError(t, err) + + ctx := context.Background() + _, err = r.Collect(ctx) + require.NoError(t, err) + assert.True(t, called, "callback not called for registered callback") + + called = false + require.NoError(t, reg.Unregister(), "unregister") + + _, err = r.Collect(ctx) + require.NoError(t, err) + assert.False(t, called, "callback called for unregistered callback") +} + func TestRegisterCallbackDropAggregations(t *testing.T) { aggFn := func(InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} @@ -507,14 +606,15 @@ func TestRegisterCallbackDropAggregations(t *testing.T) { require.NoError(t, err) var called bool - require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{ + _, err = m.RegisterCallback([]instrument.Asynchronous{ int64Counter, int64UpDownCounter, int64Gauge, floag64Counter, floag64UpDownCounter, floag64Gauge, - }, func(context.Context) { called = true })) + }, func(context.Context) { called = true }) + require.NoError(t, err) data, err := r.Collect(context.Background()) require.NoError(t, err) @@ -538,10 +638,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "afcounter", @@ -564,10 +665,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "afupdowncounter", @@ -590,10 +692,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "afgauge", @@ -614,10 +717,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "aicounter", @@ -640,10 +744,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "aiupdowncounter", @@ -666,10 +771,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "aigauge", diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 00ba1305595..8425e42e16a 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade cancel: cancel, done: make(chan struct{}), } + r.externalProducers.Store([]Producer{}) go func() { defer func() { close(r.done) }() @@ -126,7 +127,11 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade // periodicReader is a Reader that continuously collects and exports metric // data at a set interval. type periodicReader struct { - producer atomic.Value + sdkProducer atomic.Value + + mu sync.Mutex + isShutdown bool + externalProducers atomic.Value timeout time.Duration exporter Exporter @@ -166,14 +171,28 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) { } // register registers p as the producer of this reader. -func (r *periodicReader) register(p producer) { +func (r *periodicReader) register(p sdkProducer) { // Only register once. If producer is already set, do nothing. - if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { + if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { msg := "did not register periodic reader" global.Error(errDuplicateRegister, msg) } } +// RegisterProducer registers p as an external Producer of this reader. +func (r *periodicReader) RegisterProducer(p Producer) { + r.mu.Lock() + defer r.mu.Unlock() + if r.isShutdown { + return + } + currentProducers := r.externalProducers.Load().([]Producer) + newProducers := []Producer{} + newProducers = append(newProducers, currentProducers...) + newProducers = append(newProducers, p) + r.externalProducers.Store(newProducers) +} + // temporality reports the Temporality for the instrument kind provided. func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality { return r.exporter.Temporality(kind) @@ -195,12 +214,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error { } // Collect gathers and returns all metric data related to the Reader from -// the SDK. The returned metric data is not exported to the configured -// exporter, it is left to the caller to handle that if desired. +// the SDK and other Producers. The returned metric data is not exported +// to the configured exporter, it is left to the caller to handle that if +// desired. // // An error is returned if this is called after Shutdown. func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - return r.collect(ctx, r.producer.Load()) + return r.collect(ctx, r.sdkProducer.Load()) } // collect unwraps p as a produceHolder and returns its produce results. @@ -218,7 +238,20 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata err := fmt.Errorf("periodic reader: invalid producer: %T", p) return metricdata.ResourceMetrics{}, err } - return ph.produce(ctx) + + rm, err := ph.produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + var errs []error + for _, producer := range r.externalProducers.Load().([]Producer) { + externalMetrics, err := producer.Produce(ctx) + if err != nil { + errs = append(errs, err) + } + rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) + } + return rm, unifyErrors(errs) } // export exports metric data m using r's exporter. @@ -259,7 +292,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error { <-r.done // Any future call to Collect will now return ErrReaderShutdown. - ph := r.producer.Swap(produceHolder{ + ph := r.sdkProducer.Swap(produceHolder{ produce: shutdownProducer{}.produce, }) @@ -276,6 +309,12 @@ func (r *periodicReader) Shutdown(ctx context.Context) error { if err == nil || err == ErrReaderShutdown { err = sErr } + + r.mu.Lock() + defer r.mu.Unlock() + r.isShutdown = true + // release references to Producer(s) + r.externalProducers.Store([]Producer{}) }) return err } diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index d48c1a7de8e..138aae48944 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -114,7 +114,8 @@ func (ts *periodicReaderTestSuite) SetupTest() { } ts.ErrReader = NewPeriodicReader(e) - ts.ErrReader.register(testProducer{}) + ts.ErrReader.register(testSDKProducer{}) + ts.ErrReader.RegisterProducer(testExternalProducer{}) } func (ts *periodicReaderTestSuite) TearDownTest() { @@ -186,14 +187,15 @@ func TestPeriodicReaderRun(t *testing.T) { exp := &fnExporter{ exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { - // The testProducer produces testMetrics. - assert.Equal(t, testMetrics, m) + // The testSDKProducer produces testResourceMetricsAB. + assert.Equal(t, testResourceMetricsAB, m) return assert.AnError }, } r := NewPeriodicReader(exp) - r.register(testProducer{}) + r.register(testSDKProducer{}) + r.RegisterProducer(testExternalProducer{}) trigger <- time.Now() assert.Equal(t, assert.AnError, <-eh.Err) @@ -210,8 +212,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { called = new(bool) return &fnExporter{ exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { - // The testProducer produces testMetrics. - assert.Equal(t, testMetrics, m) + // The testSDKProducer produces testResourceMetricsA. + assert.Equal(t, testResourceMetricsAB, m) *called = true return assert.AnError }, @@ -221,7 +223,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush", func(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp) - r.register(testProducer{}) + r.register(testSDKProducer{}) + r.RegisterProducer(testExternalProducer{}) assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") @@ -232,7 +235,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown", func(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp) - r.register(testProducer{}) + r.register(testSDKProducer{}) + r.RegisterProducer(testExternalProducer{}) assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") }) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index bc6901e5775..f9938bf617f 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -15,6 +15,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( + "container/list" "context" "errors" "fmt" @@ -22,6 +23,7 @@ import ( "sync" "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -75,7 +77,7 @@ type pipeline struct { sync.Mutex aggregations map[instrumentation.Scope][]instrumentSync - callbacks []func(context.Context) + callbacks list.List } // addSync adds the instrumentSync to pipeline p with scope. This method is not @@ -94,10 +96,15 @@ func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) { } // addCallback registers a callback to be run when `produce()` is called. -func (p *pipeline) addCallback(callback func(context.Context)) { +func (p *pipeline) addCallback(c callback) (unregister func()) { p.Lock() defer p.Unlock() - p.callbacks = append(p.callbacks, callback) + e := p.callbacks.PushBack(c) + return func() { + p.Lock() + p.callbacks.Remove(e) + p.Unlock() + } } // callbackKey is a context key type used to identify context that came from the SDK. @@ -112,14 +119,15 @@ const produceKey callbackKey = 0 // // This method is safe to call concurrently. func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { + ctx = context.WithValue(ctx, produceKey, struct{}{}) + p.Lock() defer p.Unlock() - ctx = context.WithValue(ctx, produceKey, struct{}{}) - - for _, callback := range p.callbacks { + for e := p.callbacks.Front(); e != nil; e = e.Next() { // TODO make the callbacks parallel. ( #3034 ) - callback(ctx) + f := e.Value.(callback) + f(ctx) if err := ctx.Err(); err != nil { // This means the context expired before we finished running callbacks. return metricdata.ResourceMetrics{}, err @@ -439,10 +447,21 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli return pipes } -func (p pipelines) registerCallback(fn func(context.Context)) { - for _, pipe := range p { - pipe.addCallback(fn) +func (p pipelines) registerCallback(c callback) metric.Registration { + unregs := make([]func(), len(p)) + for i, pipe := range p { + unregs[i] = pipe.addCallback(c) + } + return unregisterFuncs(unregs) +} + +type unregisterFuncs []func() + +func (u unregisterFuncs) Unregister() error { + for _, f := range u { + f() } + return nil } // resolver facilitates resolving Aggregators an instrument needs to aggregate diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index aa9d50ef666..c52cc58dff2 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -51,7 +51,12 @@ type Reader interface { // register registers a Reader with a MeterProvider. // The producer argument allows the Reader to signal the sdk to collect // and send aggregated metric measurements. - register(producer) + register(sdkProducer) + + // RegisterProducer registers a an external Producer with this Reader. + // The Producer is used as a source of aggregated metric data which is + // incorporated into metrics collected from the SDK. + RegisterProducer(Producer) // temporality reports the Temporality for the instrument kind provided. temporality(InstrumentKind) metricdata.Temporality @@ -84,14 +89,22 @@ type Reader interface { Shutdown(context.Context) error } -// producer produces metrics for a Reader. -type producer interface { +// sdkProducer produces metrics for a Reader. +type sdkProducer interface { // produce returns aggregated metrics from a single collection. // // This method is safe to call concurrently. produce(context.Context) (metricdata.ResourceMetrics, error) } +// Producer produces metrics for a Reader from an external source. +type Producer interface { + // Produce returns aggregated metrics from an external source. + // + // This method should be safe to call concurrently. + Produce(context.Context) ([]metricdata.ScopeMetrics, error) +} + // produceHolder is used as an atomic.Value to wrap the non-concrete producer // type. type produceHolder struct { diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 28b249bd3e2..191ab39945b 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -57,16 +57,25 @@ func (ts *readerTestSuite) TestErrorForNotRegistered() { ts.ErrorIs(err, ErrReaderNotRegistered) } -func (ts *readerTestSuite) TestProducer() { - ts.Reader.register(testProducer{}) +func (ts *readerTestSuite) TestSDKProducer() { + ts.Reader.register(testSDKProducer{}) m, err := ts.Reader.Collect(context.Background()) ts.NoError(err) - ts.Equal(testMetrics, m) + ts.Equal(testResourceMetricsA, m) +} + +func (ts *readerTestSuite) TestExternalProducer() { + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) + m, err := ts.Reader.Collect(context.Background()) + ts.NoError(err) + ts.Equal(testResourceMetricsAB, m) } func (ts *readerTestSuite) TestCollectAfterShutdown() { ctx := context.Background() - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) m, err := ts.Reader.Collect(ctx) @@ -76,27 +85,29 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() { func (ts *readerTestSuite) TestShutdownTwice() { ctx := context.Background() - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown) } func (ts *readerTestSuite) TestMultipleForceFlush() { ctx := context.Background() - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.ForceFlush(ctx)) ts.NoError(ts.Reader.ForceFlush(ctx)) } func (ts *readerTestSuite) TestMultipleRegister() { - p0 := testProducer{ + p0 := testSDKProducer{ produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) { // Differentiate this producer from the second by returning an // error. - return testMetrics, assert.AnError + return testResourceMetricsA, assert.AnError }, } - p1 := testProducer{} + p1 := testSDKProducer{} ts.Reader.register(p0) // This should be ignored. @@ -106,11 +117,46 @@ func (ts *readerTestSuite) TestMultipleRegister() { ts.Equal(assert.AnError, err) } +func (ts *readerTestSuite) TestExternalProducerPartialSuccess() { + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer( + testExternalProducer{ + produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + return []metricdata.ScopeMetrics{}, assert.AnError + }, + }, + ) + ts.Reader.RegisterProducer( + testExternalProducer{ + produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + return []metricdata.ScopeMetrics{testScopeMetricsB}, nil + }, + }, + ) + + m, err := ts.Reader.Collect(context.Background()) + ts.Equal(assert.AnError, err) + ts.Equal(testResourceMetricsAB, m) +} + +func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() { + ts.Reader.register(testSDKProducer{ + produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) { + return metricdata.ResourceMetrics{}, assert.AnError + }}) + ts.Reader.RegisterProducer(testExternalProducer{}) + + m, err := ts.Reader.Collect(context.Background()) + ts.Equal(assert.AnError, err) + ts.Equal(metricdata.ResourceMetrics{}, m) +} + func (ts *readerTestSuite) TestMethodConcurrency() { // Requires the race-detector (a default test option for the project). // All reader methods should be concurrent-safe. - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) ctx := context.Background() var wg sync.WaitGroup @@ -141,49 +187,85 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() { ctx := context.Background() ts.Require().NoError(ts.Reader.Shutdown(ctx)) // Registering after shutdown should not revert the shutdown. - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) m, err := ts.Reader.Collect(ctx) ts.ErrorIs(err, ErrReaderShutdown) ts.Equal(metricdata.ResourceMetrics{}, m) } -var testMetrics = metricdata.ResourceMetrics{ - Resource: resource.NewSchemaless(attribute.String("test", "Reader")), - ScopeMetrics: []metricdata.ScopeMetrics{{ - Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"}, - Metrics: []metricdata.Metrics{{ - Name: "fake data", - Description: "Data used to test a reader", - Unit: unit.Dimensionless, - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{{ - Attributes: attribute.NewSet(attribute.String("user", "alice")), - StartTime: time.Now(), - Time: time.Now().Add(time.Second), - Value: -1, - }}, - }, - }}, +var testScopeMetricsA = metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"}, + Metrics: []metricdata.Metrics{{ + Name: "fake data", + Description: "Data used to test a reader", + Unit: unit.Dimensionless, + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("user", "alice")), + StartTime: time.Now(), + Time: time.Now().Add(time.Second), + Value: -1, + }}, + }, + }}, +} + +var testScopeMetricsB = metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "sdk/metric/test/reader/external"}, + Metrics: []metricdata.Metrics{{ + Name: "fake scope data", + Description: "Data used to test a Producer reader", + Unit: unit.Milliseconds, + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("user", "ben")), + StartTime: time.Now(), + Time: time.Now().Add(time.Second), + Value: 10, + }}, + }, }}, } -type testProducer struct { +var testResourceMetricsA = metricdata.ResourceMetrics{ + Resource: resource.NewSchemaless(attribute.String("test", "Reader")), + ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsA}, +} + +var testResourceMetricsAB = metricdata.ResourceMetrics{ + Resource: resource.NewSchemaless(attribute.String("test", "Reader")), + ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsA, testScopeMetricsB}, +} + +type testSDKProducer struct { produceFunc func(context.Context) (metricdata.ResourceMetrics, error) } -func (p testProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { +func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { + if p.produceFunc != nil { + return p.produceFunc(ctx) + } + return testResourceMetricsA, nil +} + +type testExternalProducer struct { + produceFunc func(context.Context) ([]metricdata.ScopeMetrics, error) +} + +func (p testExternalProducer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { if p.produceFunc != nil { return p.produceFunc(ctx) } - return testMetrics, nil + return []metricdata.ScopeMetrics{testScopeMetricsB}, nil } func benchReaderCollectFunc(r Reader) func(*testing.B) { ctx := context.Background() - r.register(testProducer{}) + r.register(testSDKProducer{}) // Store bechmark results in a closure to prevent the compiler from // inlining and skipping the function. @@ -198,7 +280,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) { for n := 0; n < b.N; n++ { collectedMetrics, err = r.Collect(ctx) - assert.Equalf(b, testMetrics, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err) + assert.Equalf(b, testResourceMetricsA, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err) } } } diff --git a/sdk/trace/sampling.go b/sdk/trace/sampling.go index a6dcf4b307c..ae63c7b6a66 100644 --- a/sdk/trace/sampling.go +++ b/sdk/trace/sampling.go @@ -163,10 +163,10 @@ func NeverSample() Sampler { // the root(Sampler) is used to make sampling decision. If the span has // a parent, depending on whether the parent is remote and whether it // is sampled, one of the following samplers will apply: -// - remoteParentSampled(Sampler) (default: AlwaysOn) -// - remoteParentNotSampled(Sampler) (default: AlwaysOff) -// - localParentSampled(Sampler) (default: AlwaysOn) -// - localParentNotSampled(Sampler) (default: AlwaysOff) +// - remoteParentSampled(Sampler) (default: AlwaysOn) +// - remoteParentNotSampled(Sampler) (default: AlwaysOff) +// - localParentSampled(Sampler) (default: AlwaysOn) +// - localParentNotSampled(Sampler) (default: AlwaysOff) func ParentBased(root Sampler, samplers ...ParentBasedSamplerOption) Sampler { return parentBased{ root: root,