Skip to content

Commit

Permalink
tests/system: system test for Jaeger Thrift/HTTP (#3114)
Browse files Browse the repository at this point in the history
* tests/system: system test for Jaeger Thrift/HTTP
  • Loading branch information
axw authored Jan 9, 2020
1 parent 11f9d6e commit 4e5ca1a
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 67 deletions.
Binary file added testdata/jaeger/span.thrift
Binary file not shown.
89 changes: 76 additions & 13 deletions tests/system/apmserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
import json
import os
import re
import sets
import shutil
import sys
import threading
import time
import unittest
from time import gmtime, strftime
from urlparse import urlparse

import sys
import time

from elasticsearch import Elasticsearch
from nose.tools import nottest
import requests

sys.path.append(os.path.join(os.path.dirname(__file__), '..',
Expand Down Expand Up @@ -45,7 +45,7 @@ def tearDown(self):
@classmethod
def setUpClass(cls):
cls.apm_version = "8.0.0"
cls.day = strftime("%Y.%m.%d", gmtime())
cls.day = time.strftime("%Y.%m.%d", time.gmtime())
cls.beat_name = "apm-server"
cls.beat_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), "..", ".."))
Expand Down Expand Up @@ -112,10 +112,11 @@ def get_kibana_url(user="", password=""):
)

def get_payload_path(self, name):
return self._beat_path_join(
'testdata',
'intake-v2',
name)
return self.get_testdata_path('intake-v2', name)

@nottest
def get_testdata_path(self, *names):
return self._beat_path_join('testdata', *names)

def get_payload(self, name):
with open(self.get_payload_path(name)) as f:
Expand Down Expand Up @@ -308,15 +309,17 @@ def setUp(self):
super(ElasticTest, self).setUp()

def load_docs_with_template(self, data_path, url, endpoint, expected_events_count,
query_index=None, max_timeout=10):
query_index=None, max_timeout=10, extra_headers=None):

if query_index is None:
query_index = self.index_name_pattern

headers = {'content-type': 'application/x-ndjson'}
if extra_headers:
headers.update(extra_headers)

with open(data_path) as f:
r = requests.post(url,
data=f,
headers={'content-type': 'application/x-ndjson'})
r = requests.post(url, data=f, headers=headers)
assert r.status_code == 202, r.status_code

# Wait to give documents some time to be sent to the index
Expand Down Expand Up @@ -363,6 +366,66 @@ def logged_requests(self, url="/intake/v2/events"):
if jline.get("logger") == "request" and u.path == url:
yield jline

def approve_docs(self, base_path, received, doc_type):
"""
approve_docs compares the received documents to those contained
in the file at ${base_path}.approved.json. If that file does not
exist, then it is considered equivalent to a lack of documents.
Only the document _source is compared, and we ignore differences
in some context-sensitive fields such as the "observer", which
may vary between test runs.
"""
base_path = self._beat_path_join(os.path.dirname(__file__), base_path)
approved_path = base_path + '.approved.json'
received_path = base_path + '.received.json'

try:
with open(approved_path) as f:
approved = json.load(f)
except IOError:
approved = []

received = [doc['_source'] for doc in received]
received.sort(key=lambda source: source[doc_type]['id'])

try:
for rec in received:
# Overwrite received observer values with the approved ones,
# in order to avoid noise in the 'approvals' diff if there are
# any other changes.
#
# We don't compare the observer values between received/approved,
# as they are dependent on the environment.
rec_id = rec[doc_type]['id']
rec_observer = rec['observer']
self.assertEqual(sets.Set(rec_observer.keys()), sets.Set(
["hostname", "version", "id", "ephemeral_id", "type", "version_major"]))
assert rec_observer["version"].startswith(str(rec_observer["version_major"]) + ".")
for appr in approved:
if appr[doc_type]['id'] == rec_id:
rec['observer'] = appr['observer']
break
assert len(received) == len(approved)
for i, rec in enumerate(received):
appr = approved[i]
rec_id = rec[doc_type]['id']
assert rec_id == appr[doc_type]['id'], "New entry with id {}".format(rec_id)
for k, v in rec.items():
self.assertEqual(v, appr[k])
except Exception as exc:
with open(received_path, 'w') as f:
json.dump(received, f, indent=4, separators=(',', ': '))

# Create a dynamic Exception subclass so we can fake its name to look like the original exception.
class ApprovalException(Exception):
def __init__(self, cause):
super(ApprovalException, self).__init__(cause.message)

def __str__(self):
return self.message + "\n\nReceived data differs from approved data. Run 'make update' and then 'approvals' to verify the diff."
ApprovalException.__name__ = type(exc).__name__
raise ApprovalException, exc, sys.exc_info()[2]

class ClientSideBaseTest(ServerBaseTest):
sourcemap_url = 'http://localhost:8200/assets/v1/sourcemaps'
Expand Down
5 changes: 5 additions & 0 deletions tests/system/config/apm-server.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ apm-server:
host: "localhost:8200"
secret_token: {{ secret_token }}

{% if jaeger_http_enabled %}
jaeger.http.enabled: {{ jaeger_http_enabled }}
jaeger.http.host: {{ jaeger_http_host }}
{% endif %}

