Reduce XPUB fanout lock pressure#283
Open
rgbkrk wants to merge 2 commits into
Open
Conversation
XPUB fanout still acquired a per-subscriber async writer mutex and walked the backend scc map on every send, the same hot-path cost the PUB queue stack already removed for PUB. Move XPUB connection and subscription state into socket-owned FanoutState updated by backend events, so steady-state sends iterate a plain map and try_send into a bounded per-peer queue with no async mutex and no scc lookup. The backend now owns only I/O lifecycle. On connect it hands the socket a write half through a FanoutEvent; on disconnect it forwards the peer id. A dedicated writer task drains each peer's bounded queue via the shared write_message_queue, matching PUB. Subscription changes are applied by the socket from the recv path, where it already owns the fair queue, so they stay lock-free and never touch the backend. PUB already reached this shape through the local fanout queue and shared write queue, so its send path is unchanged. The shared subscription parser and FanoutState live in pub_fanout for both socket types. Adds PUB/SUB and XPUB/XSUB behavior coverage for late-subscriber filtering and unsubscribe-then-resubscribe semantics.
The per-peer writer task only logged on write_message_queue failure, so a half-open peer (write dead, read still open) leaked its FanoutState slot and every later send was cloned and dropped forever. Hand the writer a clone of the fanout event sender so a write failure emits PeerDisconnected, evicting the peer on the next drain. This mirrors the cleanup the PUB writer does on its backend. Found by independent Codex and Claude reviews. Add a regression test that a dead peer does not disrupt healthy delivery.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
XPUB fanout still acquired a per-subscriber async writer mutex and walked the backend scc map on every send, the same hot-path cost the PUB queue stack already removed for PUB. This moves XPUB connection and subscription state into socket-owned
FanoutStateupdated by backend events, so steady-state sends iterate a plain map andtry_sendinto a bounded per-peer queue with no async mutex and no scc lookup.Design decision
The win is fewer locks and atomics per published message, not just more concurrency. The old XPUB send path did two expensive things per message: it walked
scc::HashMapwithbegin_async/next_async, and for every matching subscriber it acquiredArc<AsyncMutex<Pin<Box<ZmqFramedWrite>>>>and awaited a framed write inline.FanoutStateis owned by the socket, not the backend. The backend keeps only the I/O lifecycle. On connect it hands the socket a write half through aFanoutEvent; on disconnect it forwards the peer id. The socket drains those events at the top ofsendandrecv, then the send path iterates a plainHashMapandtry_sends into a bounded per-peermpsc. A dedicated writer task per peer drains that queue through the sharedwrite_message_queue. A full or closed queue drops one message rather than backpressuring the publisher, matching PUB's slow-subscriber behavior.Subscription state lives in
FanoutStateand is applied by the socket from therecvpath, where the socket already owns the fair queue. So subscribe/unsubscribe never touch the backend and never take a lock. Events are drained before a subscription is applied so the change always lands on a peer the map already knows about.This is the deliberate delta from the original change. PUB already reached the same shape through the local fanout queue (#274) and the shared write queue (#272). Re-introducing the original send-path design for PUB would have regressed that lock-free queue, so PUB's send path is unchanged here and the work targets XPUB, which had not yet been converted. The shared subscription parser and
FanoutStatelive inpub_fanoutfor reuse.Behavioral change to call out
XPUB sends are now fire-and-forget into bounded per-peer queues drained by writer tasks, where they were previously awaited inline. A slow XPUB subscriber now drops messages once its queue fills instead of backpressuring the publisher. This matches PUB semantics and the ZeroMQ publisher contract, but it is a real observable change for XPUB: callers that relied on
send()surfacing an error for a broken XPUB peer no longer see it on the send path. Transport errors and disconnect cleanup are detected by the writer task and the fair-queue on-disconnect hook instead. This wants explicit reviewer sign-off.Behavioral coverage
\x01<prefix>, fanout stays isolated\x00<prefix>, delivery stops for itBenchmark evidence
From the original measurement run on PR #257, not re-measured here. Locked quick Criterion PUB fanout runs with
tokio-runtime, sample size 10, 1s measurement, 0.5s warmup:These numbers were taken before the PUB queue stack merged, when both PUB and XPUB shared the old async-mutex fanout path. PUB has since moved to the local fanout queue plus shared write queue, so the PUB column reflects gains that are already on master. This change brings XPUB onto the same lock-free fanout shape, so the same class of improvement now applies to the XPUB send path.
Recreates the XPUB portion of #257.