Skip to content

Connector Resource Sync

Overview

The connector resource sync system enables Floh to maintain a local cache of upstream connector data — users, groups, roles, and other resources — so that workflow steps and UI screens can query connector data without making real-time API calls on every request.

This is particularly important for connectors that manage large directories (e.g. hundreds of thousands of users, hundreds of groups) where live fetches on every action would be prohibitively slow or rate-limited.

Architecture

The sync system uses a hybrid tiered approach:

Tier Storage Use Case Examples
Tier 1 — DB Sync PostgreSQL Small-to-medium reference data that changes infrequently Groups, roles, documents
Tier 2 — Redis Cache Redis (TTL) Individual high-volume lookups during workflow execution Single-user lookups
Tier 3 — Opt-in Bulk Sync PostgreSQL Large directories, enabled explicitly with filter rules Full user directory
┌─────────────────────────────────────────────────────┐
│                   Sync Scheduler                     │
│              (BullMQ repeatable jobs)                │
│                                                      │
│   ┌──────────────┐   ┌──────────────┐               │
│   │  full sync    │   │  incremental │               │
│   │  (cron)       │   │  (cron)      │               │
│   └──────┬───────┘   └──────┬───────┘               │
│          │                  │                        │
│          ▼                  ▼                        │
│   ┌──────────────────────────────────┐              │
│   │      ConnectorSyncService        │              │
│   │                                  │              │
│   │  • Pages through connector       │              │
│   │  • Computes SHA-256 hash         │              │
│   │  • Upserts with hash comparison  │              │
│   │  • Marks unseen records stale    │              │
│   │  • Purges beyond retention       │              │
│   └────────────┬─────────────────────┘              │
│                │                                     │
│   ┌────────────▼─────────────────────┐              │
│   │   connector_resource table (PG)  │              │
│   │   connector_sync_config table    │              │
│   └──────────────────────────────────┘              │
│                                                      │
│   ┌──────────────────────────────────┐              │
│   │   ConnectorCacheService (Redis)  │◄── Tier 2    │
│   │   TTL-based per-resource cache   │              │
│   └──────────────────────────────────┘              │
│                                                      │
│   ┌──────────────────────────────────┐              │
│   │   Webhooks (near-real-time)      │              │
│   │   Invalidates cache + upserts    │              │
│   └──────────────────────────────────┘              │
└─────────────────────────────────────────────────────┘

Database Schema

connector_resource

Stores synced resource records from upstream connectors.

Column Type Description
id UUID PK Auto-generated
connector_id UUID FK References connector_definition
resource_type VARCHAR(64) user, group, role, etc.
external_id VARCHAR(255) ID from the upstream system
display_name VARCHAR(255) Human-readable name
email VARCHAR(255) Optional email address
attributes JSONB Arbitrary upstream attributes
sync_hash VARCHAR(64) SHA-256 of the canonical record
stale_since TIMESTAMPTZ Set when the record was not refreshed during a sync pass
synced_at TIMESTAMPTZ Last time the record was confirmed upstream
created_at TIMESTAMPTZ Row creation time

A unique constraint on (connector_id, resource_type, external_id) prevents duplicates.

connector_sync_config

Stores per-connector, per-resource-type synchronization configuration.

Column Type Description
id UUID PK Auto-generated
connector_id UUID FK References connector_definition
resource_type VARCHAR(64) Resource type this config applies to
enabled BOOLEAN Whether scheduled sync is active
strategy VARCHAR(32) full or incremental
cron_schedule VARCHAR(64) Cron expression for scheduled syncs
filter_rules JSONB Optional filter rules (see below)
stale_retention VARCHAR(32) How long to keep stale records (e.g. 7d, 24h)
last_sync_at TIMESTAMPTZ Timestamp of last sync completion
last_sync_cursor TEXT Resume cursor on error
last_sync_status VARCHAR(32) idle, running, success, error
last_sync_error TEXT Error message if last sync failed
last_sync_stats JSONB Statistics from the last sync run

Sync Strategies

Full Sync

A full sync fetches all resources from the upstream connector, upserts them into connector_resource, and marks any records that were not seen during this pass as stale. Stale records are purged after the retention period.

Best for: Reference data that should always reflect the upstream truth (groups, roles).

Incremental Sync

