fire_rag 1.0.0 copy "fire_rag: ^1.0.0" to clipboard
fire_rag: ^1.0.0 copied to clipboard

Firebase RAG for AI Agents

fire_rag #

fire_rag is a Firebase-first ingestion pipeline for retrieval-augmented generation.

It is built for the case where you already have:

  • a Firestore-backed application
  • a Cloud Tasks queue
  • a text source arriving in Cloud Storage
  • an embedding model
  • optionally, a chat model for recursive summarization

Instead of indexing everything in one request, fire_rag turns ingestion into resumable Cloud Tasks:

  1. download a source file
  2. split it into base chunks
  3. persist chunk records to Firestore
  4. embed those records
  5. optionally distill them into higher levels of detail
  6. embed the distilled records too

The result is a Firestore collection containing chunk text, vectors, and chunk relationships that can later be queried with rag.

Concept #

This package is not a chat agent and it is not a vector query layer by itself.

It is the ingestion half of a Firebase RAG stack.

The core idea is:

  • use arcane_admin to run resumable task work in Cloud Tasks
  • use agentic to chunk text and call models
  • store chunk documents in Firestore
  • use rag later to retrieve those embedded records during answering

If you already have documents landing in Cloud Storage, fire_rag gives you a straightforward path from uploaded file to vectorized Firestore records.

How It Works #

fire_rag wires three task types into an arcane_admin TaskManager.

1. TaskChunk #

TaskChunk is the entry task.

It:

  • downloads a source file from Cloud Storage to a local temp path
  • uses agentic.IChunker to split the file into base chunks
  • writes each chunk into Firestore
  • batches the chunk document IDs into TaskEmbed jobs
  • optionally schedules TaskDistill if recursive summarization is enabled

Each base chunk document includes fields like:

  • content
  • postContent
  • index
  • lod
  • charStart
  • charEnd
  • record

2. TaskEmbed #

TaskEmbed reads stored chunk text from Firestore, calls your connected embedding model, and writes the resulting vector back onto the same document.

By default it embeds:

content + postContent

and stores the result in:

vector

3. TaskDistill #

TaskDistill is the recursive summarization stage.

It:

  • reads groups of factor chunks from one level of detail
  • sends them to your connected chat model
  • writes a distilled chunk into the next lod
  • links source and distilled chunks with down and up
  • schedules embedding for the newly created distilled chunks
  • continues level-by-level until only one distilled output remains

This gives you a hierarchy of chunks:

  • L0: original chunked source text
  • L1: distilled groups of L0
  • L2: distilled groups of L1
  • and so on until a single higher-level summary remains

Package Surface #

The public bootstrap is small:

FireRag.init(...)

It registers task executors for:

  • TaskChunk
  • TaskDistill
  • TaskEmbed

and exposes the configured TaskManager, embedding model, and chat model through FireRag.instance.

Installation #

Add the package:

dart pub add fire_rag

Typical companion packages are:

dart pub add arcane_admin
dart pub add agentic
dart pub add rag

If you are developing this package or changing artifact-backed task models, keep generated code up to date:

dart run build_runner build --delete-conflicting-outputs

Getting Started #

At startup you usually do two things:

  1. initialize ArcaneAdmin
  2. initialize FireRag
import 'package:agentic/agentic.dart';
import 'package:arcane_admin/arcane_admin.dart';
import 'package:fire_rag/fire_rag.dart';

Future<void> main() async {
  await ArcaneAdmin.initialize(
    projectId: 'my-project-id',
    defaultStorageBucket: 'my-project-id.firebasestorage.app',
  );

  ConnectedEmbeddingModel embedder = OpenAIConnector(
    apiKey: const String.fromEnvironment('OPENAI_API_KEY'),
  ).asEmbedder('text-embedding-3-small');

  ConnectedChatModel llm = OpenAIConnector(
    apiKey: const String.fromEnvironment('OPENAI_API_KEY'),
  ).connect(ChatModel.openai4_1Mini);

  FireRag.init(
    embed: embedder,
    llm: llm,
    taskQueue: 'rag-ingest',
    endpointUrl: 'https://your-service.run.app/event/executeJob',
  );
}

Usage #

The usual deployment shape is:

  • one endpoint that receives a Cloud Storage finalization event
  • one endpoint that executes scheduled tasks

Minimal Server Wiring #

import 'package:arcane_admin/arcane_admin.dart';
import 'package:fire_rag/fire_rag.dart';
import 'package:fire_rag/task/task_chunk.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as io;
import 'package:shelf_router/shelf_router.dart';

