Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jaeger grpc system test #3118

Merged
merged 6 commits into from
Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions beater/jaeger/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,22 @@ func TestApprovals(t *testing.T) {
tc.setup(t)
defer tc.teardown(t)

f := filepath.Join("testdata", name)
f := filepath.Join("..", "..", "testdata", "jaeger", name)
data, err := ioutil.ReadFile(f + ".json")
require.NoError(t, err)
var request api_v2.PostSpansRequest
require.NoError(t, json.Unmarshal(data, &request))

require.NoError(t, tc.sendBatchGRPC(request.Batch))
require.NoError(t, approvals.ApproveEvents(tc.events, f, ""))
require.NoError(t, approvals.ApproveEvents(tc.events, f))

tc.events = nil
thriftBatch := &jaegerthrift.Batch{
Process: thriftProcessFromModel(request.Batch.Process),
Spans: jaegerthriftconv.FromDomain(request.Batch.Spans),
}
require.NoError(t, tc.sendBatchHTTP(thriftBatch))
require.NoError(t, approvals.ApproveEvents(tc.events, f, ""))
require.NoError(t, approvals.ApproveEvents(tc.events, f))
})
}
}
Expand Down
File renamed without changes.
File renamed without changes.
68 changes: 50 additions & 18 deletions tests/system/apmserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,17 @@ class ServerSetUpBaseTest(BaseTest):
sourcemap_url = "{}/{}".format(host, 'assets/v1/sourcemaps')
expvar_url = "{}/{}".format(host, 'debug/vars')

jaeger_grpc_host = "localhost:14250"
axw marked this conversation as resolved.
Show resolved Hide resolved
jaeger_http_host = "localhost:14268"
jaeger_http_url = "http://{}/{}".format(jaeger_http_host, 'api/traces')

def config(self):
return {"ssl_enabled": "false",
"queue_flush": 0,
"jaeger_grpc_enabled": "true",
"jaeger_grpc_host": self.jaeger_grpc_host,
"jaeger_http_enabled": "true",
"jaeger_http_host": self.jaeger_http_host,
"path": os.path.abspath(self.working_dir) + "/log/*"}

def setUp(self):
Expand Down Expand Up @@ -325,15 +333,31 @@ def load_docs_with_template(self, data_path, url, endpoint, expected_events_coun
# Wait to give documents some time to be sent to the index
# This is not required but speeds up the tests
time.sleep(2)
self.es.indices.refresh(index=query_index)
self.wait_for_events(endpoint, expected_events_count, index=query_index)

def wait_for_events(self, processor_name, expected_count, index=None, max_timeout=10):
"""
wait_for_events waits for an expected number of event docs with the given
'processor.name' value, and returns the hits when found.
"""
if index is None:
index = self.index_name_pattern

self.es.indices.refresh(index=index)

query = {"term": {"processor.name": processor_name}}
result = {} # TODO(axw) use "nonlocal" when we migrate to Python 3
def get_docs():
hits = self.es.search(index=index, body={"query": query})['hits']
result['docs'] = hits['hits']
return hits['total']['value'] == expected_count

self.wait_until(
lambda: (self.es.count(index=query_index, body={
"query": {"term": {"processor.name": endpoint}}}
)['count'] == expected_events_count),
get_docs,
max_timeout=max_timeout,
name="{} documents to reach {}".format(endpoint, expected_events_count),
name="{} documents to reach {}".format(processor_name, expected_count),
)
return result['docs']

def check_backend_error_sourcemap(self, index, count=1):
rs = self.es.search(index=index, params={"rest_total_hits_as_int": "true"})
Expand Down Expand Up @@ -366,7 +390,7 @@ def logged_requests(self, url="/intake/v2/events"):
if jline.get("logger") == "request" and u.path == url:
yield jline

def approve_docs(self, base_path, received, doc_type):
def approve_docs(self, base_path, received):
"""
approve_docs compares the received documents to those contained
in the file at ${base_path}.approved.json. If that file does not
Expand All @@ -386,8 +410,22 @@ def approve_docs(self, base_path, received, doc_type):
except IOError:
approved = []

# get_doc_id returns a value suitable for sorting and identifying
# documents: either a unique ID, or a timestamp. This is necessary
# since not all event types require a unique ID (namely, errors do
# not.)
#
# We return (0, doc['error']['id']) when the event type is 'error'
# if that field exists, otherwise returns (1, doc['@timestamp']).
# The first tuple element exists to sort IDs before timestamps.
def get_doc_id(doc):
doc_type = doc['processor']['event']
if 'id' in doc[doc_type]:
return (0, doc[doc_type]['id'])
return (1, doc['@timestamp'])

received = [doc['_source'] for doc in received]
received.sort(key=lambda source: source[doc_type]['id'])
received.sort(key=get_doc_id)

try:
for rec in received:
Expand All @@ -397,20 +435,20 @@ def approve_docs(self, base_path, received, doc_type):
#
# We don't compare the observer values between received/approved,
# as they are dependent on the environment.
rec_id = rec[doc_type]['id']
rec_id = get_doc_id(rec)
rec_observer = rec['observer']
self.assertEqual(sets.Set(rec_observer.keys()), sets.Set(
["hostname", "version", "id", "ephemeral_id", "type", "version_major"]))
assert rec_observer["version"].startswith(str(rec_observer["version_major"]) + ".")
for appr in approved:
if appr[doc_type]['id'] == rec_id:
if get_doc_id(appr) == rec_id:
rec['observer'] = appr['observer']
break
assert len(received) == len(approved)
for i, rec in enumerate(received):
appr = approved[i]
rec_id = rec[doc_type]['id']
assert rec_id == appr[doc_type]['id'], "New entry with id {}".format(rec_id)
rec_id = get_doc_id(rec)
assert rec_id == get_doc_id(appr), "New entry with id {}".format(rec_id)
for k, v in rec.items():
self.assertEqual(v, appr[k])
except Exception as exc:
Expand Down Expand Up @@ -480,13 +518,7 @@ def upload_sourcemap(self, file_name='bundle_no_mapping.js.map',

class ClientSideElasticTest(ClientSideBaseTest, ElasticTest):
def wait_for_sourcemaps(self, expected_ct=1):
idx = self.index_smap
self.wait_until(
lambda: (self.es.count(index=idx, body={
"query": {"term": {"processor.name": 'sourcemap'}}}
)['count'] == expected_ct),
name="{} sourcemaps to ingest".format(expected_ct),
)
self.wait_for_events('sourcemap', expected_ct, index=self.index_smap)

def check_rum_error_sourcemap(self, updated, expected_err=None, count=1):
rs = self.es.search(index=self.index_error, params={"rest_total_hits_as_int": "true"})
Expand Down
4 changes: 4 additions & 0 deletions tests/system/config/apm-server.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ apm-server:
jaeger.http.enabled: {{ jaeger_http_enabled }}
jaeger.http.host: {{ jaeger_http_host }}
{% endif %}
{% if jaeger_grpc_enabled %}
jaeger.grpc.enabled: {{ jaeger_grpc_enabled }}
jaeger.grpc.host: {{ jaeger_grpc_host }}
{% endif %}

{% if api_key_enabled %}
api_key.enabled: {{ api_key_enabled }}
Expand Down
Loading