Skip to content

Commit

Permalink
[ML] Split up data visualizer model, remove Logger
Browse files Browse the repository at this point in the history
  • Loading branch information
qn895 committed Jun 7, 2021
1 parent aaf142c commit 403a8e3
Show file tree
Hide file tree
Showing 13 changed files with 1,337 additions and 1,046 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { estypes } from '@elastic/elasticsearch';
import { get } from 'lodash';
import { IScopedClusterClient } from 'kibana/server';
import { AggCardinality, Aggs, FieldData } from '../../types';
import {
buildBaseFilterCriteria,
buildSamplerAggregation,
getSafeAggregationName,
getSamplerAggregationsResponsePath,
} from '../../../common/utils/query_utils';
import { getDatafeedAggregations } from '../../../common/utils/datafeed_utils';
import { isPopulatedObject } from '../../../common/utils/object_utils';

export const checkAggregatableFieldsExist = async (
client: IScopedClusterClient,
indexPatternTitle: string,
query: any,
aggregatableFields: string[],
samplerShardSize: number,
timeFieldName: string | undefined,
earliestMs?: number,
latestMs?: number,
datafeedConfig?: estypes.Datafeed,
runtimeMappings?: estypes.RuntimeFields
) => {
const { asCurrentUser } = client;

const index = indexPatternTitle;
const size = 0;
const filterCriteria = buildBaseFilterCriteria(timeFieldName, earliestMs, latestMs, query);
const datafeedAggregations = getDatafeedAggregations(datafeedConfig);

// Value count aggregation faster way of checking if field exists than using
// filter aggregation with exists query.
const aggs: Aggs = datafeedAggregations !== undefined ? { ...datafeedAggregations } : {};

// Combine runtime fields from the index pattern as well as the datafeed
const combinedRuntimeMappings: estypes.RuntimeFields = {
...(isPopulatedObject(runtimeMappings) ? runtimeMappings : {}),
...(isPopulatedObject(datafeedConfig) && isPopulatedObject(datafeedConfig.runtime_mappings)
? datafeedConfig.runtime_mappings
: {}),
};

aggregatableFields.forEach((field, i) => {
const safeFieldName = getSafeAggregationName(field, i);
aggs[`${safeFieldName}_count`] = {
filter: { exists: { field } },
};

let cardinalityField: AggCardinality;
if (datafeedConfig?.script_fields?.hasOwnProperty(field)) {
cardinalityField = aggs[`${safeFieldName}_cardinality`] = {
cardinality: { script: datafeedConfig?.script_fields[field].script },
};
} else {
cardinalityField = {
cardinality: { field },
};
}
aggs[`${safeFieldName}_cardinality`] = cardinalityField;
});

const searchBody = {
query: {
bool: {
filter: filterCriteria,
},
},
...(isPopulatedObject(aggs) ? { aggs: buildSamplerAggregation(aggs, samplerShardSize) } : {}),
...(isPopulatedObject(combinedRuntimeMappings)
? { runtime_mappings: combinedRuntimeMappings }
: {}),
};

const { body } = await asCurrentUser.search({
index,
track_total_hits: true,
size,
body: searchBody,
});

const aggregations = body.aggregations;
// @ts-expect-error incorrect search response type
const totalCount = body.hits.total.value;
const stats = {
totalCount,
aggregatableExistsFields: [] as FieldData[],
aggregatableNotExistsFields: [] as FieldData[],
};

const aggsPath = getSamplerAggregationsResponsePath(samplerShardSize);
const sampleCount =
samplerShardSize > 0 ? get(aggregations, ['sample', 'doc_count'], 0) : totalCount;
aggregatableFields.forEach((field, i) => {
const safeFieldName = getSafeAggregationName(field, i);
const count = get(aggregations, [...aggsPath, `${safeFieldName}_count`, 'doc_count'], 0);
if (count > 0) {
const cardinality = get(
aggregations,
[...aggsPath, `${safeFieldName}_cardinality`, 'value'],
0
);
stats.aggregatableExistsFields.push({
fieldName: field,
existsInDocs: true,
stats: {
sampleCount,
count,
cardinality,
},
});
} else {
if (
datafeedConfig?.script_fields?.hasOwnProperty(field) ||
datafeedConfig?.runtime_mappings?.hasOwnProperty(field)
) {
const cardinality = get(
aggregations,
[...aggsPath, `${safeFieldName}_cardinality`, 'value'],
0
);
stats.aggregatableExistsFields.push({
fieldName: field,
existsInDocs: true,
stats: {
sampleCount,
count,
cardinality,
},
});
} else {
stats.aggregatableNotExistsFields.push({
fieldName: field,
existsInDocs: false,
});
}
}
});

return stats;
};

export const checkNonAggregatableFieldExists = async (
client: IScopedClusterClient,
indexPatternTitle: string,
query: any,
field: string,
timeFieldName: string | undefined,
earliestMs: number | undefined,
latestMs: number | undefined,
runtimeMappings?: estypes.RuntimeFields
) => {
const { asCurrentUser } = client;
const index = indexPatternTitle;
const size = 0;
const filterCriteria = buildBaseFilterCriteria(timeFieldName, earliestMs, latestMs, query);

const searchBody = {
query: {
bool: {
filter: filterCriteria,
},
},
...(isPopulatedObject(runtimeMappings) ? { runtime_mappings: runtimeMappings } : {}),
};
filterCriteria.push({ exists: { field } });

const { body } = await asCurrentUser.search({
index,
size,
body: searchBody,
});
// @ts-expect-error incorrect search response type
return body.hits.total.value > 0;
};
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
* 2.0.
*/

import { SecurityPluginStart } from '../..//security/server';
export const SAMPLER_TOP_TERMS_THRESHOLD = 100000;
export const SAMPLER_TOP_TERMS_SHARD_SIZE = 5000;
export const AGGREGATABLE_EXISTS_REQUEST_BATCH_SIZE = 200;
export const FIELDS_REQUEST_BATCH_SIZE = 10;

export interface StartDeps {
security?: SecurityPluginStart;
}
export const MAX_CHART_COLUMNS = 20;
Loading

0 comments on commit 403a8e3

Please sign in to comment.