Skip to content

Commit

Permalink
tests/system: system test for Jaeger gRPC
Browse files Browse the repository at this point in the history
  • Loading branch information
axw authored Jan 9, 2020
1 parent 4e5ca1a commit 92da9a5
Show file tree
Hide file tree
Showing 11 changed files with 416 additions and 39 deletions.
6 changes: 3 additions & 3 deletions beater/jaeger/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,22 @@ func TestApprovals(t *testing.T) {
tc.setup(t)
defer tc.teardown(t)

f := filepath.Join("testdata", name)
f := filepath.Join("..", "..", "testdata", "jaeger", name)
data, err := ioutil.ReadFile(f + ".json")
require.NoError(t, err)
var request api_v2.PostSpansRequest
require.NoError(t, json.Unmarshal(data, &request))

require.NoError(t, tc.sendBatchGRPC(request.Batch))
require.NoError(t, approvals.ApproveEvents(tc.events, f, ""))
require.NoError(t, approvals.ApproveEvents(tc.events, f))

tc.events = nil
thriftBatch := &jaegerthrift.Batch{
Process: thriftProcessFromModel(request.Batch.Process),
Spans: jaegerthriftconv.FromDomain(request.Batch.Spans),
}
require.NoError(t, tc.sendBatchHTTP(thriftBatch))
require.NoError(t, approvals.ApproveEvents(tc.events, f, ""))
require.NoError(t, approvals.ApproveEvents(tc.events, f))
})
}
}
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
68 changes: 50 additions & 18 deletions tests/system/apmserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,17 @@ class ServerSetUpBaseTest(BaseTest):
sourcemap_url = "{}/{}".format(host, 'assets/v1/sourcemaps')
expvar_url = "{}/{}".format(host, 'debug/vars')

jaeger_grpc_host = "localhost:14250"
jaeger_http_host = "localhost:14268"
jaeger_http_url = "http://{}/{}".format(jaeger_http_host, 'api/traces')

def config(self):
return {"ssl_enabled": "false",
"queue_flush": 0,
"jaeger_grpc_enabled": "true",
"jaeger_grpc_host": self.jaeger_grpc_host,
"jaeger_http_enabled": "true",
"jaeger_http_host": self.jaeger_http_host,
"path": os.path.abspath(self.working_dir) + "/log/*"}

def setUp(self):
Expand Down Expand Up @@ -325,15 +333,31 @@ def load_docs_with_template(self, data_path, url, endpoint, expected_events_coun
# Wait to give documents some time to be sent to the index
# This is not required but speeds up the tests
time.sleep(2)
self.es.indices.refresh(index=query_index)
self.wait_for_events(endpoint, expected_events_count, index=query_index)

def wait_for_events(self, processor_name, expected_count, index=None, max_timeout=10):
"""
wait_for_events waits for an expected number of event docs with the given
'processor.name' value, and returns the hits when found.
"""
if index is None:
index = self.index_name_pattern

self.es.indices.refresh(index=index)

query = {"term": {"processor.name": processor_name}}
result = {} # TODO(axw) use "nonlocal" when we migrate to Python 3
def get_docs():
hits = self.es.search(index=index, body={"query": query})['hits']
result['docs'] = hits['hits']
return hits['total']['value'] == expected_count

self.wait_until(
lambda: (self.es.count(index=query_index, body={
"query": {"term": {"processor.name": endpoint}}}
)['count'] == expected_events_count),
get_docs,
max_timeout=max_timeout,
name="{} documents to reach {}".format(endpoint, expected_events_count),
name="{} documents to reach {}".format(processor_name, expected_count),
)
return result['docs']

def check_backend_error_sourcemap(self, index, count=1):
rs = self.es.search(index=index, params={"rest_total_hits_as_int": "true"})
Expand Down Expand Up @@ -366,7 +390,7 @@ def logged_requests(self, url="/intake/v2/events"):
if jline.get("logger") == "request" and u.path == url:
yield jline

def approve_docs(self, base_path, received, doc_type):
def approve_docs(self, base_path, received):
"""
approve_docs compares the received documents to those contained
in the file at ${base_path}.approved.json. If that file does not
Expand All @@ -386,8 +410,22 @@ def approve_docs(self, base_path, received, doc_type):
except IOError:
approved = []

# get_doc_id returns a value suitable for sorting and identifying
# documents: either a unique ID, or a timestamp. This is necessary
# since not all event types require a unique ID (namely, errors do
# not.)
#
# We return (0, doc['error']['id']) when the event type is 'error'
# if that field exists, otherwise returns (1, doc['@timestamp']).
# The first tuple element exists to sort IDs before timestamps.
def get_doc_id(doc):
doc_type = doc['processor']['event']
if 'id' in doc[doc_type]:
return (0, doc[doc_type]['id'])
return (1, doc['@timestamp'])

received = [doc['_source'] for doc in received]
received.sort(key=lambda source: source[doc_type]['id'])
received.sort(key=get_doc_id)

try:
for rec in received:
Expand All @@ -397,20 +435,20 @@ def approve_docs(self, base_path, received, doc_type):
#
# We don't compare the observer values between received/approved,
# as they are dependent on the environment.
rec_id = rec[doc_type]['id']
rec_id = get_doc_id(rec)
rec_observer = rec['observer']
self.assertEqual(sets.Set(rec_observer.keys()), sets.Set(
["hostname", "version", "id", "ephemeral_id", "type", "version_major"]))
assert rec_observer["version"].startswith(str(rec_observer["version_major"]) + ".")
for appr in approved:
if appr[doc_type]['id'] == rec_id:
if get_doc_id(appr) == rec_id:
rec['observer'] = appr['observer']
break
assert len(received) == len(approved)
for i, rec in enumerate(received):
appr = approved[i]
rec_id = rec[doc_type]['id']
assert rec_id == appr[doc_type]['id'], "New entry with id {}".format(rec_id)
rec_id = get_doc_id(rec)
assert rec_id == get_doc_id(appr), "New entry with id {}".format(rec_id)
for k, v in rec.items():
self.assertEqual(v, appr[k])
except Exception as exc:
Expand Down Expand Up @@ -480,13 +518,7 @@ def upload_sourcemap(self, file_name='bundle_no_mapping.js.map',

class ClientSideElasticTest(ClientSideBaseTest, ElasticTest):
def wait_for_sourcemaps(self, expected_ct=1):
idx = self.index_smap
self.wait_until(
lambda: (self.es.count(index=idx, body={
"query": {"term": {"processor.name": 'sourcemap'}}}
)['count'] == expected_ct),
name="{} sourcemaps to ingest".format(expected_ct),
)
self.wait_for_events('sourcemap', expected_ct, index=self.index_smap)

def check_rum_error_sourcemap(self, updated, expected_err=None, count=1):
rs = self.es.search(index=self.index_error, params={"rest_total_hits_as_int": "true"})
Expand Down
4 changes: 4 additions & 0 deletions tests/system/config/apm-server.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ apm-server:
jaeger.http.enabled: {{ jaeger_http_enabled }}
jaeger.http.host: {{ jaeger_http_host }}
{% endif %}
{% if jaeger_grpc_enabled %}
jaeger.grpc.enabled: {{ jaeger_grpc_enabled }}
jaeger.grpc.host: {{ jaeger_grpc_host }}
{% endif %}

{% if api_key_enabled %}
api_key.enabled: {{ api_key_enabled }}
Expand Down
Loading

0 comments on commit 92da9a5

Please sign in to comment.