Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

668 aspect oriented middleware #669

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open

Conversation

ValdemarGr
Copy link
Contributor

@ValdemarGr ValdemarGr commented Sep 7, 2023

Initial draft for #668

Example output for TestServiceFs2Grpc
package hello.world

import _root_.cats.syntax.all._

trait TestServiceFs2Grpc[F[_], A] {
  def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage]
  def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage]
  def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
  def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
}

object TestServiceFs2Grpc extends _root_.fs2.grpc.GeneratedCompanion[TestServiceFs2Grpc] {
  
  def mkClientFull[F[_]: _root_.cats.effect.Async, Dom[_], Cod[_], A](
    dispatcher: _root_.cats.effect.std.Dispatcher[F],
    channel: _root_.io.grpc.Channel,
    clientAspect: _root_.fs2.grpc.client.ClientAspect[F, Dom, Cod, A],
    clientOptions: _root_.fs2.grpc.client.ClientOptions
  )(implicit
    dom0: Dom[hello.world.TestMessage],
    cod0: Cod[hello.world.TestMessage]
  ): TestServiceFs2Grpc[F, A] = new TestServiceFs2Grpc[F, A] {
    def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage] =
      clientAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
        request,
        (req, m) => _root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, dispatcher, clientOptions).flatMap(_.unaryToUnaryCall(req, m))
      )
    def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage] =
      clientAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
        request,
        (req, m) => _root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, dispatcher, clientOptions).flatMap(_.streamingToUnaryCall(req, m))
      )
    def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
      clientAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
        request,
        (req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, dispatcher, clientOptions)).flatMap(_.unaryToStreamingCall(req, m))
      )
    def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
      clientAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
        request,
        (req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, dispatcher, clientOptions)).flatMap(_.streamingToStreamingCall(req, m))
      )
  }
  
  def mkClientTrivial[F[_]: _root_.cats.effect.Async, A](
    dispatcher: _root_.cats.effect.std.Dispatcher[F],
    channel: _root_.io.grpc.Channel,
    clientAspect: _root_.fs2.grpc.client.ClientAspect[F, _root_.fs2.grpc.shared.Trivial, _root_.fs2.grpc.shared.Trivial, A],
    clientOptions: _root_.fs2.grpc.client.ClientOptions
  ) = 
    mkClientFull[F, _root_.fs2.grpc.shared.Trivial, _root_.fs2.grpc.shared.Trivial, A](
      dispatcher,
      channel,
      clientAspect,
      clientOptions
    )
  
  protected def serviceBindingFull[F[_]: _root_.cats.effect.Async, Dom[_], Cod[_], A](
    dispatcher: _root_.cats.effect.std.Dispatcher[F],
    serviceImpl: TestServiceFs2Grpc[F, A],
    serviceAspect: _root_.fs2.grpc.server.ServiceAspect[F, Dom, Cod, A],
    serverOptions: _root_.fs2.grpc.server.ServerOptions
  )(implicit
    dom0: Dom[hello.world.TestMessage],
    cod0: Cod[hello.world.TestMessage]
  ) = {
    _root_.io.grpc.ServerServiceDefinition
      .builder(hello.world.TestServiceGrpc.SERVICE)
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_NO_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).unaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
            r,
            (r, m) => serviceImpl.noStreaming(r, m)
          )
        }
      )
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).streamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
            r,
            (r, m) => serviceImpl.clientStreaming(r, m)
          )
        }
      )
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).unaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
            r,
            (r, m) => serviceImpl.serverStreaming(r, m)
          )
        }
      )
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).streamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, implicitly[Dom[hello.world.TestMessage]], implicitly[Cod[hello.world.TestMessage]]),
            r,
            (r, m) => serviceImpl.bothStreaming(r, m)
          )
        }
      )
      .build()
  }
  
  protected def serviceBindingTrivial[F[_]: _root_.cats.effect.Async, A](
    dispatcher: _root_.cats.effect.std.Dispatcher[F],
    serviceImpl: TestServiceFs2Grpc[F, A],
    serviceAspect: _root_.fs2.grpc.server.ServiceAspect[F, _root_.fs2.grpc.shared.Trivial, _root_.fs2.grpc.shared.Trivial, A],
    serverOptions: _root_.fs2.grpc.server.ServerOptions
  ) = 
    serviceBindingFull[F, _root_.fs2.grpc.shared.Trivial, _root_.fs2.grpc.shared.Trivial, A](
      dispatcher,
      serviceImpl,
      serviceAspect,
      serverOptions
    )

}


