Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests/system: system test for Jaeger Thrift/HTTP #3114

Merged
merged 4 commits into from
Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added testdata/jaeger/span.thrift
Binary file not shown.
89 changes: 76 additions & 13 deletions tests/system/apmserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
import json
import os
import re
import sets
import shutil
import sys
import threading
import time
import unittest
from time import gmtime, strftime
from urlparse import urlparse

import sys
import time

from elasticsearch import Elasticsearch
from nose.tools import nottest
import requests

sys.path.append(os.path.join(os.path.dirname(__file__), '..',
Expand Down Expand Up @@ -45,7 +45,7 @@ def tearDown(self):
@classmethod
def setUpClass(cls):
cls.apm_version = "8.0.0"
cls.day = strftime("%Y.%m.%d", gmtime())
cls.day = time.strftime("%Y.%m.%d", time.gmtime())
cls.beat_name = "apm-server"
cls.beat_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), "..", ".."))
Expand Down Expand Up @@ -112,10 +112,11 @@ def get_kibana_url(user="", password=""):
)

def get_payload_path(self, name):
return self._beat_path_join(
'testdata',
'intake-v2',
name)
return self.get_testdata_path('intake-v2', name)

@nottest
def get_testdata_path(self, *names):
return self._beat_path_join('testdata', *names)

def get_payload(self, name):
with open(self.get_payload_path(name)) as f:
Expand Down Expand Up @@ -308,15 +309,17 @@ def setUp(self):
super(ElasticTest, self).setUp()

def load_docs_with_template(self, data_path, url, endpoint, expected_events_count,
query_index=None, max_timeout=10):
query_index=None, max_timeout=10, extra_headers=None):

if query_index is None:
query_index = self.index_name_pattern

headers = {'content-type': 'application/x-ndjson'}
if extra_headers:
headers.update(extra_headers)

with open(data_path) as f:
r = requests.post(url,
data=f,
headers={'content-type': 'application/x-ndjson'})
r = requests.post(url, data=f, headers=headers)
assert r.status_code == 202, r.status_code

# Wait to give documents some time to be sent to the index
Expand Down Expand Up @@ -363,6 +366,66 @@ def logged_requests(self, url="/intake/v2/events"):
if jline.get("logger") == "request" and u.path == url:
yield jline

def approve_docs(self, base_path, received, doc_type):
"""
approve_docs compares the received documents to those contained
in the file at ${base_path}.approved.json. If that file does not
exist, then it is considered equivalent to a lack of documents.

Only the document _source is compared, and we ignore differences
in some context-sensitive fields such as the "observer", which
may vary between test runs.
"""
base_path = self._beat_path_join(os.path.dirname(__file__), base_path)
approved_path = base_path + '.approved.json'
received_path = base_path + '.received.json'

try:
with open(approved_path) as f:
approved = json.load(f)
except IOError:
approved = []

received = [doc['_source'] for doc in received]
received.sort(key=lambda source: source[doc_type]['id'])

try:
for rec in received:
# Overwrite received observer values with the approved ones,
# in order to avoid noise in the 'approvals' diff if there are
# any other changes.
#
# We don't compare the observer values between received/approved,
# as they are dependent on the environment.
rec_id = rec[doc_type]['id']
rec_observer = rec['observer']
self.assertEqual(sets.Set(rec_observer.keys()), sets.Set(
["hostname", "version", "id", "ephemeral_id", "type", "version_major"]))
assert rec_observer["version"].startswith(str(rec_observer["version_major"]) + ".")
for appr in approved:
if appr[doc_type]['id'] == rec_id:
rec['observer'] = appr['observer']
break
assert len(received) == len(approved)
for i, rec in enumerate(received):
appr = approved[i]
rec_id = rec[doc_type]['id']
assert rec_id == appr[doc_type]['id'], "New entry with id {}".format(rec_id)
for k, v in rec.items():
self.assertEqual(v, appr[k])
except Exception as exc:
with open(received_path, 'w') as f:
json.dump(received, f, indent=4, separators=(',', ': '))

# Create a dynamic Exception subclass so we can fake its name to look like the original exception.
class ApprovalException(Exception):
def __init__(self, cause):
super(ApprovalException, self).__init__(cause.message)

def __str__(self):
return self.message + "\n\nReceived data differs from approved data. Run 'make update' and then 'approvals' to verify the diff."
ApprovalException.__name__ = type(exc).__name__
raise ApprovalException, exc, sys.exc_info()[2]

class ClientSideBaseTest(ServerBaseTest):
sourcemap_url = 'http://localhost:8200/assets/v1/sourcemaps'
Expand Down
5 changes: 5 additions & 0 deletions tests/system/config/apm-server.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ apm-server:
host: "localhost:8200"
secret_token: {{ secret_token }}

{% if jaeger_http_enabled %}
jaeger.http.enabled: {{ jaeger_http_enabled }}
jaeger.http.host: {{ jaeger_http_host }}
{% endif %}

