Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compression support #200

Closed
gufoe opened this issue Oct 3, 2018 · 3 comments
Closed

Compression support #200

gufoe opened this issue Oct 3, 2018 · 3 comments

Comments

@gufoe
Copy link

gufoe commented Oct 3, 2018

Hello, thanks for the library, it works like a charm!
I'd like to know if there is any kind of built in support for message compression in both ways, and if there's not, is there any particular reason fo which it has not been implemented?
Regards, keep up the great work!

@tikue
Copy link
Collaborator

tikue commented Oct 3, 2018

Hey, thanks for the kind words!

Right now development is focused on making the library transport-agnostic (see #199). When that work is completed, it should be possible to plug in a compressing layer.

Hopefully that work will be done in reasonably short order!

@tikue
Copy link
Collaborator

tikue commented Oct 17, 2018

@gufoe I've released the latest version of tarpc. If you're interested, I encourage you to try to build a compression transport. I can try to help get started.

Let's assume we already have a Sink + Stream that performs compression for us, using your preferred algorithm; let's say it's Gzip. It looks something like this:

struct GzipTransport<Inner> {
    inner: Inner,
    gzip: Gzip,
}
struct Compressed<T> {
    is_compressed: bool,
    payload: T,
}
impl<Inner> Sink for Compression where Inner: Sink<SinkItem=Compressed<Bytes>> {
    type SinkItem = Compressed<Bytes>;
    ...
}
impl<Inner> Stream for Compression where Inner: Stream<Item=Compressed<Bytes>> {
    type Item = Bytes;
    ...
}

First, you'll need an inner transport that implements whatever protocol you're using. Let's assume your protocol just looks like GRPC:

Length-Prefixed-Message → Compressed-Flag Message-Length Message
Compressed-Flag → 0 / 1 # encoded as 1 byte unsigned integer
Message-Length → {length of Message} # encoded as 4 byte unsigned integer
Message → *{binary octet}

Like the compression transport, you don't really care what kind of IO transport is backing this, so it can be generic over the inner IO as well:

struct CompressionProtocol<Inner>(Inner);
impl<Inner> Sink for CompressionProtocol where Inner: Sink<SinkItem=Bytes> {
    type SinkItem = Compressed<Bytes>;
    type SinkError = Inner::SinkError;

    fn start_send(
        self: PinMut<Self>, 
        item: Compressed<Bytes>,
    ) -> Result<(), Self::SinkError> {
        let payload = if item.is_compressed { self.gzip.compress(item.payload) } else { item.payload };
        let mut bytes = BytesMut::with_capacity(1 /* compression flag */ + 4 /* payload len */ + payload.len());
        bytes.put_u8(item.is_compressed as u8);
        bytes.put_u64_be(payload.len());
        bytes.put(payload);
        self.inner.start_send(bytes.freeze())
    }

    // Implement the other Sink fns ....
}

impl<Inner> Stream for CompressionProtocol where Inner: Stream<Item=Bytes> {
    type Item = Compressed<Bytes>;

    fn poll_next(
        self: PinMut<Self>, 
        waker: &LocalWaker,
    ) -> Poll<Option<Compressed<Bytes>>> {
        let packet = ready!(self.inner.poll_next(waker));
        let is_compressed = packet.get_u8() as bool;
        let len = packet.get_u64_be();
        let payload = if is_compressed { self.gzip.uncompress(packet) } else { packet };
        Poll::Ready(Some(Compressed { is_compressed, payload }))
    }
}

Then we want a tarpc::Transport that delegates to it. You'll need to choose how to serialize messages; let's use bincode for this example. Suppose there's already a BincodeTransport that allows plugging in different underlying transports (the bincode-transport crate doesn't currently allow this as it's hardcoded to TCP, but it wouldn't be hard to modify).

fn make_client_transport<Req, Resp, Inner: Stream<Item=Bytes> + Sink<SinkItem=Bytes>>(inner: Inner)
        -> impl Transport<SinkItem = ClientMessage<Req>, Item = Response<Resp>>
        where Req: serde::Serialize, Resp: serde::Deserialize
{
    let inner = CompressionProtocol(inner);
    let gzip = Gzip::new();
    let gzip_transport: impl Stream<Item=Bytes> + Sink<SinkItem = Bytes> =
        GzipTransport { inner, gzip }.with(|serialized_request: Bytes| {
            Compressed {
                // Tells the compression layer to compress this payload
                is_compressed: true,
                payload: serialized_request,
            }
        });
    BincodeTransport::new(gzip_transport)
}

@tikue
Copy link
Collaborator

tikue commented Oct 17, 2018

Note that I ignored all the details with Pin and such. I haven't tried compiling this, so treat it as pseudocode. My hope is it's enough to get you, or someone else interested in this, started. I'd love to flesh out the story here.

@tikue tikue closed this as completed in 4987094 Aug 1, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants