From 2b55fbc7469cbb72b0ad496ee4ccd4abc2ea58d9 Mon Sep 17 00:00:00 2001 From: Joe Banks Date: Sat, 18 May 2024 01:29:12 +0100 Subject: [PATCH 1/6] Add new optional :ezstd dependency --- mix.exs | 2 ++ mix.lock | 1 + 2 files changed, 3 insertions(+) diff --git a/mix.exs b/mix.exs index 198f5cb73..02fb5dddd 100644 --- a/mix.exs +++ b/mix.exs @@ -81,6 +81,7 @@ defmodule Nostrum.Mixfile do "guides/advanced/pluggable_caching.md", "guides/advanced/multi_node.md", "guides/advanced/hot_code_upgrade.md", + "guides/advanced/gateway_compression.md", "guides/cheat-sheets/api.cheatmd", "guides/cheat-sheets/qlc.cheatmd", "guides/cheat-sheets/voice.cheatmd" @@ -159,6 +160,7 @@ defmodule Nostrum.Mixfile do {:certifi, "~> 2.13"}, {:kcl, "~> 1.4"}, {:mime, "~> 1.6 or ~> 2.0"}, + {:ezstd, "~> 1.1", optional: true}, {:castle, "~> 0.3.0", runtime: false}, {:ex_doc, "~> 0.32", only: :dev, runtime: false}, {:credo, "~> 1.7.5", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index 2f0d182d5..416c9e68d 100644 --- a/mix.lock +++ b/mix.lock @@ -15,6 +15,7 @@ "equivalex": {:hex, :equivalex, "1.0.3", "170d9a82ae066e0020dfe1cf7811381669565922eb3359f6c91d7e9a1124ff74", [:mix], [], "hexpm", "46fa311adb855117d36e461b9c0ad2598f72110ad17ad73d7533c78020e045fc"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.32.1", "21e40f939515373bcdc9cffe65f3b3543f05015ac6c3d01d991874129d173420", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "5142c9db521f106d61ff33250f779807ed2a88620e472ac95dc7d59c380113da"}, + "ezstd": {:hex, :ezstd, "1.1.0", "d3b483d6acfadfb65dba4015371e6d54526dbf3d9ef0941b5add8bf5890731f4", [:rebar3], [], "hexpm", "28cfa0ed6cc3922095ad5ba0f23392a1664273358b17184baa909868361184e7"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, "forecastle": {:hex, :forecastle, "0.1.1", "89dcfaccbfffe866cbd8a4c41ade55f62f00f1b5d0528bec787b1e6631004b98", [:mix], [], "hexpm", "f6f4d297224a22ac4387d305249aed7b8b02e85b4a03e83225af4536812c4079"}, "gun": {:hex, :gun, "2.0.1", "160a9a5394800fcba41bc7e6d421295cf9a7894c2252c0678244948e3336ad73", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "a10bc8d6096b9502205022334f719cc9a08d9adcfbfc0dbee9ef31b56274a20b"}, From 220b49f497d34b00f1ccb0e765126400e330c304 Mon Sep 17 00:00:00 2001 From: Joe Banks Date: Sat, 18 May 2024 02:16:05 +0100 Subject: [PATCH 2/6] Add new gateway compression behaviour --- lib/nostrum/shard/session/compression.ex | 32 ++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 lib/nostrum/shard/session/compression.ex diff --git a/lib/nostrum/shard/session/compression.ex b/lib/nostrum/shard/session/compression.ex new file mode 100644 index 000000000..7a3e1741f --- /dev/null +++ b/lib/nostrum/shard/session/compression.ex @@ -0,0 +1,32 @@ +defmodule Nostrum.Shard.Session.Compression do + @moduledoc """ + A behaviour for compression methods supported by the Discord gateway to implement. + + See the modules nested under this behaviour for reference implementations. + + > ### Internal module {: .info} + > + > This module is intended for exclusive usage inside of nostrum, and is + > documented for completeness and people curious to look behind the covers. + """ + + @doc """ + Create a new compression context that can be passed as an argument to other + methods within the behaviour to inflate data or reset the context to a + blank state. + """ + @callback create_context() :: reference() + + @doc """ + Decompress a frame received from Discord over the gateway. Should return an + iolist of the decompressed data. + """ + @callback inflate(reference(), iodata()) :: iolist() + + @doc """ + Reset a decompression context to a blank slate, this is useful after a websocket + resume has taken place or something similar requiring the reset of the state + for a shard. + """ + @callback reset_context(reference()) :: reference() +end From 39c9baa326fab9eee3da5af00ea43f2344674427 Mon Sep 17 00:00:00 2001 From: Joe Banks Date: Sat, 18 May 2024 02:16:20 +0100 Subject: [PATCH 3/6] Add compression implementations for zlib and zstd --- lib/nostrum/shard/session/compression/zlib.ex | 32 ++++++++++ lib/nostrum/shard/session/compression/zstd.ex | 61 +++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 lib/nostrum/shard/session/compression/zlib.ex create mode 100644 lib/nostrum/shard/session/compression/zstd.ex diff --git a/lib/nostrum/shard/session/compression/zlib.ex b/lib/nostrum/shard/session/compression/zlib.ex new file mode 100644 index 000000000..184f81a9e --- /dev/null +++ b/lib/nostrum/shard/session/compression/zlib.ex @@ -0,0 +1,32 @@ +defmodule Nostrum.Shard.Session.Compression.Zlib do + @moduledoc """ + Implementation of compression methods for the `zlib` compression algorithm. + + > ### Internal module {: .info} + > + > This module is intended for exclusive usage inside of nostrum, and is + > documented for completeness and people curious to look behind the covers. + """ + + @behaviour Nostrum.Shard.Session.Compression + + @spec create_context() :: :zlib.zstream() + def create_context do + context = :zlib.open() + :zlib.inflateInit(context) + + context + end + + @spec inflate(:zlib.zstream(), iodata()) :: iolist() + def inflate(ctx, frame) do + :zlib.inflate(ctx, frame) + end + + @spec reset_context(:zlib.zstream()) :: :zlib.zstream() + def reset_context(ctx) do + :zlib.inflateReset(ctx) + + ctx + end +end diff --git a/lib/nostrum/shard/session/compression/zstd.ex b/lib/nostrum/shard/session/compression/zstd.ex new file mode 100644 index 000000000..d125df79e --- /dev/null +++ b/lib/nostrum/shard/session/compression/zstd.ex @@ -0,0 +1,61 @@ +defmodule Nostrum.Shard.Session.Compression.Zstd do + @moduledoc """ + Implementation of compression methods for the `zstd` compression algorithm. + + > ### Internal module {: .info} + > + > This module is intended for exclusive usage inside of nostrum, and is + > documented for completeness and people curious to look behind the covers. + """ + + @behaviour Nostrum.Shard.Session.Compression + + def check_available! do + if not Code.ensure_loaded?(:ezstd) do + zstd_missing() + end + end + + @spec zstd_missing() :: no_return + defp zstd_missing do + raise ArgumentError, """ + Cannot use the :zstd gateway compression option without optional dependency :ezstd. + + See https://kraigie.github.io/nostrum/gateway_compression.html for more information. + """ + end + + if Code.ensure_loaded?(:ezstd) do + @zstd_buffer_size 16_000 + + @spec create_context() :: reference() | {:error, any()} + def create_context do + :ezstd.create_decompression_context(@zstd_buffer_size) + end + + @spec inflate(reference(), iodata()) :: iolist() | {:error, any()} + def inflate(ctx, frame) do + :ezstd.decompress_streaming(ctx, frame) + end + + @spec reset_context(reference()) :: reference() + def reset_context(_ctx) do + create_context() + end + else + @spec create_context() :: reference() | {:error, any()} + def create_context do + zstd_missing() + end + + @spec inflate(reference(), iodata()) :: iodata() | {:error, any()} + def inflate(_ctx, _frame) do + zstd_missing() + end + + @spec reset_context(reference()) :: reference() + def reset_context(_ctx) do + zstd_missing() + end + end +end From 1f233146e666ee14eff41cb68b309d6e8a95e80d Mon Sep 17 00:00:00 2001 From: Joe Banks Date: Sat, 18 May 2024 01:29:26 +0100 Subject: [PATCH 4/6] Add support for zstd-stream compression This adds support for zstd-stream compression through configuration of the :gateway_compression configuration option. This will optionally use the :ezstd library to decompress received payloads from Discord allowing for some potential savings on payload size. This also moves the ws_state to be agnostic and references the compression context as `compress_ctx` instead of the previous `zlib_ctx`. --- lib/nostrum/shard/session.ex | 60 +++++++++++++++++++++++++++------- lib/nostrum/struct/ws_state.ex | 8 ++--- 2 files changed, 52 insertions(+), 16 deletions(-) diff --git a/lib/nostrum/shard/session.ex b/lib/nostrum/shard/session.ex index 3af892bde..6e3de6243 100644 --- a/lib/nostrum/shard/session.ex +++ b/lib/nostrum/shard/session.ex @@ -46,12 +46,33 @@ defmodule Nostrum.Shard.Session do alias Nostrum.Shard.{Connector, Event, Payload} alias Nostrum.Struct.WSState + alias Nostrum.Shard.Session.Compression + require Logger @behaviour :gen_statem # Query string to connect to when upgrading the connection. - @gateway_qs "/?compress=zlib-stream&encoding=etf&v=10" + @gateway_qs "/?encoding=etf&v=10" + + @gateway_compress Application.compile_env( + :nostrum, + :gateway_compression, + :zlib + ) + + @compression_module (case @gateway_compress do + :zlib -> + Compression.Zlib + + :zstd -> + Compression.Zstd.check_available!() + Compression.Zstd + + _ -> + raise ArgumentError, + "Unsupported compression type: #{@gateway_compress}" + end) # Maximum time the initial connection may take. @timeout_connect :timer.seconds(5) @@ -190,34 +211,44 @@ defmodule Nostrum.Shard.Session do # end def connecting_ws(:enter, _from, %{conn: conn} = data) do - Logger.debug("Upgrading connection to websocket") + Logger.debug("Upgrading connection to websocket with #{@gateway_compress} compression") set_timeout = {:state_timeout, @timeout_ws_upgrade, :upgrade_timeout} - stream = :gun.ws_upgrade(conn, @gateway_qs, [], %{flow: @standard_flow}) + stream = :gun.ws_upgrade(conn, @gateway_qs <> compression_qs(), [], %{flow: @standard_flow}) {:keep_state, %{data | stream: stream}, set_timeout} end def connecting_ws( :info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, - %{zlib_ctx: nil} = data + %{compress_ctx: nil} = data ) do - zlib_context = :zlib.open() - :zlib.inflateInit(zlib_context) + context = @compression_module.create_context() {:next_state, :connected, - %{data | zlib_ctx: zlib_context, last_heartbeat_ack: DateTime.utc_now(), heartbeat_ack: true}} + %{ + data + | compress_ctx: context, + last_heartbeat_ack: DateTime.utc_now(), + heartbeat_ack: true + }} end def connecting_ws( :info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, - %{zlib_ctx: zlib_ctx} = data + %{compress_ctx: compress_ctx} = data ) do Logger.info("Re-established websocket connection") - :ok = :zlib.inflateReset(zlib_ctx) + + compress_ctx = @compression_module.reset_context(compress_ctx) {:next_state, :connected, - %{data | last_heartbeat_ack: DateTime.utc_now(), heartbeat_ack: true}} + %{ + data + | last_heartbeat_ack: DateTime.utc_now(), + heartbeat_ack: true, + compress_ctx: compress_ctx + }} end def connecting_ws(:state_timeout, :upgrade_timeout, _data) do @@ -248,9 +279,10 @@ defmodule Nostrum.Shard.Session do end def connected(:info, {:gun_ws, _worker, stream, {:binary, frame}}, data) do + payload_decompressed = @compression_module.inflate(data.compress_ctx, frame) + payload = - data.zlib_ctx - |> :zlib.inflate(frame) + payload_decompressed |> :erlang.iolist_to_binary() |> :erlang.binary_to_term() @@ -375,4 +407,8 @@ defmodule Nostrum.Shard.Session do :timeout end end + + defp compression_qs do + "&compress=" <> Atom.to_string(@gateway_compress) <> "-stream" + end end diff --git a/lib/nostrum/struct/ws_state.ex b/lib/nostrum/struct/ws_state.ex index ead4bc5cf..93c889de9 100644 --- a/lib/nostrum/struct/ws_state.ex +++ b/lib/nostrum/struct/ws_state.ex @@ -17,7 +17,7 @@ defmodule Nostrum.Struct.WSState do :last_heartbeat_ack, :heartbeat_ack, :heartbeat_interval, - :zlib_ctx + :compress_ctx ] @typedoc "The shard number" @@ -73,8 +73,8 @@ defmodule Nostrum.Struct.WSState do @typedoc "Interval at which heartbeats are sent" @type heartbeat_interval :: pos_integer() | nil - @typedoc "Reference to the current zlib context" - @type zlib_ctx :: reference | nil + @typedoc "Reference to the current compression context" + @type compress_ctx :: reference | nil @type t :: %__MODULE__{ shard_num: shard_num, @@ -90,6 +90,6 @@ defmodule Nostrum.Struct.WSState do last_heartbeat_ack: last_heartbeat_ack, heartbeat_ack: heartbeat_ack, heartbeat_interval: heartbeat_interval, - zlib_ctx: zlib_ctx + compress_ctx: compress_ctx } end From 5214ecf5726f4cdf34a8743e001199755a00ac37 Mon Sep 17 00:00:00 2001 From: Joe Banks Date: Sat, 18 May 2024 01:30:58 +0100 Subject: [PATCH 5/6] Document gateway compression configuration option --- guides/intro/intro.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/guides/intro/intro.md b/guides/intro/intro.md index e6d121b29..9e53e4651 100644 --- a/guides/intro/intro.md +++ b/guides/intro/intro.md @@ -52,6 +52,9 @@ Apart from the `token` field mentioned above, the following fields are also supp of members for all guilds at startup. Depending on your [cache backend](../advanced/pluggable_caching.md), this may increase startup time and memory usage by quite a bit. Defaults to `false`. +- `gateway_compression` - use either `:zlib` (default) or `:zstd` for compression + of messages from the Discord gateway. See the documentation on + [Gateway Compression](../advanced/gateway_compression.md) for more information. ### Voice-specific From 4e5672b9e19951228478daf0583bb4e170f5df0d Mon Sep 17 00:00:00 2001 From: Joe Banks Date: Sat, 18 May 2024 01:31:21 +0100 Subject: [PATCH 6/6] Add extended documentation on gateway compression --- guides/advanced/gateway_compression.md | 50 ++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 guides/advanced/gateway_compression.md diff --git a/guides/advanced/gateway_compression.md b/guides/advanced/gateway_compression.md new file mode 100644 index 000000000..67100981e --- /dev/null +++ b/guides/advanced/gateway_compression.md @@ -0,0 +1,50 @@ +# Gateway Compression + +Nostrum supports either the `zlib-stream` or `zstd-stream` gateway compression +methods, as documented +[here](https://discord.com/developers/docs/topics/gateway#encoding-and-compression) + +Most users are fine to leave the `gateway_compression` configuration option set +to `:zlib` (default), but users looking for a potential reduction in payload +sizes from the Discord gateway can optionally set `:zstd` here. + +## Using `:zstd` compression + +Using `:zstd` depends on the [`:ezstd`](https://hex.pm/packages/ezstd) library, +so you will have to add this dependency to your `mix.exs` file: + +```elixir + defp deps do + [ + {:nostrum, ...}, + {:ezstd, "~> 1.1"} # new dependency + ] + end +``` + + +> #### `:ezstd` NIFs {: .info} +> +> Some functionality of `:ezstd` depends on Erlang NIFs (Natively Implemented +> Functions). This means that a proper compiler installation as well as other +> build tools like `git` may be necessary at the stage where you compile your +> dependencies. +> +> It may be useful to run `mix deps.compile` in any build systems to ensure that +> your application does not need build utilities in the built application image. + +Once you have this additional dependency installed in your project, set the +`:nostrum`, `:gateway_compression` configuration option to `:zstd` and Nostrum +should pick up on it. + +You will need to run `mix deps.get` and `mix deps.compile` to install and +compile the new `:ezstd` dependency. + +> #### Nostrum detection of `:ezstd` {: .tip} +> +> Since the check for `:ezstd` takes place when you compile Nostrum, you might +> need to run `mix deps.compile --force nostrum` to ensure that Nostrum is +> recompiled and recognises the newly installed `:ezstd` dependency. +> +> Not doing this may mean that your compiled Nostrum version is still using +> dummy handlers that will error out even when `:ezstd` is installed.