trait GeneratedCompanion[Service[*[_], _]] {

implicit final def serviceCompanion: GeneratedCompanion[Service] = this

///=== Client ==========================================================================================================

def mkClientTrivial[F[_]: Async, A](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected?

@ahjohannessen
Copy link
Collaborator

@ValdemarGr I think this looks promising. Should mkClientFull and mkServiceFull be on GeneratedCompanion and trivial implemented in GeneratedCompanion in terms of that?

@ValdemarGr
Copy link
Contributor Author

ValdemarGr commented Sep 13, 2023

@ValdemarGr I think this looks promising. Should mkClientFull and mkServiceFull be on GeneratedCompanion and trivial implemented in GeneratedCompanion in terms of that?

I don't think it is possible to declare a generic implementation of mkClientFull and mkServiceFull on GeneratedCompanion, since you need to request Dom and Cod typeclasses for each request/response, which varies by the specific grpc service.

Update

I think it can be achieved by using an abstract type and some typeclass derivation gymnastics. The solution becomes messy however.

//GeneratedCompanion.scala
trait GeneratedCompanion[Service[*[_], _]] {
  type Doms[Dom[_]]
  type Cods[Cod[_]]

  implicit final def serviceCompanion: GeneratedCompanion[Service] = this
...
}

// TestServiceFs2Grpc.scala
object TestServiceFs2Grpc {
  ...
  case class Doms[Dom[_]](
    dom0: Dom[hello.world.TestMessage]
  )
  
  object Doms {
    implicit def typeclassInstance[Dom[_]](implicit 
      dom0: Dom[hello.world.TestMessage]
    ): Doms[Dom] = Doms(
      dom0
    )
  }
  
  case class Cods[Cod[_]](
    cod0: Cod[hello.world.TestMessage]
  )
  
  object Cods {
    implicit def typeclassInstance[Cod[_]](implicit 
      cod0: Cod[hello.world.TestMessage]
    ): Cods[Cod] = Cods(
      cod0
    )
  }
}

Alternatively we could relax the solution by removing Dom and Cod, then we could restore the *Full methods in GeneratedCompanion?

From my observation, users usually parse the protobuf structures into more idiomatic scala datatypes so maybe the Dom and Cod typeclass usecase is not of that much value?

@ValdemarGr
Copy link
Contributor Author

ValdemarGr commented Sep 13, 2023

I have also introduced a G to both server aspect and client aspect to facilitate running the service in another effect than the one that the dispatcher is defined in.

A usecase for this is for instance passing authorization

type UserId = String

type Auth[F[_]] = cats.mtl.Ask[F, UserId]

type Authed[A] = Kleisli[IO, UserId, A]

...

trait MyService[F[_]] {
  def myRequest(req: MyRequest, ctx: Metadata): F[MyResponse]
}

val myServiceAspect = new ServiceAspect[Authed, IO, Trivial, Trivial, Metadata] {
  def visitUnaryToUnary[Req, Res](
    callCtx: ServerCallContext[Req, Res, Dom, Cod],
    req: Req,
    next: (Req, Metadata) => Authed[Res]
  ): IO[Res] =
    extractAuth(callCtx.metadata).flatMap(auth => next(req, callCtx.metadata).run(auth))
}

def makeMyService[F[_]](implicit auth: Auth[F]) = new MyService[F] {
  def myRequest(req: MyRequest, ctx: Metadata): F[MyResponse] =
    auth.ask[UserId].flatMap(userId => handleThings(userId, req))
}

Dispatcher[IO].use{ d =>
  MyService.serviceTrivial[Authed, IO, Metadata](
    d,
    makeMyService[Authed],
    myServiceAspect,
    ServerOption.default
  )
}

@ValdemarGr
Copy link
Contributor Author

ValdemarGr commented Sep 15, 2023

I have also explored a solution that removed the Dom and Cod typeclasses in search of a simpler solution that still solves the issues regarding tracing and auth. What are your thoughts?

Example output for TestServiceFs2Grpc
package hello.world

import _root_.cats.syntax.all._

trait TestServiceFs2Grpc[F[_], A] {
  def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage]
  def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage]
  def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
  def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
}

object TestServiceFs2Grpc extends _root_.fs2.grpc.GeneratedCompanion[TestServiceFs2Grpc] {
  
