Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support zstd-stream gateway compression #598

Merged
merged 6 commits into from
May 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions guides/advanced/gateway_compression.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Gateway Compression

Nostrum supports either the `zlib-stream` or `zstd-stream` gateway compression
methods, as documented
[here](https://discord.com/developers/docs/topics/gateway#encoding-and-compression)

Most users are fine to leave the `gateway_compression` configuration option set
to `:zlib` (default), but users looking for a potential reduction in payload
sizes from the Discord gateway can optionally set `:zstd` here.

## Using `:zstd` compression

Using `:zstd` depends on the [`:ezstd`](https://hex.pm/packages/ezstd) library,
so you will have to add this dependency to your `mix.exs` file:

```elixir
defp deps do
[
{:nostrum, ...},
{:ezstd, "~> 1.1"} # new dependency
]
end
```


> #### `:ezstd` NIFs {: .info}
>
> Some functionality of `:ezstd` depends on Erlang NIFs (Natively Implemented
> Functions). This means that a proper compiler installation as well as other
> build tools like `git` may be necessary at the stage where you compile your
> dependencies.
>
> It may be useful to run `mix deps.compile` in any build systems to ensure that
> your application does not need build utilities in the built application image.
jchristgit marked this conversation as resolved.
Show resolved Hide resolved

Once you have this additional dependency installed in your project, set the
`:nostrum`, `:gateway_compression` configuration option to `:zstd` and Nostrum
should pick up on it.

You will need to run `mix deps.get` and `mix deps.compile` to install and
compile the new `:ezstd` dependency.

> #### Nostrum detection of `:ezstd` {: .tip}
>
> Since the check for `:ezstd` takes place when you compile Nostrum, you might
> need to run `mix deps.compile --force nostrum` to ensure that Nostrum is
> recompiled and recognises the newly installed `:ezstd` dependency.
>
> Not doing this may mean that your compiled Nostrum version is still using
> dummy handlers that will error out even when `:ezstd` is installed.
3 changes: 3 additions & 0 deletions guides/intro/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Apart from the `token` field mentioned above, the following fields are also supp
of members for all guilds at startup. Depending on your [cache
backend](../advanced/pluggable_caching.md), this may increase startup time
and memory usage by quite a bit. Defaults to `false`.
- `gateway_compression` - use either `:zlib` (default) or `:zstd` for compression
of messages from the Discord gateway. See the documentation on
[Gateway Compression](../advanced/gateway_compression.md) for more information.


### Voice-specific
Expand Down
60 changes: 48 additions & 12 deletions lib/nostrum/shard/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,33 @@ defmodule Nostrum.Shard.Session do
alias Nostrum.Shard.{Connector, Event, Payload}
alias Nostrum.Struct.WSState

alias Nostrum.Shard.Session.Compression

require Logger

@behaviour :gen_statem

# Query string to connect to when upgrading the connection.
@gateway_qs "/?compress=zlib-stream&encoding=etf&v=10"
@gateway_qs "/?encoding=etf&v=10"

@gateway_compress Application.compile_env(
:nostrum,
:gateway_compression,
:zlib
)

@compression_module (case @gateway_compress do
:zlib ->
Compression.Zlib

:zstd ->
Compression.Zstd.check_available!()
Compression.Zstd

_ ->
raise ArgumentError,
"Unsupported compression type: #{@gateway_compress}"
end)

# Maximum time the initial connection may take.
@timeout_connect :timer.seconds(5)
Expand Down Expand Up @@ -190,34 +211,44 @@ defmodule Nostrum.Shard.Session do
# end

def connecting_ws(:enter, _from, %{conn: conn} = data) do
Logger.debug("Upgrading connection to websocket")
Logger.debug("Upgrading connection to websocket with #{@gateway_compress} compression")
set_timeout = {:state_timeout, @timeout_ws_upgrade, :upgrade_timeout}
stream = :gun.ws_upgrade(conn, @gateway_qs, [], %{flow: @standard_flow})
stream = :gun.ws_upgrade(conn, @gateway_qs <> compression_qs(), [], %{flow: @standard_flow})
{:keep_state, %{data | stream: stream}, set_timeout}
end

def connecting_ws(
:info,
{:gun_upgrade, _conn, _stream, ["websocket"], _headers},
%{zlib_ctx: nil} = data
%{compress_ctx: nil} = data
) do
zlib_context = :zlib.open()
:zlib.inflateInit(zlib_context)
context = @compression_module.create_context()

{:next_state, :connected,
%{data | zlib_ctx: zlib_context, last_heartbeat_ack: DateTime.utc_now(), heartbeat_ack: true}}
%{
data
| compress_ctx: context,
last_heartbeat_ack: DateTime.utc_now(),
heartbeat_ack: true
}}
end

def connecting_ws(
:info,
{:gun_upgrade, _conn, _stream, ["websocket"], _headers},
%{zlib_ctx: zlib_ctx} = data
%{compress_ctx: compress_ctx} = data
) do
Logger.info("Re-established websocket connection")
:ok = :zlib.inflateReset(zlib_ctx)

compress_ctx = @compression_module.reset_context(compress_ctx)

{:next_state, :connected,
%{data | last_heartbeat_ack: DateTime.utc_now(), heartbeat_ack: true}}
%{
data
| last_heartbeat_ack: DateTime.utc_now(),
heartbeat_ack: true,
compress_ctx: compress_ctx
}}
end

