Skip to content
Merged
5 changes: 5 additions & 0 deletions .changeset/two-singers-repeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Fix the issue where transactions that had exactly max_batch_size changes weren't written to the shape log.
2 changes: 2 additions & 0 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ config :electric,
env!("ELECTRIC_TEMPORARY_REPLICATION_SLOT_USE_RANDOM_NAME", :boolean, nil),
# The ELECTRIC_EXPERIMENTAL_MAX_TXN_SIZE is undocumented and will be removed in future versions.
max_txn_size: env!("ELECTRIC_EXPERIMENTAL_MAX_TXN_SIZE", :integer, nil),
# The ELECTRIC_EXPERIMENTAL_MAX_BATCH_SIZE is undocumented and used for testing only.
max_batch_size: env!("ELECTRIC_EXPERIMENTAL_MAX_BATCH_SIZE", :integer, nil),
service_port: env!("ELECTRIC_PORT", :integer, nil),
shape_hibernate_after: shape_hibernate_after,
shape_enable_suspend?: shape_enable_suspend?,
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ defmodule Electric.Application do
slot_name: slot_name,
slot_temporary?: get_env(opts, :replication_slot_temporary?),
max_txn_size: get_env(opts, :max_txn_size),
max_batch_size: get_env(opts, :max_batch_size),
replication_idle_timeout: replication_idle_timeout
],
pool_opts:
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/lib/electric/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ defmodule Electric.Config do
replication_slot_temporary?: false,
replication_slot_temporary_random_name?: false,
max_txn_size: 250 * 1024 * 1024,
max_batch_size: 100,
# Scaling down on idle is disabled by default
replication_idle_timeout: 0,
manual_table_publishing?: false,
Expand Down
21 changes: 10 additions & 11 deletions packages/sync-service/lib/electric/postgres/replication_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,24 @@ defmodule Electric.Postgres.ReplicationClient do
# we can handle, above which we would exit as we run the risk of running
# out of memmory.
# TODO: stream out transactions and collect on disk to avoid this
max_txn_size: [type: {:or, [:non_neg_integer, nil]}, default: nil]
max_txn_size: [type: {:or, [:non_neg_integer, nil]}, default: nil],
# Maximum number of changes to buffer before flushing a transaction fragment.
# Smaller values result in more message passing overhead but lower memory usage.
# The minimum allowed value is 2.
max_batch_size: [type: :non_neg_integer, default: 100]
)

# Making the batch size small results in more message passing which
# can have a performance impact. The larger the batch size the more memory
# is used to hold the operations in memory before sending them off to be processed.
# For local testing batch sizes of 3 and above overcome the performance hit of message
# passing, but as we're not currently worried about the memory consumption of the
# replication client and don't want to risk any performance degradation in production
# it has been set arbitrarily high to 100. We can tune this figure later if needed.
@max_change_batch_size 100

@spec new(Access.t()) :: t()
def new(opts) do
opts = NimbleOptions.validate!(opts, @opts_schema)
settings = [display_settings: Electric.Postgres.display_settings()]
opts = settings ++ opts

{max_txn_size, opts} = Keyword.pop!(opts, :max_txn_size)
{max_batch_size, opts} = Keyword.pop!(opts, :max_batch_size)

# Assert the implicit requirement
true = max_batch_size >= 2

struct!(
__MODULE__,
Expand All @@ -130,7 +129,7 @@ defmodule Electric.Postgres.ReplicationClient do
message_converter:
MessageConverter.new(
max_tx_size: max_txn_size,
max_batch_size: @max_change_batch_size
max_batch_size: max_batch_size
)
]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverter do
tx_size: 0,
max_tx_size: nil,
max_batch_size: nil,
last_log_offset: nil,
txn_fragment: nil,
current_xid: nil
txn_fragment: nil

@type t() :: %__MODULE__{
relations: %{optional(LR.relation_id()) => LR.Relation.t()},
Expand All @@ -45,9 +43,7 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverter do
tx_size: non_neg_integer(),
max_tx_size: non_neg_integer() | nil,
max_batch_size: non_neg_integer(),
last_log_offset: LogOffset.t() | nil,
txn_fragment: TransactionFragment.t() | nil,
current_xid: Electric.Replication.Changes.xid() | nil
txn_fragment: TransactionFragment.t() | nil
}

