Connector Resource Sync¶
Overview¶
The connector resource sync system enables Floh to maintain a local cache of upstream connector data — users, groups, roles, and other resources — so that workflow steps and UI screens can query connector data without making real-time API calls on every request.
This is particularly important for connectors that manage large directories (e.g. hundreds of thousands of users, hundreds of groups) where live fetches on every action would be prohibitively slow or rate-limited.
Architecture¶
The sync system uses a hybrid tiered approach:
| Tier | Storage | Use Case | Examples |
|---|---|---|---|
| Tier 1 — DB Sync | PostgreSQL | Small-to-medium reference data that changes infrequently | Groups, roles, documents |
| Tier 2 — Redis Cache | Redis (TTL) | Individual high-volume lookups during workflow execution | Single-user lookups |
| Tier 3 — Opt-in Bulk Sync | PostgreSQL | Large directories, enabled explicitly with filter rules | Full user directory |
┌─────────────────────────────────────────────────────┐
│ Sync Scheduler │
│ (BullMQ repeatable jobs) │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ full sync │ │ incremental │ │
│ │ (cron) │ │ (cron) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────────┐ │
│ │ ConnectorSyncService │ │
│ │ │ │
│ │ • Pages through connector │ │
│ │ • Computes SHA-256 hash │ │
│ │ • Upserts with hash comparison │ │
│ │ • Marks unseen records stale │ │
│ │ • Purges beyond retention │ │
│ └────────────┬─────────────────────┘ │
│ │ │
│ ┌────────────▼─────────────────────┐ │
│ │ connector_resource table (PG) │ │
│ │ connector_sync_config table │ │
│ └──────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────┐ │
│ │ ConnectorCacheService (Redis) │◄── Tier 2 │
│ │ TTL-based per-resource cache │ │
│ └──────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────┐ │
│ │ Webhooks (near-real-time) │ │
│ │ Invalidates cache + upserts │ │
│ └──────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
Database Schema¶
connector_resource¶
Stores synced resource records from upstream connectors.
| Column | Type | Description |
|---|---|---|
id |
UUID PK | Auto-generated |
connector_id |
UUID FK | References connector_definition |
resource_type |
VARCHAR(64) | user, group, role, etc. |
external_id |
VARCHAR(255) | ID from the upstream system |
display_name |
VARCHAR(255) | Human-readable name |
email |
VARCHAR(255) | Optional email address |
attributes |
JSONB | Arbitrary upstream attributes |
sync_hash |
VARCHAR(64) | SHA-256 of the canonical record |
stale_since |
TIMESTAMPTZ | Set when the record was not refreshed during a sync pass |
synced_at |
TIMESTAMPTZ | Last time the record was confirmed upstream |
created_at |
TIMESTAMPTZ | Row creation time |
A unique constraint on (connector_id, resource_type, external_id) prevents duplicates.
connector_sync_config¶
Stores per-connector, per-resource-type synchronization configuration.
| Column | Type | Description |
|---|---|---|
id |
UUID PK | Auto-generated |
connector_id |
UUID FK | References connector_definition |
resource_type |
VARCHAR(64) | Resource type this config applies to |
enabled |
BOOLEAN | Whether scheduled sync is active |
strategy |
VARCHAR(32) | full or incremental |
cron_schedule |
VARCHAR(64) | Cron expression for scheduled syncs |
filter_rules |
JSONB | Optional filter rules (see below) |
stale_retention |
VARCHAR(32) | How long to keep stale records (e.g. 7d, 24h) |
last_sync_at |
TIMESTAMPTZ | Timestamp of last sync completion |
last_sync_cursor |
TEXT | Resume cursor on error |
last_sync_status |
VARCHAR(32) | idle, running, success, error |
last_sync_error |
TEXT | Error message if last sync failed |
last_sync_stats |
JSONB | Statistics from the last sync run |
Sync Strategies¶
Full Sync¶
A full sync fetches all resources from the upstream connector, upserts them into connector_resource, and marks any records that were not seen during this pass as stale. Stale records are purged after the retention period.
Best for: Reference data that should always reflect the upstream truth (groups, roles).
Incremental Sync¶
An incremental sync requests only records modified since the last successful sync (modifiedSince parameter). It does not mark unseen records as stale — since it only fetches changes, absence does not imply deletion.
Best for: Large datasets where full enumeration is expensive and the upstream supports modification timestamps.
Filter Rules¶
Filter rules restrict which upstream records are synced. They are stored in the filter_rules JSONB column and applied in-memory after each page is fetched.
interface SyncFilterRules {
groupNamePattern?: string; // Glob-like pattern (e.g. "proj-*")
groupIds?: string[]; // Whitelist of specific group IDs
emailDomains?: string[]; // Only sync users from these domains
memberOfSyncedGroups?: boolean; // Only sync users in already-synced groups
maxRecords?: number; // Hard cap on total synced records
}
Group Filters¶
groupNamePattern— Matches thedisplayNameusing glob syntax (*= any chars,?= single char). Case-insensitive.groupIds— Explicit whitelist; only groups with matchingexternalIdare synced.- Both filters compose with AND: a record must pass all specified filters.
User Filters¶
emailDomains— Only syncs users whose email domain matches one of the listed domains. Case-insensitive.maxRecords— Stops syncing after this many records have been processed (across all pages).
Sync Hash and Change Detection¶
Each upstream record is serialized into a canonical JSON form:
A SHA-256 hash of this JSON is computed and stored as sync_hash. On subsequent syncs, the hash is compared:
- Match → Record is unchanged; only
synced_atis updated. - Mismatch → Record is updated with new data and hash.
- New → Record is inserted.
This minimizes database writes for large datasets where most records haven't changed.
Stale Management¶
After a full sync completes, any records whose synced_at is older than the sync start time (meaning they were not seen upstream) are marked stale by setting stale_since to the current time.
Stale records are retained for a configurable period (default 7d) before being purged. This protects against transient upstream issues — if a group temporarily disappears from an API response, it won't be immediately deleted.
Redis Cache (Tier 2)¶
The ConnectorCacheService provides a TTL-based Redis cache for individual resource lookups. This is used during workflow execution to avoid repeated API calls when checking a single user or group.
Key format: cr:{connectorId}:{resourceType}:{externalId}
Default TTL: 300 seconds (5 minutes)
Cache-Aside Pattern¶
getOrFetch(connectorId, type, externalId, fetcher):
1. Check Redis → if hit, return cached value
2. Call fetcher() → if null, return null
3. Store result in Redis with TTL
4. Return result
Cache Invalidation¶
Cache entries are invalidated: - On webhook events (created/updated/deleted) - On manual cache clear - Automatically via TTL expiry
Webhook Integration¶
The webhook handler at POST /api/webhooks/:connectorId has been extended to handle resource sync events:
| Action | Cache | DB |
|---|---|---|
created |
Invalidate key | Upsert record with hash |
updated |
Invalidate key | Upsert record with hash |
deleted |
Invalidate key | Delete record |
The webhook body must include a resourceType field to trigger resource sync handling. Without it, the event falls through to the existing entitlement reconciliation handler.
{
"action": "updated",
"resourceId": "user-123",
"resourceType": "user",
"data": {
"displayName": "Jane Doe",
"email": "jane@acme.com"
}
}
API Reference¶
All endpoints require authentication and are scoped under /api/connectors.
List Synced Resources¶
Returns paginated resources from the local sync cache. Supports filtering by type, search term, and stale status.
List Sync Configurations¶
Returns all sync configurations for a connector.
Get Sync Configuration¶
Returns the sync configuration for a specific resource type.
Create or Update Sync Configuration¶
PUT /api/connectors/:id/sync-config/:type
Content-Type: application/json
{
"enabled": true,
"strategy": "full",
"cronSchedule": "0 */6 * * *",
"filterRules": {
"groupNamePattern": "proj-*"
},
"staleRetention": "7d"
}
Creates a new sync configuration or updates an existing one. When enabled with a cron schedule, a BullMQ repeatable job is registered automatically.
Delete Sync Configuration¶
Removes the sync configuration and its associated scheduled job.
Trigger Manual Sync¶
Immediately runs a sync for the specified resource type. Returns the sync statistics on completion.
Response:
{
"message": "Sync completed",
"stats": {
"added": 12,
"updated": 3,
"staled": 0,
"unchanged": 485,
"removed": 0,
"durationMs": 8420,
"pagesProcessed": 1,
"totalUpstreamRecords": 500
}
}
Get Sync Status¶
Returns the last sync status, error, and statistics without the full configuration.
Connector Implementation¶
For a connector to support resource sync, it must implement paginated list commands. The connector's configSchema should declare these commands with syncCapable: true.
Required Commands¶
| Resource Type | Command | Description |
|---|---|---|
group |
listGroups |
Paginated group listing |
user |
listUsers |
Paginated user listing |
role |
listRoles |
Paginated role listing |
Command Interface¶
Each list command receives:
{
command: 'listGroups',
cursor?: string, // Resume cursor from previous page
pageSize?: number, // Requested page size (default 500)
filter?: object, // Optional filter from sync config
modifiedSince?: string // ISO timestamp for incremental sync
}
And must return:
{
success: true,
data: {
resources: [
{
externalId: 'grp-123',
displayName: 'Engineering',
email: null,
attributes: { description: 'Engineering team', memberCount: 45 }
}
],
nextCursor: 'page-2-token', // undefined when no more pages
totalEstimate: 150 // optional, for progress indication
}
}
Authifi Example¶
The Authifi connector implements listGroups and listUsers out of the box. Both commands use cursor-based pagination and map upstream API responses to the standard ConnectorResourceRecord format.
Background Scheduling¶
When a sync configuration is enabled with a cronSchedule, a BullMQ repeatable job is registered with the id cr-sync-{connectorId}-{resourceType}. The worker handler invokes ConnectorSyncService.runSync().
On application startup, all enabled sync configs are loaded from the database and their corresponding jobs are registered with the scheduler. This ensures schedules survive server restarts.
Permissions¶
| Endpoint | Required Permission |
|---|---|
GET .../resources |
connector:read |
GET .../sync-config |
connector:read |
GET .../sync-config/:type/status |
connector:read |
PUT .../sync-config/:type |
connector:update |
DELETE .../sync-config/:type |
connector:update |
POST .../sync-config/:type/trigger |
connector:update |
Shared Types¶
All sync-related types are defined in packages/shared/src/connector-sync.types.ts and exported from @floh/shared. Key interfaces:
ConnectorResourceRecord— Upstream resource shapeConnectorListResult— Paginated list response from connectorsConnectorSyncConfigDto— Sync configuration DTOSyncFilterRules— Filter rule schemaSyncStats— Sync run statisticsConnectorResourceDto— Resource as returned by the APIResourceWebhookEvent— Webhook event for resource changes
File Layout¶
packages/shared/src/
connector-sync.types.ts # Shared type definitions
packages/server/src/
db/migrations/
003_connector_resource_sync.ts # Migration for new tables
db/schema/
operational-tables.ts # Table type definitions
database.ts # Database interface registration
modules/connectors/
sync-repository.ts # ConnectorResourceRepository, ConnectorSyncConfigRepository
sync-service.ts # ConnectorSyncService (orchestration)
sync-routes.ts # API routes for sync management
connector-cache.ts # ConnectorCacheService (Redis)
authifi-client.ts # Extended with listGroupsPaginated, listUsersPaginated
authifi-connector.ts # Extended with listGroups, listUsers commands
seed-connectors.ts # Updated schema with syncCapable commands
modules/roles/routes/
webhook-routes.ts # Extended for resource webhook events
route-registry.ts # Sync routes registration
app.ts # Startup job scheduling
worker-handlers.ts # Sync job handler
worker.ts # Worker deps with sync service
packages/server/test/unit/
connector-cache.test.ts # Cache service tests
connector-sync-repository.test.ts # Repository tests
connector-sync-service.test.ts # Sync orchestration tests
connector-sync-routes.test.ts # API route tests
connector-sync-filter.test.ts # Filter rules engine tests
authifi-connector-sync.test.ts # Authifi list commands tests
connector-resource-webhook.test.ts # Webhook extension tests