Future<void> main() async {
  await ArcaneAdmin.initialize(
    projectId: 'my-project-id',
    defaultStorageBucket: 'my-project-id.firebasestorage.app',
  );

  FireRag.init(
    embed: /* your ConnectedEmbeddingModel */,
    llm: /* your ConnectedChatModel */,
    taskQueue: 'rag-ingest',
    endpointUrl: 'https://your-service.run.app/event/executeJob',
  );

  Router router = Router();

  router.taskManager(FireRag.instance.taskManager);

  router.post('/storageFinalized', (Request request) {
    return request.storageEvent((ArcaneStorageEvent event) async {
      if (!event.path.endsWith('.txt')) {
        return Response.ok('');
      }

      await FireRag.instance.taskManager.schedule(
        TaskChunk(
          taskId: 'ingest.${event.bucket}.${event.path}',
          sourceBucket: event.bucket,
          sourcePath: event.path,
          destinationCollection: 'rag_chunks',
          record: event.path,
          maxChunkSize: 500,
          maxPostOverlap: 100,
          embedBatchSize: 25,
          chunkBatchSize: 100,
          distillationFactor: 4,
          destinationMetadata: {
            'sourceBucket': event.bucket,
            'sourcePath': event.path,
          },
        ),
      );

      return Response.ok('');
    });
  });

  await io.serve(router.call, '0.0.0.0', 8080);
}

What The Example Does #

  • /storageFinalized receives a storage event from Eventarc or your own webhook bridge
  • a new TaskChunk is scheduled
  • /event/executeJob is automatically handled by router.taskManager(...)
  • the task manager keeps re-queuing work until the current task is complete

Data Model #

Chunk document IDs follow this pattern:

{record}.{index}L{lod}

Examples:

customer-handbook.pdf.0L0
customer-handbook.pdf.1L0
customer-handbook.pdf.0L1
customer-handbook.pdf.0L2

Useful stored fields include:

  • content: the main body of the chunk
  • postContent: overlap from the following chunk
  • index: the chunk index within that level
  • lod: level of detail
  • charStart
  • charEnd
  • record: logical record identifier
  • vector: embedding written by TaskEmbed
  • down: child chunk indexes used to create a distilled chunk
  • up: parent chunk index created from a source chunk

Choosing Distillation Settings #

The most important knobs are:

  • maxChunkSize: target size of each stored chunk
  • maxPostOverlap: overlap appended from the next chunk
  • chunkBatchSize: how many chunks are persisted before scheduling embed work
  • embedBatchSize: how many document IDs are sent per embedding task
  • distillationFactor: how many chunks are combined into one higher-LOD chunk

Rules of thumb:

  • start with maxChunkSize: 500 and maxPostOverlap: 100
  • use distillationFactor: 4 if you want a compact hierarchy
  • omit distillationFactor if you only want base chunks plus embeddings
  • increase embedBatchSize only if your embedding provider comfortably supports it

Relationship To Other Packages #

fire_rag is intentionally small because it leans on a few other packages:

  • agentic Used for IChunker, chunk text splitting, connected chat models, and connected embedding models. TaskChunk uses the chunker. TaskDistill and TaskEmbed use the connected models.

  • arcane_admin Used for Firebase admin initialization, Cloud Storage download access, Firestore access, Eventarc helpers, and the resumable TaskManager / TaskExecutor system that drives ingestion.

  • rag Not used to ingest documents, but intended as the retrieval-side companion package once your Firestore chunk collection has vectors.

  • fire_api Used indirectly through arcane_admin for Firestore and Storage abstractions, including document reads, writes, and VectorValue.

  • artifact Used for serializable task objects so task state can be preserved between Cloud Task executions.

You can think of the stack like this:

  • agentic handles model calls and chunking
  • arcane_admin handles Firebase admin and task orchestration
  • fire_rag turns those pieces into a resumable ingestion pipeline
  • rag consumes the resulting embedded records for retrieval

Typical Flow In Production #

  1. A text file is uploaded to Cloud Storage.
  2. A storage event schedules TaskChunk.
  3. TaskChunk writes L0 chunks and schedules TaskEmbed.
  4. If enabled, TaskChunk schedules TaskDistill.
  5. TaskDistill writes L1 chunks, schedules embeds for those chunks, and recursively schedules higher levels.
  6. Firestore ends up containing both raw and distilled chunk records plus vectors.
  7. Your retrieval layer queries that collection later with rag.

Notes #

  • This package currently assumes text-file ingestion. If your upstream source is PDF, OCR, HTML, or something else, convert it to text before scheduling TaskChunk.
  • TaskDistill requires a chat model. If you do not want summarization, leave distillationFactor unset.
  • Query-time schema expectations are up to your retrieval layer. fire_rag focuses on ingestion and vectorization, not retrieval policy.

Contributing #

If you change task models or artifact-backed state fields, regenerate code before publishing:

dart run build_runner build --delete-conflicting-outputs