Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Small code improvement on Postgres Replication #1157

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions lib/extensions/postgres/adapters/postgres/protocol.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule Realtime.Adapters.Postgres.Protocol do
alias Realtime.Adapters.Postgres.Protocol.Write
alias Realtime.Adapters.Postgres.Protocol.KeepAlive

defguard is_write(value) when binary_part(value, 0, 1) == <<?w>>
defguard is_keep_alive(value) when binary_part(value, 0, 1) == <<?k>>

def parse(
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64, message::binary>>
) do
%Write{
server_wal_start: server_wal_start,
server_wal_end: server_wal_end,
server_system_clock: server_system_clock,
message: message
}
end

def parse(<<?k, wal_end::64, clock::64, reply::8>>) do
reply =
case reply do
0 -> :later
1 -> :now
end

%KeepAlive{wal_end: wal_end, clock: clock, reply: reply}
end

@doc """
Message to send to the server to request a standby status update.

Check https://www.postgresql.org/docs/current/protocol-replication.html#PROTOCOL-REPLICATION-STANDBY-STATUS-UPDATE for more information
"""
@spec standby_status(integer(), integer(), integer(), :now | :later, integer() | nil) :: [
binary()
]
def standby_status(last_wal_received, last_wal_flushed, last_wal_applied, reply, clock \\ nil)

def standby_status(last_wal_received, last_wal_flushed, last_wal_applied, reply, nil) do
standby_status(last_wal_received, last_wal_flushed, last_wal_applied, reply, current_time())
end

def standby_status(last_wal_received, last_wal_flushed, last_wal_applied, reply, clock) do
reply =
case reply do
:now -> 1
:later -> 0
end

[
<<?r, last_wal_received::64, last_wal_flushed::64, last_wal_applied::64, clock::64,
reply::8>>
]
end

@doc """
Message to send the server to not do any operation since the server can wait
"""
def hold(), do: []

@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
def current_time(), do: System.os_time(:microsecond) - @epoch
end
24 changes: 24 additions & 0 deletions lib/extensions/postgres/adapters/postgres/protocol/keep_alive.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Realtime.Adapters.Postgres.Protocol.KeepAlive do
@moduledoc """
Primary keepalive message (B)
Byte1('k')
Identifies the message as a sender keepalive.

Int64
The current end of WAL on the server.

Int64
The server's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.

Byte1
1 means that the client should reply to this message as soon as possible, to avoid a timeout disconnect. 0 otherwise.

The receiving process can send replies back to the sender at any time, using one of the following message formats (also in the payload of a CopyData message):
"""
@type t :: %__MODULE__{
wal_end: integer(),
clock: integer(),
reply: :now | :await
}
defstruct [:wal_end, :clock, :reply]
end
22 changes: 22 additions & 0 deletions lib/extensions/postgres/adapters/postgres/protocol/write.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Realtime.Adapters.Postgres.Protocol.Write do
@moduledoc """
XLogData (B)
Byte1('w')
Identifies the message as WAL data.

Int64
The starting point of the WAL data in this message.

Int64
The current end of WAL on the server.

Int64
The server's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.

Byten
A section of the WAL data stream.

A single WAL record is never split across two XLogData messages. When a WAL record crosses a WAL page boundary, and is therefore already split using continuation records, it can be split at the page boundary. In other words, the first main WAL record and its continuation records can be sent in different XLogData messages.
"""
defstruct [:server_wal_start, :server_wal_end, :server_system_clock, :message]
end
22 changes: 13 additions & 9 deletions lib/extensions/postgres_cdc_stream/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ defmodule Extensions.PostgresCdcStream.Replication do
require Logger

import Realtime.Helpers, only: [log_error: 2]
import Realtime.Adapters.Postgres.Protocol

alias Extensions.PostgresCdcStream

alias Realtime.Adapters.Changes.DeletedRecord
alias Realtime.Adapters.Changes.NewRecord
alias Realtime.Adapters.Changes.UpdatedRecord
alias Realtime.Adapters.Postgres.Decoder
alias Realtime.Adapters.Postgres.Protocol.KeepAlive
alias Realtime.Adapters.Postgres.Protocol.Write
alias Realtime.Crypto
alias Realtime.Database

Expand Down Expand Up @@ -74,21 +77,25 @@ defmodule Extensions.PostgresCdcStream.Replication do
end

@impl true
def handle_data(<<?w, _header::192, msg::binary>>, state) do
def handle_data(data, state) when is_write(data) do
%Write{message: message} = parse(data)

new_state =
msg
message
|> Decoder.decode_message()
|> process_message(state)

{:noreply, new_state}
end

# keepalive
def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do
def handle_data(data, state) when is_keep_alive(data) do
%KeepAlive{reply: reply, wal_end: wal_end} = parse(data)
wal_end = wal_end + 1

messages =
case reply do
1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]
0 -> []
:now -> standby_status(wal_end, wal_end, wal_end, :now)
:later -> hold()
end

{:noreply, messages, state}
Expand Down Expand Up @@ -260,9 +267,6 @@ defmodule Extensions.PostgresCdcStream.Replication do
record
end

@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
defp current_time(), do: System.os_time(:microsecond) - @epoch

def connection_opts(args) do
{host, port, name, user, pass} =
Crypto.decrypt_creds(
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.32.10",
version: "2.32.11",
elixir: "~> 1.16.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading