Skip to content

Reduce XPUB fanout lock pressure#283

Open
rgbkrk wants to merge 2 commits into
masterfrom
perf/pub-fanout-state
Open

Reduce XPUB fanout lock pressure#283
rgbkrk wants to merge 2 commits into
masterfrom
perf/pub-fanout-state

Conversation

@rgbkrk
Copy link
Copy Markdown
Member

@rgbkrk rgbkrk commented May 29, 2026

XPUB fanout still acquired a per-subscriber async writer mutex and walked the backend scc map on every send, the same hot-path cost the PUB queue stack already removed for PUB. This moves XPUB connection and subscription state into socket-owned FanoutState updated by backend events, so steady-state sends iterate a plain map and try_send into a bounded per-peer queue with no async mutex and no scc lookup.

Design decision

The win is fewer locks and atomics per published message, not just more concurrency. The old XPUB send path did two expensive things per message: it walked scc::HashMap with begin_async/next_async, and for every matching subscriber it acquired Arc<AsyncMutex<Pin<Box<ZmqFramedWrite>>>> and awaited a framed write inline.

FanoutState is owned by the socket, not the backend. The backend keeps only the I/O lifecycle. On connect it hands the socket a write half through a FanoutEvent; on disconnect it forwards the peer id. The socket drains those events at the top of send and recv, then the send path iterates a plain HashMap and try_sends into a bounded per-peer mpsc. A dedicated writer task per peer drains that queue through the shared write_message_queue. A full or closed queue drops one message rather than backpressuring the publisher, matching PUB's slow-subscriber behavior.

Subscription state lives in FanoutState and is applied by the socket from the recv path, where the socket already owns the fair queue. So subscribe/unsubscribe never touch the backend and never take a lock. Events are drained before a subscription is applied so the change always lands on a peer the map already knows about.

This is the deliberate delta from the original change. PUB already reached the same shape through the local fanout queue (#274) and the shared write queue (#272). Re-introducing the original send-path design for PUB would have regressed that lock-free queue, so PUB's send path is unchanged here and the work targets XPUB, which had not yet been converted. The shared subscription parser and FanoutState live in pub_fanout for reuse.

Behavioral change to call out

XPUB sends are now fire-and-forget into bounded per-peer queues drained by writer tasks, where they were previously awaited inline. A slow XPUB subscriber now drops messages once its queue fills instead of backpressuring the publisher. This matches PUB semantics and the ZeroMQ publisher contract, but it is a real observable change for XPUB: callers that relied on send() surfacing an error for a broken XPUB peer no longer see it on the send path. Transport errors and disconnect cleanup are detected by the writer task and the fair-queue on-disconnect hook instead. This wants explicit reviewer sign-off.

Behavioral coverage

socket scenario expected
PUB/SUB late subscriber joins with a different filter each subscriber gets only its own prefix
PUB/SUB unsubscribe then subscribe a new prefix old prefix stops delivering, new one delivers
XPUB/SUB late subscriber joins XPUB sees each \x01<prefix>, fanout stays isolated
XPUB/SUB unsubscribe updates fanout state XPUB sees \x00<prefix>, delivery stops for it

Benchmark evidence

From the original measurement run on PR #257, not re-measured here. Locked quick Criterion PUB fanout runs with tokio-runtime, sample size 10, 1s measurement, 0.5s warmup:

transport subscribers 256B 4096B
tcp 1 +8.5% +11.5%
tcp 8 +8.3% +6.4%
tcp 64 +5.7% +5.2%
ipc 1 +4.3% +0.1%
ipc 8 +17.1% +13.3%
ipc 64 +18.1% +10.9%

These numbers were taken before the PUB queue stack merged, when both PUB and XPUB shared the old async-mutex fanout path. PUB has since moved to the local fanout queue plus shared write queue, so the PUB column reflects gains that are already on master. This change brings XPUB onto the same lock-free fanout shape, so the same class of improvement now applies to the XPUB send path.

Recreates the XPUB portion of #257.

XPUB fanout still acquired a per-subscriber async writer mutex and walked
the backend scc map on every send, the same hot-path cost the PUB queue
stack already removed for PUB. Move XPUB connection and subscription state
into socket-owned FanoutState updated by backend events, so steady-state
sends iterate a plain map and try_send into a bounded per-peer queue with
no async mutex and no scc lookup.

The backend now owns only I/O lifecycle. On connect it hands the socket a
write half through a FanoutEvent; on disconnect it forwards the peer id.
A dedicated writer task drains each peer's bounded queue via the shared
write_message_queue, matching PUB. Subscription changes are applied by the
socket from the recv path, where it already owns the fair queue, so they
stay lock-free and never touch the backend.

PUB already reached this shape through the local fanout queue and shared
write queue, so its send path is unchanged. The shared subscription parser
and FanoutState live in pub_fanout for both socket types.

Adds PUB/SUB and XPUB/XSUB behavior coverage for late-subscriber filtering
and unsubscribe-then-resubscribe semantics.
The per-peer writer task only logged on write_message_queue failure, so a
half-open peer (write dead, read still open) leaked its FanoutState slot and
every later send was cloned and dropped forever. Hand the writer a clone of
the fanout event sender so a write failure emits PeerDisconnected, evicting
the peer on the next drain. This mirrors the cleanup the PUB writer does on
its backend. Found by independent Codex and Claude reviews.

Add a regression test that a dead peer does not disrupt healthy delivery.
@rgbkrk rgbkrk marked this pull request as ready for review May 29, 2026 18:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant