suspendOnTopic method

  1. @override
Future<void> suspendOnTopic(
  1. String runId,
  2. String stepName,
  3. String topic, {
  4. DateTime? deadline,
  5. Map<String, Object?>? data,
})

Suspends runId while awaiting an event with topic.

If deadline is provided, the run should be considered due at that time even if an event is never received.

Implementation

@override
Future<void> suspendOnTopic(
  String runId,
  String stepName,
  String topic, {
  DateTime? deadline,
  Map<String, Object?>? data,
}) async {
  final now = _clock.now().toUtc();
  final metadata = _prepareSuspensionData(
    data,
    resumeAt: deadline,
    deadline: deadline,
    topic: topic,
  );

  await _connections.runInTransaction((ctx) async {
    final run = await ctx
        .query<StemWorkflowRun>()
        .whereEquals('id', runId)
        .whereEquals('namespace', namespace)
        .first();

    if (run != null) {
      final updates = StemWorkflowRunUpdateDto(
        status: WorkflowStatus.suspended.name,
        waitTopic: topic,
        resumeAt: deadline,
        suspensionData: jsonEncode(metadata),
        updatedAt: now,
      ).toMap();
      updates['resume_at'] = deadline;
      await ctx.repository<StemWorkflowRun>().update(
        updates,
        where: StemWorkflowRunPartial(id: runId, namespace: namespace),
      );
    }
  });
}