  def mkClientFull[F[_], G[_]: _root_.cats.effect.Async, A](
    dispatcher: _root_.cats.effect.std.Dispatcher[G],
    channel: _root_.io.grpc.Channel,
    clientAspect: _root_.fs2.grpc.client.ClientAspect[F, G, A],
    clientOptions: _root_.fs2.grpc.client.ClientOptions
  ): TestServiceFs2Grpc[F, A] = new TestServiceFs2Grpc[F, A] {
    def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage] =
      clientAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_NO_STREAMING),
        request,
        (req, m) => _root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, dispatcher, clientOptions).flatMap(_.unaryToUnaryCall(req, m))
      )
    def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage] =
      clientAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING),
        request,
        (req, m) => _root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, dispatcher, clientOptions).flatMap(_.streamingToUnaryCall(req, m))
      )
    def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
      clientAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING),
        request,
        (req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, dispatcher, clientOptions)).flatMap(_.unaryToStreamingCall(req, m))
      )
    def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
      clientAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage](
        _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING),
        request,
        (req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, dispatcher, clientOptions)).flatMap(_.streamingToStreamingCall(req, m))
      )
  }
  
  protected def serviceBindingFull[F[_], G[_]: _root_.cats.effect.Async, A](
    dispatcher: _root_.cats.effect.std.Dispatcher[G],
    serviceImpl: TestServiceFs2Grpc[F, A],
    serviceAspect: _root_.fs2.grpc.server.ServiceAspect[F, G, A],
    serverOptions: _root_.fs2.grpc.server.ServerOptions
  ) = {
    _root_.io.grpc.ServerServiceDefinition
      .builder(hello.world.TestServiceGrpc.SERVICE)
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_NO_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).unaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_NO_STREAMING),
            r,
            (r, m) => serviceImpl.noStreaming(r, m)
          )
        }
      )
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).streamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING),
            r,
            (r, m) => serviceImpl.clientStreaming(r, m)
          )
        }
      )
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).unaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING),
            r,
            (r, m) => serviceImpl.serverStreaming(r, m)
          )
        }
      )
      .addMethod(
        hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING,
        _root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).streamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => 
          serviceAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage](
            _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING),
            r,
            (r, m) => serviceImpl.bothStreaming(r, m)
          )
        }
      )
      .build()
  }
}

@ahjohannessen
Copy link
Collaborator

From my observation, users usually parse the protobuf structures into more idiomatic scala datatypes so maybe the Dom and Cod typeclass usecase is not of that much value?

In our codebases we often convert protobuf structures into more idiomatic scala datatypes, either by hand or have typemappers in ScalaPB for request/response payload types.

What does the removal of Dom and Cod impact on missing features?

I think your latest changes are less intrusive and if they solve the issue with regards to tracing and auth, then it is perhaps better with this simpler solution?

@ValdemarGr
Copy link
Contributor Author

What does the removal of Dom and Cod impact on missing features?

I don't think any features that people need (from scanning the related issues) are impacted by removing these.

I think your latest changes are less intrusive and if they solve the issue with regards to tracing and auth, then it is perhaps better with this simpler solution?

Yes let's move forward with this.
I'll make sure the tests are green and construct tests that use the new features to implement tracing and auth.

@ahjohannessen
Copy link
Collaborator

Yes let's move forward with this.
I'll make sure the tests are green and construct tests that use the new features to implement tracing and auth.

Sound like a plan 👍

@ahjohannessen
Copy link
Collaborator

@ValdemarGr Perhaps getting more eyes from typelevel on the PR would be a good idea?

@ValdemarGr
Copy link
Contributor Author

@ValdemarGr Perhaps getting more eyes from typelevel on the PR would be a good idea?

Yes that would be awesome.

@ahjohannessen
Copy link
Collaborator

@rossabaker @armanbilge @fiadliel What do you guys think about this change? :)

@lacarvalho91
Copy link
Contributor

quite excited about this change, hoping it will make reusable otel4s integration easy 🙏

@bcarter97
Copy link

Is there any update on this PR's progress? Is something blocking it or does it need reviewing?

@ValdemarGr
Copy link
Contributor Author

ValdemarGr commented Jan 3, 2024

Is there any update on this PR's progress? Is something blocking it or does it need reviewing?

I think the last event in this PR was tagging some typelevel maintainers for reviewing the code.

@lacarvalho91
Copy link
Contributor

@ahjohannessen is there anyone else that can be asked to review this? Would be great to progress this

@rossabaker
Copy link
Member

I haven't used gRPC in a long time, so I don't have strong opinions here.

@MattLangsenkamp
Copy link

MattLangsenkamp commented May 18, 2024

Has there been any progress on this? This functionality would be very useful for observe-ability functionality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants