handleRequest<O, R> method

Stream<R> handleRequest<O, R>(
  1. ServiceCall call,
  2. ServiceMethod<O, R> method,
  3. Stream<O> requests,
  4. ServerStreamingInvoker<O, R> invoker,
)

Handles a gRPC request.

Implementation

Stream<R> handleRequest<O, R>(
  ServiceCall call,
  ServiceMethod<O, R> method,
  Stream<O> requests,
  ServerStreamingInvoker<O, R> invoker,
) {
  final path = call.clientMetadata?[':path'] ?? 'unknown';
  final pathSegments = path.split('/');
  if (pathSegments.length < 3) {
    throw GrpcError.unimplemented('Invalid gRPC path');
  }
  final serviceName = pathSegments[1];
  final methodName = pathSegments[2];
  final service = server?.lookupService(serviceName);
  final controller = StreamController<R>();
  final grpcResolver = messagesResolver as GrpcMessageResolver?;
  grpcResolver
      ?.handleRpcCall<O, R>(
        RequestPacket(
          pattern: '${service?.runtimeType}.$methodName',
          id: serviceName,
          payload: GrpcInvocationPayload<O, R>(
            call: call,
            requests: requests,
            method: method,
            invoker: invoker,
          ),
        ),
        this,
      )
      .then((responsePacket) {
        if (responsePacket == null) {
          throw GrpcError.internal('No response received from handler');
        }
        if (responsePacket.isError) {
          throw GrpcError.internal(
            'Error from handler: ${responsePacket.payload}',
          );
        }
        final resultStream = responsePacket.payload as Stream<R>;
        resultStream.listen(
          controller.add,
          onError: controller.addError,
          onDone: controller.close,
        );
      })
      .catchError((error) {
        if (error is RpcException) {
          controller.addError(GrpcError.custom(14, error.message));
          controller.close();
          return;
        }
        controller.addError(error);
        controller.close();
      });
  return controller.stream;
}