Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spans created by TracedThriftServer should be Server spans by default #210

Merged
merged 2 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package com.dwolla.util.async.finagle

import cats._
import cats.effect._
import cats.*
import cats.effect.*
import cats.effect.std.{Dispatcher, Env}
import cats.mtl._
import cats.syntax.all._
import cats.tagless._
import cats.tagless.aop._
import cats.tagless.implicits._
import cats.mtl.*
import cats.syntax.all.*
import cats.tagless.aop.*
import cats.tagless.implicits.*
import com.comcast.ip4s.{IpAddress, SocketAddress}
import com.dwolla.util.async.finagle.HigherKindedToMethodPerEndpoint._
import com.dwolla.util.async.twitter._
import com.twitter.finagle._
import com.dwolla.util.async.finagle.HigherKindedToMethodPerEndpoint.*
import com.dwolla.util.async.twitter.*
import com.twitter.finagle.*
import com.twitter.finagle.tracing.TraceId
import com.twitter.util.{Future, Promise}
import natchez._
import natchez.*

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -44,39 +43,35 @@ object TracedThriftServer {
)
(implicit ec: ExecutionContext,
LocalSpan: Local[F, Span[F]]): Resource[F, ListeningServer] =
TracedThriftServer(addr, label, iface, entryPoint, Span.Options.Defaults.withSpanKind(Span.SpanKind.Server))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding Span.Options.Defaults.withSpanKind(Span.SpanKind.Server) is the key change in this PR; the rest is really just refactoring.


/**
* Builds a `ListeningServer` with Zipkin and Natchez tracing enabled. The `Thrift[F]` implementation will
*
* @param addr the socket address at which to listen for connections. (Typically `0.0.0.0:port`)
* @param label the name to assign the service in the Zipkin traces
* @param iface the Thrift method-per-endpoint implementation. Must be implemented in `Kleisli[F, Span[F], *]` so the span continued from Zipkin can be injected into the program.
* @param entryPoint the Natchez `EntryPoint` responsible for creating `Span` instances based on the Trace IDs coming from Finagle/Zipkin
* @param spanOptions allows the caller to set specific Span options that should be set on each new trace
* @param ec `ExecutionContext` where Twitter Futures will be completed when the Scala Future output by `Dispatcher.unsafeToFuture` completes
* @param LocalSpan `Local[F, Span[F]]` used to continue or start a new root span when a Thrift request is received
* @tparam F the effect in which to operate, which must have `Async[F]` and `Env[F]` instances available
* @tparam Thrift the higher-kinded MethodPerEndpoint Thrift algebra generated by scrooge and modified by the `AddCatsTaglessInstances` scalafix
* @return a `Resource[F, ListeningServer]` managing the lifecycle of the underlying Finagle server
*/
def apply[F[_] : Async : Env, Thrift[_[_]] : HigherKindedToMethodPerEndpoint : Instrument](addr: SocketAddress[IpAddress],
label: String,
iface: Thrift[F],
entryPoint: EntryPoint[F],
spanOptions: Span.Options,
)
(implicit ec: ExecutionContext,
LocalSpan: Local[F, Span[F]]): Resource[F, ListeningServer] =
Dispatcher.parallel[F]
.map(unsafeMapKToFuture(_, iface.instrument, entryPoint))
.map(new UnsafeInstrumentationToFuture[F, Thrift](_, entryPoint, spanOptions))
.map(iface.instrument.mapK(_))
.flatMap(t => Resource.make(acquire(addr, label, t))(release[F]))

private def unsafeMapKToFuture[F[_] : Async, Thrift[_[_]] : FunctorK](dispatcher: Dispatcher[F],
iface: Thrift[Instrumentation[F, *]],
entryPoint: EntryPoint[F],
)
(implicit ec: ExecutionContext,
LocalSpan: Local[F, Span[F]]): Thrift[Future] =
iface.mapK(new (Instrumentation[F, *] ~> Future) {
override def apply[A](fa: Instrumentation[F, A]): Future[A] =
currentTraceId().flatMap { maybeTraceId =>
val p = Promise[A]()

dispatcher.unsafeToFuture {
entryPoint.continueOrElseRoot(
s"${fa.algebraName}.${fa.methodName}",
maybeTraceId
.map(ZipkinKernel.asKernel)
.getOrElse(Kernel(Map.empty))
)
.use(Local[F, Span[F]].scope(fa.value))
}
.onComplete {
case Success(a) => p.setValue(a)
case Failure(ex) => p.setException(ex)
}

p
}
})

private def currentTraceId(): Future[Option[TraceId]] =
Future(com.twitter.finagle.tracing.Trace.idOption)

Expand All @@ -96,4 +91,34 @@ object TracedThriftServer {

private def release[F[_] : Async](s: ListeningServer): F[Unit] =
liftFuture(Sync[F].delay(s.close()))

private class UnsafeInstrumentationToFuture[F[_] : MonadCancelThrow, Thrift[_[_]]](dispatcher: Dispatcher[F],
entryPoint: EntryPoint[F],
spanOptions: Span.Options,
)
(implicit ec: ExecutionContext,
LocalSpan: Local[F, Span[F]]) extends (Instrumentation[F, *] ~> Future) {
override def apply[A](instrumentation: Instrumentation[F, A]): Future[A] =
currentTraceId().flatMap { maybeTraceId =>
val p = Promise[A]()

val fa = entryPoint.continueOrElseRoot(
name = s"${instrumentation.algebraName}.${instrumentation.methodName}",
kernel = maybeTraceId
.map(ZipkinKernel.asKernel)
.getOrElse(Kernel(Map.empty)),
options = spanOptions
)
.use(Local[F, Span[F]].scope(instrumentation.value))

dispatcher.unsafeToFuture(fa)
.onComplete {
case Success(a) => p.setValue(a)
case Failure(ex) => p.setException(ex)
}

p
}
}

}
6 changes: 3 additions & 3 deletions project/AsyncUtilsBuildPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import org.typelevel.sbt.TypelevelMimaPlugin.autoImport.*
import org.typelevel.sbt.TypelevelSettingsPlugin
import org.typelevel.sbt.TypelevelSettingsPlugin.autoImport.*
import org.typelevel.sbt.TypelevelSonatypeCiReleasePlugin.autoImport.*
import org.typelevel.sbt.TypelevelSonatypePlugin.autoImport.*
import org.typelevel.sbt.TypelevelVersioningPlugin.autoImport.*
import org.typelevel.sbt.gha.GenerativePlugin.autoImport.*
import org.typelevel.sbt.gha.GitHubActionsPlugin.autoImport.*
Expand All @@ -22,6 +21,7 @@ import sbt.librarymanagement.DependencyBuilders.OrganizationArtifactName
import sbtprojectmatrix.ProjectMatrixPlugin
import sbtprojectmatrix.ProjectMatrixPlugin.autoImport.*
import scalafix.sbt.ScalafixPlugin.autoImport.*
import xerial.sbt.Sonatype.autoImport.*

object AsyncUtilsBuildPlugin extends AutoPlugin {
override def trigger = noTrigger
Expand Down Expand Up @@ -338,8 +338,8 @@ object AsyncUtilsBuildPlugin extends AutoPlugin {
),
),
startYear := Option(2021),
tlSonatypeUseLegacyHost := true,
tlBaseVersion := "1.1",
sonatypeCredentialHost := xerial.sbt.Sonatype.sonatypeLegacy,
tlBaseVersion := "1.2",
tlCiReleaseBranches := Seq("main"),
mergifyRequiredJobs ++= Seq("validate-steward"),
mergifyStewardConfig ~= { _.map {
Expand Down