rpc_dart_data
rpc_dart_data is the high-level data layer (CRUD + queries + change streams + offline sync) that sits on top of rpc_dart.
It gives you a transport-agnostic contract, ready-to-use storage adapters, and utilities for building offline-friendly backends and clients.
Feature highlights
- Unified
DataServicecontract with helpers for create/get/list/update/patch/delete/deleteCollection. - Bulk workflows via
bulkUpsert,bulkUpsertStream, andbulkDeleteto move large batches atomically. - Search & aggregations (
search,aggregate) delegated to the storage adapter, including backend pagination. - Streaming snapshots: export/import the full database as NDJSON, stream it through
payloadStream, or setincludePayloadString: falseto skip building large strings. - Change streams & offline sync:
watchChangeswith cursors, bidirectionalsyncChanges, andOfflineCommandQueuefor durable command queues. - SQLite/Drift adapter with SQLCipher support and a
SqliteSetupHookso you can register custom pragmas before the database is exposed to your code. - Ready-made environments (
DataServiceFactory.inMemory) for tests, demos, and local prototyping.
Architecture in seven layers
- Transport (WebSocket / HTTP/2 / isolates / TURN / in-memory) provided by
rpc_dart_transports. - Endpoint (
RpcCallerEndpoint/RpcResponderEndpoint). - Contract + codecs (
IDataServiceContractandRpcCodec<...>). - Low-level plumbing (
DataServiceCaller/DataServiceResponder). - Repository + storage adapter (
BaseDataRepository, change journal, and the concrete adapter: in-memory or Drift). - Facade (
DataServiceClient,DataServiceServer,DataServiceFactory,InMemoryDataServiceEnvironment). - Offline utilities such as
OfflineCommandQueueand reconnection helpers.
Consumers usually interact only with layer 6 while everything below stays private.
Quick start
import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_data/rpc_dart_data.dart';
Future<void> main() async {
final env = await DataServiceFactory.inMemory();
final client = env.client;
final ctx = RpcContext.withHeaders({'authorization': 'Bearer dev'});
final created = await client.create(
collection: 'notes',
payload: {'title': 'Hello', 'done': false},
context: ctx,
);
final sub = client
.watchChanges(collection: 'notes', context: ctx)
.listen((event) => print('change=${event.type} id=${event.id} v=${event.version}'));
await client.patch(
collection: 'notes',
id: created.id,
expectedVersion: created.version,
patch: const RecordPatch(set: {'done': true}),
context: ctx,
);
await Future<void>.delayed(const Duration(milliseconds: 50));
await sub.cancel();
await env.dispose();
}
See example/extended_demo.dart for a larger walkthrough.
Streaming export and import
DataRepository.exportDatabase writes every snapshot as newline-delimited JSON (NDJSON).
Each line contains a header, collection, record, collectionEnd, or footer entry. You can either:
- Work with the whole payload as a string (default), or
- Set
ExportDatabaseRequest(includePayloadString: false)and consume thepayloadStreamto keep memory flat while sending it over the wire.
Imports accept the same format. When replaceExisting is true, missing collections are removed and re-created so the target repository matches the snapshot exactly. Legacy JSON (format version 1.0.0) is still accepted for backward compatibility, but it requires loading the entire blob in memory first.
ImportDatabaseRequest always validates the snapshot stream before mutating any collections, and the importer processes records in databaseImportBatchSize chunks, so very large dumps no longer spike RAM usage.
Example: piping exported data
final export = await repository.exportDatabase(
const ExportDatabaseRequest(includePayloadString: false),
);
await for (final chunk in export.payloadStream!) {
sink.add(chunk); // write to file/socket/etc
}
Offline queue and sync commands
final env = await DataServiceFactory.inMemory();
final client = env.client;
final ctx = RpcContext.withHeaders({'authorization': 'Bearer x'});
final queue = client.createOfflineQueue(sessionId: 'device-1');
final command = queue.buildCreateCommand(
const CreateRecordRequest(collection: 'tasks', payload: {'title': 'Draft'}),
);
final persistedJson = command.toJson();
final ackFuture = queue.enqueueCommand(
DataCommand.fromJson(persistedJson),
autoStart: false,
context: ctx,
);
await queue.start(context: ctx);
await queue.flushPending();
final ack = await ackFuture;
print('applied=${ack.applied} id=${ack.record?.id}');
Use enqueueCommand(..., resolveConflicts: false) if you prefer the queue to stop on conflicts instead of retrying automatically.
DataServiceClient now includes:
listAllRecords– paginates throughlist()until the entire collection is in memory.bulkUpsertStream– streams large batches directly to the repository.pushAndAwaitAck– sends a single sync command and waits for the response.createOfflineQueue– a convenience constructor forOfflineCommandQueue.close– shuts down the underlying RPC endpoint.
Aggregations
final metrics = await client.aggregate(
collection: 'orders',
metrics: {
'countAll': 'count',
'sumPrice': 'sum:price',
'avgPrice': 'avg:price',
'minPrice': 'min:price',
'maxPrice': 'max:price',
},
context: ctx,
);
print(metrics.metrics);
The repository validates metric expressions and delegates supported work to the storage adapter.
Change streams and optimistic concurrency
watchChangesaccepts an optionalcursor, so clients can resume after a reconnect.- Drift/SQLite keeps a persistent
change_journaltable, allowing cursors to survive restarts. updateandpatchrequire anexpectedVersion. A mismatch triggersRpcDataError.conflict(...)(or a transportRpcException).
Extending the storage layer
Implement DataStorageAdapter to plug in any backend (PostgreSQL, Elastic, Firestore, ...):
class PostgresAdapter implements DataStorageAdapter {
@override
Future<DataRecord?> readRecord(String collection, String id) async {
// Query your store and return DataRecord when a row exists.
}
@override
Future<ListRecordsResponse> queryCollection(ListRecordsRequest request) async {
// Apply filters + pagination server-side.
}
// Implement searchCollection, aggregateCollection, writeRecords, deleteCollection, etc.
}
Adapters participate in streaming export/import by overriding readCollectionChunks, and they can expose custom indices via CollectionIndexStorageAdapter if needed.
Drift/SQLite profile
DriftDataStorageAdapter ships as the default single-node implementation:
- Creates per-collection tables on demand plus indexes on
version,created_at, andupdated_at. - Delegates filtering, sorting, and pagination to SQL for predictable O(log N) plans.
- Uses batched UPSERT statements to keep
writeRecordsfast even when thousands of rows are updated. - Supports SQLCipher by passing a
SqlCipherKeyor aSqliteSetupHookthat registers additional pragmas. - Stores
watch()/sync()events in a durable journal, so cursor-based clients recover after restarts.
Testing
Smoke tests are located in test/data_service_facade_test.dart. Run the whole suite with:
dart test --concurrency=1 -r compact
Examples
example/quick_start.dartexample/offline_sync.dartexample/extended_demo.dart
Production checklist
- Schedule regular backups using
exportDatabase; periodically restore them into a staging environment withimportDatabase. - Restrict RPC access through a reverse proxy with authentication and rate limiting.
- Monitor the SQLite database and change-journal size (e.g., via
sqlite3CLI or metrics collectors). - Keep an eye on the CHANGELOG for breaking changes and new capabilities.