Skip to content

Commit

Permalink
spans created by TracedThriftServer should be Server spans by default
Browse files Browse the repository at this point in the history
  • Loading branch information
bpholt committed Oct 3, 2024
1 parent fc1e082 commit 7da1a2e
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package com.dwolla.util.async.finagle

import cats._
import cats.effect._
import cats.*
import cats.effect.*
import cats.effect.std.{Dispatcher, Env}
import cats.mtl._
import cats.syntax.all._
import cats.tagless._
import cats.tagless.aop._
import cats.tagless.implicits._
import cats.mtl.*
import cats.syntax.all.*
import cats.tagless.aop.*
import cats.tagless.implicits.*
import com.comcast.ip4s.{IpAddress, SocketAddress}
import com.dwolla.util.async.finagle.HigherKindedToMethodPerEndpoint._
import com.dwolla.util.async.twitter._
import com.twitter.finagle._
import com.dwolla.util.async.finagle.HigherKindedToMethodPerEndpoint.*
import com.dwolla.util.async.twitter.*
import com.twitter.finagle.*
import com.twitter.finagle.tracing.TraceId
import com.twitter.util.{Future, Promise}
import natchez._
import natchez.*

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -44,39 +43,35 @@ object TracedThriftServer {
)
(implicit ec: ExecutionContext,
LocalSpan: Local[F, Span[F]]): Resource[F, ListeningServer] =
TracedThriftServer(addr, label, iface, entryPoint, Span.Options.Defaults.withSpanKind(Span.SpanKind.Server))

/**
* Builds a `ListeningServer` with Zipkin and Natchez tracing enabled. The `Thrift[F]` implementation will
*
* @param addr the socket address at which to listen for connections. (Typically `0.0.0.0:port`)
* @param label the name to assign the service in the Zipkin traces
* @param iface the Thrift method-per-endpoint implementation. Must be implemented in `Kleisli[F, Span[F], *]` so the span continued from Zipkin can be injected into the program.
* @param entryPoint the Natchez `EntryPoint` responsible for creating `Span` instances based on the Trace IDs coming from Finagle/Zipkin
* @param spanOptions allows the caller to set specific Span options that should be set on each new trace
* @param ec `ExecutionContext` where Twitter Futures will be completed when the Scala Future output by `Dispatcher.unsafeToFuture` completes
* @param LocalSpan `Local[F, Span[F]]` used to continue or start a new root span when a Thrift request is received
* @tparam F the effect in which to operate, which must have `Async[F]` and `Env[F]` instances available
* @tparam Thrift the higher-kinded MethodPerEndpoint Thrift algebra generated by scrooge and modified by the `AddCatsTaglessInstances` scalafix
* @return a `Resource[F, ListeningServer]` managing the lifecycle of the underlying Finagle server
*/
def apply[F[_] : Async : Env, Thrift[_[_]] : HigherKindedToMethodPerEndpoint : Instrument](addr: SocketAddress[IpAddress],
label: String,
iface: Thrift[F],
entryPoint: EntryPoint[F],
spanOptions: Span.Options,
)
(implicit ec: ExecutionContext,
LocalSpan: Local[F, Span[F]]): Resource[F, ListeningServer] =
Dispatcher.parallel[F]
.map(unsafeMapKToFuture(_, iface.instrument, entryPoint))
.map(new UnsafeInstrumentationToFuture[F, Thrift](_, entryPoint, spanOptions))
.map(iface.instrument.mapK(_))
.flatMap(t => Resource.make(acquire(addr, label, t))(release[F]))

private def unsafeMapKToFuture[F[_] : Async, Thrift[_[_]] : FunctorK](dispatcher: Dispatcher[F],
iface: Thrift[Instrumentation[F, *]],
entryPoint: EntryPoint[F],
)
(implicit ec: ExecutionContext,
LocalSpan: Local[F, Span[F]]): Thrift[Future] =
iface.mapK(new (Instrumentation[F, *] ~> Future) {
override def apply[A](fa: Instrumentation[F, A]): Future[A] =
currentTraceId().flatMap { maybeTraceId =>
val p = Promise[A]()

dispatcher.unsafeToFuture {
entryPoint.continueOrElseRoot(
s"${fa.algebraName}.${fa.methodName}",
maybeTraceId
.map(ZipkinKernel.asKernel)
.getOrElse(Kernel(Map.empty))
)
.use(Local[F, Span[F]].scope(fa.value))
}
.onComplete {
case Success(a) => p.setValue(a)
case Failure(ex) => p.setException(ex)
}

p
}
})

private def currentTraceId(): Future[Option[TraceId]] =
Future(com.twitter.finagle.tracing.Trace.idOption)

Expand All @@ -96,4 +91,34 @@ object TracedThriftServer {

private def release[F[_] : Async](s: ListeningServer): F[Unit] =
liftFuture(Sync[F].delay(s.close()))

private class UnsafeInstrumentationToFuture[F[_] : MonadCancelThrow, Thrift[_[_]]](dispatcher: Dispatcher[F],
entryPoint: EntryPoint[F],
spanOptions: Span.Options,
)
(implicit ec: ExecutionContext,
LocalSpan: Local[F, Span[F]]) extends (Instrumentation[F, *] ~> Future) {
override def apply[A](instrumentation: Instrumentation[F, A]): Future[A] =
currentTraceId().flatMap { maybeTraceId =>
val p = Promise[A]()

val fa = entryPoint.continueOrElseRoot(
name = s"${instrumentation.algebraName}.${instrumentation.methodName}",
kernel = maybeTraceId
.map(ZipkinKernel.asKernel)
.getOrElse(Kernel(Map.empty)),
options = spanOptions
)
.use(Local[F, Span[F]].scope(instrumentation.value))

dispatcher.unsafeToFuture(fa)
.onComplete {
case Success(a) => p.setValue(a)
case Failure(ex) => p.setException(ex)
}

p
}
}

}
2 changes: 1 addition & 1 deletion project/AsyncUtilsBuildPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ object AsyncUtilsBuildPlugin extends AutoPlugin {
),
startYear := Option(2021),
sonatypeCredentialHost := xerial.sbt.Sonatype.sonatypeLegacy,
tlBaseVersion := "1.1",
tlBaseVersion := "1.2",
tlCiReleaseBranches := Seq("main"),
mergifyRequiredJobs ++= Seq("validate-steward"),
mergifyStewardConfig ~= { _.map {
Expand Down

0 comments on commit 7da1a2e

Please sign in to comment.