From 4e5ca1a4a1813c8dcff6085e344fda03be877722 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 9 Jan 2020 08:56:28 +0800 Subject: [PATCH] tests/system: system test for Jaeger Thrift/HTTP (#3114) * tests/system: system test for Jaeger Thrift/HTTP --- testdata/jaeger/span.thrift | Bin 0 -> 392 bytes tests/system/apmserver.py | 89 +++++++++++++++++++++---- tests/system/config/apm-server.yml.j2 | 5 ++ tests/system/jaeger_span.approved.json | 59 ++++++++++++++++ tests/system/test_integration.py | 54 --------------- tests/system/test_jaeger.py | 41 ++++++++++++ 6 files changed, 181 insertions(+), 67 deletions(-) create mode 100644 testdata/jaeger/span.thrift create mode 100644 tests/system/jaeger_span.approved.json create mode 100644 tests/system/test_jaeger.py diff --git a/testdata/jaeger/span.thrift b/testdata/jaeger/span.thrift new file mode 100644 index 0000000000000000000000000000000000000000..c45a2901be595a4e6a1de40dbd96d79809c58e45 GIT binary patch literal 392 zcmZ8d%T2^E6r4C4$U0eqL=7AhIf2~*xNxO|l}Rix%EyYmk@iFb+&Rzmln*@=?^;}!=KEsLYj2NdD?W!2j+w151 wMM$xBf*q29Iur3X)4Wryy^J}ke_iXL*Ty&5{Mr;Bc{g_<1W{mfCqI7z-+^5`ng9R* literal 0 HcmV?d00001 diff --git a/tests/system/apmserver.py b/tests/system/apmserver.py index df63fd4a8f0..cdaed3100ea 100644 --- a/tests/system/apmserver.py +++ b/tests/system/apmserver.py @@ -2,16 +2,16 @@ import json import os import re +import sets import shutil +import sys import threading +import time import unittest -from time import gmtime, strftime from urlparse import urlparse -import sys -import time - from elasticsearch import Elasticsearch +from nose.tools import nottest import requests sys.path.append(os.path.join(os.path.dirname(__file__), '..', @@ -45,7 +45,7 @@ def tearDown(self): @classmethod def setUpClass(cls): cls.apm_version = "8.0.0" - cls.day = strftime("%Y.%m.%d", gmtime()) + cls.day = time.strftime("%Y.%m.%d", time.gmtime()) cls.beat_name = "apm-server" cls.beat_path = os.path.abspath(os.path.join( os.path.dirname(__file__), "..", "..")) @@ -112,10 +112,11 @@ def get_kibana_url(user="", password=""): ) def get_payload_path(self, name): - return self._beat_path_join( - 'testdata', - 'intake-v2', - name) + return self.get_testdata_path('intake-v2', name) + + @nottest + def get_testdata_path(self, *names): + return self._beat_path_join('testdata', *names) def get_payload(self, name): with open(self.get_payload_path(name)) as f: @@ -308,15 +309,17 @@ def setUp(self): super(ElasticTest, self).setUp() def load_docs_with_template(self, data_path, url, endpoint, expected_events_count, - query_index=None, max_timeout=10): + query_index=None, max_timeout=10, extra_headers=None): if query_index is None: query_index = self.index_name_pattern + headers = {'content-type': 'application/x-ndjson'} + if extra_headers: + headers.update(extra_headers) + with open(data_path) as f: - r = requests.post(url, - data=f, - headers={'content-type': 'application/x-ndjson'}) + r = requests.post(url, data=f, headers=headers) assert r.status_code == 202, r.status_code # Wait to give documents some time to be sent to the index @@ -363,6 +366,66 @@ def logged_requests(self, url="/intake/v2/events"): if jline.get("logger") == "request" and u.path == url: yield jline + def approve_docs(self, base_path, received, doc_type): + """ + approve_docs compares the received documents to those contained + in the file at ${base_path}.approved.json. If that file does not + exist, then it is considered equivalent to a lack of documents. + + Only the document _source is compared, and we ignore differences + in some context-sensitive fields such as the "observer", which + may vary between test runs. + """ + base_path = self._beat_path_join(os.path.dirname(__file__), base_path) + approved_path = base_path + '.approved.json' + received_path = base_path + '.received.json' + + try: + with open(approved_path) as f: + approved = json.load(f) + except IOError: + approved = [] + + received = [doc['_source'] for doc in received] + received.sort(key=lambda source: source[doc_type]['id']) + + try: + for rec in received: + # Overwrite received observer values with the approved ones, + # in order to avoid noise in the 'approvals' diff if there are + # any other changes. + # + # We don't compare the observer values between received/approved, + # as they are dependent on the environment. + rec_id = rec[doc_type]['id'] + rec_observer = rec['observer'] + self.assertEqual(sets.Set(rec_observer.keys()), sets.Set( + ["hostname", "version", "id", "ephemeral_id", "type", "version_major"])) + assert rec_observer["version"].startswith(str(rec_observer["version_major"]) + ".") + for appr in approved: + if appr[doc_type]['id'] == rec_id: + rec['observer'] = appr['observer'] + break + assert len(received) == len(approved) + for i, rec in enumerate(received): + appr = approved[i] + rec_id = rec[doc_type]['id'] + assert rec_id == appr[doc_type]['id'], "New entry with id {}".format(rec_id) + for k, v in rec.items(): + self.assertEqual(v, appr[k]) + except Exception as exc: + with open(received_path, 'w') as f: + json.dump(received, f, indent=4, separators=(',', ': ')) + + # Create a dynamic Exception subclass so we can fake its name to look like the original exception. + class ApprovalException(Exception): + def __init__(self, cause): + super(ApprovalException, self).__init__(cause.message) + + def __str__(self): + return self.message + "\n\nReceived data differs from approved data. Run 'make update' and then 'approvals' to verify the diff." + ApprovalException.__name__ = type(exc).__name__ + raise ApprovalException, exc, sys.exc_info()[2] class ClientSideBaseTest(ServerBaseTest): sourcemap_url = 'http://localhost:8200/assets/v1/sourcemaps' diff --git a/tests/system/config/apm-server.yml.j2 b/tests/system/config/apm-server.yml.j2 index 97db0c7918b..bd6b5cbe96e 100644 --- a/tests/system/config/apm-server.yml.j2 +++ b/tests/system/config/apm-server.yml.j2 @@ -4,6 +4,11 @@ apm-server: host: "localhost:8200" secret_token: {{ secret_token }} + {% if jaeger_http_enabled %} + jaeger.http.enabled: {{ jaeger_http_enabled }} + jaeger.http.host: {{ jaeger_http_host }} + {% endif %} + {% if api_key_enabled %} api_key.enabled: {{ api_key_enabled }} api_key.limit: {{ api_key_limit | default(100) }} diff --git a/tests/system/jaeger_span.approved.json b/tests/system/jaeger_span.approved.json new file mode 100644 index 00000000000..8110030fcc0 --- /dev/null +++ b/tests/system/jaeger_span.approved.json @@ -0,0 +1,59 @@ +[ + { + "transaction": { + "sampled": true, + "name": "test_span", + "result": "Success", + "duration": { + "us": 2 + }, + "type": "custom", + "id": "5025e08c7fef6542" + }, + "trace": { + "id": "00000000000000005025e08c7fef6542" + }, + "observer": { + "ephemeral_id": "39dd6032-fa6c-4b9e-9fab-f35ea6ef8dfe", + "version_major": 8, + "hostname": "alloy", + "version": "8.0.0", + "type": "apm-server", + "id": "901ad5b3-b313-4c2d-a7a0-2d188005f25e" + }, + "timestamp": { + "us": 1578451731616515 + }, + "@timestamp": "2020-01-08T02:48:51.616Z", + "labels": { + "sampler_type": "const", + "sampler_param": true + }, + "agent": { + "ephemeral_id": "3a5c6b00dd41a605", + "name": "Jaeger/Go", + "version": "2.21.2-dev" + }, + "host": { + "ip": "10.1.1.101", + "hostname": "alloy", + "name": "alloy" + }, + "service": { + "node": { + "name": "alloy" + }, + "name": "test_service", + "language": { + "name": "Go" + } + }, + "ecs": { + "version": "1.2.0" + }, + "processor": { + "name": "transaction", + "event": "transaction" + } + } +] \ No newline at end of file diff --git a/tests/system/test_integration.py b/tests/system/test_integration.py index 9dd6a450236..e7aca83d592 100644 --- a/tests/system/test_integration.py +++ b/tests/system/test_integration.py @@ -9,7 +9,6 @@ from apmserver import integration_test from apmserver import ClientSideElasticTest, ElasticTest, ExpvarBaseTest from apmserver import OverrideIndicesTest, OverrideIndicesFailureTest -from sets import Set @integration_test @@ -102,59 +101,6 @@ def test_load_docs_with_template_and_add_error(self): self.check_backend_error_sourcemap(self.index_error, count=4) - def approve_docs(self, base_path, received, doc_type): - base_path = self._beat_path_join(os.path.dirname(__file__), base_path) - approved_path = base_path + '.approved.json' - received_path = base_path + '.received.json' - - try: - with open(approved_path) as f: - approved = json.load(f) - except IOError: - approved = [] - - received = [doc['_source'] for doc in received] - received.sort(key=lambda source: source[doc_type]['id']) - - try: - for rec in received: - # Overwrite received observer values with the approved ones, - # in order to avoid noise in the 'approvals' diff if there are - # any other changes. - # - # We don't compare the observer values between received/approved, - # as they are dependent on the environment. - rec_id = rec[doc_type]['id'] - rec_observer = rec['observer'] - self.assertEqual(Set(rec_observer.keys()), Set( - ["hostname", "version", "id", "ephemeral_id", "type", "version_major"])) - assert rec_observer["version"].startswith(str(rec_observer["version_major"]) + ".") - for appr in approved: - if appr[doc_type]['id'] == rec_id: - rec['observer'] = appr['observer'] - break - assert len(received) == len(approved) - for i, rec in enumerate(received): - appr = approved[i] - rec_id = rec[doc_type]['id'] - assert rec_id == appr[doc_type]['id'], "New entry with id {}".format(rec_id) - for k, v in rec.items(): - self.assertEqual(v, appr[k]) - except Exception as exc: - with open(received_path, 'w') as f: - json.dump(received, f, indent=4, separators=(',', ': ')) - - # Create a dynamic Exception subclass so we can fake its name to look like the original exception. - class ApprovalException(Exception): - def __init__(self, cause): - super(ApprovalException, self).__init__(cause.message) - - def __str__(self): - return self.message + "\n\nReceived data differs from approved data. Run 'make update' and then 'approvals' to verify the diff." - ApprovalException.__name__ = type(exc).__name__ - - raise ApprovalException, exc, sys.exc_info()[2] - @integration_test class EnrichEventIntegrationTest(ClientSideElasticTest): diff --git a/tests/system/test_jaeger.py b/tests/system/test_jaeger.py new file mode 100644 index 00000000000..90ac4d5b29b --- /dev/null +++ b/tests/system/test_jaeger.py @@ -0,0 +1,41 @@ +import re + +import requests + +from apmserver import integration_test +from apmserver import ElasticTest + + +@integration_test +class Test(ElasticTest): + jaeger_http_host = "localhost:14268" + + def setUp(self): + super(Test, self).setUp() + self.wait_until(lambda: self.log_contains("Listening for Jaeger HTTP"), name="Jaeger HTTP listener started") + + # Extract the Jaeger HTTP server address. + match = re.search("Listening for Jaeger HTTP requests on: (.*)$", self.get_log(), re.MULTILINE) + listen_addr = match.group(1) + self.jaeger_http_url = "http://{}/{}".format(listen_addr, 'api/traces') + + def config(self): + cfg = super(Test, self).config() + cfg.update({ + "jaeger_http_enabled": "true", + "jaeger_http_host": "localhost:0", # Listen on a dynamic port + }) + return cfg + + def test_jaeger_http(self): + """ + This test sends a Jaeger span in Thrift encoding over HTTP, and verifies that it is indexed. + """ + jaeger_span_thrift = self.get_testdata_path('jaeger', 'span.thrift') + self.load_docs_with_template(jaeger_span_thrift, self.jaeger_http_url, 'transaction', 1, + extra_headers={"content-type": "application/vnd.apache.thrift.binary"}) + self.assert_no_logged_warnings() + + rs = self.es.search(index=self.index_transaction) + assert rs['hits']['total']['value'] == 1, "found {} documents".format(rs['count']) + self.approve_docs('jaeger_span', rs['hits']['hits'], 'transaction')