diff --git a/beater/jaeger/server_test.go b/beater/jaeger/server_test.go index 196d244784d..c3bac1d0eb1 100644 --- a/beater/jaeger/server_test.go +++ b/beater/jaeger/server_test.go @@ -62,14 +62,14 @@ func TestApprovals(t *testing.T) { tc.setup(t) defer tc.teardown(t) - f := filepath.Join("testdata", name) + f := filepath.Join("..", "..", "testdata", "jaeger", name) data, err := ioutil.ReadFile(f + ".json") require.NoError(t, err) var request api_v2.PostSpansRequest require.NoError(t, json.Unmarshal(data, &request)) require.NoError(t, tc.sendBatchGRPC(request.Batch)) - require.NoError(t, approvals.ApproveEvents(tc.events, f, "")) + require.NoError(t, approvals.ApproveEvents(tc.events, f)) tc.events = nil thriftBatch := &jaegerthrift.Batch{ @@ -77,7 +77,7 @@ func TestApprovals(t *testing.T) { Spans: jaegerthriftconv.FromDomain(request.Batch.Spans), } require.NoError(t, tc.sendBatchHTTP(thriftBatch)) - require.NoError(t, approvals.ApproveEvents(tc.events, f, "")) + require.NoError(t, approvals.ApproveEvents(tc.events, f)) }) } } diff --git a/beater/jaeger/testdata/batch_0.approved.json b/testdata/jaeger/batch_0.approved.json similarity index 100% rename from beater/jaeger/testdata/batch_0.approved.json rename to testdata/jaeger/batch_0.approved.json diff --git a/beater/jaeger/testdata/batch_0.json b/testdata/jaeger/batch_0.json similarity index 100% rename from beater/jaeger/testdata/batch_0.json rename to testdata/jaeger/batch_0.json diff --git a/beater/jaeger/testdata/batch_1.approved.json b/testdata/jaeger/batch_1.approved.json similarity index 100% rename from beater/jaeger/testdata/batch_1.approved.json rename to testdata/jaeger/batch_1.approved.json diff --git a/beater/jaeger/testdata/batch_1.json b/testdata/jaeger/batch_1.json similarity index 100% rename from beater/jaeger/testdata/batch_1.json rename to testdata/jaeger/batch_1.json diff --git a/tests/system/apmserver.py b/tests/system/apmserver.py index cdaed3100ea..605cfc228c2 100644 --- a/tests/system/apmserver.py +++ b/tests/system/apmserver.py @@ -165,9 +165,17 @@ class ServerSetUpBaseTest(BaseTest): sourcemap_url = "{}/{}".format(host, 'assets/v1/sourcemaps') expvar_url = "{}/{}".format(host, 'debug/vars') + jaeger_grpc_host = "localhost:14250" + jaeger_http_host = "localhost:14268" + jaeger_http_url = "http://{}/{}".format(jaeger_http_host, 'api/traces') + def config(self): return {"ssl_enabled": "false", "queue_flush": 0, + "jaeger_grpc_enabled": "true", + "jaeger_grpc_host": self.jaeger_grpc_host, + "jaeger_http_enabled": "true", + "jaeger_http_host": self.jaeger_http_host, "path": os.path.abspath(self.working_dir) + "/log/*"} def setUp(self): @@ -325,15 +333,31 @@ def load_docs_with_template(self, data_path, url, endpoint, expected_events_coun # Wait to give documents some time to be sent to the index # This is not required but speeds up the tests time.sleep(2) - self.es.indices.refresh(index=query_index) + self.wait_for_events(endpoint, expected_events_count, index=query_index) + + def wait_for_events(self, processor_name, expected_count, index=None, max_timeout=10): + """ + wait_for_events waits for an expected number of event docs with the given + 'processor.name' value, and returns the hits when found. + """ + if index is None: + index = self.index_name_pattern + + self.es.indices.refresh(index=index) + + query = {"term": {"processor.name": processor_name}} + result = {} # TODO(axw) use "nonlocal" when we migrate to Python 3 + def get_docs(): + hits = self.es.search(index=index, body={"query": query})['hits'] + result['docs'] = hits['hits'] + return hits['total']['value'] == expected_count self.wait_until( - lambda: (self.es.count(index=query_index, body={ - "query": {"term": {"processor.name": endpoint}}} - )['count'] == expected_events_count), + get_docs, max_timeout=max_timeout, - name="{} documents to reach {}".format(endpoint, expected_events_count), + name="{} documents to reach {}".format(processor_name, expected_count), ) + return result['docs'] def check_backend_error_sourcemap(self, index, count=1): rs = self.es.search(index=index, params={"rest_total_hits_as_int": "true"}) @@ -366,7 +390,7 @@ def logged_requests(self, url="/intake/v2/events"): if jline.get("logger") == "request" and u.path == url: yield jline - def approve_docs(self, base_path, received, doc_type): + def approve_docs(self, base_path, received): """ approve_docs compares the received documents to those contained in the file at ${base_path}.approved.json. If that file does not @@ -386,8 +410,22 @@ def approve_docs(self, base_path, received, doc_type): except IOError: approved = [] + # get_doc_id returns a value suitable for sorting and identifying + # documents: either a unique ID, or a timestamp. This is necessary + # since not all event types require a unique ID (namely, errors do + # not.) + # + # We return (0, doc['error']['id']) when the event type is 'error' + # if that field exists, otherwise returns (1, doc['@timestamp']). + # The first tuple element exists to sort IDs before timestamps. + def get_doc_id(doc): + doc_type = doc['processor']['event'] + if 'id' in doc[doc_type]: + return (0, doc[doc_type]['id']) + return (1, doc['@timestamp']) + received = [doc['_source'] for doc in received] - received.sort(key=lambda source: source[doc_type]['id']) + received.sort(key=get_doc_id) try: for rec in received: @@ -397,20 +435,20 @@ def approve_docs(self, base_path, received, doc_type): # # We don't compare the observer values between received/approved, # as they are dependent on the environment. - rec_id = rec[doc_type]['id'] + rec_id = get_doc_id(rec) rec_observer = rec['observer'] self.assertEqual(sets.Set(rec_observer.keys()), sets.Set( ["hostname", "version", "id", "ephemeral_id", "type", "version_major"])) assert rec_observer["version"].startswith(str(rec_observer["version_major"]) + ".") for appr in approved: - if appr[doc_type]['id'] == rec_id: + if get_doc_id(appr) == rec_id: rec['observer'] = appr['observer'] break assert len(received) == len(approved) for i, rec in enumerate(received): appr = approved[i] - rec_id = rec[doc_type]['id'] - assert rec_id == appr[doc_type]['id'], "New entry with id {}".format(rec_id) + rec_id = get_doc_id(rec) + assert rec_id == get_doc_id(appr), "New entry with id {}".format(rec_id) for k, v in rec.items(): self.assertEqual(v, appr[k]) except Exception as exc: @@ -480,13 +518,7 @@ def upload_sourcemap(self, file_name='bundle_no_mapping.js.map', class ClientSideElasticTest(ClientSideBaseTest, ElasticTest): def wait_for_sourcemaps(self, expected_ct=1): - idx = self.index_smap - self.wait_until( - lambda: (self.es.count(index=idx, body={ - "query": {"term": {"processor.name": 'sourcemap'}}} - )['count'] == expected_ct), - name="{} sourcemaps to ingest".format(expected_ct), - ) + self.wait_for_events('sourcemap', expected_ct, index=self.index_smap) def check_rum_error_sourcemap(self, updated, expected_err=None, count=1): rs = self.es.search(index=self.index_error, params={"rest_total_hits_as_int": "true"}) diff --git a/tests/system/config/apm-server.yml.j2 b/tests/system/config/apm-server.yml.j2 index bd6b5cbe96e..cfe9f3ada13 100644 --- a/tests/system/config/apm-server.yml.j2 +++ b/tests/system/config/apm-server.yml.j2 @@ -8,6 +8,10 @@ apm-server: jaeger.http.enabled: {{ jaeger_http_enabled }} jaeger.http.host: {{ jaeger_http_host }} {% endif %} + {% if jaeger_grpc_enabled %} + jaeger.grpc.enabled: {{ jaeger_grpc_enabled }} + jaeger.grpc.host: {{ jaeger_grpc_host }} + {% endif %} {% if api_key_enabled %} api_key.enabled: {{ api_key_enabled }} diff --git a/tests/system/jaeger_batch_0.approved.json b/tests/system/jaeger_batch_0.approved.json new file mode 100644 index 00000000000..9c4097fb0e6 --- /dev/null +++ b/tests/system/jaeger_batch_0.approved.json @@ -0,0 +1,246 @@ +[ + { + "transaction": { + "sampled": true, + "name": "Driver::findNearest", + "result": "Success", + "duration": { + "us": 243417 + }, + "type": "custom", + "id": "7be2fd98d0973be3" + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + }, + "observer": { + "ephemeral_id": "70e2f245-9936-4ffc-b23a-147e39f95605", + "version_major": 8, + "hostname": "alloy", + "version": "8.0.0", + "type": "apm-server", + "id": "a27a1bdf-fa86-4f3d-a818-c66fe8fd1667" + }, + "timestamp": { + "us": 1576827704953864 + }, + "@timestamp": "2019-12-20T07:41:44.953Z", + "labels": { + "peer_port": 50535, + "as": "thrift", + "sampler_param": true, + "peer_ipv4": 2130706433, + "sampler_type": "const", + "peer_service": "driver-client" + }, + "agent": { + "ephemeral_id": "624386e9c81d2980", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "host": { + "ip": "10.0.0.13", + "hostname": "host01", + "name": "host01" + }, + "service": { + "node": { + "name": "host01" + }, + "name": "driver", + "language": { + "name": "Go" + } + }, + "ecs": { + "version": "1.2.0" + }, + "processor": { + "name": "transaction", + "event": "transaction" + } + }, + { + "parent": { + "id": "7be2fd98d0973be3" + }, + "transaction": { + "type": "custom", + "id": "7be2fd98d0973be3" + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + }, + "observer": { + "ephemeral_id": "17cdbd41-10e5-463b-8238-88d55539d283", + "version_major": 8, + "hostname": "alloy", + "version": "8.0.0", + "type": "apm-server", + "id": "0ccc2e07-dbca-4f92-b08c-ba9ca93daf1a" + }, + "timestamp": { + "us": 1576827705007552 + }, + "@timestamp": "2019-12-20T07:41:45.007Z", + "agent": { + "ephemeral_id": "624386e9c81d2980", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "host": { + "ip": "10.0.0.13", + "hostname": "host01", + "name": "host01" + }, + "service": { + "node": { + "name": "host01" + }, + "name": "driver", + "language": { + "name": "Go" + } + }, + "ecs": { + "version": "1.2.0" + }, + "error": { + "exception": [ + { + "message": "redis timeout" + } + ], + "grouping_key": "dd09a7d0d9dde0adfcd694967c5a88de", + "log": { + "message": "Retrying GetDriver after error" + } + }, + "processor": { + "name": "error", + "event": "error" + } + }, + { + "parent": { + "id": "7be2fd98d0973be3" + }, + "transaction": { + "type": "custom", + "id": "7be2fd98d0973be3" + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + }, + "observer": { + "ephemeral_id": "17cdbd41-10e5-463b-8238-88d55539d283", + "version_major": 8, + "hostname": "alloy", + "version": "8.0.0", + "type": "apm-server", + "id": "0ccc2e07-dbca-4f92-b08c-ba9ca93daf1a" + }, + "timestamp": { + "us": 1576827705089431 + }, + "@timestamp": "2019-12-20T07:41:45.089Z", + "agent": { + "ephemeral_id": "624386e9c81d2980", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "host": { + "ip": "10.0.0.13", + "hostname": "host01", + "name": "host01" + }, + "service": { + "node": { + "name": "host01" + }, + "name": "driver", + "language": { + "name": "Go" + } + }, + "ecs": { + "version": "1.2.0" + }, + "error": { + "exception": [ + { + "message": "redis timeout" + } + ], + "grouping_key": "dd09a7d0d9dde0adfcd694967c5a88de", + "log": { + "message": "Retrying GetDriver after error" + } + }, + "processor": { + "name": "error", + "event": "error" + } + }, + { + "parent": { + "id": "7be2fd98d0973be3" + }, + "transaction": { + "type": "custom", + "id": "7be2fd98d0973be3" + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + }, + "observer": { + "ephemeral_id": "17cdbd41-10e5-463b-8238-88d55539d283", + "version_major": 8, + "hostname": "alloy", + "version": "8.0.0", + "type": "apm-server", + "id": "0ccc2e07-dbca-4f92-b08c-ba9ca93daf1a" + }, + "timestamp": { + "us": 1576827705172530 + }, + "@timestamp": "2019-12-20T07:41:45.172Z", + "agent": { + "ephemeral_id": "624386e9c81d2980", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "host": { + "ip": "10.0.0.13", + "hostname": "host01", + "name": "host01" + }, + "service": { + "node": { + "name": "host01" + }, + "name": "driver", + "language": { + "name": "Go" + } + }, + "ecs": { + "version": "1.2.0" + }, + "error": { + "exception": [ + { + "message": "redis timeout" + } + ], + "grouping_key": "dd09a7d0d9dde0adfcd694967c5a88de", + "log": { + "message": "Retrying GetDriver after error" + } + }, + "processor": { + "name": "error", + "event": "error" + } + } +] \ No newline at end of file diff --git a/tests/system/jaegergrpc/main.go b/tests/system/jaegergrpc/main.go new file mode 100644 index 00000000000..08e1808a61c --- /dev/null +++ b/tests/system/jaegergrpc/main.go @@ -0,0 +1,77 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "google.golang.org/grpc" +) + +var ( + serverAddr = flag.String("addr", "localhost:14250", "Jaeger gRPC server address") + insecure = flag.Bool("insecure", false, "Disable certificate verification") +) + +func main() { + flag.Parse() + if flag.NArg() == 0 { + fmt.Fprintf(os.Stderr, "Usage: %s [flags] [ ...]\n", filepath.Base(os.Args[0])) + os.Exit(2) + } + + var opts []grpc.DialOption + if *insecure { + opts = append(opts, grpc.WithInsecure()) + } + + conn, err := grpc.Dial(*serverAddr, opts...) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + client := api_v2.NewCollectorServiceClient(conn) + for _, arg := range flag.Args() { + request, err := decodeRequest(arg) + if err != nil { + log.Fatal(err) + } + _, err = client.PostSpans(context.Background(), request) + if err != nil { + log.Fatal(err) + } + } +} + +func decodeRequest(filename string) (*api_v2.PostSpansRequest, error) { + var request api_v2.PostSpansRequest + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + return &request, json.NewDecoder(f).Decode(&request) +} diff --git a/tests/system/test_integration.py b/tests/system/test_integration.py index e7aca83d592..96a8870acac 100644 --- a/tests/system/test_integration.py +++ b/tests/system/test_integration.py @@ -1,7 +1,4 @@ from datetime import datetime, timedelta -import json -import os -import sys import time import requests @@ -79,12 +76,12 @@ def test_load_docs_with_template_and_add_transaction(self): # compare existing ES documents for transactions with new ones rs = self.es.search(index=self.index_transaction) assert rs['hits']['total']['value'] == 4, "found {} documents".format(rs['count']) - self.approve_docs('transaction', rs['hits']['hits'], 'transaction') + self.approve_docs('transaction', rs['hits']['hits']) # compare existing ES documents for spans with new ones rs = self.es.search(index=self.index_span) assert rs['hits']['total']['value'] == 5, "found {} documents".format(rs['count']) - self.approve_docs('spans', rs['hits']['hits'], 'span') + self.approve_docs('spans', rs['hits']['hits']) def test_load_docs_with_template_and_add_error(self): """ @@ -97,7 +94,7 @@ def test_load_docs_with_template_and_add_error(self): # compare existing ES documents for errors with new ones rs = self.es.search(index=self.index_error) assert rs['hits']['total']['value'] == 4, "found {} documents".format(rs['count']) - self.approve_docs('error', rs['hits']['hits'], 'error') + self.approve_docs('error', rs['hits']['hits']) self.check_backend_error_sourcemap(self.index_error, count=4) diff --git a/tests/system/test_jaeger.py b/tests/system/test_jaeger.py index 90ac4d5b29b..978eb21ebf3 100644 --- a/tests/system/test_jaeger.py +++ b/tests/system/test_jaeger.py @@ -1,6 +1,6 @@ +import os import re - -import requests +import subprocess from apmserver import integration_test from apmserver import ElasticTest @@ -8,22 +8,26 @@ @integration_test class Test(ElasticTest): - jaeger_http_host = "localhost:14268" - def setUp(self): super(Test, self).setUp() self.wait_until(lambda: self.log_contains("Listening for Jaeger HTTP"), name="Jaeger HTTP listener started") + self.wait_until(lambda: self.log_contains("Listening for Jaeger gRPC"), name="Jaeger gRPC listener started") - # Extract the Jaeger HTTP server address. - match = re.search("Listening for Jaeger HTTP requests on: (.*)$", self.get_log(), re.MULTILINE) - listen_addr = match.group(1) - self.jaeger_http_url = "http://{}/{}".format(listen_addr, 'api/traces') + # Extract the Jaeger server addresses. + log = self.get_log() + match = re.search("Listening for Jaeger HTTP requests on: (.*)$", log, re.MULTILINE) + self.jaeger_http_url = "http://{}/{}".format(match.group(1), 'api/traces') + match = re.search("Listening for Jaeger gRPC requests on: (.*)$", log, re.MULTILINE) + self.jaeger_grpc_addr = match.group(1) def config(self): cfg = super(Test, self).config() cfg.update({ + "jaeger_grpc_enabled": "true", "jaeger_http_enabled": "true", - "jaeger_http_host": "localhost:0", # Listen on a dynamic port + # Listen on dynamic ports + "jaeger_grpc_host": "localhost:0", + "jaeger_http_host": "localhost:0", }) return cfg @@ -34,8 +38,25 @@ def test_jaeger_http(self): jaeger_span_thrift = self.get_testdata_path('jaeger', 'span.thrift') self.load_docs_with_template(jaeger_span_thrift, self.jaeger_http_url, 'transaction', 1, extra_headers={"content-type": "application/vnd.apache.thrift.binary"}) + self.assert_no_logged_warnings() + transaction_docs = self.wait_for_events('transaction', 1) + self.approve_docs('jaeger_span', transaction_docs) + + def test_jaeger_grpc(self): + """ + This test sends a Jaeger batch over gRPC, and verifies that the spans are indexed. + """ + jaeger_request_data = self.get_testdata_path('jaeger', 'batch_0.json') - rs = self.es.search(index=self.index_transaction) - assert rs['hits']['total']['value'] == 1, "found {} documents".format(rs['count']) - self.approve_docs('jaeger_span', rs['hits']['hits'], 'transaction') + client = os.path.join(os.path.dirname(__file__), 'jaegergrpc') + subprocess.check_call(['go', 'run', client, + '-addr', self.jaeger_grpc_addr, + '-insecure', + jaeger_request_data, + ]) + + self.assert_no_logged_warnings() + transaction_docs = self.wait_for_events('transaction', 1) + error_docs = self.wait_for_events('error', 3) + self.approve_docs('jaeger_batch_0', transaction_docs + error_docs)