diff --git a/build.sbt b/build.sbt index 521f729..c33ebb6 100644 --- a/build.sbt +++ b/build.sbt @@ -12,7 +12,7 @@ val jacksonOneVersion = "1.9.13" // See https://github.com/druid-io/druid/pull/1669, https://github.com/druid-io/tranquility/pull/81 before upgrading Jackson val jacksonTwoVersion = "2.4.6" val jacksonTwoModuleScalaVersion = "2.4.5" -val druidVersion = "0.9.2" +val druidVersion = "0.10.1-SNAPSHOT" val curatorVersion = "2.12.0" val guiceVersion = "4.0" val flinkVersion = "1.0.3" @@ -48,8 +48,7 @@ val coreDependencies = Seq( exclude("log4j", "log4j") exclude("mysql", "mysql-connector-java") // Not needed, unwanted GPLv2 license force(), - "com.metamx" % "java-util" % "0.28.2" exclude("log4j", "log4j") force(), - "io.netty" % "netty" % "3.10.5.Final" force(), + "io.netty" % "netty" % "3.10.6.Final" force(), "org.apache.curator" % "curator-client" % curatorVersion force(), "org.apache.curator" % "curator-framework" % curatorVersion force(), "org.apache.curator" % "curator-recipes" % curatorVersion force(), diff --git a/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala index 6597b6b..425d6db 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala @@ -233,7 +233,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( rand.nextInt(tuning.maxSegmentsPerBeam - tuning.minSegmentsPerBeam + 1) val intervalToCover = new Interval( timestamp.getMillis, - tuning.segmentGranularity.increment(timestamp, numSegmentsToCover).getMillis, + timestamp.plus(tuning.segmentGranularity.getPeriod.multipliedBy(numSegmentsToCover)).getMillis, ISOChronology.getInstanceUTC ) val timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start) diff --git a/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeamTuning.scala b/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeamTuning.scala index e54e0f1..78b1d04 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeamTuning.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeamTuning.scala @@ -18,12 +18,12 @@ */ package com.metamx.tranquility.beam -import com.metamx.common.Granularity import com.metamx.common.scala.Logging +import io.druid.java.util.common.granularity.{Granularities, PeriodGranularity} import org.joda.time.{DateTime, Period} case class ClusteredBeamTuning( - segmentGranularity: Granularity = Granularity.HOUR, + segmentGranularity: PeriodGranularity = Granularities.HOUR.asInstanceOf[PeriodGranularity], warmingPeriod: Period = new Period(0), windowPeriod: Period = new Period("PT10M"), partitions: Int = 1, @@ -58,7 +58,7 @@ object ClusteredBeamTuning * * Default is Granularity.HOUR. */ - def segmentGranularity(x: Granularity) = new Builder(config.copy(segmentGranularity = x)) + def segmentGranularity(x: PeriodGranularity) = new Builder(config.copy(segmentGranularity = x)) /** * If nonzero, create sub-beams early. This can be useful if sub-beams take a long time to start up. @@ -120,7 +120,7 @@ object ClusteredBeamTuning */ @deprecated("use ClusteredBeamTuning.builder()", "0.2.26") def create( - segmentGranularity: Granularity, + segmentGranularity: PeriodGranularity, warmingPeriod: Period, windowPeriod: Period, partitions: Int, diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala index 60243cd..ecb32ce 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala @@ -20,7 +20,6 @@ package com.metamx.tranquility.druid import com.fasterxml.jackson.databind.ObjectMapper import com.github.nscala_time.time.Imports._ -import com.metamx.common.Granularity import com.metamx.common.scala.untyped._ import com.metamx.common.scala.Jackson import com.metamx.common.scala.Logging @@ -32,9 +31,12 @@ import com.twitter.util.Await import com.twitter.util.Future import io.druid.data.input.impl.TimestampSpec import java.{util => ju} + +import io.druid.java.util.common.granularity.{Granularities, Granularity} import org.joda.time.chrono.ISOChronology import org.joda.time.DateTime import org.joda.time.Interval + import scala.util.Random class DruidBeamMaker[A]( @@ -70,10 +72,6 @@ class DruidBeamMaker[A]( } val taskId = "index_realtime_%s_%s_%s_%s%s" format(dataSource, interval.start, partition, replicant, suffix) val shutoffTime = interval.end + beamTuning.windowPeriod + config.firehoseGracePeriod - val queryGranularityMap = druidObjectMapper.convertValue( - rollup.indexGranularity, - classOf[ju.Map[String, AnyRef]] - ) val dataSchemaMap = Map( "dataSource" -> dataSource, "parser" -> Map( @@ -88,7 +86,7 @@ class DruidBeamMaker[A]( "granularitySpec" -> Map( "type" -> "uniform", "segmentGranularity" -> beamTuning.segmentGranularity, - "queryGranularity" -> queryGranularityMap, + "queryGranularity" -> rollup.indexGranularity, "rollup" -> rollup.isRollup ) ) @@ -149,8 +147,12 @@ class DruidBeamMaker[A]( override def newBeam(interval: Interval, partition: Int) = { require( - beamTuning.segmentGranularity.widen(interval) == interval, - "Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval) + DruidBeamMaker.widen(beamTuning.segmentGranularity, interval) == interval, + "Interval does not match segmentGranularity[%s]: %s != %s" format( + beamTuning.segmentGranularity, + interval, + DruidBeamMaker.widen(beamTuning.segmentGranularity, interval) + ) ) val baseFirehoseId = DruidBeamMaker.generateBaseFirehoseId( location.dataSource, @@ -202,8 +204,12 @@ class DruidBeamMaker[A]( beamTuning.segmentBucket(new DateTime(d("timestamp"), ISOChronology.getInstanceUTC)) } require( - beamTuning.segmentGranularity.widen(interval) == interval, - "Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval) + DruidBeamMaker.widen(beamTuning.segmentGranularity, interval) == interval, + "Interval does not match segmentGranularity[%s]: %s != %s" format( + beamTuning.segmentGranularity, + interval, + DruidBeamMaker.widen(beamTuning.segmentGranularity, interval) + ) ) val partition = int(d("partition")) val tasks = if (d contains "tasks") { @@ -244,20 +250,37 @@ object DruidBeamMaker val tsUtc = new DateTime(ts.getMillis, ISOChronology.getInstanceUTC) val cycleBucket = segmentGranularity match { - case Granularity.SECOND => (tsUtc.minuteOfHour().get * 60 + tsUtc.secondOfMinute().get) % 900 // 900 buckets - case Granularity.MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 180 buckets - case Granularity.FIVE_MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 36 buckets - case Granularity.TEN_MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 18 buckets - case Granularity.FIFTEEN_MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 12 buckets - case Granularity.HOUR => tsUtc.hourOfDay().get - case Granularity.SIX_HOUR => tsUtc.hourOfDay().get - case Granularity.DAY => tsUtc.dayOfMonth().get - case Granularity.WEEK => tsUtc.weekOfWeekyear().get - case Granularity.MONTH => tsUtc.monthOfYear().get - case Granularity.YEAR => tsUtc.yearOfCentury().get + case Granularities.SECOND => (tsUtc.minuteOfHour().get * 60 + tsUtc.secondOfMinute().get) % 900 // 900 buckets + case Granularities.MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 180 buckets + case Granularities.FIVE_MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 36 buckets + case Granularities.TEN_MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 18 buckets + case Granularities.FIFTEEN_MINUTE => tsUtc.hourOfDay().get % 3 * 60 + tsUtc.minuteOfHour().get // 12 buckets + case Granularities.HOUR => tsUtc.hourOfDay().get + case Granularities.SIX_HOUR => tsUtc.hourOfDay().get + case Granularities.DAY => tsUtc.dayOfMonth().get + case Granularities.WEEK => tsUtc.weekOfWeekyear().get + case Granularities.MONTH => tsUtc.monthOfYear().get + case Granularities.YEAR => tsUtc.yearOfCentury().get case x => throw new IllegalArgumentException("No gross firehose id hack for granularity[%s]" format x) } "%s-%03d-%04d".format(dataSource, cycleBucket, partition) } + + def widen(granularity: Granularity, interval: Interval): Interval = { + val start: DateTime = granularity.bucketStart(interval.getStart) + val end: DateTime = + if (interval.getEnd.equals(start)) { + // Empty with aligned start/end; expand into a granularity-sized interval + granularity.increment(start) + } else if (granularity.bucketStart(interval.getEnd).equals(interval.getEnd)) { + // Non-empty with aligned end; keep the same end + interval.getEnd + } else { + // Non-empty with non-aligned end; push it out + granularity.bucketEnd(interval.getEnd) + } + + new Interval(start, end) + } } diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala index dc9eacd..6b7a76c 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala @@ -60,9 +60,12 @@ import java.nio.ByteBuffer import java.{lang => jl} import java.{util => ju} import javax.ws.rs.core.MediaType + +import io.druid.java.util.common.granularity.PeriodGranularity import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.retry.ExponentialBackoffRetry + import scala.collection.JavaConverters._ import scala.language.reflectiveCalls import scala.reflect.runtime.universe.TypeTag @@ -346,7 +349,8 @@ object DruidBeams .timestampSpec(timestampSpec) .tuning( ClusteredBeamTuning( - segmentGranularity = fireDepartment.getDataSchema.getGranularitySpec.getSegmentGranularity, + segmentGranularity = + fireDepartment.getDataSchema.getGranularitySpec.getSegmentGranularity.asInstanceOf[PeriodGranularity], windowPeriod = fireDepartment.getTuningConfig.getWindowPeriod, warmingPeriod = config.propertiesBasedConfig.taskWarmingPeriod ) diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidRollup.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidRollup.scala index ad61273..a8b25e5 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidRollup.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidRollup.scala @@ -20,9 +20,11 @@ package com.metamx.tranquility.druid import io.druid.data.input.impl.SpatialDimensionSchema import io.druid.data.input.impl.TimestampSpec -import io.druid.granularity.QueryGranularity import io.druid.query.aggregation.AggregatorFactory import java.{util => ju} + +import io.druid.java.util.common.granularity.Granularity + import scala.collection.JavaConverters._ /** @@ -35,7 +37,7 @@ import scala.collection.JavaConverters._ class DruidRollup( val dimensions: DruidDimensions, val aggregators: IndexedSeq[AggregatorFactory], - val indexGranularity: QueryGranularity, + val indexGranularity: Granularity, val isRollup: Boolean = true ) { @@ -177,7 +179,7 @@ object DruidRollup def apply( dimensions: DruidDimensions, aggregators: Seq[AggregatorFactory], - indexGranularity: QueryGranularity, + indexGranularity: Granularity, isRollup: Boolean ) = { @@ -194,7 +196,7 @@ object DruidRollup def create( dimensions: DruidDimensions, aggregators: java.util.List[AggregatorFactory], - indexGranularity: QueryGranularity, + indexGranularity: Granularity, isRollup: Boolean ): DruidRollup = { @@ -212,7 +214,7 @@ object DruidRollup def create( dimensions: java.util.List[String], aggregators: java.util.List[AggregatorFactory], - indexGranularity: QueryGranularity, + indexGranularity: Granularity, isRollup: Boolean ): DruidRollup = { diff --git a/core/src/main/scala/com/metamx/tranquility/druid/input/InputRowPartitioner.scala b/core/src/main/scala/com/metamx/tranquility/druid/input/InputRowPartitioner.scala index 1d84bae..a86acb5 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/input/InputRowPartitioner.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/input/InputRowPartitioner.scala @@ -22,17 +22,19 @@ package com.metamx.tranquility.druid.input import com.google.common.hash.Hashing import com.metamx.tranquility.partition.Partitioner import io.druid.data.input.InputRow -import io.druid.granularity.QueryGranularity +import io.druid.java.util.common.granularity.Granularity +import org.joda.time.DateTime + import scala.collection.JavaConverters._ /** * Partitioner that partitions Druid InputRows by their truncated timestamp and dimensions. */ -class InputRowPartitioner(queryGranularity: QueryGranularity) extends Partitioner[InputRow] +class InputRowPartitioner(queryGranularity: Granularity) extends Partitioner[InputRow] { override def partition(row: InputRow, numPartitions: Int): Int = { val partitionHashCode = Partitioner.timeAndDimsHashCode( - queryGranularity.truncate(row.getTimestampFromEpoch), + queryGranularity.bucketStart(new DateTime(row.getTimestampFromEpoch)).getMillis, row.getDimensions.asScala.view map { dim => dim -> row.getRaw(dim) } diff --git a/core/src/main/scala/com/metamx/tranquility/partition/MapPartitioner.scala b/core/src/main/scala/com/metamx/tranquility/partition/MapPartitioner.scala index 596da8e..323c408 100644 --- a/core/src/main/scala/com/metamx/tranquility/partition/MapPartitioner.scala +++ b/core/src/main/scala/com/metamx/tranquility/partition/MapPartitioner.scala @@ -81,7 +81,7 @@ class MapPartitioner[A]( } val partitionHashCode = if (dimensions.nonEmpty) { - val truncatedTimestamp = rollup.indexGranularity.truncate(timestamper.timestamp(thing).getMillis) + val truncatedTimestamp = rollup.indexGranularity.bucketStart(timestamper.timestamp(thing)).getMillis Partitioner.timeAndDimsHashCode(truncatedTimestamp, dimensions) } else { thing.hashCode() diff --git a/core/src/test/java/com/metamx/tranquility/javatests/JavaApiTest.java b/core/src/test/java/com/metamx/tranquility/javatests/JavaApiTest.java index 4aec2ea..9df96f0 100644 --- a/core/src/test/java/com/metamx/tranquility/javatests/JavaApiTest.java +++ b/core/src/test/java/com/metamx/tranquility/javatests/JavaApiTest.java @@ -19,6 +19,8 @@ package com.metamx.tranquility.javatests; +import java.util.List; + import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.metamx.tranquility.druid.DruidBeamConfig; @@ -28,15 +30,13 @@ import com.metamx.tranquility.druid.SchemalessDruidDimensions; import com.metamx.tranquility.druid.SpecificDruidDimensions; import com.metamx.tranquility.finagle.FinagleRegistryConfig; -import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.granularity.Granularities; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; -import java.util.List; - public class JavaApiTest { private static final List dimensions = ImmutableList.of("column"); @@ -52,7 +52,7 @@ public void testSpecificDimensionsRollupConfiguration() throws Exception final DruidRollup rollup = DruidRollup.create( DruidDimensions.specific(dimensions), aggregators, - QueryGranularities.MINUTE, + Granularities.MINUTE, true ); Assert.assertTrue(rollup.dimensions() instanceof SpecificDruidDimensions); @@ -65,7 +65,7 @@ public void testSchemalessDimensionsRollupConfiguration() throws Exception final DruidRollup rollup = DruidRollup.create( DruidDimensions.schemaless(), aggregators, - QueryGranularities.MINUTE, + Granularities.MINUTE, true ); Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions); @@ -78,7 +78,7 @@ public void testSchemalessDimensionsWithExclusionsRollupConfiguration() throws E final DruidRollup rollup = DruidRollup.create( DruidDimensions.schemalessWithExclusions(dimensions), aggregators, - QueryGranularities.MINUTE, + Granularities.MINUTE, true ); Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions); @@ -99,7 +99,7 @@ public void testSchemalessDimensionsWithExclusionsAndSpatialDimensionsRollupConf ) ), aggregators, - QueryGranularities.MINUTE, + Granularities.MINUTE, true ); Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions); diff --git a/core/src/test/scala/com/metamx/tranquility/druid/input/InputRowPartitionerTest.scala b/core/src/test/scala/com/metamx/tranquility/druid/input/InputRowPartitionerTest.scala index dbdb9cb..222f0a5 100644 --- a/core/src/test/scala/com/metamx/tranquility/druid/input/InputRowPartitionerTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/druid/input/InputRowPartitionerTest.scala @@ -25,10 +25,11 @@ import io.druid.data.input.impl.DimensionsSpec import io.druid.data.input.impl.MapInputRowParser import io.druid.data.input.impl.TimeAndDimsParseSpec import io.druid.data.input.impl.TimestampSpec -import io.druid.granularity.QueryGranularities +import io.druid.java.util.common.granularity.Granularities import org.joda.time.DateTime import org.scalatest.FunSuite import org.scalatest.ShouldMatchers + import scala.collection.JavaConverters._ class InputRowPartitionerTest extends FunSuite with ShouldMatchers @@ -51,7 +52,7 @@ class InputRowPartitionerTest extends FunSuite with ShouldMatchers ) ) - val partitioner = new InputRowPartitioner(QueryGranularities.MINUTE) + val partitioner = new InputRowPartitioner(Granularities.MINUTE) val same = Seq( Dict("t" -> new DateTime("2000T00:00:03"), "foo" -> 1, "bar" -> Seq("y", "z")), diff --git a/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala b/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala index af754ea..9a1be15 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala @@ -19,11 +19,12 @@ package com.metamx.tranquility.test +import java.util.UUID + import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.github.nscala_time.time.Imports._ import com.google.common.base.Charsets -import com.metamx.common.Granularity import com.metamx.common.logger.Logger import com.metamx.common.scala.Jackson import com.metamx.common.scala.Predef._ @@ -34,17 +35,12 @@ import com.metamx.emitter.service.ServiceEmitter import com.metamx.tranquility.beam._ import com.metamx.tranquility.test.common.CuratorRequiringSuite import com.metamx.tranquility.typeclass.Timestamper -import com.twitter.util.Await -import com.twitter.util.Future -import com.twitter.util.Promise -import java.util.UUID +import com.twitter.util.{Await, Future, Promise} +import io.druid.java.util.common.granularity.{Granularities, PeriodGranularity} import org.apache.curator.framework.CuratorFramework -import org.joda.time.DateTime -import org.joda.time.DateTimeZone -import org.joda.time.Interval -import org.scalatest.BeforeAndAfter -import org.scalatest.FunSuite -import org.scalatest.Matchers +import org.joda.time.{DateTime, DateTimeZone, Interval} +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} + import scala.collection.immutable.BitSet import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -200,7 +196,7 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA } val defaultTuning = ClusteredBeamTuning( - segmentGranularity = Granularity.HOUR, + segmentGranularity = Granularities.HOUR.asInstanceOf[PeriodGranularity], warmingPeriod = 0.minutes, windowPeriod = 10.minutes, partitions = 2, diff --git a/core/src/test/scala/com/metamx/tranquility/test/DirectDruidTest.scala b/core/src/test/scala/com/metamx/tranquility/test/DirectDruidTest.scala index 33553c6..b2be68b 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/DirectDruidTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/DirectDruidTest.scala @@ -19,43 +19,36 @@ package com.metamx.tranquility.test +import java.io.ByteArrayInputStream +import java.nio.ByteBuffer +import java.{util => ju} +import javax.ws.rs.core.MediaType + import _root_.io.druid.data.input.impl.TimestampSpec -import _root_.io.druid.granularity.QueryGranularities import _root_.io.druid.query.aggregation.LongSumAggregatorFactory -import _root_.scala.collection.JavaConverters._ -import _root_.scala.reflect.runtime.universe.typeTag import com.github.nscala_time.time.Imports._ import com.google.common.base.Charsets import com.google.common.io.ByteStreams -import com.metamx.common.parsers.ParseException -import com.metamx.common.scala.timekeeper.TestingTimekeeper -import com.metamx.common.scala.timekeeper.Timekeeper -import com.metamx.common.scala.Jackson -import com.metamx.common.scala.Logging -import com.metamx.common.Granularity import com.metamx.common.ISE -import com.metamx.tranquility.beam.ClusteredBeamTuning -import com.metamx.tranquility.beam.RoundRobinBeam -import com.metamx.tranquility.config.DataSourceConfig -import com.metamx.tranquility.config.PropertiesBasedConfig -import com.metamx.tranquility.config.TranquilityConfig +import com.metamx.common.parsers.ParseException +import com.metamx.common.scala.{Jackson, Logging} +import com.metamx.common.scala.timekeeper.{TestingTimekeeper, Timekeeper} +import com.metamx.tranquility.beam.{ClusteredBeamTuning, RoundRobinBeam} +import com.metamx.tranquility.config.{DataSourceConfig, PropertiesBasedConfig, TranquilityConfig} import com.metamx.tranquility.druid._ import com.metamx.tranquility.test.DirectDruidTest._ import com.metamx.tranquility.test.common._ -import com.metamx.tranquility.tranquilizer.MessageDroppedException -import com.metamx.tranquility.tranquilizer.Tranquilizer -import com.metamx.tranquility.typeclass.DefaultJsonWriter -import com.metamx.tranquility.typeclass.JavaObjectWriter -import com.metamx.tranquility.typeclass.Timestamper +import com.metamx.tranquility.tranquilizer.{MessageDroppedException, Tranquilizer} +import com.metamx.tranquility.typeclass.{DefaultJsonWriter, JavaObjectWriter, Timestamper} import com.twitter.util._ -import java.io.ByteArrayInputStream -import java.nio.ByteBuffer -import java.{util => ju} -import javax.ws.rs.core.MediaType +import io.druid.java.util.common.granularity.{Granularities, PeriodGranularity} import org.apache.curator.framework.CuratorFramework import org.joda.time.DateTime import org.scalatest.FunSuite +import _root_.scala.collection.JavaConverters._ +import _root_.scala.reflect.runtime.universe.typeTag + object DirectDruidTest { val TimeColumn = "ts" @@ -78,14 +71,14 @@ object DirectDruidTest def newBuilder(curator: CuratorFramework, timekeeper: Timekeeper): DruidBeams.Builder[SimpleEvent, SimpleEvent] = { val dataSource = "xxx" - val tuning = ClusteredBeamTuning(Granularity.HOUR, 0.minutes, 10.minutes, 1, 1, 1, 1) + val tuning = ClusteredBeamTuning(Granularities.HOUR.asInstanceOf[PeriodGranularity], 0.minutes, 10.minutes, 1, 1, 1, 1) val rollup = DruidRollup( SpecificDruidDimensions( Vector("foo"), Vector(MultipleFieldDruidSpatialDimension("coord.geo", Seq("lat", "lon"))) ), IndexedSeq(new LongSumAggregatorFactory("barr", "bar")), - QueryGranularities.MINUTE, + Granularities.MINUTE, true ) val druidEnvironment = new DruidEnvironment( diff --git a/core/src/test/scala/com/metamx/tranquility/test/DruidBeamTest.scala b/core/src/test/scala/com/metamx/tranquility/test/DruidBeamTest.scala index 5f0a9ba..3ffa6c9 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/DruidBeamTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/DruidBeamTest.scala @@ -20,32 +20,25 @@ package com.metamx.tranquility.test import _root_.io.druid.data.input.impl.TimestampSpec -import _root_.io.druid.granularity.QueryGranularities -import _root_.io.druid.indexing.common.task.RealtimeIndexTask -import _root_.io.druid.indexing.common.task.Task +import _root_.io.druid.indexing.common.task.{RealtimeIndexTask, Task} import _root_.io.druid.query.aggregation.LongSumAggregatorFactory -import _root_.io.druid.segment.realtime.firehose.ChatHandlerProvider -import _root_.io.druid.segment.realtime.firehose.ClippedFirehoseFactory -import _root_.io.druid.segment.realtime.firehose.NoopChatHandlerProvider +import _root_.io.druid.segment.realtime.firehose.{ChatHandlerProvider, ClippedFirehoseFactory, NoopChatHandlerProvider} import _root_.io.druid.server.metrics.EventReceiverFirehoseRegister import _root_.io.druid.timeline.partition.LinearShardSpec -import _root_.scala.collection.JavaConverters._ import com.fasterxml.jackson.databind -import com.fasterxml.jackson.databind.DeserializationContext -import com.fasterxml.jackson.databind.InjectableValues -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.databind.ObjectReader +import com.fasterxml.jackson.databind.{DeserializationContext, InjectableValues, ObjectMapper, ObjectReader} import com.github.nscala_time.time.Imports._ import com.google.inject.Key -import com.metamx.common.Granularity import com.metamx.common.scala.untyped.Dict import com.metamx.emitter.core.NoopEmitter import com.metamx.emitter.service.ServiceEmitter import com.metamx.tranquility.beam.ClusteredBeamTuning import com.metamx.tranquility.druid._ +import io.druid.java.util.common.granularity.{Granularities, PeriodGranularity} import org.joda.time.chrono.ISOChronology -import org.scalatest.FunSuite -import org.scalatest.Matchers +import org.scalatest.{FunSuite, Matchers} + +import _root_.scala.collection.JavaConverters._ class DruidBeamTest extends FunSuite with Matchers { @@ -80,47 +73,47 @@ class DruidBeamTest extends FunSuite with Matchers test("GenerateFirehoseId: H=00") { val dt = new DateTime("2010-02-03T00:34:56.789Z") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.SECOND, dt, 1) === "x-296-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.MINUTE, dt, 1) === "x-034-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.FIVE_MINUTE, dt, 1) === "x-034-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.TEN_MINUTE, dt, 1) === "x-034-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.FIFTEEN_MINUTE, dt, 1) === "x-034-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.HOUR, dt, 1) === "x-000-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.SIX_HOUR, dt, 1) === "x-000-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.DAY, dt, 1) === "x-003-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.WEEK, dt, 1) === "x-005-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.MONTH, dt, 1) === "x-002-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.YEAR, dt, 1) === "x-010-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.SECOND, dt, 1) === "x-296-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.MINUTE, dt, 1) === "x-034-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.FIVE_MINUTE, dt, 1) === "x-034-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.TEN_MINUTE, dt, 1) === "x-034-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.FIFTEEN_MINUTE, dt, 1) === "x-034-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.HOUR, dt, 1) === "x-000-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.SIX_HOUR, dt, 1) === "x-000-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.DAY, dt, 1) === "x-003-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.WEEK, dt, 1) === "x-005-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.MONTH, dt, 1) === "x-002-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.YEAR, dt, 1) === "x-010-0001") } test("GenerateFirehoseId: H=12") { val dt = new DateTime("2010-02-03T12:34:56.789Z") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.SECOND, dt, 1) === "x-296-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.MINUTE, dt, 1) === "x-034-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.FIVE_MINUTE, dt, 1) === "x-034-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.TEN_MINUTE, dt, 1) === "x-034-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.FIFTEEN_MINUTE, dt, 1) === "x-034-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.HOUR, dt, 1) === "x-012-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.SIX_HOUR, dt, 1) === "x-012-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.DAY, dt, 1) === "x-003-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.WEEK, dt, 1) === "x-005-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.MONTH, dt, 1) === "x-002-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.YEAR, dt, 1) === "x-010-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.SECOND, dt, 1) === "x-296-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.MINUTE, dt, 1) === "x-034-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.FIVE_MINUTE, dt, 1) === "x-034-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.TEN_MINUTE, dt, 1) === "x-034-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.FIFTEEN_MINUTE, dt, 1) === "x-034-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.HOUR, dt, 1) === "x-012-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.SIX_HOUR, dt, 1) === "x-012-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.DAY, dt, 1) === "x-003-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.WEEK, dt, 1) === "x-005-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.MONTH, dt, 1) === "x-002-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.YEAR, dt, 1) === "x-010-0001") } test("GenerateFirehoseId: H=23") { val dt = new DateTime("2010-02-03T23:34:56.789Z") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.SECOND, dt, 1) === "x-296-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.MINUTE, dt, 1) === "x-154-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.FIVE_MINUTE, dt, 1) === "x-154-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.TEN_MINUTE, dt, 1) === "x-154-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.FIFTEEN_MINUTE, dt, 1) === "x-154-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.HOUR, dt, 1) === "x-023-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.SIX_HOUR, dt, 1) === "x-023-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.DAY, dt, 1) === "x-003-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.WEEK, dt, 1) === "x-005-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.MONTH, dt, 1) === "x-002-0001") - assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularity.YEAR, dt, 1) === "x-010-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.SECOND, dt, 1) === "x-296-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.MINUTE, dt, 1) === "x-154-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.FIVE_MINUTE, dt, 1) === "x-154-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.TEN_MINUTE, dt, 1) === "x-154-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.FIFTEEN_MINUTE, dt, 1) === "x-154-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.HOUR, dt, 1) === "x-023-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.SIX_HOUR, dt, 1) === "x-023-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.DAY, dt, 1) === "x-003-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.WEEK, dt, 1) === "x-005-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.MONTH, dt, 1) === "x-002-0001") + assert(DruidBeamMaker.generateBaseFirehoseId("x", Granularities.YEAR, dt, 1) === "x-010-0001") } test("Task JSON") { @@ -128,7 +121,7 @@ class DruidBeamTest extends FunSuite with Matchers DruidBeamConfig(), DruidLocation.create("druid/overlord", "mydatasource"), ClusteredBeamTuning( - segmentGranularity = Granularity.HOUR, + segmentGranularity = Granularities.HOUR.asInstanceOf[PeriodGranularity], warmingPeriod = 0.minutes, windowPeriod = 4.minutes ), @@ -140,7 +133,7 @@ class DruidBeamTest extends FunSuite with Matchers DruidRollup( dimensions = SpecificDruidDimensions(Seq("dim1", "dim2"), Seq(DruidSpatialDimension.singleField("spatial1"))), aggregators = Seq(new LongSumAggregatorFactory("met1", "met1")), - indexGranularity = QueryGranularities.MINUTE, + indexGranularity = Granularities.MINUTE, true ), new TimestampSpec("ts", "iso", null), @@ -180,8 +173,8 @@ class DruidBeamTest extends FunSuite with Matchers val dataSchema = task.getRealtimeIngestionSchema.getDataSchema dataSchema.getDataSource should be("mydatasource") dataSchema.getAggregators.deep should be(Array(new LongSumAggregatorFactory("met1", "met1")).deep) - dataSchema.getGranularitySpec.getSegmentGranularity should be(Granularity.HOUR) - dataSchema.getGranularitySpec.getQueryGranularity should be(QueryGranularities.MINUTE) + dataSchema.getGranularitySpec.getSegmentGranularity should be(Granularities.HOUR) + dataSchema.getGranularitySpec.getQueryGranularity should be(Granularities.MINUTE) val parseSpec = dataSchema.getParser.getParseSpec parseSpec.getTimestampSpec.getTimestampColumn should be("ts") @@ -200,7 +193,7 @@ class DruidBeamTest extends FunSuite with Matchers DruidRollup( dimensions = SpecificDruidDimensions(Seq(), Seq()), aggregators = Seq(), - indexGranularity = QueryGranularities.NONE, + indexGranularity = Granularities.NONE, // isRollup is set for test. isRollup ), diff --git a/core/src/test/scala/com/metamx/tranquility/test/DruidRollupTest.scala b/core/src/test/scala/com/metamx/tranquility/test/DruidRollupTest.scala index 754bd41..0be0f45 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/DruidRollupTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/DruidRollupTest.scala @@ -23,11 +23,12 @@ import com.metamx.tranquility.druid.DruidRollup import com.metamx.tranquility.druid.SchemalessDruidDimensions import com.metamx.tranquility.druid.SpecificDruidDimensions import io.druid.data.input.impl.TimestampSpec -import io.druid.granularity.QueryGranularities +import io.druid.java.util.common.granularity.Granularities import io.druid.query.aggregation.CountAggregatorFactory import io.druid.query.aggregation.LongSumAggregatorFactory import org.scalatest.FunSuite import org.scalatest.Matchers + import scala.collection.JavaConverters._ class DruidRollupTest extends FunSuite with Matchers @@ -36,7 +37,7 @@ class DruidRollupTest extends FunSuite with Matchers val rollup = DruidRollup( SpecificDruidDimensions(Vector("hey", "what"), Vector.empty), Seq(new CountAggregatorFactory("heyyo")), - QueryGranularities.NONE, + Granularities.NONE, true ) rollup.validate() @@ -47,7 +48,7 @@ class DruidRollupTest extends FunSuite with Matchers DruidRollup( SpecificDruidDimensions(Vector("hey", "what"), Vector.empty), Seq(new CountAggregatorFactory("hey")), - QueryGranularities.NONE, + Granularities.NONE, true ) } @@ -59,7 +60,7 @@ class DruidRollupTest extends FunSuite with Matchers DruidRollup( SpecificDruidDimensions(Vector("what"), Vector.empty), Seq(new CountAggregatorFactory("hey"), new LongSumAggregatorFactory("hey", "blah")), - QueryGranularities.NONE, + Granularities.NONE, true ) } @@ -71,7 +72,7 @@ class DruidRollupTest extends FunSuite with Matchers DruidRollup( SpecificDruidDimensions(Vector("what", "what"), Vector.empty), Seq(new CountAggregatorFactory("hey")), - QueryGranularities.NONE, + Granularities.NONE, true ) } @@ -82,7 +83,7 @@ class DruidRollupTest extends FunSuite with Matchers val rollup = DruidRollup( SpecificDruidDimensions(Vector("e", "f", "a", "b", "z", "t"), Vector.empty), Seq(new CountAggregatorFactory("hey")), - QueryGranularities.NONE, + Granularities.NONE, true ) rollup.dimensions.specMap.get("dimensions").asInstanceOf[java.util.List[String]].asScala should @@ -93,7 +94,7 @@ class DruidRollupTest extends FunSuite with Matchers val rollup = DruidRollup( SpecificDruidDimensions(Seq("foo", "bar")), Seq(new LongSumAggregatorFactory("hey", "there")), - QueryGranularities.NONE, + Granularities.NONE, true ) val timestampSpec = new TimestampSpec("t", "auto", null) @@ -110,7 +111,7 @@ class DruidRollupTest extends FunSuite with Matchers val rollup = DruidRollup( SchemalessDruidDimensions(Set("qux")), Seq(new LongSumAggregatorFactory("hey", "there")), - QueryGranularities.NONE, + Granularities.NONE, true ) val timestampSpec = new TimestampSpec("t", "auto", null) diff --git a/core/src/test/scala/com/metamx/tranquility/test/MapPartitionerTest.scala b/core/src/test/scala/com/metamx/tranquility/test/MapPartitionerTest.scala index c8a84db..09cb487 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/MapPartitionerTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/MapPartitionerTest.scala @@ -28,9 +28,10 @@ import com.metamx.tranquility.druid.SpecificDruidDimensions import com.metamx.tranquility.partition.MapPartitioner import com.metamx.tranquility.typeclass.Timestamper import io.druid.data.input.impl.TimestampSpec -import io.druid.granularity.QueryGranularities import io.druid.query.aggregation.DoubleSumAggregatorFactory import java.{util => ju} + +import io.druid.java.util.common.granularity.Granularities import org.joda.time.DateTime import org.scalatest.FunSuite import org.scalatest.Matchers @@ -69,7 +70,7 @@ class MapPartitionerTest extends FunSuite with Matchers DruidRollup( SpecificDruidDimensions(Seq("foo", "bar", "baz")), Seq(new DoubleSumAggregatorFactory("x", "xSum")), - QueryGranularities.MINUTE, + Granularities.MINUTE, true ) ) @@ -107,7 +108,7 @@ class MapPartitionerTest extends FunSuite with Matchers DruidRollup( SpecificDruidDimensions(Seq("foo", "bar", "baz")), Seq(new DoubleSumAggregatorFactory("x", "xSum")), - QueryGranularities.MINUTE, + Granularities.MINUTE, true ) ) @@ -148,7 +149,7 @@ class MapPartitionerTest extends FunSuite with Matchers DruidRollup( SpecificDruidDimensions(Seq("foo", "bar", "baz")), Seq(new DoubleSumAggregatorFactory("x", "xSum")), - QueryGranularities.MINUTE, + Granularities.MINUTE, true ) ) diff --git a/core/src/test/scala/com/metamx/tranquility/test/common/DruidIntegrationSuite.scala b/core/src/test/scala/com/metamx/tranquility/test/common/DruidIntegrationSuite.scala index 1b3d492..06d1106 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/common/DruidIntegrationSuite.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/common/DruidIntegrationSuite.scala @@ -25,7 +25,6 @@ import com.google.common.base.Charsets import com.google.common.io.CharStreams import com.google.common.io.Files import com.google.inject.Injector -import com.metamx.collections.spatial.search.RectangularBound import com.metamx.common.lifecycle.Lifecycle import com.metamx.common.scala.concurrent._ import com.metamx.common.scala.control._ @@ -37,7 +36,6 @@ import io.druid.cli.CliBroker import io.druid.cli.CliCoordinator import io.druid.cli.CliOverlord import io.druid.cli.GuiceRunnable -import io.druid.granularity.QueryGranularities import io.druid.guice.GuiceInjectors import io.druid.query.aggregation.AggregatorFactory import io.druid.query.aggregation.LongSumAggregatorFactory @@ -50,8 +48,12 @@ import java.io.InputStreamReader import java.net.BindException import java.net.URLClassLoader import java.util.concurrent.atomic.AtomicInteger + +import io.druid.collections.spatial.search.RectangularBound +import io.druid.java.util.common.granularity.Granularities import org.apache.curator.framework.CuratorFramework import org.scalatest.FunSuite + import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.reflect.classTag @@ -256,7 +258,7 @@ trait DruidIntegrationSuite extends Logging with CuratorRequiringSuite (Druids .newTimeseriesQueryBuilder() .dataSource("xxx") - .granularity(QueryGranularities.MINUTE) + .granularity(Granularities.MINUTE) .intervals("0000/3000") .aggregators(Seq[AggregatorFactory](new LongSumAggregatorFactory("barr", "barr")).asJava) .build(), @@ -273,7 +275,7 @@ trait DruidIntegrationSuite extends Logging with CuratorRequiringSuite (Druids .newTimeseriesQueryBuilder() .dataSource("xxx") - .granularity(QueryGranularities.MINUTE) + .granularity(Granularities.MINUTE) .intervals("0000/3000") .aggregators(Seq[AggregatorFactory](new LongSumAggregatorFactory("barr", "barr")).asJava) .filters(new SpatialDimFilter("coord.geo", new RectangularBound(Array(35f, 120f), Array(40f, 125f)))) diff --git a/docs/samza.md b/docs/samza.md index 5767e3c..22bd6e7 100644 --- a/docs/samza.md +++ b/docs/samza.md @@ -24,7 +24,6 @@ For example: ```java import com.google.common.collect.ImmutableList; -import com.metamx.common.Granularity; import com.metamx.tranquility.beam.Beam; import com.metamx.tranquility.beam.ClusteredBeamTuning; import com.metamx.tranquility.druid.DruidBeams; @@ -32,7 +31,8 @@ import com.metamx.tranquility.druid.DruidLocation; import com.metamx.tranquility.druid.DruidRollup; import com.metamx.tranquility.samza.BeamFactory; import com.metamx.tranquility.typeclass.Timestamper; -import io.druid.granularity.QueryGranularity; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.granularity.PeriodGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import org.apache.curator.framework.CuratorFramework; @@ -84,10 +84,10 @@ public class MyBeamFactory implements BeamFactory .curator(curator) .discoveryPath("/druid/discovery") .location(DruidLocation.create("druid/overlord", "druid:firehose:%s", dataSource)) - .rollup(DruidRollup.create(dimensions, aggregators, QueryGranularities.MINUTE, isRollup)) + .rollup(DruidRollup.create(dimensions, aggregators, Granularities.MINUTE, isRollup)) .tuning( ClusteredBeamTuning.builder() - .segmentGranularity(Granularity.HOUR) + .segmentGranularity((PeriodGranularity) Granularities.HOUR) .windowPeriod(new Period("PT10M")) .build() ) diff --git a/samza/src/test/java/com/metamx/tranquility/javatests/TestSamzaBeamFactory.java b/samza/src/test/java/com/metamx/tranquility/javatests/TestSamzaBeamFactory.java index 1364c2f..63329b6 100644 --- a/samza/src/test/java/com/metamx/tranquility/javatests/TestSamzaBeamFactory.java +++ b/samza/src/test/java/com/metamx/tranquility/javatests/TestSamzaBeamFactory.java @@ -19,8 +19,10 @@ package com.metamx.tranquility.javatests; +import java.util.List; +import java.util.Map; + import com.google.common.collect.ImmutableList; -import com.metamx.common.Granularity; import com.metamx.tranquility.beam.Beam; import com.metamx.tranquility.beam.ClusteredBeamTuning; import com.metamx.tranquility.druid.DruidBeams; @@ -28,7 +30,8 @@ import com.metamx.tranquility.druid.DruidRollup; import com.metamx.tranquility.samza.BeamFactory; import com.metamx.tranquility.typeclass.Timestamper; -import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.granularity.PeriodGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import org.apache.curator.framework.CuratorFramework; @@ -39,9 +42,6 @@ import org.joda.time.DateTime; import org.joda.time.Period; -import java.util.List; -import java.util.Map; - public class TestSamzaBeamFactory implements BeamFactory { @@ -79,10 +79,10 @@ public DateTime timestamp(Object obj) .curator(curator) .discoveryPath("/druid/discovery") .location(DruidLocation.create("overlord", "druid:firehose:%s", dataSource)) - .rollup(DruidRollup.create(dimensions, aggregators, QueryGranularities.MINUTE, true)) + .rollup(DruidRollup.create(dimensions, aggregators, Granularities.MINUTE, true)) .tuning( ClusteredBeamTuning.builder() - .segmentGranularity(Granularity.HOUR) + .segmentGranularity(((PeriodGranularity) Granularities.HOUR)) .windowPeriod(new Period("PT10M")) .build() ) diff --git a/server/src/test/scala/com/metamx/tranquility/server/ServerDruidTest.scala b/server/src/test/scala/com/metamx/tranquility/server/ServerDruidTest.scala index 52ebc40..0c147ab 100644 --- a/server/src/test/scala/com/metamx/tranquility/server/ServerDruidTest.scala +++ b/server/src/test/scala/com/metamx/tranquility/server/ServerDruidTest.scala @@ -21,32 +21,25 @@ package com.metamx.tranquility.server import _root_.io.druid.data.input.InputRow import _root_.io.druid.data.input.impl.TimestampSpec -import _root_.io.druid.granularity.QueryGranularities import _root_.io.druid.query.aggregation.LongSumAggregatorFactory -import _root_.scala.reflect.runtime.universe.typeTag import com.github.nscala_time.time.Imports._ import com.google.common.base.Charsets -import com.metamx.common.Granularity +import com.metamx.common.scala.{Jackson, Logging} import com.metamx.common.scala.Predef._ -import com.metamx.common.scala.timekeeper.TestingTimekeeper -import com.metamx.common.scala.timekeeper.Timekeeper -import com.metamx.common.scala.untyped.Dict -import com.metamx.common.scala.untyped.long -import com.metamx.common.scala.Jackson -import com.metamx.common.scala.Logging -import com.metamx.tranquility.beam.Beam -import com.metamx.tranquility.beam.ClusteredBeamTuning -import com.metamx.tranquility.beam.RoundRobinBeam +import com.metamx.common.scala.timekeeper.{TestingTimekeeper, Timekeeper} +import com.metamx.common.scala.untyped.{Dict, long} +import com.metamx.tranquility.beam.{Beam, ClusteredBeamTuning, RoundRobinBeam} import com.metamx.tranquility.druid._ import com.metamx.tranquility.server.ServerDruidTest._ import com.metamx.tranquility.server.ServerTestUtil.withTester import com.metamx.tranquility.test.DirectDruidTest -import com.metamx.tranquility.test.common.CuratorRequiringSuite -import com.metamx.tranquility.test.common.DruidIntegrationSuite +import com.metamx.tranquility.test.common.{CuratorRequiringSuite, DruidIntegrationSuite} +import io.druid.java.util.common.granularity.{Granularities, PeriodGranularity} import org.apache.curator.framework.CuratorFramework import org.joda.time.DateTime -import org.scalatest.FunSuite -import org.scalatest.ShouldMatchers +import org.scalatest.{FunSuite, ShouldMatchers} + +import _root_.scala.reflect.runtime.universe.typeTag class ServerDruidTest extends FunSuite with DruidIntegrationSuite with CuratorRequiringSuite with ShouldMatchers with Logging @@ -117,14 +110,22 @@ object ServerDruidTest val TimeFormat = "posix" def newDruidBeam(curator: CuratorFramework, timekeeper: Timekeeper): Beam[Dict] = { - val tuning = ClusteredBeamTuning(Granularity.HOUR, 0.minutes, 10.minutes, 1, 1, 1, 1) + val tuning = ClusteredBeamTuning( + Granularities.HOUR.asInstanceOf[PeriodGranularity], + 0.minutes, + 10.minutes, + 1, + 1, + 1, + 1 + ) val rollup = DruidRollup( SpecificDruidDimensions( Vector("foo"), Vector(MultipleFieldDruidSpatialDimension("coord.geo", Seq("lat", "lon"))) ), IndexedSeq(new LongSumAggregatorFactory("barr", "bar")), - QueryGranularities.MINUTE, + Granularities.MINUTE, true ) val druidEnvironment = DruidEnvironment.create( diff --git a/storm/src/test/java/com/metamx/tranquility/javatests/StormJavaApiTest.java b/storm/src/test/java/com/metamx/tranquility/javatests/StormJavaApiTest.java index 25cad5b..5006f5d 100644 --- a/storm/src/test/java/com/metamx/tranquility/javatests/StormJavaApiTest.java +++ b/storm/src/test/java/com/metamx/tranquility/javatests/StormJavaApiTest.java @@ -19,12 +19,18 @@ package com.metamx.tranquility.javatests; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import javax.ws.rs.core.MediaType; + import backtype.storm.task.IMetricsContext; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.metamx.common.Granularity; import com.metamx.tranquility.beam.Beam; import com.metamx.tranquility.beam.ClusteredBeamTuning; import com.metamx.tranquility.druid.DruidBeams; @@ -35,7 +41,9 @@ import com.metamx.tranquility.typeclass.JavaObjectWriter; import com.metamx.tranquility.typeclass.Timestamper; import com.twitter.finagle.Service; -import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.granularity.DurationGranularity; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.granularity.PeriodGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import org.apache.curator.framework.CuratorFramework; @@ -47,13 +55,6 @@ import org.junit.Assert; import org.junit.Test; -import javax.ws.rs.core.MediaType; -import java.io.ByteArrayOutputStream; -import java.io.ObjectOutputStream; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - public class StormJavaApiTest { private static final List dimensions = ImmutableList.of("column"); @@ -102,10 +103,10 @@ public DateTime timestamp(Map theMap) dataSource ) ) - .rollup(DruidRollup.create(dimensions, aggregators, QueryGranularities.MINUTE, true)) + .rollup(DruidRollup.create(dimensions, aggregators, Granularities.MINUTE, true)) .tuning( ClusteredBeamTuning.builder() - .segmentGranularity(Granularity.HOUR) + .segmentGranularity(((PeriodGranularity) Granularities.HOUR)) .windowPeriod(new Period("PT10M")) .build() )