def connecting_ws(:state_timeout, :upgrade_timeout, _data) do
Expand Down Expand Up @@ -248,9 +279,10 @@ defmodule Nostrum.Shard.Session do
end

def connected(:info, {:gun_ws, _worker, stream, {:binary, frame}}, data) do
payload_decompressed = @compression_module.inflate(data.compress_ctx, frame)

payload =
data.zlib_ctx
|> :zlib.inflate(frame)
payload_decompressed
|> :erlang.iolist_to_binary()
|> :erlang.binary_to_term()

Expand Down Expand Up @@ -375,4 +407,8 @@ defmodule Nostrum.Shard.Session do
:timeout
end
end

defp compression_qs do
"&compress=" <> Atom.to_string(@gateway_compress) <> "-stream"
end
end
32 changes: 32 additions & 0 deletions lib/nostrum/shard/session/compression.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Nostrum.Shard.Session.Compression do
@moduledoc """
A behaviour for compression methods supported by the Discord gateway to implement.

See the modules nested under this behaviour for reference implementations.

> ### Internal module {: .info}
>
> This module is intended for exclusive usage inside of nostrum, and is
> documented for completeness and people curious to look behind the covers.
jchristgit marked this conversation as resolved.
Show resolved Hide resolved
"""

@doc """
Create a new compression context that can be passed as an argument to other
methods within the behaviour to inflate data or reset the context to a
blank state.
"""
@callback create_context() :: reference()

@doc """
Decompress a frame received from Discord over the gateway. Should return an
iolist of the decompressed data.
"""
@callback inflate(reference(), iodata()) :: iolist()

@doc """
Reset a decompression context to a blank slate, this is useful after a websocket
resume has taken place or something similar requiring the reset of the state
for a shard.
"""
@callback reset_context(reference()) :: reference()
end
32 changes: 32 additions & 0 deletions lib/nostrum/shard/session/compression/zlib.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Nostrum.Shard.Session.Compression.Zlib do
@moduledoc """
Implementation of compression methods for the `zlib` compression algorithm.

> ### Internal module {: .info}
>
> This module is intended for exclusive usage inside of nostrum, and is
> documented for completeness and people curious to look behind the covers.
"""

@behaviour Nostrum.Shard.Session.Compression

@spec create_context() :: :zlib.zstream()
def create_context do
context = :zlib.open()
:zlib.inflateInit(context)

context
end

@spec inflate(:zlib.zstream(), iodata()) :: iolist()
def inflate(ctx, frame) do
:zlib.inflate(ctx, frame)
end

@spec reset_context(:zlib.zstream()) :: :zlib.zstream()
def reset_context(ctx) do
:zlib.inflateReset(ctx)

ctx
end
end
61 changes: 61 additions & 0 deletions lib/nostrum/shard/session/compression/zstd.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule Nostrum.Shard.Session.Compression.Zstd do
@moduledoc """
Implementation of compression methods for the `zstd` compression algorithm.

> ### Internal module {: .info}
>
> This module is intended for exclusive usage inside of nostrum, and is
> documented for completeness and people curious to look behind the covers.
"""

@behaviour Nostrum.Shard.Session.Compression

def check_available! do
if not Code.ensure_loaded?(:ezstd) do
zstd_missing()
end
end

@spec zstd_missing() :: no_return
defp zstd_missing do
raise ArgumentError, """
Cannot use the :zstd gateway compression option without optional dependency :ezstd.

See https://kraigie.github.io/nostrum/gateway_compression.html for more information.
"""
end