An incremental sync requests only records modified since the last successful sync (modifiedSince parameter). It does not mark unseen records as stale — since it only fetches changes, absence does not imply deletion.

Best for: Large datasets where full enumeration is expensive and the upstream supports modification timestamps.

Filter Rules

Filter rules restrict which upstream records are synced. They are stored in the filter_rules JSONB column and applied in-memory after each page is fetched.

interface SyncFilterRules {
  groupNamePattern?: string;     // Glob-like pattern (e.g. "proj-*")
  groupIds?: string[];           // Whitelist of specific group IDs
  emailDomains?: string[];       // Only sync users from these domains
  memberOfSyncedGroups?: boolean; // Only sync users in already-synced groups
  maxRecords?: number;           // Hard cap on total synced records
}

Group Filters

  • groupNamePattern — Matches the displayName using glob syntax (* = any chars, ? = single char). Case-insensitive.
  • groupIds — Explicit whitelist; only groups with matching externalId are synced.
  • Both filters compose with AND: a record must pass all specified filters.

User Filters

  • emailDomains — Only syncs users whose email domain matches one of the listed domains. Case-insensitive.
  • maxRecords — Stops syncing after this many records have been processed (across all pages).

Sync Hash and Change Detection

Each upstream record is serialized into a canonical JSON form:

{
  "externalId": "...",
  "displayName": "...",
  "email": "..." | null,
  "attributes": { ... }
}

A SHA-256 hash of this JSON is computed and stored as sync_hash. On subsequent syncs, the hash is compared:

  • Match → Record is unchanged; only synced_at is updated.
  • Mismatch → Record is updated with new data and hash.
  • New → Record is inserted.

This minimizes database writes for large datasets where most records haven't changed.

Stale Management

After a full sync completes, any records whose synced_at is older than the sync start time (meaning they were not seen upstream) are marked stale by setting stale_since to the current time.

Stale records are retained for a configurable period (default 7d) before being purged. This protects against transient upstream issues — if a group temporarily disappears from an API response, it won't be immediately deleted.

Redis Cache (Tier 2)

The ConnectorCacheService provides a TTL-based Redis cache for individual resource lookups. This is used during workflow execution to avoid repeated API calls when checking a single user or group.

Key format: cr:{connectorId}:{resourceType}:{externalId}

Default TTL: 300 seconds (5 minutes)

Cache-Aside Pattern

getOrFetch(connectorId, type, externalId, fetcher):
  1. Check Redis → if hit, return cached value
  2. Call fetcher() → if null, return null
  3. Store result in Redis with TTL
  4. Return result

Cache Invalidation

Cache entries are invalidated: - On webhook events (created/updated/deleted) - On manual cache clear - Automatically via TTL expiry

Webhook Integration

The webhook handler at POST /api/webhooks/:connectorId has been extended to handle resource sync events:

Action Cache DB
created Invalidate key Upsert record with hash
updated Invalidate key Upsert record with hash
deleted Invalidate key Delete record

The webhook body must include a resourceType field to trigger resource sync handling. Without it, the event falls through to the existing entitlement reconciliation handler.

{
  "action": "updated",
  "resourceId": "user-123",
  "resourceType": "user",
  "data": {
    "displayName": "Jane Doe",
    "email": "jane@acme.com"
  }
}

API Reference

All endpoints require authentication and are scoped under /api/connectors.

List Synced Resources

GET /api/connectors/:id/resources?type=group&page=1&pageSize=20&search=eng

Returns paginated resources from the local sync cache. Supports filtering by type, search term, and stale status.

List Sync Configurations

GET /api/connectors/:id/sync-config

Returns all sync configurations for a connector.

Get Sync Configuration

GET /api/connectors/:id/sync-config/:type

Returns the sync configuration for a specific resource type.

Create or Update Sync Configuration

PUT /api/connectors/:id/sync-config/:type
Content-Type: application/json

{
  "enabled": true,
  "strategy": "full",
  "cronSchedule": "0 */6 * * *",
  "filterRules": {
    "groupNamePattern": "proj-*"
  },
  "staleRetention": "7d"
}

Creates a new sync configuration or updates an existing one. When enabled with a cron schedule, a BullMQ repeatable job is registered automatically.

Delete Sync Configuration

DELETE /api/connectors/:id/sync-config/:type

Removes the sync configuration and its associated scheduled job.

Trigger Manual Sync

