kalam_link 0.1.3-alpha.1 copy "kalam_link: ^0.1.3-alpha.1" to clipboard
kalam_link: ^0.1.3-alpha.1 copied to clipboard

KalamDB client SDK for Dart and Flutter — queries, live subscriptions, and authentication powered by flutter_rust_bridge.

kalam_link #

Official Dart and Flutter SDK for KalamDB — private, realtime storage for AI agents.

Status: Alpha — the API surface is still evolving and may include breaking changes between releases.

KalamDB is a SQL-first realtime database. Every user or tenant gets a private namespace. Subscribe to any SQL query live over WebSocket. Publish and consume events via Topics. Store recent data fast and move cold history to object storage.

kalamdb.org · Docs · GitHub

Features #

  • SQL Queries — execute parameterized SQL with $1, $2 placeholders
  • Live Subscriptions — subscribe to any SQL query; receive inserts, updates, and deletes in real-time over WebSocket
  • Per-tenant isolation — each user gets a private namespace; no app-side WHERE user_id = ? filters needed
  • Topics & Pub/Sub — publish events to topics and consume them from any client or agent
  • Authentication — HTTP Basic Auth, JWT tokens, dynamic authProvider callback, or anonymous access
  • Cross-platform — iOS, Android, macOS, Windows, Linux, and Web (WASM)

Installation #

dependencies:
  kalam_link: ^0.1.3-alpha.1
flutter pub add kalam_link

Initialization #

KalamClient.init() must be called once at app startup before any other SDK call. It initializes the underlying Rust runtime.

void main() async {
  WidgetsFlutterBinding.ensureInitialized();
  await KalamClient.init();
  runApp(MyApp());
}

Connecting #

import 'package:kalam_link/kalam_link.dart';

final client = await KalamClient.connect(
  url: 'https://db.example.com',
  authProvider: () async {
    final token = await myApp.getOrRefreshJwt();
    return Auth.jwt(token);
  },
);

connect() options #

Parameter Type Default Description
url String required Server URL
authProvider AuthProvider? null Recommended. Async callback invoked before each connection for fresh credentials. Takes precedence over auth.
auth Auth Auth.none() Static credentials. Use authProvider for tokens that can expire.
timeout Duration 30 s HTTP request timeout
maxRetries int 3 Retry count for idempotent (SELECT) queries
disableCompression bool false Disable gzip on WebSocket (useful during development)
connectionHandlers ConnectionHandlers? null Lifecycle event callbacks — see below

Authentication #

Use authProvider for OAuth flows, refresh tokens, or any credentials fetched from secure storage. The callback is invoked before every connection attempt, so tokens are always fresh.

final client = await KalamClient.connect(
  url: 'https://db.example.com',
  authProvider: () async {
    // fetch from keychain, refresh if expired, etc.
    final token = await myApp.getOrRefreshJwt();
    return Auth.jwt(token);
  },
);

Re-invoke the authProvider on demand (e.g. after a 401 or on a schedule):

await client.refreshAuth();

// Periodic refresh example:
Timer.periodic(Duration(minutes: 55), (_) => client.refreshAuth());

Static auth #

For simpler scenarios where tokens do not expire:

// HTTP Basic Auth
final client = await KalamClient.connect(
  url: 'https://db.example.com',
  auth: Auth.basic('alice', 'secret123'),
);

// JWT token
final client = await KalamClient.connect(
  url: 'https://db.example.com',
  auth: Auth.jwt('eyJhbGci...'),
);

// No authentication (localhost bypass)
final client = await KalamClient.connect(
  url: 'http://localhost:8080',
);

Login and token refresh #

Exchange Basic credentials for a JWT and use it for subsequent connections:

final bootstrap = await KalamClient.connect(url: serverUrl);
final tokens = await bootstrap.login('alice', 'secret123');
await bootstrap.dispose();

final client = await KalamClient.connect(
  url: serverUrl,
  auth: Auth.jwt(tokens.accessToken),
);

Refresh an expiring token:

final fresh = await client.refreshToken(tokens.refreshToken!);
// use fresh.accessToken for the next connection

Executing Queries #

// Simple query
final result = await client.query('SELECT * FROM users LIMIT 10');
for (final row in result.rows) {
  print(row); // List<dynamic>
}

// Rows as maps keyed by column name
final maps = result.results.first.toMaps();

// Parameterized query
final result = await client.query(
  r'SELECT * FROM orders WHERE user_id = $1 AND status = $2',
  params: ['user-uuid-123', 'pending'],
);

// Query scoped to a user namespace
final result = await client.query(
  'SELECT * FROM messages LIMIT 5',
  namespace: 'alice',
);

