sendMessageStreamRaw method

Stream<String> sendMessageStreamRaw(
  1. String messageJson, {
  2. String? extraContext,
})

Send a raw JSON message and get streaming response.

Implementation

Stream<String> sendMessageStreamRaw(String messageJson,
    {String? extraContext}) {
  _assertInitialized();
  _assertConversation();
  final b = _bindings!;

  final controller = StreamController<String>();

  final messagePtr = messageJson.toNativeUtf8();
  final extraPtr =
      extraContext != null ? extraContext.toNativeUtf8() : nullptr;

  // NativeCallable.listener is thread-safe — the callback can be
  // invoked from the native background thread that LiteRT-LM uses
  // for streaming, and Dart will marshal it to the right isolate.
  // Dart callback — receives heap-copied strings from proxy
  late final NativeCallable<_StreamCallbackNative> callable;
  callable = NativeCallable<_StreamCallbackNative>.listener(
    (Pointer<Void> data, Pointer<Char> chunk, int isFinal,
        Pointer<Char> errorMsg) {
      if (errorMsg != nullptr && errorMsg.address != 0) {
        final error = errorMsg.cast<Utf8>().toDartString();
        _proxyFreeString!(errorMsg); // free strdup'd string
        // stopGeneration() (and any other caller-initiated cancel) surfaces
        // here as a CANCELLED error from native. That's not an error from
        // the API consumer's perspective — the stream just stops cleanly
        // at whatever token was last delivered.
        if (error.startsWith('CANCELLED')) {
          controller.close();
        } else {
          controller.addError(Exception('Stream error: $error'));
          controller.close();
        }
        callable.close();
        calloc.free(messagePtr);
        if (extraPtr != nullptr) calloc.free(extraPtr);
        return;
      }

      if (chunk != nullptr && chunk.address != 0) {
        final text = chunk.cast<Utf8>().toDartString();
        _proxyFreeString!(chunk); // free strdup'd string
        if (text.isNotEmpty) {
          controller.add(text);
        }
      }

      if (isFinal != 0) {
        controller.close();
        callable.close();
        calloc.free(messagePtr);
        if (extraPtr != nullptr) calloc.free(extraPtr);
      }
    },
  );

  // Create proxy that strdup's strings before forwarding to Dart callback
  final outProxyFn = calloc<Pointer<NativeFunction<_StreamCallbackNative>>>();
  final proxyData = _proxyCreate!(
    callable.nativeFunction,
    nullptr,
    outProxyFn,
  );
  final proxyFn = outProxyFn.value;
  calloc.free(outProxyFn);

  final result = b.litert_lm_conversation_send_message_stream(
    _conversation!,
    messagePtr.cast(),
    extraPtr == nullptr ? nullptr : extraPtr.cast(),
    proxyFn.cast(),
    proxyData,
  );

  if (result != 0) {
    controller
        .addError(Exception('Failed to start streaming (code: $result)'));
    controller.close();
    callable.close();
    calloc.free(messagePtr);
    if (extraPtr != nullptr) calloc.free(extraPtr);
  }

  return controller.stream;
}