rpc_dart_data 2.1.1 copy "rpc_dart_data: ^2.1.1" to clipboard
rpc_dart_data: ^2.1.1 copied to clipboard

Data service layer (CRUD + querying + offline sync) built on rpc_dart.

rpc_dart_data #

rpc_dart_data is the high-level data layer (CRUD + queries + change streams + offline sync) that sits on top of rpc_dart. It gives you a transport-agnostic contract, ready-to-use storage adapters, and utilities for building offline-friendly backends and clients.

-## Feature highlights

  • Unified DataService contract with helpers for create/get/list/update/patch/delete/deleteCollection.
  • Bulk workflows via bulkUpsert, bulkUpsertStream, and bulkDelete to move large batches atomically.
  • Search & aggregations (search, aggregate) delegated to the storage adapter, including backend pagination.
  • Streaming snapshots: export/import the full database as NDJSON, stream it through payloadStream, or set includePayloadString: false to skip building large strings.
  • Change streams & offline sync: watchChanges with cursors, bidirectional syncChanges, and OfflineCommandQueue for durable command queues.
  • SQLite adapter with SQLCipher support (auto-loads libsqlcipher when a key is provided, uses sqlite3mc.wasm on web) and a SqliteSetupHook so you can register custom pragmas before the database is exposed to your code.
  • Ready-made environments (DataServiceFactory.inMemory) for tests, demos, and local prototyping.
  • Collection discovery: RPC listCollections() queries the storage adapter for the current list of collection names so clients can introspect available datasets without enumerating all records.

This package now depends directly only on rpc_dart, sqlite3, and helpers such as licensify/args. The former drift and rpc_dart_transports dependencies were removed so the core data layer stays lean—plug in whatever transport or storage adapter fits your architecture.

Architecture in seven layers #

  1. Transport (WebSocket / HTTP/2 / isolates / TURN / in-memory) provided by rpc_dart implementations (you can plug in any IRpcTransport; this package now relies on the transports shipped by rpc_dart instead of bundling rpc_dart_transports directly).
  2. Endpoint (RpcCallerEndpoint / RpcResponderEndpoint).
  3. Contract + codecs (IDataServiceContract and RpcCodec<...>).
  4. Low-level plumbing (DataServiceCaller / DataServiceResponder).
  5. Repository + storage adapter (BaseDataRepository, change journal, and the concrete adapter: in-memory or SQLite).
  6. Facade (DataServiceClient, DataServiceServer, DataServiceFactory, InMemoryDataServiceEnvironment).
  7. Offline utilities such as OfflineCommandQueue and reconnection helpers.

Consumers usually interact only with layer 6 while everything below stays private.

Quick start #

import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_data/rpc_dart_data.dart';

Future<void> main() async {
  final env = await DataServiceFactory.inMemory();
  final client = env.client;
  final ctx = RpcContext.withHeaders({'authorization': 'Bearer dev'});

  final created = await client.create(
    collection: 'notes',
    payload: {'title': 'Hello', 'done': false},
    context: ctx,
  );

  final sub = client
      .watchChanges(collection: 'notes', context: ctx)
      .listen((event) => print('change=${event.type} id=${event.id} v=${event.version}'));

  await client.patch(
    collection: 'notes',
    id: created.id,
    expectedVersion: created.version,
    patch: const RecordPatch(set: {'done': true}),
    context: ctx,
  );

  await Future<void>.delayed(const Duration(milliseconds: 50));
  await sub.cancel();
  await env.dispose();
}

See example/extended_demo.dart for a larger walkthrough.

Streaming export and import #

DataRepository.exportDatabase writes every snapshot as newline-delimited JSON (NDJSON). Each line contains a header, collection, record, collectionEnd, or footer entry. You can either:

  • Work with the whole payload as a string (default), or
  • Set ExportDatabaseRequest(includePayloadString: false) and consume the payloadStream to keep memory flat while sending it over the wire.

Imports accept the same format. When replaceExisting is true, missing collections are removed and re-created so the target repository matches the snapshot exactly. Legacy JSON (format version 1.0.0) is still accepted for backward compatibility, but it requires loading the entire blob in memory first.

ImportDatabaseRequest always validates the snapshot stream before mutating any collections, and the importer processes records in databaseImportBatchSize chunks, so very large dumps no longer spike RAM usage.

Example: piping exported data #

final export = await repository.exportDatabase(
  const ExportDatabaseRequest(includePayloadString: false),
);
await for (final chunk in export.payloadStream!) {
  sink.add(chunk); // write to file/socket/etc
}

Offline queue and sync commands #

final env = await DataServiceFactory.inMemory();
final client = env.client;
final ctx = RpcContext.withHeaders({'authorization': 'Bearer x'});
final queue = client.createOfflineQueue(sessionId: 'device-1');