def new(opts \\ []) do
Expand Down Expand Up @@ -83,8 +79,6 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverter do
| tx_op_index: 0,
tx_size: 0,
tx_change_count: 0,
last_log_offset: nil,
current_xid: msg.xid,
txn_fragment: %TransactionFragment{
xid: msg.xid,
lsn: msg.final_lsn,
Expand Down Expand Up @@ -227,22 +221,16 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverter do
txn_change_count: state.tx_change_count
}

last_log_offset = state.last_log_offset || LogOffset.new(Lsn.to_integer(fragment.lsn), 0)
returned_txn_fragment =
%{fragment | commit: commit}
|> finalize_txn_fragment()

{:ok,
%{
fragment
| commit: commit,
last_log_offset: last_log_offset,
changes: Enum.reverse(fragment.changes)
},
{:ok, returned_txn_fragment,
%{
state
| tx_op_index: nil,
tx_size: 0,
tx_change_count: 0,
last_log_offset: nil,
current_xid: nil,
txn_fragment: nil
}}
end
Expand Down Expand Up @@ -276,8 +264,6 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverter do
state
| tx_size: state.tx_size + bytes,
tx_change_count: state.tx_change_count + 1,
last_log_offset: current_offset(state),

# We're adding 2 to the op index because it's possible we're splitting some of the operations before storage.
# This gives us headroom for splitting any operation into 2.
tx_op_index: state.tx_op_index + 2
Expand Down Expand Up @@ -312,13 +298,47 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverter do
} = state
)
when change_count >= max_batch_size do
{:ok,
%{
fragment
| last_log_offset: state.last_log_offset,
changes: Enum.reverse(fragment.changes)
}, %{state | txn_fragment: %TransactionFragment{xid: state.current_xid, lsn: fragment.lsn}}}
# Keep the most recent change in the state so that, if the next message is Commit, the last
# txn fragment has at least one change.
#
# Before this safeguard got introduced, it was possible to observe a scenario where a txn
# fragment was returned due to reaching the max_batch_size but the next message was Commit
# and, as a result, a txn fragment with an empty list of changes and the same
# last_log_offset as the preceding fragment would be returned. This last fragment would then get
# skipped by ShapeLogCollector (due to the already seen offset) and the shape consumer
# process would never see the Commit change for the transaction.

[last_change | fragment_changes] = fragment.changes

returned_txn_fragment =
%{fragment | changes: fragment_changes, change_count: fragment.change_count - 1}
|> finalize_txn_fragment()

state =
%{
state
| txn_fragment: %TransactionFragment{
xid: fragment.xid,
lsn: fragment.lsn
}
}
|> add_change(last_change)
|> add_affected_relation(last_change.relation)

{:ok, returned_txn_fragment, state}
end

defp maybe_flush(state), do: {:buffering, state}

# Empty transaction
defp finalize_txn_fragment(%TransactionFragment{changes: []} = fragment) do
%{fragment | last_log_offset: LogOffset.new(Lsn.to_integer(fragment.lsn), 0)}
end

# Changes are accumulated in reverse order, so hd(changes) is the most recent one.
# We use its log_offset to populate the fragment's last_log_offset.
defp finalize_txn_fragment(%TransactionFragment{changes: changes} = fragment) do
[%{log_offset: last_log_offset} | _] = changes
%{fragment | last_log_offset: last_log_offset, changes: Enum.reverse(changes)}
end
end
4 changes: 4 additions & 0 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ defmodule Electric.StackSupervisor do
slot_temporary?: [type: :boolean, default: false],
try_creating_publication?: [type: :boolean, default: true],
max_txn_size: [type: {:or, [:non_neg_integer, nil]}, default: nil],
max_batch_size: [
type: :non_neg_integer,
default: Electric.Config.default(:max_batch_size)
],
replication_idle_timeout: [
type: :non_neg_integer,
default: Electric.Config.default(:replication_idle_timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,15 +498,15 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverterTest do
{:buffering, converter} = MessageConverter.convert(insert_msg, converter)
{:buffering, converter} = MessageConverter.convert(insert_msg, converter)

# Third change triggers flush (3 inserts = max_batch_size)
# Third change triggers flush (3 inserts = max_batch_size). All but the last changes are included in the txn fragment.
assert {:ok,
%TransactionFragment{
xid: 456,
lsn: @test_lsn,
last_log_offset: %LogOffset{tx_offset: 123, op_offset: 4},
last_log_offset: %LogOffset{tx_offset: 123, op_offset: 2},
has_begin?: true,
commit: nil,
changes: [%NewRecord{}, %NewRecord{}, %NewRecord{}],
changes: [%NewRecord{}, %NewRecord{}],
affected_relations: affected
}, converter} = MessageConverter.convert(insert_msg, converter)

Expand All @@ -523,7 +523,8 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverterTest do
last_log_offset: %LogOffset{tx_offset: 123, op_offset: 6},
has_begin?: false,
commit: %Commit{},
changes: [%NewRecord{}]
changes: [%NewRecord{}, %NewRecord{}],
affected_relations: affected
}, _converter} =
MessageConverter.convert(
%LR.Commit{
Expand All @@ -533,6 +534,8 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverterTest do
},
converter
)

