Skip to content

Commit

Permalink
Merge branch 'master' into otel-jvm-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Mar 31, 2021
2 parents 9f2f373 + efb4c16 commit fdce970
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 31 deletions.
11 changes: 8 additions & 3 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,24 @@ apm-server:

#-- General RUM settings

# A list of service names to allow, to limit service-specific indices and data streams
# created for unauthenticated RUM events.
# If the list is empty, any service name is allowed.
#allow_service_names: []

# A list of permitted origins for real user monitoring.
# User-agents will send an origin header that will be validated against this list.
# An origin is made of a protocol scheme, host and port, without the url path.
# Allowed origins in this setting can have * to match anything (eg.: http://*.example.com)
# If an item in the list is a single '*', everything will be allowed.
#allow_origins : ['*']
#allow_origins: ['*']

# A list of Access-Control-Allow-Headers to allow RUM requests, in addition to "Content-Type",
# "Content-Encoding", and "Accept"
#allow_headers : []
#allow_headers: []

# Custom HTTP headers to add to RUM responses, e.g. for security policy compliance.
#response_headers :
#response_headers:
# X-My-Header: Contents of the header

# Regexp to be matched against a stacktrace frame's `file_name` and `abs_path` attributes.
Expand Down
11 changes: 8 additions & 3 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,24 @@ apm-server:

#-- General RUM settings

# A list of service names to allow, to limit service-specific indices and data streams
# created for unauthenticated RUM events.
# If the list is empty, any service name is allowed.
#allow_service_names: []

# A list of permitted origins for real user monitoring.
# User-agents will send an origin header that will be validated against this list.
# An origin is made of a protocol scheme, host and port, without the url path.
# Allowed origins in this setting can have * to match anything (eg.: http://*.example.com)
# If an item in the list is a single '*', everything will be allowed.
#allow_origins : ['*']
#allow_origins: ['*']

# A list of Access-Control-Allow-Headers to allow RUM requests, in addition to "Content-Type",
# "Content-Encoding", and "Accept"
#allow_headers : []
#allow_headers: []

# Custom HTTP headers to add to RUM responses, e.g. for security policy compliance.
#response_headers :
#response_headers:
# X-My-Header: Contents of the header

# Regexp to be matched against a stacktrace frame's `file_name` and `abs_path` attributes.
Expand Down
11 changes: 8 additions & 3 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,24 @@ apm-server:

#-- General RUM settings

# A list of service names to allow, to limit service-specific indices and data streams
# created for unauthenticated RUM events.
# If the list is empty, any service name is allowed.
#allow_service_names: []

# A list of permitted origins for real user monitoring.
# User-agents will send an origin header that will be validated against this list.
# An origin is made of a protocol scheme, host and port, without the url path.
# Allowed origins in this setting can have * to match anything (eg.: http://*.example.com)
# If an item in the list is a single '*', everything will be allowed.
#allow_origins : ['*']
#allow_origins: ['*']

# A list of Access-Control-Allow-Headers to allow RUM requests, in addition to "Content-Type",
# "Content-Encoding", and "Accept"
#allow_headers : []
#allow_headers: []

# Custom HTTP headers to add to RUM responses, e.g. for security policy compliance.
#response_headers :
#response_headers:
# X-My-Header: Contents of the header

