isolated_stream 2.1.0-dev.1
isolated_stream: ^2.1.0-dev.1 copied to clipboard
A Dart package for running CPU-intensive stream transformations in separate isolates to prevent blocking the main thread.
isolated_stream #
Make isolates as simple as streams â or as simple as a function call. Offload CPU-heavy work to background isolates with either a stream API or a long-lived request/response worker.
Features đ¯ #
- đ Non-blocking processing - Transform streams in separate isolates
- đ Request/response API -
IsolatedWorkerfor awaiting individual calls on long-lived isolates - ⥠Configurable concurrency - Cap in-flight work with a single parameter
- đą Isolate pooling - Distribute work across multiple isolates
- đ¤ Idle offloading - Automatically release isolates after inactivity and transparently re-spawn on demand
- đŽ Simple API - Easy-to-use extension method on streams
When to use đ¤ #
This package is perfect for:
- CPU-intensive computations on stream data (image processing, mathematical calculations, parsing)
- Background workers that handle repeated ad-hoc requests (parsers, encoders, solvers)
- Preventing UI freezes in Flutter apps during heavy processing
- Maintaining responsiveness while processing large datasets
Quick start đ #
-
Install this package.
dependencies: isolated_stream: ^2.1.0-dev.1 -
Import it in your Dart file.
import 'package:isolated_stream/isolated_stream.dart'; -
Create a handler and transform your stream.
// Simple handler that doubles numbers class DoubleHandler extends IsolatedHandler<int, int> { @override int compute(int value) => value * 2; // Runs in separate isolate! } final stream = Stream.fromIterable([1, 2, 3, 4, 5]); final transformed = stream.isolatedMap(DoubleHandler()); // Results: [2, 4, 6, 8, 10] - all calculated in separate isolate -
Enjoy non-blocking stream processing! đ
Less time waiting for frames, more time enjoying fluent animations! đ
Features #
đ Basic transformation #
Transform stream elements in separate isolates without blocking your main thread:
// Define your computation logic
class MultiplyHandler extends IsolatedHandler<int, int> {
final int factor;
MultiplyHandler(this.factor) : super(debugName: 'Multiply by $factor');
@override
int compute(int value) => value * factor; // Runs in isolate!
}
// Use it like any other stream transformation
final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
final transformed = stream.isolatedMap(MultiplyHandler(10)); // Sequential by default
await for (final value in transformed) {
print(value); // Output: 10, 20, 30, 40, 50 (processed one at a time)
}
⥠Processing strategies #
Choose the right strategy for your use case:
final stream = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8]);
// Sequential: process one at a time (default)
stream.isolatedMap(handler); // or .sequential()
// Concurrent: process multiple simultaneously, preserving order
stream.isolatedMap(handler,
strategy: IsolatedProcessingStrategy.concurrent(concurrency: 3));
// Droppable: drop new events when at max concurrency
stream.isolatedMap(handler,
strategy: IsolatedProcessingStrategy.droppable(concurrency: 2));
// Restartable: cancel previous work when new events arrive
stream.isolatedMap(handler,
strategy: IsolatedProcessingStrategy.restartable());
đą Isolate pooling #
Distribute work across multiple isolates for maximum performance:
final bigStream = Stream.fromIterable(List.generate(1000, (i) => i));
// Use multiple isolates for high-throughput processing
final processed = bigStream.isolatedMap(
HeavyComputationHandler(),
strategy: IsolatedProcessingStrategy.concurrent(
concurrency: 8, // 8 total concurrent operations
isolates: 4, // Work distributed among 4 isolates đĒ
),
);
// Scales beautifully with available CPU cores!
print('Processed ${await processed.length} items lightning fast! âĄ');
đ Request/response with IsolatedWorker #
When you don't have a stream of data â just individual values you want to process on demand â use IsolatedWorker. It keeps isolates alive across calls and returns a Future<E> for each compute(value).
class ParseJsonHandler extends IsolatedHandler<String, Map<String, dynamic>> {
@override
Map<String, dynamic> compute(String raw) => json.decode(raw);
}
final worker = IsolatedWorker<String, Map<String, dynamic>>(
ParseJsonHandler(),
isolates: 2, // optional: spread calls across 2 isolates
concurrency: 4, // optional: cap in-flight calls to 4 (queue the rest)
);
// Call it like a function, as many times as you like.
final a = await worker.compute('{"hello": "world"}');
final b = await worker.compute('[1, 2, 3]');
// Many calls in parallel â bounded by `concurrency`.
final results = await Future.wait([
for (final raw in payloads) worker.compute(raw),
]);
worker.dispose(); // kill isolates when done
Concurrency modes #
concurrency: null(default) â unbounded; calls run in parallel, distributed round-robin across the isolate pool.concurrency: 1â serialized (FIFO); one call at a time.concurrency: Nâ at most N calls in flight; extras queue until a slot frees.
đ¤ Idle offloading #
Both IsolatedWorker and every IsolatedProcessingStrategy accept an idleTimeout. If no work has been in flight for that long, the pool releases its isolates; the next call transparently re-spawns them. Handy for workers that might sit idle for a long time.
// Request/response worker that naps when unused
final worker = IsolatedWorker<int, int>(
MultiplyHandler(10),
idleTimeout: const Duration(minutes: 1),
onIdleShutdown: () => print('isolates released'),
);
// Stream strategy with idle offloading
stream.isolatedMap(
handler,
strategy: IsolatedProcessingStrategy.concurrent(
concurrency: 3,
idleTimeout: const Duration(seconds: 30),
),
);
đ Async operations #
Async operations work seamlessly:
class HeavyApiDataParser extends IsolatedHandler<String, Map<String, dynamic>> {
@override
Future<Map<String, dynamic>> compute(String endpoint) async {
final response = await http.get(Uri.parse(endpoint));
return json.decode(response.body);
}
}
// Fetch and parse data from multiple APIs concurrently
final apis = Stream.fromIterable([
'https://example.com/data-1',
'https://example.com/data-2',
'https://example.com/data-3'
]);
final responses = apis.isolatedMap(
HeavyApiDataParser(),
strategy: IsolatedProcessingStrategy.concurrent(concurrency: 3),
);
â ī¸ Important notes #
đĻ Serialization requirements #
Data must be sendable through Dart's SendPort mechanism for inter-isolate communication. Since isolates created with Isolate.spawn share the same code, most objects are sendable:
â Always sendable:
- Primitives:
null,true,false,int,double,String - Collections:
List,Map,LinkedHashMap,Set,LinkedHashSet(with sendable contents) - Typed data:
TransferableTypedDataand related types - Special types:
Capability,SendPort,Typeinstances - Custom objects: Any class instance (with exceptions below)
â Cannot be sent:
- Objects with native resources:
Socket,File,HttpClient, etc. - Isolate infrastructure:
ReceivePort,DynamicLibrary - Finalizers:
Finalizable,Finalizer,NativeFinalizer - VM internals:
UserTag,MirrorReference - Classes marked with
@pragma('vm:isolate-unsendable')
â ī¸ Closures limitation: Functions and closures may capture more state than needed due to VM implementation, potentially causing larger object graphs than expected (https://github.com/dart-lang/sdk/issues/36983).
âšī¸ Performance tips #
- Use for CPU-heavy work: There's communication overhead, so only use for intensive computations
đ Overview (API reference) #
Creating handlers #
class MyHandler extends IsolatedHandler<InputType, OutputType> {
@override
OutputType compute(InputType input) {
// Your transformation logic here
return transformedOutput;
}
}
Stream API â isolatedMap #
stream.isolatedMap(
handler, // Your IsolatedHandler instance
strategy: IsolatedProcessingStrategy.concurrent(
concurrency: 4, // How many operations to run in parallel
isolates: 2, // How many isolates to distribute work across
idleTimeout: const Duration(minutes: 1), // optional
),
)
Strategies:
sequential()â Process one at a time (default)concurrent(concurrency, isolates)â Process multiple simultaneously, preserving input orderdroppable(concurrency, isolates)â Drop events when busyrestartable(concurrency, isolates)â Cancel previous work for new events
All four accept optional idleTimeout and onIdleShutdown.
Returns: Stream<E> with transformed elements in input order.
Request/response API â IsolatedWorker #
final worker = IsolatedWorker<T, E>(
handler, // Your IsolatedHandler instance
isolates: 1, // How many background isolates
concurrency: null, // null = unbounded; 1 = sequential; N = bounded
idleTimeout: null, // optional: release isolates after inactivity
onIdleShutdown: null,
);
final result = await worker.compute(value); // Future<E>
worker.dispose();
When to pick which: use isolatedMap when you already have a Stream<T> to transform; use IsolatedWorker when you have individual values to process on demand and want a long-lived worker you can call like a function.
đ License #
MIT License - see the LICENSE file for details.