POST /api/connectors/:id/sync-config/:type/trigger

Immediately runs a sync for the specified resource type. Returns the sync statistics on completion.

Response:

{
  "message": "Sync completed",
  "stats": {
    "added": 12,
    "updated": 3,
    "staled": 0,
    "unchanged": 485,
    "removed": 0,
    "durationMs": 8420,
    "pagesProcessed": 1,
    "totalUpstreamRecords": 500
  }
}

Get Sync Status

GET /api/connectors/:id/sync-config/:type/status

Returns the last sync status, error, and statistics without the full configuration.

Connector Implementation

For a connector to support resource sync, it must implement paginated list commands. The connector's configSchema should declare these commands with syncCapable: true.

Required Commands

Resource Type Command Description
group listGroups Paginated group listing
user listUsers Paginated user listing
role listRoles Paginated role listing

Command Interface

Each list command receives:

{
  command: 'listGroups',
  cursor?: string,       // Resume cursor from previous page
  pageSize?: number,     // Requested page size (default 500)
  filter?: object,       // Optional filter from sync config
  modifiedSince?: string // ISO timestamp for incremental sync
}

And must return:

{
  success: true,
  data: {
    resources: [
      {
        externalId: 'grp-123',
        displayName: 'Engineering',
        email: null,
        attributes: { description: 'Engineering team', memberCount: 45 }
      }
    ],
    nextCursor: 'page-2-token',  // undefined when no more pages
    totalEstimate: 150           // optional, for progress indication
  }
}

Authifi Example

The Authifi connector implements listGroups and listUsers out of the box. Both commands use cursor-based pagination and map upstream API responses to the standard ConnectorResourceRecord format.

Background Scheduling

When a sync configuration is enabled with a cronSchedule, a BullMQ repeatable job is registered with the id cr-sync-{connectorId}-{resourceType}. The worker handler invokes ConnectorSyncService.runSync().

On application startup, all enabled sync configs are loaded from the database and their corresponding jobs are registered with the scheduler. This ensures schedules survive server restarts.

Permissions

Endpoint Required Permission
GET .../resources connector:read
GET .../sync-config connector:read
GET .../sync-config/:type/status connector:read
PUT .../sync-config/:type connector:update
DELETE .../sync-config/:type connector:update
POST .../sync-config/:type/trigger connector:update

Shared Types

All sync-related types are defined in packages/shared/src/connector-sync.types.ts and exported from @floh/shared. Key interfaces:

  • ConnectorResourceRecord — Upstream resource shape
  • ConnectorListResult — Paginated list response from connectors
  • ConnectorSyncConfigDto — Sync configuration DTO
  • SyncFilterRules — Filter rule schema
  • SyncStats — Sync run statistics
  • ConnectorResourceDto — Resource as returned by the API
  • ResourceWebhookEvent — Webhook event for resource changes

File Layout

packages/shared/src/
  connector-sync.types.ts         # Shared type definitions

packages/server/src/
  db/migrations/
    003_connector_resource_sync.ts # Migration for new tables
  db/schema/
    operational-tables.ts          # Table type definitions
    database.ts                    # Database interface registration
  modules/connectors/
    sync-repository.ts             # ConnectorResourceRepository, ConnectorSyncConfigRepository
    sync-service.ts                # ConnectorSyncService (orchestration)
    sync-routes.ts                 # API routes for sync management
    connector-cache.ts             # ConnectorCacheService (Redis)
    authifi-client.ts              # Extended with listGroupsPaginated, listUsersPaginated
    authifi-connector.ts           # Extended with listGroups, listUsers commands
    seed-connectors.ts             # Updated schema with syncCapable commands
  modules/roles/routes/
    webhook-routes.ts              # Extended for resource webhook events
  route-registry.ts               # Sync routes registration
  app.ts                           # Startup job scheduling
  worker-handlers.ts               # Sync job handler
  worker.ts                        # Worker deps with sync service

packages/server/test/unit/
  connector-cache.test.ts                # Cache service tests
  connector-sync-repository.test.ts      # Repository tests
  connector-sync-service.test.ts         # Sync orchestration tests
  connector-sync-routes.test.ts          # API route tests
  connector-sync-filter.test.ts          # Filter rules engine tests
  authifi-connector-sync.test.ts         # Authifi list commands tests
  connector-resource-webhook.test.ts     # Webhook extension tests