if Code.ensure_loaded?(:ezstd) do
@zstd_buffer_size 16_000

@spec create_context() :: reference() | {:error, any()}
def create_context do
:ezstd.create_decompression_context(@zstd_buffer_size)
end

@spec inflate(reference(), iodata()) :: iolist() | {:error, any()}
def inflate(ctx, frame) do
:ezstd.decompress_streaming(ctx, frame)
end

@spec reset_context(reference()) :: reference()
def reset_context(_ctx) do
create_context()
end
jchristgit marked this conversation as resolved.
Show resolved Hide resolved
else
@spec create_context() :: reference() | {:error, any()}
def create_context do
zstd_missing()
end

@spec inflate(reference(), iodata()) :: iodata() | {:error, any()}
def inflate(_ctx, _frame) do
zstd_missing()
end

@spec reset_context(reference()) :: reference()
def reset_context(_ctx) do
zstd_missing()
end
end
end
8 changes: 4 additions & 4 deletions lib/nostrum/struct/ws_state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Nostrum.Struct.WSState do
:last_heartbeat_ack,
:heartbeat_ack,
:heartbeat_interval,
:zlib_ctx
:compress_ctx
]

@typedoc "The shard number"
Expand Down Expand Up @@ -73,8 +73,8 @@ defmodule Nostrum.Struct.WSState do
@typedoc "Interval at which heartbeats are sent"
@type heartbeat_interval :: pos_integer() | nil

@typedoc "Reference to the current zlib context"
@type zlib_ctx :: reference | nil
@typedoc "Reference to the current compression context"
@type compress_ctx :: reference | nil

@type t :: %__MODULE__{
shard_num: shard_num,
Expand All @@ -90,6 +90,6 @@ defmodule Nostrum.Struct.WSState do
last_heartbeat_ack: last_heartbeat_ack,
heartbeat_ack: heartbeat_ack,
heartbeat_interval: heartbeat_interval,
zlib_ctx: zlib_ctx
compress_ctx: compress_ctx
}
end
2 changes: 2 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ defmodule Nostrum.Mixfile do
"guides/advanced/pluggable_caching.md",
"guides/advanced/multi_node.md",
"guides/advanced/hot_code_upgrade.md",
"guides/advanced/gateway_compression.md",
"guides/cheat-sheets/api.cheatmd",
"guides/cheat-sheets/qlc.cheatmd",
"guides/cheat-sheets/voice.cheatmd"
Expand Down Expand Up @@ -159,6 +160,7 @@ defmodule Nostrum.Mixfile do
{:certifi, "~> 2.13"},
{:kcl, "~> 1.4"},
{:mime, "~> 1.6 or ~> 2.0"},
{:ezstd, "~> 1.1", optional: true},
{:castle, "~> 0.3.0", runtime: false},
{:ex_doc, "~> 0.32", only: :dev, runtime: false},
{:credo, "~> 1.7.5", only: [:dev, :test], runtime: false},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"equivalex": {:hex, :equivalex, "1.0.3", "170d9a82ae066e0020dfe1cf7811381669565922eb3359f6c91d7e9a1124ff74", [:mix], [], "hexpm", "46fa311adb855117d36e461b9c0ad2598f72110ad17ad73d7533c78020e045fc"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.32.1", "21e40f939515373bcdc9cffe65f3b3543f05015ac6c3d01d991874129d173420", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "5142c9db521f106d61ff33250f779807ed2a88620e472ac95dc7d59c380113da"},
"ezstd": {:hex, :ezstd, "1.1.0", "d3b483d6acfadfb65dba4015371e6d54526dbf3d9ef0941b5add8bf5890731f4", [:rebar3], [], "hexpm", "28cfa0ed6cc3922095ad5ba0f23392a1664273358b17184baa909868361184e7"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"forecastle": {:hex, :forecastle, "0.1.1", "89dcfaccbfffe866cbd8a4c41ade55f62f00f1b5d0528bec787b1e6631004b98", [:mix], [], "hexpm", "f6f4d297224a22ac4387d305249aed7b8b02e85b4a03e83225af4536812c4079"},
"gun": {:hex, :gun, "2.0.1", "160a9a5394800fcba41bc7e6d421295cf9a7894c2252c0678244948e3336ad73", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "a10bc8d6096b9502205022334f719cc9a08d9adcfbfc0dbee9ef31b56274a20b"},
Expand Down