recursiveDistillChunks method
Implementation
Stream<Chunk> recursiveDistillChunks({
required Stream<Chunk> chunks,
required IDistiller distiller,
int parallelism = 8,
int factor = 3,
bool isBase = true,
}) async* {
Queue<Chunk> buffer = Queue<Chunk>();
Stream<Chunk> distilled =
distillChunks(
chunks: chunks.map((i) {
if (isBase) buffer.add(i);
return i;
}),
distiller: distiller,
factor: factor,
parallelism: parallelism,
).asBroadcastStream();
StreamController<Chunk> nextLevelController = StreamController<Chunk>();
StreamSubscription sub = distilled.listen(nextLevelController.add);
int c = 0;
await for (Chunk i in distilled) {
c++;
if (isBase) {
while (buffer.isNotEmpty) {
yield buffer.removeFirst();
}
}
yield i;
}
sub.cancel();
nextLevelController.close();
if (c <= factor) {
return;
}
yield* recursiveDistillChunks(
chunks: nextLevelController.stream,
distiller: distiller,
factor: factor,
parallelism: parallelism,
isBase: false,
);
}