cross_channel 0.11.0
cross_channel: ^0.11.0 copied to clipboard
High-performance & flexible channels for Dart/Flutter.
cross_channel #
Fast & flexible channels for Dart/Flutter. A complete concurrency toolkit providing Rust-style synchronization primitives: MPSC, MPMC, SPSC, OneShot, and Broadcast (SPMC).
Bench-tested · ~1.2–2.6 Mops/s
✨ Features #
- Universal Handles: Transparent zero-overhead local optimization with native cross-isolate support (just pass the channel to
Isolate.spawn). - Transferable Handles: Serialize any
Sender/Receiverfor Web WorkerpostMessagetransfer withtoTransferable()/fromTransferable(). - Broadcast Ring: Single-Producer Multi-Consumer (SPMC) ring buffer with publisher history replay and lag detection for slow subscribers.
- Select (
XSelect): Macro-like multiplexing over Futures, Streams, Timers, and Channels with non-blocking fast paths and loser cancellation. - Rich Topology: Multi-producer / multi-consumer topologies with drop policies (
block,oldest,newest) and backpressure (capacity=0rendezvous). - LatestOnly Buffers: Specifically tuned for UI and sensors (always coalesces to the latest value).
- Sender Rate Limiting: Limit noisy events at the source with built-in
throttleanddebouncemodifiers. - Unified Remote Protocol: Credit-based flow control ensuring remote Isolates/Workers never crash on Out-of-Memory.
- Web & Stream Adapters: First-class
MessagePortbridging forpackage:weband richStreamintegration. - Notify Primitive: Ultra-lightweight
notifyOne()/notified()wakeups for control-plane synchronization.
📦 Install #
dart pub add cross_channel
🧭 API #
High-Level (XChannel) #
final (tx, rx) = XChannel.mpsc<T>(capacity: 1024, policy: DropPolicy.block);
final (tx, rx) = XChannel.mpmc<T>(capacity: 1024, policy: DropPolicy.oldest);
final (tx, rx) = XChannel.mpscLatest<T>(); // MPSC latest-only
final (tx, rx) = XChannel.mpmcLatest<T>(); // MPMC latest-only (competitive)
final (tx, rx) = XChannel.spsc<int>(capacity: 1024); // pow2 rounded internally
final (tx, rx) = XChannel.oneshot<T>(consumeOnce: false);
final (tx, broadcast) = XChannel.broadcast<T>(capacity: 1024);
Low-Level #
Low-level flavors are available in mpsc.dart, mpmc.dart, spsc.dart, oneshot.dart.
import 'package:cross_channel/mpsc.dart';
final (tx, rx) = Mpsc.unbounded<T>(); // chunked=true (default)
final (tx, rx) = Mpsc.unbounded<T>(chunked: false); // simple unbounded
final (tx, rx) = Mpsc.bounded<T>( 1024);
final (tx, rx) = Mpsc.channel<T>(capacity: 1024, policy: DropPolicy.oldest, onDrop: (d) {});
final (tx, rx) = Mpsc.latest<T>();
import 'package:cross_channel/mpmc.dart';
final (tx, rx) = Mpmc.unbounded<T>(); // chunked=true (default)
final (tx, rx) = Mpmc.unbounded<T>(chunked: false); // simple unbounded
final (tx, rx) = Mpmc.bounded<T>(1024);
final (tx, rx) = Mpmc.channel<T>(capacity: 1024, policy: DropPolicy.oldest, onDrop: (d) {});
final (tx, rx) = Mpmc.latest<T>();
import 'package:cross_channel/spsc.dart';
final (tx, rx) = Spsc.channel<T>(1024); // pow2 rounded internally
import 'package:cross_channel/oneshot.dart';
final (tx, rx) = OneShot.channel<T>(consumeOnce: false);
import 'package:cross_channel/broadcast.dart';
final (tx, broadcast) = Broadcast.channel<T>(1024);
Note #
- Unbounded channels use chunked buffer by default (hot ring and chunked overflow)
- Defaults: hot=8192, chunk=4096, rebalanceBatch=64, threshold=cap/16, gate=chunk/2.
- Streams are single-subscription. Clone receivers for parallel consumers on MPMC.
- Interop available in
isolate_extension.dart,stream_extension.dart,web_extension.dart. - Core traits & buffers available in
src/*.
🚦 Choosing a channel #
| Channel | Producers | Consumers | Use case | Notes |
|---|---|---|---|---|
| MPSC | multi | single | Task queue, async pipeline | Backpressure & drop policies |
| MPMC | multi | multi | Work-sharing worker pool | Consumers compete (no broadcast) |
| SPSC | single | single | Ultra-low-latency hot path | Lock-free ring (pow2 capacity) |
| OneShot | 1 | 1 (or many) | Request/response, once-only signal | consumeOnce option |
| LatestOnly | multi | single (MPSC) / competitive (MPMC) | Progress, sensors, UI signals | Always coalesces to last value |
| Broadcast | single | multi (all see all) | Event bus, monitoring, telemetry | Ring buffer with lag detection |
When to use Notify vs channels #
- Use Notify for control-plane wakeups without payloads: config-changed, flush, shutdown, “poke a waiter”, etc. (notifyOne / notifyAll; waiter calls notified() and awaits.)
- Use channels for data-plane messages with payloads and ordering: tasks, jobs, progress values, events to be processed, etc.
💡 Quick examples #
MPSC with backpressure #
import 'package:cross_channel/cross_channel.dart';
Future<void> producer(MpscSender<int> tx) async {
for (var i = 0; i < 100; i++) {
await tx.send(i); // waits if queue is full
}
tx.close();
}
Future<void> consumer(MpscReceiver<int> rx) async {
await for (final v in rx.stream()) {
// handle v
}
}
void main() {
final (tx, rx) = XChannel.mpsc<int>(capacity: 8);
Future.wait([producer(tx), consumer(rx)])
}
MPMC worker pool (competitive consumption) #
import 'package:cross_channel/cross_channel.dart';
Future<void> worker(int id, MpmcReceiver<String> rx) async {
await for (final task in rx.stream()) {
// process task
}
}
void main() async {
final (tx, rx0) = XChannel.mpmc<String>(capacity: 16);
final rx1 = rx0.clone();
final rx2 = rx0.clone();
final w0 = worker(0, rx0);
final w1 = worker(1, rx1);
final w2 = worker(2, rx2);
for (var i = 0; i < 20; i++) {
await tx.send('task $i');
}
tx.close();
await Future.wait([w0, w1, w2]);
}
LatestOnly signal (coalesced progress) #
import 'package:cross_channel/cross_channel.dart';
Future<void> ui(MpscReceiver<double> rx) async {
await for (final p in rx.stream()) {
// update progress bar with p in [0..1]
}
}
void main() async {
final (tx, rx) = XChannel.mpscLatest<double>();
final _ = ui(rx);
for (var i = 0; i <= 100; i++) {
tx.trySend(i / 100); // overwrites previous value
await Future.delayed(const Duration(milliseconds: 10));
}
tx.close();
}
Sliding queues (drop policies) #
final (tx, rx) = XChannel.mpsc<int>(
capacity: 1024,
policy: DropPolicy.oldest, // or DropPolicy.newest
onDrop: (d) => print('dropped $d'),
);
oldest: evicts the oldest queued item to make room (keeps newest data flowing)newest: drops the incoming item (send “looks ok” but value discarded)block: default (producer waits when full)
OneShot (single vs multi observe) #
// consumeOnce = true: first receiver consumes, then disconnects
final (stx, srx) = XChannel.oneshot<String>(consumeOnce: true);
// consumeOnce = false: every receiver sees the same value (until higher-level teardown)
final (btx, brx) = XChannel.oneshot<String>(consumeOnce: false);
Broadcast (Pub/Sub with Ring) #
final (tx, broadcast) = XChannel.broadcast<int>(capacity: 16);
// Subscriber 1 (starts from now)
final sub1 = broadcast.subscribe();
// Subscriber 2 (replays last 5 items)
final sub2 = broadcast.subscribe(replay: 5);
await tx.send(42);
// Both receive 42
final val1 = await sub1.recv();
final val2 = await sub2.recv();
Rate Limiting (Sender-side) #
Crucial for avoiding cross-isolate overhead on high-frequency events.
final (tx, rx) = XChannel.mpsc<double>();
// Only send at most 1 item every 100ms
final throttled = tx.throttle(const Duration(milliseconds: 100));
// Wait for silence (100ms) before sending last value
final debounced = tx.debounce(const Duration(milliseconds: 100));
// Usage:
onSliderChanged(val) => throttled.send(val);
onSearchQuery(text) => debounced.send(text);
🔔 Notify (lightweight wakeups) #
A tiny synchronization primitive to signal tasks without passing data.
notified()→ returns a(Future<void>, cancel)pair.- If a permit is available, it completes immediately and consumes one permit.
- Otherwise it registers a waiter until notified or canceled.
notifyOne()→ wakes one waiter or stores one permit if none is waiting.notifyAll()→ wakes all current waiters (does not store permits).close()→ wakes everyone withdisconnected.- Integrates with
XSelectviaonFuture.
import 'package:cross_channel/cross_channel.dart';
final n = Notify();
// Waiter
final (f, cancel) = n.notified();
// ... later: cancel(); // optional
// Notifiers
n.notifyOne(); // or
n.notifyAll();
// With XSelect
final (fu, _) = n.notified();
await XSelect.run<void>((s) => s
..onFuture<void>(fu, (_) => null, tag: 'notify')
..onTick(Ticker.every(const Duration(seconds: 1)), () => null, tag: 'tick')
);
🧰 XSelect (futures/streams/receivers/timers) #
XSelect lets you race multiple asynchronous branches and cancel the losers.
Cheat-sheet
-
Channels (Receiver)
-
onRecv(rx, (RecvResult
-
onRecvValue(rx, (T) -> R, {onDisconnected, tag})
-
onRecvError(rx, (Object, StackTrace?) -> R
-
-
Notify
- onNotify(Notify n, R Function() body, {Object? tag})
- onNotifyOnce(Notify n, R Function() body, {Object? tag})
-
Futures
-
onFuture(future, (T) -> R, {tag})
-
onFutureValue(future, (T) -> R, {tag})
-
onFutureError(future, (Object, StackTrace?) -> R, {tag})
-
-
Streams
-
onStream(stream, (T) -> R, {tag})
-
onStreamDone(stream, () -> R, {tag})
-
-
Timers
-
onDelay(Duration, () -> R, {tag}) (one-shot)
-
onTick(Duration, () -> R, {tag}) (every)
-
-
Sending
- onSend(sender, value, {tag}) (races a send; wins when the send completes)
-
Sync fast-path
- XSelect.syncRun(builder) → only immediate, non-blocking arms (e.g., tryRecv, already-due timers).
-
Guards
- if_(() => bool) to enable/disable a branch without rebuilding the selection.
-
Fairness vs order
- Fairness by rotation is default; call .ordered() to preserve declaration order
import 'package:cross_channel/cross_channel.dart';
Future<void> main() async {
final (tx, rx) = XChannel.mpsc<int>(capacity: 8);
// Example: first event wins among a stream, a channel, a timer, and a future.
final result = await XSelect.run<String>((s) => s
..onStream<int>(
Stream.periodic(const Duration(milliseconds: 50), (i) => i),
(i) => 'stream:$i',
tag: 'S',
)
// simple
..onRecvValue<int>(
rx,
(v) => 'recv:$v',
onDisconnected: () => 'disconnected',
tag: 'R',
)
// full control over RecvResult
..onRecv<int>(
rx,
(res) {
if (res is RecvOk<int>) return 'recv:${res.value}';
if (res is RecvErrorDisconnected) return 'disconnected';
return 'unexpected:$res';
},
tag: 'R'?
)
..onTick(
const Duration(milliseconds: 50),
() => 'timer',
tag: 'T',
)
..onFuture<String>(
Future<String>.delayed(const Duration(milliseconds: 80), () => 'future'),
(s) => s,
tag: 'F',
)
);
print('winner -> $result');
}
Notes:
- returns the first resolved branch and cancels the rest.
- ordered = true forces order, otherwise we use fairness rotation
syncRunonly uses immediate arms, aka non blocking- Use
if_to conditionally include a branch without rebuilding the selection.
🔄 Select Stream #
New in 0.11.0, XSelect.stream allows you to repeat the selection logic and yield each winning result as a stream. This is perfect for long-running event loops.
final events = XSelect.stream<AppEvent>((s) => s
..onRecvValue(userCommands, (cmd) => AppEvent.user(cmd))
..onStream(networkData, (data) => AppEvent.network(data))
..onTick(Duration(seconds: 1), () => AppEvent.heartbeat())
// Optional: stop the stream when a specific condition is met
, stopWhen: (event) => event is ShutdownEvent,
);
await for (final event in events) {
process(event);
}
🔗 Interop #
Stream #
import 'package:cross_channel/stream_extension.dart';
// Receiver → broadcast Stream (pause/resume when no listeners)
final broadcast = rx.toBroadcastStream(
waitForListeners: true,
stopWhenNoListeners: true,
closeReceiverOnDone: false,
);
// Stream → Sender (optional drop on full; auto-close sender on done)
await someStream.redirectToSender(tx, dropWhenFull: true);
Isolates #
Channels natively support cross-isolate communication (Universal Handles). Just pass the Sender or Receiver directly to your isolate!
import 'dart:isolate';
import 'package:cross_channel/cross_channel.dart';
void main() async {
final (tx, rx) = XChannel.mpsc<String>();
// Pass the sender directly to the isolate
await Isolate.spawn((Sender<String> s) {
s.send('Hello from isolate!');
}, tx);
final msg = await rx.recv();
print(msg.valueOrNull);
}
You can also use extension methods on raw ports for legacy bridging or SendPort.request for RPC:
import 'dart:isolate';
import 'package:cross_channel/isolate_extension.dart';
// Typed request/reply over a standard SendPort
final reply = await someSendPort.request<Map<String, Object?>>(
'get_user',
data: {'id': 42},
timeout: const Duration(seconds: 3),
);
// Port → channel bridge (legacy integration)
final rp = ReceivePort();
final (tx, rx) = rp.toMpsc<MyEvent>(capacity: 512, strict: true);
Web #
import 'package:web/web.dart';
import 'package:cross_channel/web_extension.dart';
final channel = MessageChannel();
// Typed request/reply
final res = await channel.port1.request<String>('ping');
// Port → channel bridge
final (tx, rx) = channel.port2.toMpmc<JsEvent>(capacity: 512, strict: true);
Transferable Handles (Web Workers) #
Serialize any Sender or Receiver for transfer across Web Workers via postMessage (or Isolates).
The ChannelCore stays on the main thread — the worker gets a lightweight remote handle.
import 'package:cross_channel/cross_channel.dart';
// Main thread — create channel and serialize the receiver
final (tx, rx) = XChannel.spsc<String>(capacity: 128);
final transferable = rx.toTransferable();
// Pass `transferable` to the worker via postMessage/Squadron payload
// Worker side — reconstruct from the transferred data
final remoteRx = SpscReceiver<String>.fromTransferable(transferable);
await for (final msg in remoteRx.stream()) {
print(msg); // receives messages from main thread
}
Works with all channel types: SpscSender/Receiver, MpscSender/Receiver, MpmcSender/Receiver, BroadcastSender/Receiver, OneShotSender/Receiver.
Note: On web, the raw
MessagePortinside the map must be included in thepostMessagetransfer list. Frameworks like Squadron support this viainspectRequest.
🧩 Results & helpers #
// SendResult: SendOk | SendErrorFull | SendErrorDisconnected
// RecvResult: RecvOk(value) | RecvErrorEmpty | RecvErrorDisconnected
// Extensions:
// SendResult
r.hasSend;
r.isFull;
r.isDisconnected;
r.isTimeout;
r.isFailed;
r.hasError;
// RecvResult
rr.hasValue;
rr.isEmpty;
rr.isDisconnected;
rr.isTimeout;
rr.isFailed;
rr.hasError;
rr.valueOrNull;
// Timeouts/batching/draining:
await tx.sendTimeout(v, const Duration(milliseconds: 10));
await tx.sendAll(iterable); // waits on full
tx.trySendAll(iterable); // best-effort
rx.tryRecvAll(max: 128); // burst non-blocking drain
await rx.recvAll(idle: Duration(milliseconds: 1), max: 1024);
// Cancelable receive (all receivers):
final (fut, cancel) = rx.recvCancelable();
cancel(); // attempts to abort the wait if still pending
📊 Benchmarks (Dart VM, i7-8550U, High priority, CPU affinity set) #
- Benches are single-isolate micro-benchmarks.
- Pinning affinity/priority helps stabilize latencies.
MPSC #
| case | Mops/s (recv) | ns/op (recv) | p99 us (recv) |
|---|---|---|---|
| ping-pong cap=1 AB (1P/1C) | 1.39 | 720.8 | 23.2 |
| ping-pong cap=1 BA (1P/1C) | 1.39 | 720.8 | 1.0 |
| pipeline cap=1024 (1P/1C) | 1.26 | 793.8 | 19.5 |
| pipeline unbounded (1P/1C) | 1.25 | 798.8 | 19.7 |
| pipeline unbounded (chunked) (1P/1C) | 1.22 | 823.2 | 23.2 |
| multi-producers cap=1024 (4P/1C) | 1.25 | 801.7 | 19.0 |
| pipeline rendezvous cap=0 (1P/1C) | 1.21 | 829.3 | 16.2 |
| sliding oldest cap=1024 (1P/1C) | 1.27 | 788.3 | 17.2 |
| sliding newest cap=1024 (1P/1C) | 1.28 | 783.7 | 21.8 |
| latestOnly (coalesce) (1P/1C) | 1.26 | 796.1 | 22.1 |
MPMC #
| case | Mops/s (recv) | ns/op (recv) | p99 us (recv) |
|---|---|---|---|
| ping-pong cap=1 AB (1P/1C) | 1.31 | 761.6 | 30.9 |
| ping-pong cap=1 BA (1P/1C) | 1.31 | 761.6 | 2.7 |
| pipeline cap=1024 (1P/1C) | 1.24 | 808.4 | 26.5 |
| pipeline unbounded (1P/1C) | 1.26 | 795.4 | 27.5 |
| pipeline unbounded (chunked) (1P/1C) | 1.25 | 797.2 | 16.7 |
| multi-producers cap=1024 (4P/1C) | 1.20 | 832.5 | 26.4 |
| multi-producers cap=1024 (4P/4C) | 1.19 | 840.6 | 23.4 |
| pipeline rendezvous cap=0 (1P/1C) | 1.18 | 848.9 | 22.0 |
| sliding oldest cap=1024 (1P/1C) | 1.28 | 780.9 | 18.2 |
| sliding newest cap=1024 (1P/1C) | 1.31 | 763.0 | 17.5 |
| latestOnly (coalesce) (1P/1C) | 1.34 | 749.4 | 17.6 |
| competitive (1P/4C) | 1.30 | 769.2 | 21.0 |
SPSC #
| case | Mops/s (recv) | ns/op (recv) | p99 us (recv) |
|---|---|---|---|
| spsc ping-pong cap=1 AB | 1.40 | 715.0 | 33.7 |
| spsc ping-pong cap=1 BA | 1.40 | 715.0 | 1.4 |
| spsc pipeline cap=1024 | 1.25 | 797.3 | 18.9 |
| spsc pipeline cap=4096 | 1.26 | 795.1 | 21.9 |
ONESHOT #
| case | Mops/s (recv) | ns/op (recv) | p99 us (recv) |
|---|---|---|---|
| oneshot send+receive | 2.08 | 481.4 | 1.9 |
| oneshot pipeline | 2.10 | 475.7 | 1.2 |
BROADCAST (SPMC Ring) #
| case | Mops/s (recv) | ns/op (recv) | p99 us (recv) |
|---|---|---|---|
| broadcast pipeline (1P/1C) | 0.77 | 1299.9 | 36.9 |
| broadcast pipeline (1P/4C) | 1.03 | 975.2 | 106.8 |
| broadcast pipeline (1P/8C) | 1.08 | 924.5 | 140.3 |
How to bench #
This repo ships with two PowerShell scripts to run the micro-benchmarks. They produce consistent, copy-pastable output and (optionally) a CSV for later analysis.
Windows or PowerShell Core (
pwsh) on macOS/Linux is fine. For non-PowerShell usage, see the manual commands at the end.
Lite — fast dev loop
Compiles and runs once per target. No CSV, no CPU pinning, no priority tweaks.
# All suites, 1e6 iterations per case
.\bench_lite.ps1 -Target all -Count 1000000
# Single suite
.\bench_lite.ps1 -Target mpsc -Count 1000000
Full — reproducible runs + CSV
Lets you set CPU affinity, process priority, repeat counts, and append results to a CSV.
# Compile & Run, MPMC only, 5 repeats, High priority, CPU0,
# append CSV lines to bench\out.csv
.\bench_full.ps1 `
-Target mpmc `
-Action cr `
-Count 1000000 `
-Repeat 5 `
-Priority High `
-Affinity 0x1 `
-Csv `
-OutCsv "bench\out.csv" `
-AppendCsv
Parameters (full mode):
- Target: spsc, mpsc, mpmc, oneshot, isolate, inter_isolate, all
- Action: compile (just build), run (run existing exes), cr (compile+run)
- Count: iterations per case (e.g., 1000000)
- Repeat: run the suite multiple times (e.g., -Repeat 5)
- Priority: Idle · BelowNormal · Normal · AboveNormal · High · RealTime
- Affinity: CPU bitmask (e.g., 0x1 = CPU0, 0x3 = CPU0–1)
- Csv: write results to CSV
- OutCsv: CSV path (default bench\out.csv)
- AppendCsv: append instead of overwriting the header
Stability tips: pin CPU with -Affinity, raise -Priority High, do a warm-up repeat, and keep the machine idle.
Benchmark CSV Format
Note: Benchmarks use the same metrics CSV format shown in the Metrics section. When you run benchmarks with
--csv, they output detailed metrics for each test case.
The benchmark scripts automatically enable metrics collection and use StdExporter or CsvExporter to output comprehensive performance data including operation counts, latency percentiles, and throughput measurements.
You can aggregate across repeats (median/mean/p95) in your own tooling.
📊 Metrics (Advanced) #
Built-in metrics system for production monitoring and performance analysis.
import 'package:cross_channel/metrics.dart';
// Enable metrics globally
MetricsConfig.enabled = true;
MetricsConfig.sampleLatency = true;
MetricsConfig.sampleRate = 0.1; // 10% sampling
// Configure exporter
MetricsConfig.exporter = StdExporter(
useColor: true,
compact: false,
);
// OR export to CSV file
final csvFile = File('metrics.csv');
MetricsConfig.exporter = CsvExporter(sink: csvFile.openWrite());
// Channels automatically collect metrics
final (tx, rx) = XChannel.mpsc<String>(capacity: 1000);
Available exporters:
NoopExporter- Default (discards metrics)StdExporter- Formatted console output with colorsCsvExporter- Export to CSV format (stdout or file)- Custom exporters by extending
MetricsExporter
Metrics collected:
- Operation counts:
sent,recv,dropped,closed - Latency percentiles: P50, P95, P99 (via P² algorithm)
- Performance: ops/second, ns/operation, drop rates
- Per-channel and global aggregation
Performance impact: unmeasured.
CSV Metrics Format #
The CsvExporter produces runtime metrics. This is the same format used by the benchmark scripts:
ts,type,id,sent,recv,dropped,closed,trySendOk,trySendFail,tryRecvOk,tryRecvEmpty,send_p50_ns,send_p95_ns,send_p99_ns,recv_p50_ns,recv_p95_ns,recv_p99_ns,mops,ns_per_op,drop_rate,try_send_failure_rate,try_recv_empty_rate,channels_count
2025-09-12T20:26:00.000Z,global,,1000,950,50,0,800,200,900,50,,,,,,,0.95,1052,0.05,0.2,0.05,3
2025-09-12T20:26:00.000Z,channel,my-worker-queue,500,480,20,0,400,100,450,30,1200.5,2100.8,5000.2,950.1,1800.3,4200.1,0.48,2083,0.04,0.2,0.06,
Field Groups:
Metadata:
ts- Timestamp (ISO8601)type- 'global' or 'channel'id- Channel identifier (empty for global)
Core Operations:
sent,recv- Total blocking operation countsdropped,closed- Loss and lifecycle events
Non-blocking Operations:
trySendOk,trySendFail- Non-blocking send resultstryRecvOk,tryRecvEmpty- Non-blocking receive results
Latency Percentiles (nanoseconds):
send_p50_ns,send_p95_ns,send_p99_ns- Send operation latenciesrecv_p50_ns,recv_p95_ns,recv_p99_ns- Receive operation latencies
Performance Metrics:
mops- Million operations per secondns_per_op- Nanoseconds per operation
Quality Metrics:
drop_rate- Percentage of dropped messages (0.0-1.0)try_send_failure_rate- Non-blocking send failure rate (0.0-1.0)try_recv_empty_rate- Non-blocking receive empty rate (0.0-1.0)
System:
channels_count- Total active channels (global snapshots only)
🧪 Testing #
This repo ships with comprehensive unit, stress and integration tests (isolate/web/stream).
dart test
License #
MIT