{% if api_key_enabled %}
api_key.enabled: {{ api_key_enabled }}
api_key.limit: {{ api_key_limit | default(100) }}
Expand Down
59 changes: 59 additions & 0 deletions tests/system/jaeger_span.approved.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
[
{
"transaction": {
"sampled": true,
"name": "test_span",
"result": "Success",
"duration": {
"us": 2
},
"type": "custom",
"id": "5025e08c7fef6542"
},
"trace": {
"id": "00000000000000005025e08c7fef6542"
},
"observer": {
"ephemeral_id": "39dd6032-fa6c-4b9e-9fab-f35ea6ef8dfe",
"version_major": 8,
"hostname": "alloy",
"version": "8.0.0",
"type": "apm-server",
"id": "901ad5b3-b313-4c2d-a7a0-2d188005f25e"
},
"timestamp": {
"us": 1578451731616515
},
"@timestamp": "2020-01-08T02:48:51.616Z",
"labels": {
"sampler_type": "const",
"sampler_param": true
},
"agent": {
"ephemeral_id": "3a5c6b00dd41a605",
"name": "Jaeger/Go",
"version": "2.21.2-dev"
},
"host": {
"ip": "10.1.1.101",
"hostname": "alloy",
"name": "alloy"
},
"service": {
"node": {
"name": "alloy"
},
"name": "test_service",
"language": {
"name": "Go"
}
},
"ecs": {
"version": "1.2.0"
},
"processor": {
"name": "transaction",
"event": "transaction"
}
}
]
54 changes: 0 additions & 54 deletions tests/system/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from apmserver import integration_test
from apmserver import ClientSideElasticTest, ElasticTest, ExpvarBaseTest
from apmserver import OverrideIndicesTest, OverrideIndicesFailureTest
from sets import Set


@integration_test
Expand Down Expand Up @@ -102,59 +101,6 @@ def test_load_docs_with_template_and_add_error(self):

self.check_backend_error_sourcemap(self.index_error, count=4)

def approve_docs(self, base_path, received, doc_type):
base_path = self._beat_path_join(os.path.dirname(__file__), base_path)
approved_path = base_path + '.approved.json'
received_path = base_path + '.received.json'

try:
with open(approved_path) as f:
approved = json.load(f)
except IOError:
approved = []

received = [doc['_source'] for doc in received]
received.sort(key=lambda source: source[doc_type]['id'])

try:
for rec in received:
# Overwrite received observer values with the approved ones,
# in order to avoid noise in the 'approvals' diff if there are
# any other changes.
#
# We don't compare the observer values between received/approved,
# as they are dependent on the environment.
rec_id = rec[doc_type]['id']
rec_observer = rec['observer']
self.assertEqual(Set(rec_observer.keys()), Set(
["hostname", "version", "id", "ephemeral_id", "type", "version_major"]))
assert rec_observer["version"].startswith(str(rec_observer["version_major"]) + ".")
for appr in approved:
if appr[doc_type]['id'] == rec_id:
rec['observer'] = appr['observer']
break
assert len(received) == len(approved)
for i, rec in enumerate(received):
appr = approved[i]
rec_id = rec[doc_type]['id']
assert rec_id == appr[doc_type]['id'], "New entry with id {}".format(rec_id)
for k, v in rec.items():
self.assertEqual(v, appr[k])
except Exception as exc:
with open(received_path, 'w') as f:
json.dump(received, f, indent=4, separators=(',', ': '))

# Create a dynamic Exception subclass so we can fake its name to look like the original exception.
class ApprovalException(Exception):
def __init__(self, cause):
super(ApprovalException, self).__init__(cause.message)

def __str__(self):
return self.message + "\n\nReceived data differs from approved data. Run 'make update' and then 'approvals' to verify the diff."
ApprovalException.__name__ = type(exc).__name__

raise ApprovalException, exc, sys.exc_info()[2]


@integration_test
class EnrichEventIntegrationTest(ClientSideElasticTest):
Expand Down
41 changes: 41 additions & 0 deletions tests/system/test_jaeger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import re

import requests

from apmserver import integration_test
from apmserver import ElasticTest


@integration_test
class Test(ElasticTest):
jaeger_http_host = "localhost:14268"

def setUp(self):
super(Test, self).setUp()
self.wait_until(lambda: self.log_contains("Listening for Jaeger HTTP"), name="Jaeger HTTP listener started")

# Extract the Jaeger HTTP server address.
match = re.search("Listening for Jaeger HTTP requests on: (.*)$", self.get_log(), re.MULTILINE)
listen_addr = match.group(1)
self.jaeger_http_url = "http://{}/{}".format(listen_addr, 'api/traces')

def config(self):
cfg = super(Test, self).config()
cfg.update({
"jaeger_http_enabled": "true",
"jaeger_http_host": "localhost:0", # Listen on a dynamic port
})
return cfg

def test_jaeger_http(self):
"""
This test sends a Jaeger span in Thrift encoding over HTTP, and verifies that it is indexed.
"""
jaeger_span_thrift = self.get_testdata_path('jaeger', 'span.thrift')
self.load_docs_with_template(jaeger_span_thrift, self.jaeger_http_url, 'transaction', 1,
extra_headers={"content-type": "application/vnd.apache.thrift.binary"})
self.assert_no_logged_warnings()

rs = self.es.search(index=self.index_transaction)
assert rs['hits']['total']['value'] == 1, "found {} documents".format(rs['count'])
self.approve_docs('jaeger_span', rs['hits']['hits'], 'transaction')