sendMessageStreamRaw method
Send a raw JSON message and get streaming response.
Implementation
Stream<String> sendMessageStreamRaw(String messageJson,
{String? extraContext}) {
_assertInitialized();
_assertConversation();
final b = _bindings!;
final controller = StreamController<String>();
final messagePtr = messageJson.toNativeUtf8();
final extraPtr =
extraContext != null ? extraContext.toNativeUtf8() : nullptr;
// NativeCallable.listener is thread-safe — the callback can be
// invoked from the native background thread that LiteRT-LM uses
// for streaming, and Dart will marshal it to the right isolate.
// Dart callback — receives heap-copied strings from proxy
late final NativeCallable<_StreamCallbackNative> callable;
callable = NativeCallable<_StreamCallbackNative>.listener(
(Pointer<Void> data, Pointer<Char> chunk, int isFinal,
Pointer<Char> errorMsg) {
if (errorMsg != nullptr && errorMsg.address != 0) {
final error = errorMsg.cast<Utf8>().toDartString();
_proxyFreeString!(errorMsg); // free strdup'd string
// stopGeneration() (and any other caller-initiated cancel) surfaces
// here as a CANCELLED error from native. That's not an error from
// the API consumer's perspective — the stream just stops cleanly
// at whatever token was last delivered.
if (error.startsWith('CANCELLED')) {
controller.close();
} else {
controller.addError(Exception('Stream error: $error'));
controller.close();
}
callable.close();
calloc.free(messagePtr);
if (extraPtr != nullptr) calloc.free(extraPtr);
return;
}
if (chunk != nullptr && chunk.address != 0) {
final text = chunk.cast<Utf8>().toDartString();
_proxyFreeString!(chunk); // free strdup'd string
if (text.isNotEmpty) {
controller.add(text);
}
}
if (isFinal != 0) {
controller.close();
callable.close();
calloc.free(messagePtr);
if (extraPtr != nullptr) calloc.free(extraPtr);
}
},
);
// Create proxy that strdup's strings before forwarding to Dart callback
final outProxyFn = calloc<Pointer<NativeFunction<_StreamCallbackNative>>>();
final proxyData = _proxyCreate!(
callable.nativeFunction,
nullptr,
outProxyFn,
);
final proxyFn = outProxyFn.value;
calloc.free(outProxyFn);
final result = b.litert_lm_conversation_send_message_stream(
_conversation!,
messagePtr.cast(),
extraPtr == nullptr ? nullptr : extraPtr.cast(),
proxyFn.cast(),
proxyData,
);
if (result != 0) {
controller
.addError(Exception('Failed to start streaming (code: $result)'));
controller.close();
callable.close();
calloc.free(messagePtr);
if (extraPtr != nullptr) calloc.free(extraPtr);
}
return controller.stream;
}