assert MapSet.equal?(affected, MapSet.new([{"public", "users"}]))
end

test "maintains correct log offsets across batches", %{converter: _converter} do
Expand All @@ -558,8 +561,7 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverterTest do
commit: nil,
changes: [
%NewRecord{log_offset: %LogOffset{tx_offset: 123, op_offset: 0}},
%NewRecord{log_offset: %LogOffset{tx_offset: 123, op_offset: 2}},
%NewRecord{log_offset: %LogOffset{tx_offset: 123, op_offset: 4}}
%NewRecord{log_offset: %LogOffset{tx_offset: 123, op_offset: 2}}
]
}, converter} = MessageConverter.convert(insert_msg, converter)

Expand All @@ -572,6 +574,7 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverterTest do
has_begin?: false,
commit: %Commit{},
changes: [
%NewRecord{log_offset: %LogOffset{tx_offset: 123, op_offset: 4}},
%NewRecord{log_offset: %LogOffset{tx_offset: 123, op_offset: 6}}
]
}, _converter} =
Expand All @@ -585,6 +588,66 @@ defmodule Electric.Postgres.ReplicationClient.MessageConverterTest do
)
end

test "commit fragment has greater offset than the preceding fragment when txn has exactly max_batch_size changes",
%{converter: _converter} do
# Use a small batch size for this test
converter = MessageConverter.new(max_batch_size: 3)
{:ok, %Relation{}, converter} = MessageConverter.convert(@relation, converter)

{:buffering, converter} =
MessageConverter.convert(
%LR.Begin{final_lsn: @test_lsn, commit_timestamp: DateTime.utc_now(), xid: 456},
converter
)

insert_msg = %LR.Insert{relation_id: 1, tuple_data: ["123"], bytes: 3}

# Insert exactly 3 changes (= max_batch_size)
{:buffering, converter} = MessageConverter.convert(insert_msg, converter)
{:buffering, converter} = MessageConverter.convert(insert_msg, converter)

# Third change triggers flush, but only two changes are returned.
assert {:ok,
%TransactionFragment{
xid: 456,
has_begin?: true,
commit: nil,
last_log_offset: data_fragment_offset,
changes: [_, _],
affected_relations: affected
}, converter} = MessageConverter.convert(insert_msg, converter)

assert affected == MapSet.new([{"public", "users"}])

# The previous change is immediately followed by a Commit
assert {:ok,
%TransactionFragment{
xid: 456,
has_begin?: false,
commit: %Commit{},
last_log_offset: commit_fragment_offset,
changes: [_],
affected_relations: affected
}, _converter} =
MessageConverter.convert(
%LR.Commit{
lsn: @test_lsn,
end_lsn: @test_end_lsn,
commit_timestamp: ~U[2024-01-01 00:00:00Z]
},
converter
)

assert affected == MapSet.new([{"public", "users"}])

# The commit fragment's offset must be strictly greater than the previous fragment's offset
assert LogOffset.compare(commit_fragment_offset, data_fragment_offset) == :gt

# Commit fragment offset is based on the same final_lsn as the preceding changes. It's
# basically the log offset of the most recent change.
assert commit_fragment_offset == LogOffset.new(Lsn.to_integer(@test_lsn), 4)
end

test "returns Relation immediately without flushing buffered operations", %{
converter: converter
} do
Expand Down
Loading