cross_channel 0.10.1
cross_channel: ^0.10.1 copied to clipboard
High-performance & flexible channels for Dart/Flutter.
Changelog #
[Unreleased] #
Planned #
- Cross-Isolate Event Bus (future optimization of Broadcast)
0.10.1 – 2026-03-05 #
Added #
- Automated Inter-Isolate Metrics Sync:
- Introduced the
MetricsSyncbackend control message. - Channels now automatically merge metrics from remote isolates or Web Workers upon closure, completely eliminating the need for manual
MetricsRegistry().merge()calls over background ports.
- Introduced the
Fixed #
- Metrics Counting Race Condition: Fixed an off-by-one error in benchmarking utilities where early receiver closures led to silent
SendErrorDisconnecteddrops for final sender metrics. - Centralized
closeRemote()in baseSenderandReceiverclasses to standardize remote port resource cleanup and final metric synchronization.
0.10.0 – 2026-02-28 #
Added #
- Transferable Handles (
toTransferable/fromTransferable):- All
SenderandReceivertypes can now be serialized for transfer across Web Workers (viapostMessage) and Isolates. toTransferable()returns aMap<String, Object?>containing the raw platform port + metadata.fromTransferable(data)factory constructors on all handle types:SpscSender,SpscReceiver,MpscSender,MpscReceiver,MpmcSender,MpmcReceiver,BroadcastSender,BroadcastReceiver,OneShotSender,OneShotReceiver.- Reconstructed handles use
channelId = -1to always route through the remote path (prevents registry collisions on the worker side).
- All
- Platform Port Serialization (
packPort/unpackPort):- New platform-agnostic functions following the same conditional-import pattern as
createReceiver(). - On VM: extracts/wraps raw
SendPort. On Web: extracts/wraps rawMessagePort.
- New platform-agnostic functions following the same conditional-import pattern as
- Improved Documentation:
- Added comprehensive
{@tool snippet}examples to all channel classes. - New standalone examples for each channel flavor in
example/.
- Added comprehensive
- Codecov Integration:
- CI pipeline now automatically generates and uploads test coverage reports.
- Test Coverage (75.9%):
- Added full suite of unit tests for the
metricsmodule (Registry, Recorders, Exporters, P2 Quantiles). - Added missing core buffer tests (
UnboundedBuffer,LatestOnlyBuffer) and remote protocol tests (RemoteConnection, Flow Control).
- Added full suite of unit tests for the
Changed #
- Performance & Backpressure (
FlowControlledRemoteConnection):- Eliminated secondary
ListQueuebuffering for pending network sends. send()andsendBatch()now aggressively yield and await network credits natively, providing strictly enforced, zero-allocation backpressure down to the caller without double-buffering.- DRY refactoring of remote connection setup logic, preventing resource leaks.
- Eliminated secondary
Fixed #
- Broadcast Channel Disconnections: Fixed an infinite hang in cross-isolate broadcast pipelines caused by failure to propagate
RecvErrorDisconnectedwhen underlying platform connections closed. - Worker Handshake Protocol: Fixed an edge-case bug where
PlatformReceiverlisteners failed to decodeControlMessagepayloads wrapped inside untyped maps generically transferred from Web Workers.
Usage #
// Main thread — serialize for transfer
final (tx, rx) = XChannel.spsc<String>(capacity: 128);
final transferable = rx.toTransferable();
// Worker/Isolate — reconstruct
final remoteRx = SpscReceiver<String>.fromTransferable(transferable);
await for (final msg in remoteRx.stream()) {
print(msg); // works across context boundary
}
0.9.1 - 2026-02-27 #
Fixed #
- Fixed unawaited futures and static analysis warnings in debug apps and tests
- Fixed markdown exports in
lib/broadcast.dart - Clarified native cross-isolate support (Universal Handles) directly in the
README.mdexamples
0.9.0 – 2026-01-24 #
Added #
- Broadcast Channel (
XChannel.broadcast):- Single-Producer Multi-Consumer (SPMC) Ring Buffer.
- Pub/Sub semantics: All subscribers receive all messages.
- Lag Detection: Slow subscribers detect gaps (skip) instead of blocking producer.
- History Replay:
subscribe(replay: N)to catch up on past events.
- Sender Rate Limiting:
sender.throttle(duration): Limit event rate (drops excess).sender.debounce(duration): Stabilize bursty events (sends only last after silence).
- Universal Handles (
ChannelRegistry):- Zero-overhead local fallback: Channels check their locality (intra-isolate) before serializing.
- O(1) lookup enables passing "remote" handles that transparently optimize to direct buffer access if local.
- Unified Remote Protocol:
RemoteConnection<T>andFlowControlledRemoteConnection<T>for structured cross-context communication.- Credit-based flow control protocol to prevent OOM on slow receivers.
- Platform Agnostic Channels (
lib/src/platform/):- Unified internal abstraction
PlatformPort/PlatformReceiver. - Modern
package:webimplementation for future-proof Web support.
- Unified internal abstraction
Changed #
- Performance:
- Single-waiter optimization:
PopWaiterQueueuses a field instead of a queue for the common 1-waiter case (recv path). - Fast-path routing:
ChannelOpsautomatically routes to local channel if available.
- Single-waiter optimization:
Fixed #
- Metrics:
- Improved
ActiveMetricsRecordertimestamp sampling accuracy (_nowNs).
- Improved
0.8.0 – 2025-09-12 #
Added #
- XSelect
- New branch types (modularized under
src/branches/):FutureBranch,StreamBranch,RecvBranch,TimerBranch— each encapsulates its attach/attachSync logic.onNotifyandonNotifyOncearms to integrateNotifyintoXSelect.onRecvValuenow acceptsonErrorandonDisconnectedcallbacks for fine-grained handling.
- TimerBranch improvements:
- Support for both one-shot (
once) and periodic (period) timers. - Catch-up logic ensures no drift when ticks are delayed.
- Support for both one-shot (
- Synchronous fast-path:
- New
attachSyncmethod for branches. XSelect.runprobes branches synchronously before arming async listeners.
- New
- New branch types (modularized under
- ChannelMetrics: global metrics system for channels.
- New
MetricsRecorderinterface (ActiveMetricsRecorder/NoopMetricsRecorder). - Configurable via
MetricsConfig(enable/disable, sample rate, exporters). - Built-in exporters:
StdExporter,CsvExporter,NoopExporter. - Quantiles computed via P² algorithm for p50/p95/p99 latency.
- New
Changed #
- XSelect
- Refactored:
- Split monolithic branch implementations into dedicated classes (
FutureBranch,StreamBranch, etc.). - Internal resolution now calls
attachSyncfirst (zero-latency when possible). - Fairness rotation preserved,
.ordered()still available to disable it.
- Split monolithic branch implementations into dedicated classes (
- Timeout handling:
- Renamed
timeout(duration, ...)→onTimeout(duration, ...)for consistency with other branch APIs. - Implemented via
TimerBranch.once.
- Renamed
- Refactored:
- Buffers
- Internal waiters now use
Completer.syncfor immediate resolution instead of scheduling via microtask (applies to unbounded, bounded, chunked, and latestOnly).
- Internal waiters now use
Breaking #
- XSelect
- Removed Ticker/Arm API:
- Old
Ticker.everyandArm<T>types are gone. - Replace with
onTick(Duration, ...)(periodic timers) oronDelay(Duration, ...)(one-shot).
- Old
- Internals:
_Branch/ArmBranchtypes removed, replaced bySelectBranchinterface.
- Removed Ticker/Arm API:
Migration guide (0.8 → 0.9) #
- ..timeout(Duration(seconds: 5), () => 'fallback')
+ ..onTimeout(Duration(seconds: 5), () => 'fallback')
- ..onTick(Ticker.every(Duration(seconds: 1)), () => 'tick')
+ ..onTick(Duration(seconds: 1), () => 'tick')
0.7.3 – 2025-09-07 #
0.7.2 – 2025-09-07 #
Added #
- XSelect
- Builder API:
- New
onFutureValue,onFutureError,onStreamDone— convenience variants. - New
onDelayfor single-shot timers. - New
onSend(sender, value, ...)to race a send completion.
- New
- Builder API:
- Docs & inline comments: extensive
///API docs and usage examples across whole package
Changed #
- XSelect
- Fairness & ordering: default start-index rotation for fairness (prevents starvation); call
.ordered()to preserve declaration order. - Cancellation: consistent loser cleanup (cancel timers/subs, ignore late futures); clearer error propagation (use of
Future.errorandZone).
- Fairness & ordering: default start-index rotation for fairness (prevents starvation); call
Fixed #
- Synchronous resolution path: if an
Arm.immediatefires duringattach, the selection resolves without over-subscribing other branches.
0.7.0 – 2025-09-06 #
Breaking #
- Select → XSelect
Select.any(...)removed. Use:XSelect.run<T>((b) => b ... )— async, waits for first ready arm.XSelect.syncRun<T>((b) => b ... )— non-blocking, returns immediately if an arm is ready.XSelect.race<T>([ (b) => b.onFuture(...), ... ])— race multiple builders.XSelect.syncRace<T>([ ... ])— non-blocking variant.
- Builder API changes:
- New
onRecvValue(rx, onValue, onDisconnected: ...)when you only care about values. - New
onTick(Ticker, ...)andtimeout(duration, ...). onStream,onFuturekept (signatures aligned).
- New
- Loop control:
SelectDecisionremoved. Just return the value you want from the winning arm. If you usedcontinueLoop/breakLoop, return aboolinstead and branch on it.
- Property rename
Receiver.isClosed→Receiver.isDisconnected(and it’s available on all receivers, not just closable ones).
- Result helpers rename
SendResultX.isOk→hasSendRecvResultX.isOk→hasValue- (Others kept:
isFull,isDisconnected,isTimeout,isFailed,hasError,isEmpty,valueOrNull.)
- Low-level channel factory parameter rename
{Mpsc,Mpmc}.channel<T>(..., **dropPolicy**: ...)→{Mpsc,Mpmc}.channel<T>(..., **policy**: ...)onDropunchanged.
- SPSC constructor signature
Spsc.channel<T>(**capacityPow2:** 1024)→Spsc.channel<T>(1024)(positional power-of-two).
- MPMC send semantics clarified
- Sending with no live receivers now returns
SendErrorDisconnectedimmediately (enforced consistently across bounded/unbounded).
- Sending with no live receivers now returns
Added #
-
Notify primitive:
notified()→(Future<void>, cancel)that consumes a permit if available, otherwise waits.notifyOne()wakes one waiter or stores a permit;notifyAll()wakes all current waiters.close()wakes waiters withdisconnected.epochfor tracing/tests.
-
Uniform receiver capabilities:
recvCancelable()on all receivers.isDisconnectedproperty available on all receivers.
-
Ticker
Ticker.every(Duration)with.reset(...)to integrate timed arms viaonTick.
-
Examples
example/showcasing MPSC, MPMC, SPSC, OneShot, LatestOnly, andXSelect.
Changed #
- Select integration
XSelectnow usesrecvCancelable()under the hood for receivers; cancellation is best-effort where an implementation cannot immediately abort a pending wait.
- Unbounded buffers
- Now chunked by default for throughput & GC friendliness:
- Hot small ring + chunked overflow.
- Tuned defaults (internal):
hot=8192,chunk=4096,rebalanceBatch=64,threshold=cap/16,gate=chunk/2.
Mpsc.unbounded/Mpmc.unboundedacceptchunked: true|false(default true).
- Now chunked by default for throughput & GC friendliness:
- LatestOnly
- Fast-path coalescing tightened; benches around ~110–140 Mops/s on common desktop VMs.
- Docs/README updated to
XSelect.*,policyparameter, SPSC signature,isDisconnected, and universalrecvCancelable().
Migration guide (0.6 → 0.7) #
- while (!rx.isClosed) {
+ while (!rx.isDisconnected) {
// ...
}
- final out = await Select.any((s) => s
- ..onReceiver(rx, ok: (v) => SelectDecision.breakLoop)
- ..onFuture(fut, (_) => SelectDecision.continueLoop));
+ final broke = await XSelect.run<bool>((s) => s
+ ..onRecvValue(rx, (v) => true, onDisconnected: () => true)
+ ..onFuture(fut, (_) => false));
+ if (broke) break;
- final (tx, rx) = Mpmc.channel<int>(capacity: 1024, dropPolicy: DropPolicy.oldest);
+ final (tx, rx) = Mpmc.channel<int>(capacity: 1024, policy: DropPolicy.oldest);
- final (tx, rx) = Spsc.channel<int>(capacityPow2: 1024);
+ final (tx, rx) = Spsc.channel<int>(1024);
- if ((await tx.send(v)).isOk) { ... }
+ if ((await tx.send(v)).hasSend) { ... }
- final r = await rx.recv(); if (r.isOk) use(r.valueOrNull);
+ final r = await rx.recv(); if (r.hasValue) use(r.valueOrNull!);
0.6.0 – 2025-09-01 #
Added #
- Select:
Select.any((s) => s.onReceiver(...).onFuture(...).onStream(...).onTimer(...))- Each branch executes an async block, first to complete wins
- Losing branch are properly cancelled (
recvCancelable, stream cancel) SelectDecision(continueLoop/breakLoop) for loop control- Added
removeRecvWaiterto all buffers - New
recvCancelablehelper inChannelOps
0.5.0 – 2025-08-31 #
Added #
- Timeout helpers:
recvTimeout,sendTimeout
- Batch helpers :
sendAll,recvAll,trySendAll,tryRecvAll
0.4.0 – 2025-08-30 #
Added #
- LatestOnly channels:
MpscandMpmclatestOnly
- Semantics: new sends overwrite previous values
0.3.0 – 2025-08-29 #
Added #
- Isolate adapters:
ReceivePort→Mpsc/Mpmc- Typed request/reply via
SendPort.request
- Web adapters:
MessagePort→Mpsc/Mpmc- Typed requests with
MessageChannel
- Stream adapters:
Receiver.toBroadcastStream,Stream.redirectToSender
0.2.0 – 2025-08-27 #
Added #
- SPSC ring buffer:
- Power-of-two capacity
- Ultra-low overhead
trySend,tryRecv
- OneShot channel:
consumeOnce=true: first receiver consumes and disconnectsconsumeOnce=false: all receivers observe the same value
- Drop policies:
block,oldest,newest - Result extensions:
SendResultX(ok,full,disconnected)RecvResultX(ok,empty,disconnected,valueOrNull)