Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Druid 0.10.1, incomplete #233

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ val jacksonOneVersion = "1.9.13"
// See https:/druid-io/druid/pull/1669, https:/druid-io/tranquility/pull/81 before upgrading Jackson
val jacksonTwoVersion = "2.4.6"
val jacksonTwoModuleScalaVersion = "2.4.5"
val druidVersion = "0.9.2"
val druidVersion = "0.10.1-SNAPSHOT"
val curatorVersion = "2.12.0"
val guiceVersion = "4.0"
val flinkVersion = "1.0.3"
Expand Down Expand Up @@ -48,8 +48,7 @@ val coreDependencies = Seq(
exclude("log4j", "log4j")
exclude("mysql", "mysql-connector-java") // Not needed, unwanted GPLv2 license
force(),
"com.metamx" % "java-util" % "0.28.2" exclude("log4j", "log4j") force(),
"io.netty" % "netty" % "3.10.5.Final" force(),
"io.netty" % "netty" % "3.10.6.Final" force(),
"org.apache.curator" % "curator-client" % curatorVersion force(),
"org.apache.curator" % "curator-framework" % curatorVersion force(),
"org.apache.curator" % "curator-recipes" % curatorVersion force(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
rand.nextInt(tuning.maxSegmentsPerBeam - tuning.minSegmentsPerBeam + 1)
val intervalToCover = new Interval(
timestamp.getMillis,
tuning.segmentGranularity.increment(timestamp, numSegmentsToCover).getMillis,
timestamp.plus(tuning.segmentGranularity.getPeriod.multipliedBy(numSegmentsToCover)).getMillis,
ISOChronology.getInstanceUTC
)
val timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
package com.metamx.tranquility.beam

import com.metamx.common.Granularity
import com.metamx.common.scala.Logging
import io.druid.java.util.common.granularity.{Granularities, PeriodGranularity}
import org.joda.time.{DateTime, Period}

case class ClusteredBeamTuning(
segmentGranularity: Granularity = Granularity.HOUR,
segmentGranularity: PeriodGranularity = Granularities.HOUR.asInstanceOf[PeriodGranularity],
warmingPeriod: Period = new Period(0),
windowPeriod: Period = new Period("PT10M"),
partitions: Int = 1,
Expand Down Expand Up @@ -58,7 +58,7 @@ object ClusteredBeamTuning
*
* Default is Granularity.HOUR.
*/
def segmentGranularity(x: Granularity) = new Builder(config.copy(segmentGranularity = x))
def segmentGranularity(x: PeriodGranularity) = new Builder(config.copy(segmentGranularity = x))

/**
* If nonzero, create sub-beams early. This can be useful if sub-beams take a long time to start up.
Expand Down Expand Up @@ -120,7 +120,7 @@ object ClusteredBeamTuning
*/
@deprecated("use ClusteredBeamTuning.builder()", "0.2.26")
def create(
segmentGranularity: Granularity,
segmentGranularity: PeriodGranularity,
warmingPeriod: Period,
windowPeriod: Period,
partitions: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package com.metamx.tranquility.druid

import com.fasterxml.jackson.databind.ObjectMapper
import com.github.nscala_time.time.Imports._
import com.metamx.common.Granularity
import com.metamx.common.scala.untyped._
import com.metamx.common.scala.Jackson
import com.metamx.common.scala.Logging
Expand All @@ -32,9 +31,12 @@ import com.twitter.util.Await
import com.twitter.util.Future
import io.druid.data.input.impl.TimestampSpec
import java.{util => ju}

import io.druid.java.util.common.granularity.{Granularities, Granularity}
import org.joda.time.chrono.ISOChronology
import org.joda.time.DateTime
import org.joda.time.Interval

import scala.util.Random

class DruidBeamMaker[A](
Expand Down Expand Up @@ -70,10 +72,6 @@ class DruidBeamMaker[A](
}
val taskId = "index_realtime_%s_%s_%s_%s%s" format(dataSource, interval.start, partition, replicant, suffix)
val shutoffTime = interval.end + beamTuning.windowPeriod + config.firehoseGracePeriod
val queryGranularityMap = druidObjectMapper.convertValue(
rollup.indexGranularity,
classOf[ju.Map[String, AnyRef]]
)
val dataSchemaMap = Map(
"dataSource" -> dataSource,
"parser" -> Map(
Expand All @@ -88,7 +86,7 @@ class DruidBeamMaker[A](
"granularitySpec" -> Map(
"type" -> "uniform",
"segmentGranularity" -> beamTuning.segmentGranularity,
"queryGranularity" -> queryGranularityMap,
"queryGranularity" -> rollup.indexGranularity,
"rollup" -> rollup.isRollup
)
)
Expand Down Expand Up @@ -149,8 +147,12 @@ class DruidBeamMaker[A](

override def newBeam(interval: Interval, partition: Int) = {
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
DruidBeamMaker.widen(beamTuning.segmentGranularity, interval) == interval,
"Interval does not match segmentGranularity[%s]: %s != %s" format(
beamTuning.segmentGranularity,
interval,
DruidBeamMaker.widen(beamTuning.segmentGranularity, interval)
)
)
val baseFirehoseId = DruidBeamMaker.generateBaseFirehoseId(
location.dataSource,
Expand Down Expand Up @@ -202,8 +204,12 @@ class DruidBeamMaker[A](
beamTuning.segmentBucket(new DateTime(d("timestamp"), ISOChronology.getInstanceUTC))
}
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
DruidBeamMaker.widen(beamTuning.segmentGranularity, interval) == interval,
"Interval does not match segmentGranularity[%s]: %s != %s" format(
beamTuning.segmentGranularity,
interval,
DruidBeamMaker.widen(beamTuning.segmentGranularity, interval)
)
)
val partition = int(d("partition"))
val tasks = if (d contains "tasks") {
Expand Down Expand Up @@ -244,20 +250,37 @@ object DruidBeamMaker
val tsUtc = new DateTime(ts.getMillis, ISOChronology.getInstanceUTC)

val cycleBucket = segmentGranularity match {
case Granularity.SECOND => (tsUtc.minuteOfHour().get * 60 + tsUtc.secondOfMinute().get) % 900 // 900 buckets
case Granularity.MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 180 buckets
case Granularity.FIVE_MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 36 buckets
case Granularity.TEN_MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 18 buckets
case Granularity.FIFTEEN_MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 12 buckets
case Granularity.HOUR => tsUtc.hourOfDay().get
case Granularity.SIX_HOUR => tsUtc.hourOfDay().get
case Granularity.DAY => tsUtc.dayOfMonth().get
case Granularity.WEEK => tsUtc.weekOfWeekyear().get
case Granularity.MONTH => tsUtc.monthOfYear().get
case Granularity.YEAR => tsUtc.yearOfCentury().get
case Granularities.SECOND => (tsUtc.minuteOfHour().get * 60 + tsUtc.secondOfMinute().get) % 900 // 900 buckets
case Granularities.MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 180 buckets
case Granularities.FIVE_MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 36 buckets
case Granularities.TEN_MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 18 buckets
case Granularities.FIFTEEN_MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 12 buckets
case Granularities.HOUR => tsUtc.hourOfDay().get
case Granularities.SIX_HOUR => tsUtc.hourOfDay().get
case Granularities.DAY => tsUtc.dayOfMonth().get
case Granularities.WEEK => tsUtc.weekOfWeekyear().get
case Granularities.MONTH => tsUtc.monthOfYear().get
case Granularities.YEAR => tsUtc.yearOfCentury().get
case x => throw new IllegalArgumentException("No gross firehose id hack for granularity[%s]" format x)
}

"%s-%03d-%04d".format(dataSource, cycleBucket, partition)
}

def widen(granularity: Granularity, interval: Interval): Interval = {
val start: DateTime = granularity.bucketStart(interval.getStart)
val end: DateTime =
if (interval.getEnd.equals(start)) {
// Empty with aligned start/end; expand into a granularity-sized interval
granularity.increment(start)
} else if (granularity.bucketStart(interval.getEnd).equals(interval.getEnd)) {
// Non-empty with aligned end; keep the same end
interval.getEnd
} else {
// Non-empty with non-aligned end; push it out
granularity.bucketEnd(interval.getEnd)
}

new Interval(start, end)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ import java.nio.ByteBuffer
import java.{lang => jl}
import java.{util => ju}
import javax.ws.rs.core.MediaType

import io.druid.java.util.common.granularity.PeriodGranularity
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry

import scala.collection.JavaConverters._
import scala.language.reflectiveCalls
import scala.reflect.runtime.universe.TypeTag
Expand Down Expand Up @@ -346,7 +349,8 @@ object DruidBeams
.timestampSpec(timestampSpec)
.tuning(
ClusteredBeamTuning(
segmentGranularity = fireDepartment.getDataSchema.getGranularitySpec.getSegmentGranularity,
segmentGranularity =
fireDepartment.getDataSchema.getGranularitySpec.getSegmentGranularity.asInstanceOf[PeriodGranularity],
windowPeriod = fireDepartment.getTuningConfig.getWindowPeriod,
warmingPeriod = config.propertiesBasedConfig.taskWarmingPeriod
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package com.metamx.tranquility.druid

import io.druid.data.input.impl.SpatialDimensionSchema
import io.druid.data.input.impl.TimestampSpec
import io.druid.granularity.QueryGranularity
import io.druid.query.aggregation.AggregatorFactory
import java.{util => ju}

import io.druid.java.util.common.granularity.Granularity

import scala.collection.JavaConverters._

/**
Expand All @@ -35,7 +37,7 @@ import scala.collection.JavaConverters._
class DruidRollup(
val dimensions: DruidDimensions,
val aggregators: IndexedSeq[AggregatorFactory],
val indexGranularity: QueryGranularity,
val indexGranularity: Granularity,
val isRollup: Boolean = true
)
{
Expand Down Expand Up @@ -177,7 +179,7 @@ object DruidRollup
def apply(
dimensions: DruidDimensions,
aggregators: Seq[AggregatorFactory],
indexGranularity: QueryGranularity,
indexGranularity: Granularity,
isRollup: Boolean
) =
{
Expand All @@ -194,7 +196,7 @@ object DruidRollup
def create(
dimensions: DruidDimensions,
aggregators: java.util.List[AggregatorFactory],
indexGranularity: QueryGranularity,
indexGranularity: Granularity,
isRollup: Boolean
): DruidRollup =
{
Expand All @@ -212,7 +214,7 @@ object DruidRollup
def create(
dimensions: java.util.List[String],
aggregators: java.util.List[AggregatorFactory],
indexGranularity: QueryGranularity,
indexGranularity: Granularity,
isRollup: Boolean
): DruidRollup =
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ package com.metamx.tranquility.druid.input
import com.google.common.hash.Hashing
import com.metamx.tranquility.partition.Partitioner
import io.druid.data.input.InputRow
import io.druid.granularity.QueryGranularity
import io.druid.java.util.common.granularity.Granularity
import org.joda.time.DateTime

import scala.collection.JavaConverters._

/**
* Partitioner that partitions Druid InputRows by their truncated timestamp and dimensions.
*/
class InputRowPartitioner(queryGranularity: QueryGranularity) extends Partitioner[InputRow]
class InputRowPartitioner(queryGranularity: Granularity) extends Partitioner[InputRow]
{
override def partition(row: InputRow, numPartitions: Int): Int = {
val partitionHashCode = Partitioner.timeAndDimsHashCode(
queryGranularity.truncate(row.getTimestampFromEpoch),
queryGranularity.bucketStart(new DateTime(row.getTimestampFromEpoch)).getMillis,
row.getDimensions.asScala.view map { dim =>
dim -> row.getRaw(dim)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class MapPartitioner[A](
}

val partitionHashCode = if (dimensions.nonEmpty) {
val truncatedTimestamp = rollup.indexGranularity.truncate(timestamper.timestamp(thing).getMillis)
val truncatedTimestamp = rollup.indexGranularity.bucketStart(timestamper.timestamp(thing)).getMillis
Partitioner.timeAndDimsHashCode(truncatedTimestamp, dimensions)
} else {
thing.hashCode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package com.metamx.tranquility.javatests;

import java.util.List;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.tranquility.druid.DruidBeamConfig;
Expand All @@ -28,15 +30,13 @@
import com.metamx.tranquility.druid.SchemalessDruidDimensions;
import com.metamx.tranquility.druid.SpecificDruidDimensions;
import com.metamx.tranquility.finagle.FinagleRegistryConfig;
import io.druid.granularity.QueryGranularities;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;

public class JavaApiTest
{
private static final List<String> dimensions = ImmutableList.of("column");
Expand All @@ -52,7 +52,7 @@ public void testSpecificDimensionsRollupConfiguration() throws Exception
final DruidRollup rollup = DruidRollup.create(
DruidDimensions.specific(dimensions),
aggregators,
QueryGranularities.MINUTE,
Granularities.MINUTE,
true
);
Assert.assertTrue(rollup.dimensions() instanceof SpecificDruidDimensions);
Expand All @@ -65,7 +65,7 @@ public void testSchemalessDimensionsRollupConfiguration() throws Exception
final DruidRollup rollup = DruidRollup.create(
DruidDimensions.schemaless(),
aggregators,
QueryGranularities.MINUTE,
Granularities.MINUTE,
true
);
Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions);
Expand All @@ -78,7 +78,7 @@ public void testSchemalessDimensionsWithExclusionsRollupConfiguration() throws E
final DruidRollup rollup = DruidRollup.create(
DruidDimensions.schemalessWithExclusions(dimensions),
aggregators,
QueryGranularities.MINUTE,
Granularities.MINUTE,
true
);
Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions);
Expand All @@ -99,7 +99,7 @@ public void testSchemalessDimensionsWithExclusionsAndSpatialDimensionsRollupConf
)
),
aggregators,
QueryGranularities.MINUTE,
Granularities.MINUTE,
true
);
Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import io.druid.data.input.impl.DimensionsSpec
import io.druid.data.input.impl.MapInputRowParser
import io.druid.data.input.impl.TimeAndDimsParseSpec
import io.druid.data.input.impl.TimestampSpec
import io.druid.granularity.QueryGranularities
import io.druid.java.util.common.granularity.Granularities
import org.joda.time.DateTime
import org.scalatest.FunSuite
import org.scalatest.ShouldMatchers

import scala.collection.JavaConverters._

class InputRowPartitionerTest extends FunSuite with ShouldMatchers
Expand All @@ -51,7 +52,7 @@ class InputRowPartitionerTest extends FunSuite with ShouldMatchers
)
)

val partitioner = new InputRowPartitioner(QueryGranularities.MINUTE)
val partitioner = new InputRowPartitioner(Granularities.MINUTE)

val same = Seq(
Dict("t" -> new DateTime("2000T00:00:03"), "foo" -> 1, "bar" -> Seq("y", "z")),
Expand Down
Loading