{% if api_key_enabled %}
api_key.enabled: {{ api_key_enabled }}
api_key.limit: {{ api_key_limit | default(100) }}
Expand Down
59 changes: 59 additions & 0 deletions tests/system/jaeger_span.approved.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
[
{
"transaction": {
"sampled": true,
"name": "test_span",
"result": "Success",
"duration": {
"us": 2
},
"type": "custom",
"id": "5025e08c7fef6542"
},
"trace": {
"id": "00000000000000005025e08c7fef6542"
},
"observer": {
"ephemeral_id": "39dd6032-fa6c-4b9e-9fab-f35ea6ef8dfe",
"version_major": 8,
"hostname": "alloy",
"version": "8.0.0",
"type": "apm-server",
"id": "901ad5b3-b313-4c2d-a7a0-2d188005f25e"
},
"timestamp": {
"us": 1578451731616515
},
"@timestamp": "2020-01-08T02:48:51.616Z",
"labels": {
"sampler_type": "const",
"sampler_param": true
},
"agent": {
"ephemeral_id": "3a5c6b00dd41a605",
"name": "Jaeger/Go",
"version": "2.21.2-dev"
},
"host": {
"ip": "10.1.1.101",
"hostname": "alloy",
"name": "alloy"
},
"service": {
"node": {
"name": "alloy"
},
"name": "test_service",
"language": {
"name": "Go"
}
},
"ecs": {
"version": "1.2.0"
},
"processor": {
"name": "transaction",
"event": "transaction"
}
}
]
54 changes: 0 additions & 54 deletions tests/system/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from apmserver import integration_test
from apmserver import ClientSideElasticTest, ElasticTest, ExpvarBaseTest
from apmserver import OverrideIndicesTest, OverrideIndicesFailureTest
from sets import Set


@integration_test
Expand Down Expand Up @@ -102,59 +101,6 @@ def test_load_docs_with_template_and_add_error(self):

self.check_backend_error_sourcemap(self.index_error, count=4)

def approve_docs(self, base_path, received, doc_type):
base_path = self._beat_path_join(os.path.dirname(__file__), base_path)
approved_path = base_path + '.approved.json'
received_path = base_path + '.received.json'

try:
with open(approved_path) as f:
approved = json.load(f)
except IOError:
approved = []

received = [doc['_source'] for doc in received]
received.sort(key=lambda source: source[doc_type]['id'])

try:
for rec in received:
# Overwrite received observer values with the approved ones,
# in order to avoid noise in the 'approvals' diff if there are
# any other changes.
#
# We don't compare the observer values between received/approved,
# as they are dependent on the environment.
rec_id = rec[doc_type]['id']
rec_observer = rec['observer']
self.assertEqual(Set(rec_observer.keys()), Set(
["hostname", "version", "id", "ephemeral_id", "type", "version_major"]))
assert rec_observer["version"].startswith(str(rec_observer["version_major"]) + ".")
for appr in approved:
if appr[doc_type]['id'] == rec_id:
rec['observer'] = appr['observer']
break
assert len(received) == len(approved)
for i, rec in enumerate(received):
appr = approved[i]
rec_id = rec[doc_type]['id']
assert rec_id == appr[doc_type]['id'], "New entry with id {}".format(rec_id)
for k, v in rec.items():
self.assertEqual(v, appr[k])
except Exception as exc:
with open(received_path, 'w') as f:
json.dump(received, f, indent=4, separators=(',', ': '))

# Create a dynamic Exception subclass so we can fake its name to look like the original exception.
class ApprovalException(Exception):
def __init__(self, cause):
super(ApprovalException, self).__init__(cause.message)

def __str__(self):
return self.message + "\n\nReceived data differs from approved data. Run 'make update' and then 'approvals' to verify the diff."
ApprovalException.__name__ = type(exc).__name__

raise ApprovalException, exc, sys.exc_info()[2]


@integration_test
class EnrichEventIntegrationTest(ClientSideElasticTest):
Expand Down
41 changes: 41 additions & 0 deletions tests/system/test_jaeger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import re

import requests

from apmserver import integration_test
from apmserver import ElasticTest


@integration_test
class Test(ElasticTest):
jaeger_http_host = "localhost:14268"

def setUp(self):
super(Test, self).setUp()
self.wait_until(lambda: self.log_contains("Listening for Jaeger HTTP"), name="Jaeger HTTP listener started")

# Extract the Jaeger HTTP server address.
match = re.search("Listening for Jaeger HTTP requests on: (.*)$", self.get_log(), re.MULTILINE)
listen_addr = match.group(1)
self.jaeger_http_url = "http://{}/{}".format(listen_addr, 'api/traces')

def config(self):
cfg = super(Test, self).config()
cfg.update({
"jaeger_http_enabled": "true",
"jaeger_http_host": "localhost:0", # Listen on a dynamic port
})
return cfg

def test_jaeger_http(self):
"""
This test sends a Jaeger span in Thrift encoding over HTTP, and verifies that it is indexed.
"""
jaeger_span_thrift = self.get_testdata_path('jaeger', 'span.thrift')
self.load_docs_with_template(jaeger_span_thrift, self.jaeger_http_url, 'transaction', 1,
extra_headers={"content-type": "application/vnd.apache.thrift.binary"})
self.assert_no_logged_warnings()

rs = self.es.search(index=self.index_transaction)
assert rs['hits']['total']['value'] == 1, "found {} documents".format(rs['count'])
self.approve_docs('jaeger_span', rs['hits']['hits'], 'transaction')

0 comments on commit 4e5ca1a

Please sign in to comment.