# Regexp to be matched against a stacktrace frame's `file_name` and `abs_path` attributes.
Expand Down
1 change: 1 addition & 0 deletions apmpackage/apm/0.2.0/agent/input/template.yml.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ apm-server:
rum:
enabled: {{enable_rum}}
source_mapping.elasticsearch.api_key: {{sourcemap_api_key}}
allow_service_names: {{rum_allow_service_names}}
allow_origins: {{rum_allow_origins}}
allow_headers: {{rum_allow_headers}}
library_pattern: {{rum_library_pattern}}
Expand Down
5 changes: 4 additions & 1 deletion apmpackage/apm/0.2.0/changelog.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# newer versions go on top
- version: "0.2.0"
changes:
- description: Introduce a configurable default service environment
- description: added support for apm-server.rum.allow_service_names
type: enhancement # can be one of: enhancement, bugfix, breaking-change
link: https:/elastic/apm-server/pull/5030
- description: introduce a configurable default service environment
type: enhancement # can be one of: enhancement, bugfix, breaking-change
link: https:/elastic/apm-server/pull/4861
- version: "0.1.0"
Expand Down
7 changes: 7 additions & 0 deletions apmpackage/apm/0.2.0/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ policy_templates:
description: Default service environment to record in events which have no service environment defined.
required: false
show_user: false
- name: rum_allow_service_names
type: text
title: RUM - Allowed Service Names
description: Allowed service names for events sent by RUM agents.
multi: true
required: false
show_user: false
- name: rum_allow_origins
type: text
title: RUM - Origin Headers
Expand Down
10 changes: 6 additions & 4 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ func TestUnpackConfig(t *testing.T) {
"limit": 7200,
"lru_size": 2000,
},
"allow_origins": []string{"example*"},
"allow_headers": []string{"Authorization"},
"allow_service_names": []string{"opbeans-rum"},
"allow_origins": []string{"example*"},
"allow_headers": []string{"Authorization"},
"source_mapping": map[string]interface{}{
"cache": map[string]interface{}{
"expiration": 8 * time.Minute,
Expand Down Expand Up @@ -171,8 +172,9 @@ func TestUnpackConfig(t *testing.T) {
Limit: 7200,
LruSize: 2000,
},
AllowOrigins: []string{"example*"},
AllowHeaders: []string{"Authorization"},
AllowServiceNames: []string{"opbeans-rum"},
AllowOrigins: []string{"example*"},
AllowHeaders: []string{"Authorization"},
SourceMapping: &SourceMapping{
Cache: &Cache{Expiration: 8 * time.Minute},
IndexPattern: "apm-test*",
Expand Down
1 change: 1 addition & 0 deletions beater/config/rum.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
type RumConfig struct {
Enabled *bool `config:"enabled"`
EventRate *EventRate `config:"event_rate"`
AllowServiceNames []string `config:"allow_service_names"`
AllowOrigins []string `config:"allow_origins"`
AllowHeaders []string `config:"allow_headers"`
ResponseHeaders map[string][]string `config:"response_headers"`
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ https:/elastic/apm-server/compare/7.12\...master[View commits]
* Set `client.ip` for events from the Elastic APM iOS agent {pull}4975[4975]
* Calculate service destination metrics for OpenTelemetry spans {pull}4976[4976]
* Add exponential retries to api key and tail sampling requests{pull}4991[4991]
* Add `apm-server.rum.allow_service_names` config {pull}5030[5030]
* Ingest pipeline for translating OpenTelemetry Java metrics to Elastic APM fields {pull}4986[4986]

[float]
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration-rum.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ This setting defines the number of unique IPs that can be tracked in the cache.
Sites with many concurrent clients should consider increasing this limit.
Defaults to 1000.

[float]
[[rum-allow-service-names]]
==== `allow_service_names`
A list of permitted service names for RUM support.
This can be set to restrict RUM events to those with one of a set of known service names,
in order to limit the number of service-specific indices or data streams created.
By default this is not set, allowing any service name.

[float]
[[rum-allow-origins]]
==== `allow_origins`
Expand Down
32 changes: 32 additions & 0 deletions model/modelprocessor/nop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package modelprocessor

import (
"context"

"github.com/elastic/apm-server/model"
)

// Nop is a no-op model.BatchProcessor.
type Nop struct{}

// ProcessBatch does nothing -- just returns nil.
func (Nop) ProcessBatch(ctx context.Context, batch *model.Batch) error {
return nil
}
66 changes: 51 additions & 15 deletions processor/stream/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/elastic/apm-server/model/modeldecoder"
"github.com/elastic/apm-server/model/modeldecoder/rumv3"
v2 "github.com/elastic/apm-server/model/modeldecoder/v2"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/utility"
)
Expand All @@ -59,11 +60,12 @@ const (
type decodeMetadataFunc func(decoder.Decoder, *model.Metadata) error

type Processor struct {
Mconfig modeldecoder.Config
MaxEventSize int
streamReaderPool sync.Pool
decodeMetadata decodeMetadataFunc
isRUM bool
Mconfig modeldecoder.Config
MaxEventSize int
streamReaderPool sync.Pool
decodeMetadata decodeMetadataFunc
isRUM bool
allowedServiceNames map[string]bool
}

func BackendProcessor(cfg *config.Config) *Processor {
Expand All @@ -77,20 +79,33 @@ func BackendProcessor(cfg *config.Config) *Processor {

func RUMV2Processor(cfg *config.Config) *Processor {
return &Processor{
Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental},
MaxEventSize: cfg.MaxEventSize,
decodeMetadata: v2.DecodeNestedMetadata,
isRUM: true,
Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental},
MaxEventSize: cfg.MaxEventSize,
decodeMetadata: v2.DecodeNestedMetadata,
isRUM: true,
allowedServiceNames: makeAllowedServiceNamesMap(cfg.RumConfig.AllowServiceNames),
}
}

func RUMV3Processor(cfg *config.Config) *Processor {
return &Processor{
Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental},
MaxEventSize: cfg.MaxEventSize,
decodeMetadata: rumv3.DecodeNestedMetadata,
isRUM: true,
Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental},
MaxEventSize: cfg.MaxEventSize,
decodeMetadata: rumv3.DecodeNestedMetadata,
isRUM: true,
allowedServiceNames: makeAllowedServiceNamesMap(cfg.RumConfig.AllowServiceNames),
}
}

func makeAllowedServiceNamesMap(allowed []string) map[string]bool {
if len(allowed) == 0 {
return nil
}
m := make(map[string]bool, len(allowed))
for _, name := range allowed {
m[name] = true
}
return m
}

func (p *Processor) readMetadata(reader *streamReader, metadata *model.Metadata) error {
Expand Down Expand Up @@ -288,13 +303,16 @@ func (p *Processor) HandleStream(ctx context.Context, ipRateLimiter *rate.Limite
defer sr.release()

// first item is the metadata object
err := p.readMetadata(sr, meta)
if err != nil {
if err := p.readMetadata(sr, meta); err != nil {
// no point in continuing if we couldn't read the metadata
res.Add(err)
return res
}

var allowedServiceNamesProcessor model.BatchProcessor = modelprocessor.Nop{}
if p.allowedServiceNames != nil {
allowedServiceNamesProcessor = modelprocessor.MetadataProcessorFunc(p.restrictAllowedServiceNames)
}
requestTime := utility.RequestTime(ctx)

sp, ctx := apm.StartSpan(ctx, "Stream", "Reporter")
Expand All @@ -307,6 +325,11 @@ func (p *Processor) HandleStream(ctx context.Context, ipRateLimiter *rate.Limite
if batch.Len() == 0 {
continue
}
if err := allowedServiceNamesProcessor.ProcessBatch(ctx, &batch); err != nil {
res.Add(err)
return res
}

// NOTE(axw) ProcessBatch takes ownership of batch, which means we cannot reuse
// the slice memory. We should investigate alternative interfaces between the
// processor and publisher which would enable better memory reuse, e.g. by using
Expand Down Expand Up @@ -334,6 +357,19 @@ func (p *Processor) HandleStream(ctx context.Context, ipRateLimiter *rate.Limite
return res
}

func (p *Processor) restrictAllowedServiceNames(ctx context.Context, meta *model.Metadata) error {
// Restrict to explicitly allowed service names. The list of
// allowed service names is not considered secret, so we do
// not use constant time comparison.
if !p.allowedServiceNames[meta.Service.Name] {
return &Error{
Type: InvalidInputErrType,
Message: "service name is not allowed",
}
}
return nil
}

// getStreamReader returns a streamReader that reads ND-JSON lines from r.
func (p *Processor) getStreamReader(r io.Reader) *streamReader {
if sr, ok := p.streamReaderPool.Get().(*streamReader); ok {
Expand Down
40 changes: 38 additions & 2 deletions processor/stream/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/elastic/apm-server/beater/beatertest"
"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/tests/loader"
"github.com/elastic/apm-server/transform"
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestIntegrationRum(t *testing.T) {
UserAgent: model.UserAgent{Original: "rum-2.0"},
Client: model.Client{IP: net.ParseIP("192.0.0.1")}}

p := RUMV2Processor(&config.Config{MaxEventSize: 100 * 1024})
p := RUMV2Processor(&config.Config{MaxEventSize: 100 * 1024, RumConfig: &config.RumConfig{}})
actualResult := p.HandleStream(ctx, nil, &reqDecoderMeta, bodyReader, batchProcessor)
assertApproveResult(t, actualResult, test.name)
})
Expand Down Expand Up @@ -184,13 +185,48 @@ func TestRUMV3(t *testing.T) {
UserAgent: model.UserAgent{Original: "rum-2.0"},
Client: model.Client{IP: net.ParseIP("192.0.0.1")}}

p := RUMV3Processor(&config.Config{MaxEventSize: 100 * 1024})
p := RUMV3Processor(&config.Config{MaxEventSize: 100 * 1024, RumConfig: &config.RumConfig{}})
actualResult := p.HandleStream(ctx, nil, &reqDecoderMeta, bodyReader, batchProcessor)
assertApproveResult(t, actualResult, test.name)
})
}
}

func TestRUMAllowedServiceNames(t *testing.T) {
for _, test := range []struct {
AllowServiceNames []string
ExpectedResult *Result
}{{
AllowServiceNames: nil,
ExpectedResult: &Result{
Accepted: 2,
},
}, {
AllowServiceNames: []string{"apm-agent-js"}, // matches what's in test data
ExpectedResult: &Result{
Accepted: 2,
},
}, {
AllowServiceNames: []string{"reject_everything"},
ExpectedResult: &Result{
Accepted: 0,
Errors: []*Error{{Type: InvalidInputErrType, Message: "service name is not allowed"}},
},
}} {
p := RUMV2Processor(&config.Config{
MaxEventSize: 100 * 1024,
RumConfig: &config.RumConfig{AllowServiceNames: test.AllowServiceNames},
})

b, err := loader.LoadDataAsBytes(filepath.Join("../testdata/intake-v2/transactions_spans_rum.ndjson"))
require.NoError(t, err)
bodyReader := bytes.NewBuffer(b)

result := p.HandleStream(context.Background(), nil, &model.Metadata{}, bodyReader, modelprocessor.Nop{})
assert.Equal(t, test.ExpectedResult, result)
}
}

func TestRateLimiting(t *testing.T) {
b, err := loader.LoadDataAsBytes("../testdata/intake-v2/ratelimit.ndjson")
require.NoError(t, err)
Expand Down
Loading

0 comments on commit fdce970

Please sign in to comment.