final command = queue.buildCreateCommand(
  const CreateRecordRequest(collection: 'tasks', payload: {'title': 'Draft'}),
);
final persistedJson = command.toJson();

final ackFuture = queue.enqueueCommand(
  DataCommand.fromJson(persistedJson),
  autoStart: false,
  context: ctx,
);
await queue.start(context: ctx);
await queue.flushPending();
final ack = await ackFuture;
print('applied=${ack.applied} id=${ack.record?.id}');

Use enqueueCommand(..., resolveConflicts: false) if you prefer the queue to stop on conflicts instead of retrying automatically.

DataServiceClient now includes:

  • listAllRecords – paginates through list() until the entire collection is in memory.
  • bulkUpsertStream – streams large batches directly to the repository.
  • pushAndAwaitAck – sends a single sync command and waits for the response.
  • createOfflineQueue – a convenience constructor for OfflineCommandQueue.
  • close – shuts down the underlying RPC endpoint.

Aggregations #

final metrics = await client.aggregate(
  collection: 'orders',
  metrics: {
    'countAll': 'count',
    'sumPrice': 'sum:price',
    'avgPrice': 'avg:price',
    'minPrice': 'min:price',
    'maxPrice': 'max:price',
  },
  context: ctx,
);
print(metrics.metrics);

The repository validates metric expressions and delegates supported work to the storage adapter.

Change streams and optimistic concurrency #

  • watchChanges accepts an optional cursor, so clients can resume after a reconnect.
  • SQLite keeps a persistent s_change_journal table, allowing cursors to survive restarts.
  • update and patch require an expectedVersion. A mismatch triggers RpcDataError.conflict(...) (or a transport RpcException).

Listing collections #

Call DataServiceClient.listCollections() to retrieve the current catalog of collection names without enumerating records.

final collections = await client.listCollections(context: ctx);
print('collections: ${collections.join(', ')}');

The RPC experience simply forwards to DataStorageAdapter.listCollections(), so the result reflects whatever tables your adapter created.

Extending the storage layer #

Implement DataStorageAdapter to plug in any backend (PostgreSQL, Elastic, Firestore, ...):

class PostgresAdapter implements DataStorageAdapter {
  @override
  Future<DataRecord?> readRecord(String collection, String id) async {
    // Query your store and return DataRecord when a row exists.
  }

  @override
  Future<ListRecordsResponse> queryCollection(ListRecordsRequest request) async {
    // Apply filters + pagination server-side.
  }

  // Implement searchCollection, aggregateCollection, writeRecords, deleteCollection, etc.
}

Adapters participate in streaming export/import by overriding readCollectionChunks, and they can expose custom indices via CollectionIndexStorageAdapter if needed.

SQLite profile #

SqliteDataStorageAdapter ships as the default single-node implementation:

  • Creates per-collection tables on demand plus indexes on version, created_at, and updated_at.
  • Delegates filtering, sorting, and pagination to SQL for predictable O(log N) plans.
  • Uses batched UPSERT statements to keep writeRecords fast even when thousands of rows are updated.
  • SQLCipher:
    • Pass a SqlCipherKey to enable encryption; the runtime will auto-load libsqlcipher on macOS/Linux (checks SQLITE3_LIB_DIR/SQLITE3_LIB_NAME or common Homebrew paths) before opening the database and will fail fast if cipher pragmas are missing.
    • On the web, the adapter uses the sqlite3mc.wasm build (SQLite3MultipleCiphers) by default; set webSqliteWasmUri to a custom location if you host your own WASM.
    • You can still register extra PRAGMA via SqliteSetupHook for fine-tuning.
  • Stores watch() / sync() events in a durable journal, so cursor-based clients recover after restarts.

Testing #

Smoke tests are located in test/data_service_facade_test.dart. Run the whole suite with:

dart test --concurrency=1 -r compact

Examples #

  • example/quick_start.dart
  • example/offline_sync.dart
  • example/extended_demo.dart

Production checklist #

  • Schedule regular backups using exportDatabase; periodically restore them into a staging environment with importDatabase.
  • Restrict RPC access through a reverse proxy with authentication and rate limiting.
  • Monitor the SQLite database and change-journal size (e.g., via sqlite3 CLI or metrics collectors).
  • Keep an eye on the CHANGELOG for breaking changes and new capabilities.

License #

MIT

0
likes
0
points
890
downloads

Publisher

verified publisherdart.nogipx.dev

Weekly Downloads

Data service layer (CRUD + querying + offline sync) built on rpc_dart.

Homepage
Repository (GitHub)
View/report issues

License

unknown (license)

Dependencies

async, collection, equatable, licensify, meta, path, rpc_dart, sqlite3

More

Packages that depend on rpc_dart_data