QueryResponse fields #

Field Type Description
success bool Whether the query succeeded
results List<QueryResult> Result sets (one per statement)
tookMs double? Execution time in milliseconds
error ErrorDetail? Error details when success is false

QueryResult fields #

Field Type Description
columns List<SchemaField> Column metadata
rows List<List<dynamic>> Parsed row values
rowCount int Rows affected or returned
message String? DDL message (e.g. "Table created")
toMaps() List<Map<String, dynamic>> Rows keyed by column name

Live Subscriptions #

Subscribe to any SQL query. The returned Stream<ChangeEvent> emits real-time events:

final stream = client.subscribe(
  'SELECT * FROM chat.messages WHERE room_id = $1 ORDER BY created_at ASC',
  batchSize: 100,   // rows per initial-data batch
  lastRows: 50,     // rewind: include last N rows before live changes
);

await for (final event in stream) {
  switch (event) {
    case AckEvent(:final subscriptionId, :final totalRows):
      print('Subscribed $subscriptionId. Snapshot rows: $totalRows');
    case InitialDataBatch(:final rowsJson, :final hasMore):
      print('Snapshot batch, hasMore=$hasMore');
    case InsertEvent(:final row):
      print('New row: $row');
    case UpdateEvent(:final row, :final oldRow):
      print('Updated: $oldRow → $row');
    case DeleteEvent(:final row):
      print('Deleted: $row');
    case SubscriptionError(:final message):
      print('Error: $message');
  }
}

Cancel the subscription by cancelling the StreamSubscription:

final sub = stream.listen((_) {});
// later:
await sub.cancel();

subscribe() options #

Parameter Type Description
sql String SQL query to watch
batchSize int? Max rows per initial snapshot batch
lastRows int? Include the last N rows before live changes begin
subscriptionId String? Custom subscription ID

ChangeEvent variants #

Variant Fields
AckEvent subscriptionId, totalRows, schema, batchNum, hasMore, status
InitialDataBatch subscriptionId, rowsJson, batchNum, hasMore, status
InsertEvent subscriptionId, rowsJson, row
UpdateEvent subscriptionId, rowsJson, oldRowsJson, row, oldRow
DeleteEvent subscriptionId, oldRowsJson, row
SubscriptionError subscriptionId, code, message

Connection Lifecycle Handlers #

Hook into connection events for logging, UI state, or diagnostics:

final client = await KalamClient.connect(
  url: 'https://db.example.com',
  authProvider: () async => Auth.jwt(await getToken()),
  connectionHandlers: ConnectionHandlers(
    onConnect: () => print('connected'),
    onDisconnect: (reason) => print('disconnected: ${reason.message}'),
    onError: (error) => print('error: ${error.message}'),
    onReceive: (message) => print('[recv] $message'),
    onSend: (message) => print('[send] $message'),
  ),
);

Server Health & Setup #

// Check server health (version, status)
final health = await client.healthCheck();
print('${health.status} — v${health.version}');

// Check if first-time setup is required
final setup = await client.checkSetupStatus();
if (setup.needsSetup) {
  await client.serverSetup(ServerSetupRequest(
    username: 'admin',
    password: 'AdminPass123!',
    rootPassword: 'RootPass456!',
  ));
}

Disposing #

Always dispose the client when done to release resources:

await client.dispose();

Full API Reference #

Method Description
KalamClient.init() Required. Initialize the Rust runtime once at app startup
KalamClient.connect(url, {authProvider, auth, timeout, maxRetries, disableCompression, connectionHandlers}) Create a connected client
query(sql, {params, namespace}) Execute parameterized SQL
subscribe(sql, {batchSize, lastRows, subscriptionId}) Subscribe to live changes — returns Stream<ChangeEvent>
login(username, password) Exchange Basic credentials for JWT tokens
refreshToken(refreshToken) Refresh an expiring access token
refreshAuth() Re-invoke authProvider and update credentials in-place
healthCheck() Check server health and version
checkSetupStatus() Check if first-time setup is needed
serverSetup(request) Perform initial server setup
dispose() Release resources

License #

Apache-2.0


Native performance on iOS and Android is powered by the excellent flutter_rust_bridge project — thank you to its maintainers and contributors.

1
likes
120
points
--
downloads

Publisher

verified publisherkalamdb.org

Weekly Downloads

KalamDB client SDK for Dart and Flutter — queries, live subscriptions, and authentication powered by flutter_rust_bridge.

Repository (GitHub)
View/report issues

Documentation

API reference

License

unknown (license)

Dependencies

flutter, flutter_rust_bridge, freezed_annotation

More

Packages that depend on kalam_link

Packages that implement kalam_link