Skip to content

Commit

Permalink
Add support for querying HLLSketch types (Netflix-Skunkworks#454)
Browse files Browse the repository at this point in the history
These represent approximate distinct counts. We can treat them as
longsum/counters. Usage will be nuanced, we'll probably need to
ensure callers are performing per-step queries for the results to make
sense.
  • Loading branch information
bsyk authored and manolama committed Oct 25, 2023
1 parent 80c0ab6 commit fa37f5a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,12 @@ object DruidClient {

case class Metric(name: String, dataType: String = "DOUBLE") {

def isCounter: Boolean = dataType == "DOUBLE" || dataType == "FLOAT" || dataType == "LONG"
def isSketch: Boolean = {
dataType == "HLLSketch"
}

def isCounter: Boolean =
dataType == "DOUBLE" || dataType == "FLOAT" || dataType == "LONG" || isSketch

def isTimer: Boolean = {
dataType == "spectatorHistogramTimer"
Expand Down Expand Up @@ -512,6 +517,7 @@ object DruidClient {
def sum(fieldName: String): Aggregation = Aggregation("doubleSum", fieldName)
def min(fieldName: String): Aggregation = Aggregation("doubleMin", fieldName)
def max(fieldName: String): Aggregation = Aggregation("doubleMax", fieldName)
def distinct(fieldName: String): Aggregation = Aggregation("HLLSketchMerge", fieldName)
def timer(fieldName: String): Aggregation = Aggregation("timer", fieldName)
def distSummary(fieldName: String): Aggregation = Aggregation("dist-summary", fieldName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ object DruidDatabaseActor {
case Histogram(_) if metric.isDistSummary => Aggregation.distSummary(metric.name)
case DataExpr.GroupBy(e, _) => toAggregation(metric, e)
case DataExpr.Consolidation(e, _) => toAggregation(metric, e)
case _ if metric.isSketch => Aggregation.distinct(metric.name)
case _: DataExpr.Sum => Aggregation.sum(metric.name)
case _: DataExpr.Max => Aggregation.max(metric.name)
case _: DataExpr.Min => Aggregation.min(metric.name)
Expand Down Expand Up @@ -563,7 +564,8 @@ object DruidDatabaseActor {
// performance. This filter cannot be used with the spectatorHistogram type in druid.
val metricValueFilter = Query.Not(Query.Equal(name, "0")).and(Query.HasKey(name))
val finalQueryWithFilter =
if (m.metric.isHistogram) finalQuery else finalQuery.and(metricValueFilter)
if (m.metric.isHistogram || m.metric.isSketch) finalQuery
else finalQuery.and(metricValueFilter)

val groupByQuery = GroupByQuery(
dataSource = datasource,
Expand Down
11 changes: 11 additions & 0 deletions atlas-druid/src/test/resources/segmentMetadataResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@
"minValue": null,
"maxValue": null,
"errorMessage": null
},
"test.metric.hllsketch": {
"typeSignature": "COMPLEX<HLLSketch>",
"type": "HLLSketch",
"hasMultipleValues": false,
"hasNulls": true,
"size": 0,
"cardinality": null,
"minValue": null,
"maxValue": null,
"errorMessage": null
}
},
"size": 158732338,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ class DruidClientSuite extends FunSuite {
"test.dim.2",
"test.metric.histogram.dist.1",
"test.metric.histogram.dist.2",
"test.metric.histogram.timer"
"test.metric.histogram.timer",
"test.metric.hllsketch"
)
assertEquals(columns.keySet, expected)
}
Expand All @@ -165,6 +166,7 @@ class DruidClientSuite extends FunSuite {
assertEquals(columns("test.metric.histogram.dist.1").`type`, "spectatorHistogram")
assertEquals(columns("test.metric.histogram.dist.2").`type`, "spectatorHistogramDistribution")
assertEquals(columns("test.metric.histogram.timer").`type`, "spectatorHistogramTimer")
assertEquals(columns("test.metric.hllsketch").`type`, "HLLSketch")
assertEquals(columns("test.dim.1").`type`, "STRING")
assertEquals(columns("test.dim.1").`type`, "STRING")
}
Expand All @@ -177,6 +179,7 @@ class DruidClientSuite extends FunSuite {
case "test.metric.histogram.dist.1" => assert(m.isDistSummary)
case "test.metric.histogram.dist.2" => assert(m.isDistSummary)
case "test.metric.histogram.timer" => assert(m.isTimer)
case "test.metric.hllsketch" => assert(m.isSketch)
case name => throw new MatchError(name)
}
}
Expand Down Expand Up @@ -243,6 +246,13 @@ class DruidClientSuite extends FunSuite {
assert(!json.contains("aggrType"))
}

test("aggregation encode, distinct type") {
val aggr = Aggregation.distinct("foo")
val json = Json.encode(aggr)
assert(json.contains("HLLSketchMerge"))
assert(!json.contains("aggrType"))
}

test("aggregation encode, doubleSum type") {
val aggr = Aggregation.sum("foo")
val json = Json.encode(aggr)
Expand Down

0 comments on commit fa37f5a

Please sign in to comment.