diff --git a/graphile/graphile-cache/src/__tests__/disposal-and-cap.test.ts b/graphile/graphile-cache/src/__tests__/disposal-and-cap.test.ts new file mode 100644 index 0000000000..56f74fa120 --- /dev/null +++ b/graphile/graphile-cache/src/__tests__/disposal-and-cap.test.ts @@ -0,0 +1,186 @@ +import { EventEmitter } from 'events'; +import { + ensureCacheHeadroom, + getCacheConfig, + graphileCache, + type GraphileCacheEntry, + invokeEntryHandler, +} from '../graphile-cache'; + +/** + * Regression tests for the schema-builder OOM fix. + * + * 1. Disposal guard is entry-scoped, not key-scoped. The previous string-keyed guard + * skipped pgl.release() for a rebuilt entry that shared a key with an entry still + * mid-release (the "same-key disposal race"). These tests pin the corrected behaviour: + * every distinct entry is released exactly once, while the SAME entry is never + * double-disposed. + * 2. getCacheConfig honours GRAPHILE_CACHE_MAX and otherwise returns a heap-aware default + * bounded to a sane range (a count-only default of 50 × ~0.5GB/instance was the OOM). + */ + +const tick = (ms = 60) => new Promise((resolve) => setTimeout(resolve, ms)); + +const makeEntry = (releaseDelayMs = 0): GraphileCacheEntry => { + const release = jest.fn( + () => new Promise((resolve) => setTimeout(resolve, releaseDelayMs)), + ); + return { + pgl: { release } as unknown as GraphileCacheEntry['pgl'], + serv: {} as GraphileCacheEntry['serv'], + handler: {} as GraphileCacheEntry['handler'], + // Never .listen()ed in production; close() invokes its callback synchronously here. + httpServer: { close: (cb: () => void) => cb() } as unknown as GraphileCacheEntry['httpServer'], + cacheKey: 'mock', + createdAt: Date.now(), + }; +}; + +describe('graphile-cache disposal guard (same-key race)', () => { + it('disposes a rebuilt entry on the same key while a prior entry is mid-release', async () => { + const key = 'race-key-1'; + const a = makeEntry(40); // a.release() stays pending while b is disposed + const b = makeEntry(0); + + graphileCache.set(key, a); + graphileCache.delete(key); // -> disposeEntry(a) begins, a.release() in flight + graphileCache.set(key, b); // key was free; inserts b + graphileCache.delete(key); // -> disposeEntry(b) MUST NOT be skipped by the guard + + await tick(120); + + // The bug: the old string-keyed guard left `key` parked while a.release() was pending, + // so b.release() was never called (would be 0 here). + expect((a.pgl as unknown as { release: jest.Mock }).release).toHaveBeenCalledTimes(1); + expect((b.pgl as unknown as { release: jest.Mock }).release).toHaveBeenCalledTimes(1); + }); + + it('never disposes the same entry twice', async () => { + const key = 'race-key-2'; + const a = makeEntry(0); + + graphileCache.set(key, a); + graphileCache.delete(key); // dispose a + graphileCache.set(key, a); // re-insert the SAME entry object + graphileCache.delete(key); // dispose again -> guard must skip (already disposed) + + await tick(); + + expect((a.pgl as unknown as { release: jest.Mock }).release).toHaveBeenCalledTimes(1); + }); +}); + +describe('graphile-cache heap-aware capacity', () => { + const prev = process.env.GRAPHILE_CACHE_MAX; + afterEach(() => { + if (prev === undefined) delete process.env.GRAPHILE_CACHE_MAX; + else process.env.GRAPHILE_CACHE_MAX = prev; + }); + + it('honours an explicit GRAPHILE_CACHE_MAX', () => { + process.env.GRAPHILE_CACHE_MAX = '7'; + expect(getCacheConfig().max).toBe(7); + }); + + it('falls back to a bounded heap-aware default when unset', () => { + delete process.env.GRAPHILE_CACHE_MAX; + const { max } = getCacheConfig(); + // Each cached instance retains ~0.5GB, so the default must be far below the legacy 50 + // on a normal heap, but always within [3, 50]. + expect(max).toBeGreaterThanOrEqual(3); + expect(max).toBeLessThanOrEqual(50); + }); + + it('ignores a non-numeric GRAPHILE_CACHE_MAX (falls back to heap-aware default)', () => { + process.env.GRAPHILE_CACHE_MAX = 'not-a-number'; + const { max } = getCacheConfig(); + expect(max).toBeGreaterThanOrEqual(3); + expect(max).toBeLessThanOrEqual(50); + }); +}); + +// A minimal express-like response: an EventEmitter so invokeEntryHandler can hook +// 'finish'/'close', cast to the express type it expects. +const makeRes = () => new EventEmitter() as unknown as Parameters[2]; +const fakeReq = {} as Parameters[1]; +const fakeNext = (() => undefined) as Parameters[3]; + +describe('invokeEntryHandler in-flight refcounting', () => { + it('increments while serving and releases exactly once across finish+close', () => { + const entry = makeEntry(); + const handler = jest.fn(); + entry.handler = handler as unknown as GraphileCacheEntry['handler']; + + const res = makeRes(); + expect(invokeEntryHandler(entry, fakeReq, res, fakeNext)).toBe(true); + expect(handler).toHaveBeenCalledTimes(1); + expect(entry.inflight).toBe(1); + + (res as unknown as EventEmitter).emit('finish'); + expect(entry.inflight).toBe(0); + // 'close' always follows 'finish' in Node; the release must be idempotent. + (res as unknown as EventEmitter).emit('close'); + expect(entry.inflight).toBe(0); + }); + + it('refuses a disposing entry without invoking the handler', () => { + const entry = makeEntry(); + const handler = jest.fn(); + entry.handler = handler as unknown as GraphileCacheEntry['handler']; + entry.disposing = true; + + expect(invokeEntryHandler(entry, fakeReq, makeRes(), fakeNext)).toBe(false); + expect(handler).not.toHaveBeenCalled(); + }); +}); + +describe('disposeEntry drains in-flight requests before release', () => { + it('waits for inflight to reach 0, then releases exactly once', async () => { + const key = 'drain-key-1'; + const entry = makeEntry(0); + entry.inflight = 1; // simulate a request mid-flight + + graphileCache.set(key, entry); + graphileCache.delete(key); // disposal starts; must poll instead of releasing + + await tick(120); // one poll cycle elapsed, request still in flight + expect((entry.pgl as unknown as { release: jest.Mock }).release).not.toHaveBeenCalled(); + + entry.inflight = 0; // request completes + await tick(250); // allow the next poll cycle to observe it + + expect((entry.pgl as unknown as { release: jest.Mock }).release).toHaveBeenCalledTimes(1); + }); +}); + +describe('ensureCacheHeadroom', () => { + const prevMax = process.env.GRAPHILE_CACHE_MAX; + afterEach(() => { + if (prevMax === undefined) delete process.env.GRAPHILE_CACHE_MAX; + else process.env.GRAPHILE_CACHE_MAX = prevMax; + graphileCache.clear(); + }); + + it('evicts least-recently-used entries until a build slot is free', async () => { + process.env.GRAPHILE_CACHE_MAX = '2'; + const a = makeEntry(0); + const b = makeEntry(0); + graphileCache.set('headroom-a', a); + graphileCache.set('headroom-b', b); + graphileCache.get('headroom-a'); // touch a → b becomes LRU-oldest + + const evicted = ensureCacheHeadroom(1); + + expect(evicted).toBe(1); + expect(graphileCache.has('headroom-a')).toBe(true); + expect(graphileCache.has('headroom-b')).toBe(false); + await tick(); // let the fire-and-forget disposal settle before clear() + }); + + it('no-ops when under capacity', () => { + process.env.GRAPHILE_CACHE_MAX = '5'; + graphileCache.set('headroom-c', makeEntry(0)); + expect(ensureCacheHeadroom(1)).toBe(0); + expect(graphileCache.has('headroom-c')).toBe(true); + }); +}); diff --git a/graphile/graphile-cache/src/create-instance.ts b/graphile/graphile-cache/src/create-instance.ts index fc4c625ae2..5787020a5a 100644 --- a/graphile/graphile-cache/src/create-instance.ts +++ b/graphile/graphile-cache/src/create-instance.ts @@ -10,6 +10,11 @@ const log = new Logger('graphile-cache:create'); interface GraphileInstanceOptions { preset: any; cacheKey: string; + /** + * Database name backing this instance's pg pool. Stored on the entry so the + * graphile-cache pgCache cleanup callback can evict it when its pool is disposed. + */ + dbname?: string; /** * When true, a RealtimeManager is created and started alongside the * PostGraphile instance. The pool is extracted from the preset's @@ -37,7 +42,7 @@ interface GraphileInstanceOptions { export const createGraphileInstance = async ( opts: GraphileInstanceOptions ): Promise => { - const { preset, cacheKey, enableRealtime = false } = opts; + const { preset, cacheKey, dbname, enableRealtime = false } = opts; const pgl = postgraphile(preset); const serv = pgl.createServ(grafserv); @@ -53,7 +58,9 @@ export const createGraphileInstance = async ( handler, httpServer, cacheKey, + dbname, createdAt: Date.now(), + inflight: 0, }; if (enableRealtime) { diff --git a/graphile/graphile-cache/src/graphile-cache.ts b/graphile/graphile-cache/src/graphile-cache.ts index 26bbb24c0a..a5a5c66d25 100644 --- a/graphile/graphile-cache/src/graphile-cache.ts +++ b/graphile/graphile-cache/src/graphile-cache.ts @@ -1,8 +1,14 @@ import { EventEmitter } from 'events'; +import { getHeapStatistics } from 'node:v8'; import { Logger } from '@pgpmjs/logger'; import { LRUCache } from 'lru-cache'; import { pgCache } from 'pg-cache'; -import type { Express } from 'express'; +import type { + Express, + NextFunction as ExpressNextFunction, + Request as ExpressRequest, + Response as ExpressResponse, +} from 'express'; import type { Server as HttpServer } from 'http'; import type { PostGraphileInstance } from 'postgraphile'; import type { GrafservBase } from 'grafserv'; @@ -12,8 +18,7 @@ const log = new Logger('graphile-cache'); // --- Time Constants --- export const ONE_HOUR_MS = 1000 * 60 * 60; export const FIVE_MINUTES_MS = 1000 * 60 * 5; -const ONE_DAY = ONE_HOUR_MS * 24; -const ONE_YEAR = ONE_DAY * 366; +const SIX_HOURS_MS = ONE_HOUR_MS * 6; // --- Eviction Types --- export type EvictionReason = 'lru' | 'ttl' | 'manual'; @@ -43,30 +48,84 @@ export interface CacheConfig { ttl: number; } +/** + * Empirically, a PostGraphile v5 instance that has served at least one GraphQL + * request retains ~0.5 GB of V8 heap (the fully-materialised GraphQL schema plus + * grafast's per-schema plan machinery; a build-only instance is far smaller, but + * every *cached* instance is, by definition, one that serves requests). The cache + * therefore CANNOT be bounded by entry count alone: `max` heavy instances need + * `max * ~0.5GB` of resident heap, and once that exceeds the V8 old-space limit the + * process OOMs as distinct hosts fill the cache. A fixed default of 50 implies + * ~24 GB of resident schemas — far beyond any normal heap — which is the root cause + * of the schema-builder OOM. We derive a heap-aware default that budgets a fraction + * of the heap for cached instances. Override explicitly with GRAPHILE_CACHE_MAX, and + * tune the per-instance estimate with GRAPHILE_CACHE_INSTANCE_HEAP_BYTES. + */ +// ~0.5 GB retained per query-serving instance (measured against real provisioned apps). +const DEFAULT_INSTANCE_HEAP_BYTES = 512 * 1024 * 1024; +// Fraction of the V8 heap budgeted for cached instances; the remainder is headroom +// for transient schema builds (each build briefly allocates hundreds of MB) and +// request processing. +const CACHE_HEAP_FRACTION = 0.5; +const MIN_CACHE_MAX = 3; +const MAX_CACHE_MAX = 50; + +const parseEnvInt = (value: string | undefined, fallback: number): number => { + if (!value) return fallback; + const n = parseInt(value, 10); + return Number.isFinite(n) && n > 0 ? n : fallback; +}; + +/** + * Heap-aware default for the maximum number of cached PostGraphile instances. + * cache_heap_budget = heap_size_limit * CACHE_HEAP_FRACTION + * max = clamp(floor(cache_heap_budget / per_instance_bytes), MIN, MAX) + */ +function computeHeapAwareMax(): number { + try { + const heapLimit = getHeapStatistics().heap_size_limit; // reflects --max-old-space-size + const perInstance = parseEnvInt( + process.env.GRAPHILE_CACHE_INSTANCE_HEAP_BYTES, + DEFAULT_INSTANCE_HEAP_BYTES, + ); + const budgeted = Math.floor((heapLimit * CACHE_HEAP_FRACTION) / perInstance); + return Math.min(MAX_CACHE_MAX, Math.max(MIN_CACHE_MAX, budgeted)); + } catch { + return MIN_CACHE_MAX; + } +} + /** * Get cache configuration from environment variables * * Supports: - * - GRAPHILE_CACHE_MAX: Maximum number of entries (default: 50) - * - GRAPHILE_CACHE_TTL_MS: TTL in milliseconds - * - Production default: ONE_YEAR + * - GRAPHILE_CACHE_MAX: Maximum number of entries. Default is heap-aware + * (see computeHeapAwareMax) rather than a fixed 50, because each cached + * instance retains ~0.5 GB and a count-only cap OOMs the process. + * - GRAPHILE_CACHE_INSTANCE_HEAP_BYTES: per-instance heap estimate for the + * heap-aware default (default ~512 MB). + * - GRAPHILE_CACHE_TTL_MS: idle TTL in milliseconds (updateAgeOnGet refreshes it + * on every hit, so this is an idle-expiry, not an absolute lifetime) + * - Production default: SIX_HOURS_MS — an idle tenant's ~0.5 GB instance is + * reclaimed within hours instead of pinned for a year; a later request + * simply rebuilds it * - Development default: FIVE_MINUTES_MS * - * NOTE: This value should be <= PG_CACHE_MAX (also default: 50) so that - * every cached PostGraphile instance has a live pool backing it. + * NOTE: This value should be <= PG_CACHE_MAX so that every cached PostGraphile + * instance has a live pool backing it. */ export function getCacheConfig(): CacheConfig { const isDevelopment = process.env.NODE_ENV === 'development'; const max = process.env.GRAPHILE_CACHE_MAX - ? parseInt(process.env.GRAPHILE_CACHE_MAX, 10) - : 50; + ? parseEnvInt(process.env.GRAPHILE_CACHE_MAX, computeHeapAwareMax()) + : computeHeapAwareMax(); const ttl = process.env.GRAPHILE_CACHE_TTL_MS ? parseInt(process.env.GRAPHILE_CACHE_TTL_MS, 10) : isDevelopment ? FIVE_MINUTES_MS - : ONE_YEAR; + : SIX_HOURS_MS; return { max, ttl }; } @@ -89,12 +148,32 @@ export interface GraphileCacheEntry { httpServer: HttpServer; cacheKey: string; createdAt: number; + /** + * Database name backing this instance's pg pool. Used to evict the entry when its + * pool is disposed (see the pgCache cleanup callback). Distinct from cacheKey, which + * on the public server is the request HOST, not the database. + */ + dbname?: string; /** Optional RealtimeManager for cursor-tracked subscription delivery */ realtimeManager?: { stop(): Promise } | null; + /** + * Number of requests currently executing against this entry's handler. + * Maintained by invokeEntryHandler; disposeEntry drains to 0 (bounded by + * GRAPHILE_CACHE_DRAIN_TIMEOUT_MS) before releasing the instance so eviction + * cannot tear down a schema mid-request. + */ + inflight?: number; + /** Set at the start of disposal; routing must treat the entry as a cache miss. */ + disposing?: boolean; } -// Track disposed entries to prevent double-disposal -const disposedKeys = new Set(); +// Track disposed entries to prevent double-disposal. Keyed by ENTRY IDENTITY (a WeakSet), +// NOT by the cache key. A key-scoped guard caused a same-key disposal race: while entry A +// for key K was mid `pgl.release()` (K parked in the guard), a rebuilt entry B for the SAME +// key K could be evicted and its disposeEntry would short-circuit, silently skipping +// B.pgl.release(). Guarding by entry identity disposes every distinct entry exactly once +// while still allowing a rebuilt entry on the same key to be released. +const disposedEntries = new WeakSet(); // Track keys that are being manually evicted for accurate eviction reason const manualEvictionKeys = new Set(); @@ -106,20 +185,42 @@ const manualEvictionKeys = new Set(); * 1. Closing the HTTP server if listening * 2. Releasing the PostGraphile instance (which internally releases grafserv) * - * Uses disposedKeys set to prevent double-disposal when closeAllCaches() - * explicitly disposes entries and then clear() triggers the dispose callback. + * Uses the disposedEntries WeakSet to prevent double-disposal of the same entry when + * closeAllCaches() explicitly disposes entries and then clear() triggers the dispose + * callback for the same entry. */ const disposeEntry = async (entry: GraphileCacheEntry, key: string): Promise => { - // Prevent double-disposal - if (disposedKeys.has(key)) { + // Prevent double-disposal of the SAME entry (guard by identity, not by key — see the + // disposedEntries declaration for why). + if (disposedEntries.has(entry)) { return; } - disposedKeys.add(key); + disposedEntries.add(entry); + entry.disposing = true; + + // Drain in-flight requests before tearing the instance down. The entry is already + // out of the cache (dispose fires post-removal), so no NEW requests can route here — + // invokeEntryHandler also refuses entries with `disposing` set. Bounded wait: a wedged + // request must not pin ~0.5 GB forever. + const drainTimeoutMs = parseEnvInt(process.env.GRAPHILE_CACHE_DRAIN_TIMEOUT_MS, 30_000); + const drainStart = Date.now(); + while ((entry.inflight ?? 0) > 0 && Date.now() - drainStart < drainTimeoutMs) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + if ((entry.inflight ?? 0) > 0) { + log.warn( + `Disposing PostGraphile[${key}] with ${entry.inflight} request(s) still in flight after ${drainTimeoutMs}ms drain timeout`, + ); + } log.debug(`Disposing PostGraphile[${key}]`); try { - // Close HTTP server if it's listening - if (entry.httpServer?.listening) { + // Close the HTTP server. create-instance builds it via createServer() but never + // .listen()s it, so `.listening` is always false — the old guard meant close() never + // ran. Closing unconditionally detaches grafserv's 'upgrade' listener; when the server + // was never listening, close() simply invokes the callback with ERR_SERVER_NOT_RUNNING + // (a harmless no-op here), so the Promise always resolves. + if (entry.httpServer) { await new Promise((resolve) => { entry.httpServer.close(() => resolve()); }); @@ -138,9 +239,8 @@ const disposeEntry = async (entry: GraphileCacheEntry, key: string): Promise({ max: initialConfig.max, @@ -187,6 +301,60 @@ export const graphileCache = new LRUCache({ } }); +/** + * Invoke an entry's Express handler with in-flight refcounting. + * + * Returns false WITHOUT invoking when the entry is already being disposed — + * callers must treat that as a cache miss (rebuild) or retry. On invocation, + * the refcount is incremented and released exactly once when the response + * finishes or the connection closes, which is what disposeEntry drains on. + */ +export function invokeEntryHandler( + entry: GraphileCacheEntry, + req: ExpressRequest, + res: ExpressResponse, + next: ExpressNextFunction, +): boolean { + if (entry.disposing) { + return false; + } + entry.inflight = (entry.inflight ?? 0) + 1; + let released = false; + const release = () => { + if (!released) { + released = true; + entry.inflight = Math.max(0, (entry.inflight ?? 1) - 1); + } + }; + // 'close' fires after 'finish' and also on aborted connections; release is idempotent. + res.once('finish', release); + res.once('close', release); + entry.handler(req, res, next); + return true; +} + +/** + * Evict least-recently-used entries until there is room for `slots` new entries + * under the configured max. Called BEFORE building a new instance so the build's + * transient allocation (hundreds of MB) lands on a cache that has already shed + * an instance, instead of stacking a full cache + a build peak (the OOM shape). + */ +export function ensureCacheHeadroom(slots = 1): number { + const { max } = getCacheConfig(); + let evicted = 0; + while (graphileCache.size > Math.max(0, max - slots)) { + // rkeys() iterates least-recently-used first + const oldestKey = graphileCache.rkeys().next().value as string | undefined; + if (oldestKey === undefined) break; + graphileCache.delete(oldestKey); + evicted++; + } + if (evicted > 0) { + log.info(`Evicted ${evicted} instance(s) before build to keep heap headroom (max=${max})`); + } + return evicted; +} + // --- Cache Stats --- export interface CacheStats { size: number; @@ -235,9 +403,12 @@ export function clearMatchingEntries(pattern: RegExp): number { const unregister = pgCache.registerCleanupCallback((pgPoolKey: string) => { log.debug(`pgPool[${pgPoolKey}] disposed - checking graphile entries`); - // Remove graphile entries that reference this pool key + // Remove graphile entries backed by this pool. Match on the entry's dbname (the pool is + // keyed by database name), NOT on cacheKey: on the public server cacheKey is the request + // HOST, which never contains the database name, so the old `cacheKey.includes(pgPoolKey)` + // test never matched and this safety valve was dead. graphileCache.forEach((entry, k) => { - if (entry.cacheKey.includes(pgPoolKey)) { + if (entry.dbname === pgPoolKey) { log.debug(`Removing graphileCache[${k}] due to pgPool[${pgPoolKey}] disposal`); manualEvictionKeys.add(k); graphileCache.delete(k); @@ -281,11 +452,12 @@ export const closeAllCaches = async (verbose = false): Promise => { // Wait for all disposals to complete await Promise.allSettled(disposePromises); - // Clear the cache after disposal (dispose callback will no-op due to disposedKeys) + // Clear the cache after disposal (dispose callback no-ops: each entry is already + // in the disposedEntries WeakSet from the explicit disposeEntry above). graphileCache.clear(); - // Clear disposed keys tracking after full cleanup - disposedKeys.clear(); + // The disposedEntries WeakSet needs no explicit clearing — entries are reclaimed by + // GC once the cache no longer references them. manualEvictionKeys.clear(); // Close pg pools diff --git a/graphile/graphile-cache/src/index.ts b/graphile/graphile-cache/src/index.ts index e64be532b9..8c5965a2ea 100644 --- a/graphile/graphile-cache/src/index.ts +++ b/graphile/graphile-cache/src/index.ts @@ -25,6 +25,10 @@ export { CacheStats, getCacheStats, + // In-flight refcounting + pre-build headroom + invokeEntryHandler, + ensureCacheHeadroom, + // Clear matching entries clearMatchingEntries } from './graphile-cache'; diff --git a/graphile/graphile-i18n/src/plugin.ts b/graphile/graphile-i18n/src/plugin.ts index ebe3ab14a4..b53cc9fb62 100644 --- a/graphile/graphile-i18n/src/plugin.ts +++ b/graphile/graphile-i18n/src/plugin.ts @@ -74,7 +74,7 @@ export function createI18nPlugin(options: I18nPluginOptions = {}): GraphileConfi // Closure-scoped state shared between init and field hooks let i18nRegistry: Record = {}; - const localeTypeCache: Record = {}; + let localeTypeCache: Record = {}; return { name: 'I18nPlugin', @@ -85,6 +85,7 @@ export function createI18nPlugin(options: I18nPluginOptions = {}): GraphileConfi init: { callback(_, build) { i18nRegistry = {}; + localeTypeCache = {}; for (const [, codec] of Object.entries(build.input.pgRegistry.pgCodecs)) { const c = codec as PgCodecWithAttributes; @@ -238,10 +239,22 @@ export function createI18nPlugin(options: I18nPluginOptions = {}): GraphileConfi .map(f => `coalesce(v."${f.column}", b."${f.column}") as "${f.column}"`) .join(', '); + // When the instance is built for blueprint pooling + // (schema.constructiveUnqualified), emit search_path-relative table + // references so the per-request search_path resolves the tenant schema. + // Default (flag absent): fully schema-qualified, byte-identical output. + const constructiveUnqualified = !!(build.options as any).constructiveUnqualified; + const baseTableRef = constructiveUnqualified + ? `"${baseTable}"` + : `"${schemaName}"."${baseTable}"`; + const translationTableRef = constructiveUnqualified + ? `"${translationTable}"` + : `"${schemaName}"."${translationTable}"`; + // Build the SQL query template const sqlQuery = `SELECT v."${langCodeColumn}" AS "lang_code", ${coalescedCols} - FROM "${schemaName}"."${baseTable}" b - LEFT JOIN "${schemaName}"."${translationTable}" v + FROM ${baseTableRef} b + LEFT JOIN ${translationTableRef} v ON v."${fkColumn}" = b."${pkColumn}" AND array_position($2::text[], v."${langCodeColumn}") IS NOT NULL WHERE b."${pkColumn}" = $1::${pkType} @@ -265,18 +278,20 @@ export function createI18nPlugin(options: I18nPluginOptions = {}): GraphileConfi $baseCols[column] = $parent.get(column); } const $withPgClient = (grafastContext() as any).get('withPgClient'); + const $pgSettings = (grafastContext() as any).get('pgSettings'); const $langCodes = (grafastContext() as any).get('langCodes'); // Combine all inputs into a single step const $input = object({ id: $id, withPgClient: $withPgClient, + pgSettings: $pgSettings, langCodes: $langCodes, ...$baseCols, }); return lambda($input, async (input: any) => { - const { id, withPgClient, langCodes: ctxLangCodes, ...baseCols } = input; + const { id, withPgClient, pgSettings, langCodes: ctxLangCodes, ...baseCols } = input; const langs: string[] = ctxLangCodes ?? defaultLanguages; if (!withPgClient || !id) { @@ -287,7 +302,7 @@ export function createI18nPlugin(options: I18nPluginOptions = {}): GraphileConfi return result; } - const row = await withPgClient(null, async (client: any) => { + const row = await withPgClient(pgSettings, async (client: any) => { const { rows } = await client.query(sqlQuery, [id, langs]); return rows[0] ?? null; }); diff --git a/graphile/graphile-llm/src/__tests__/agent-discovery.test.ts b/graphile/graphile-llm/src/__tests__/agent-discovery.test.ts new file mode 100644 index 0000000000..c0b9e14c7f --- /dev/null +++ b/graphile/graphile-llm/src/__tests__/agent-discovery.test.ts @@ -0,0 +1,148 @@ +/** + * Agent discovery tenant-isolation tests (pure, no DB required). + * + * Verifies the cross-tenant bleed fix: discovery is filtered by the requesting + * tenant's database id (resolved from jwt.claims.database_id in pgSettings and + * passed as $1), and the per-database cache is keyed by that id — never by + * dbname alone — so a shared/pooled instance cannot return another tenant's + * agent config. + */ + +import { clearAgentDiscoveryCache, getAgentDiscovery } from '../plugins/agent-discovery-plugin'; + +// ─── Fake pool ─────────────────────────────────────────────────────────────── + +interface QueryCall { + sql: string; + values: unknown[] | undefined; +} + +/** + * A fake pg Pool whose `query` scopes rows by the $1 database id — mirroring how + * the real (shared) control-plane metaschema tables behave once the WHERE + * clause is applied. Records every call for assertions. + */ +function makeFakePool(rowsByDatabaseId: Record) { + const calls: QueryCall[] = []; + const pool = { + calls, + query: async (sql: string, values?: unknown[]) => { + calls.push({ sql, values }); + const id = values?.[0] as string | null; + const row = id != null ? rowsByDatabaseId[id] : undefined; + return { rows: row ? [row] : [] }; + } + }; + return pool; +} + +const TENANT_1 = '11111111-1111-1111-1111-111111111111'; +const TENANT_2 = '22222222-2222-2222-2222-222222222222'; + +const ROWS = { + [TENANT_1]: { + schema_name: 'tenant1_agent', + thread_table_name: 'agent_thread', + message_table_name: 'agent_message', + task_table_name: 'agent_task' + }, + [TENANT_2]: { + schema_name: 'tenant2_agent', + thread_table_name: 't2_thread', + message_table_name: 't2_message', + task_table_name: 't2_task' + } +}; + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +describe('getAgentDiscovery — tenant isolation', () => { + beforeEach(() => { + clearAgentDiscoveryCache(); + }); + + it('filters by the tenant database id from pgSettings, passed as $1', async () => { + const pool = makeFakePool(ROWS); + + const result = await getAgentDiscovery(pool as any, 'shared-db', { + 'jwt.claims.database_id': TENANT_1 + }); + + expect(pool.calls).toHaveLength(1); + expect(pool.calls[0].sql).toContain('WHERE s.database_id = $1'); + expect(pool.calls[0].values).toEqual([TENANT_1]); + + expect(result).not.toBeNull(); + expect(result!.thread).toEqual({ schemaName: 'tenant1_agent', tableName: 'agent_thread' }); + expect(result!.message).toEqual({ schemaName: 'tenant1_agent', tableName: 'agent_message' }); + expect(result!.task).toEqual({ schemaName: 'tenant1_agent', tableName: 'agent_task' }); + }); + + it('does NOT bleed across tenants sharing one instance/dbname', async () => { + const pool = makeFakePool(ROWS); + + const t1 = await getAgentDiscovery(pool as any, 'shared-db', { + 'jwt.claims.database_id': TENANT_1 + }); + const t2 = await getAgentDiscovery(pool as any, 'shared-db', { + 'jwt.claims.database_id': TENANT_2 + }); + + // Each tenant gets its own config despite the same dbname. + expect(t1!.thread!.schemaName).toBe('tenant1_agent'); + expect(t2!.thread!.schemaName).toBe('tenant2_agent'); + + // Two distinct database ids ⇒ two separate cache entries ⇒ two queries. + expect(pool.calls.map((c) => c.values)).toEqual([[TENANT_1], [TENANT_2]]); + }); + + it('caches by database id (same id ⇒ a single query)', async () => { + const pool = makeFakePool(ROWS); + + const a = await getAgentDiscovery(pool as any, 'shared-db', { + 'jwt.claims.database_id': TENANT_1 + }); + const b = await getAgentDiscovery(pool as any, 'shared-db', { + 'jwt.claims.database_id': TENANT_1 + }); + + expect(pool.calls).toHaveLength(1); + expect(a).toEqual(b); + }); + + it('fails closed when database id is absent — passes null as $1, keys under :nodb', async () => { + const pool = makeFakePool(ROWS); + + // No pgSettings at all. + const r1 = await getAgentDiscovery(pool as any, 'shared-db'); + // Empty pgSettings (no jwt.claims.database_id). + const r2 = await getAgentDiscovery(pool as any, 'shared-db', {}); + + expect(r1).toBeNull(); + expect(r2).toBeNull(); + + // First call queries with null $1; second is served from the ':nodb' cache. + expect(pool.calls).toHaveLength(1); + expect(pool.calls[0].values).toEqual([null]); + + // A different dbname without an id uses a different ':nodb' key ⇒ new query. + await getAgentDiscovery(pool as any, 'other-db'); + expect(pool.calls).toHaveLength(2); + }); + + it('caches null (unprovisioned tenant) without re-querying', async () => { + // Pool returns no rows for this (valid) tenant id ⇒ discovery is null. + const pool = makeFakePool({}); + + const first = await getAgentDiscovery(pool as any, 'shared-db', { + 'jwt.claims.database_id': TENANT_1 + }); + const second = await getAgentDiscovery(pool as any, 'shared-db', { + 'jwt.claims.database_id': TENANT_1 + }); + + expect(first).toBeNull(); + expect(second).toBeNull(); + expect(pool.calls).toHaveLength(1); + }); +}); diff --git a/graphile/graphile-llm/src/__tests__/rag-unqualified.test.ts b/graphile/graphile-llm/src/__tests__/rag-unqualified.test.ts new file mode 100644 index 0000000000..93aac8b560 --- /dev/null +++ b/graphile/graphile-llm/src/__tests__/rag-unqualified.test.ts @@ -0,0 +1,130 @@ +/** + * RAG chunk-table qualification tests (pure, no DB / no Ollama required). + * + * Verifies the blueprint-pooling flag `build.options.constructiveUnqualified` + * is threaded into discovered ChunkTableInfo and that the generated chunk + * search SQL references the tenant-data table UNQUALIFIED when the flag is on + * (so the per-request search_path resolves the tenant), while the default + * behavior stays schema-qualified. + */ + +import { buildChunkSearchSql, discoverChunkTables } from '../plugins/rag-plugin'; +import type { ChunkTableInfo } from '../types'; + +// ─── Fixtures ──────────────────────────────────────────────────────────────── + +function baseChunkTable(overrides: Partial = {}): ChunkTableInfo { + return { + parentCodecName: 'articles', + chunksSchema: 'llm_test', + chunksTableName: 'articles_chunks', + parentFkField: 'parent_id', + parentPkField: 'id', + embeddingField: 'embedding', + contentField: 'content', + ...overrides + }; +} + +function makeBuild(constructiveUnqualified?: boolean) { + return { + options: constructiveUnqualified === undefined ? {} : { constructiveUnqualified }, + input: { + pgRegistry: { + pgCodecs: { + articles: { + name: 'articles', + attributes: { id: {}, title: {} }, + extensions: { + pg: { schemaName: 'llm_test' }, + tags: { + hasChunks: { + chunksTable: 'articles_chunks', + parentFk: 'parent_id', + parentPk: 'id', + embeddingField: 'embedding', + contentField: 'content' + } + } + } + } + } + } + } + }; +} + +// ─── buildChunkSearchSql ───────────────────────────────────────────────────── + +describe('buildChunkSearchSql — chunk table qualification', () => { + it('emits a SCHEMA-QUALIFIED reference by default (flag off)', () => { + const { text } = buildChunkSearchSql(baseChunkTable(), '[1,0,0]', 5, null); + expect(text).toContain('FROM "llm_test"."articles_chunks"'); + }); + + it('emits an UNQUALIFIED reference when unqualified is true (blueprint pooling)', () => { + const { text } = buildChunkSearchSql( + baseChunkTable({ unqualified: true }), + '[1,0,0]', + 5, + null + ); + expect(text).toContain('FROM "articles_chunks"'); + expect(text).not.toContain('"llm_test"."articles_chunks"'); + }); + + it('emits an UNQUALIFIED reference when no schema is known (existing fallback)', () => { + const { text } = buildChunkSearchSql( + baseChunkTable({ chunksSchema: null }), + '[1,0,0]', + 5, + null + ); + expect(text).toContain('FROM "articles_chunks"'); + }); + + it('preserves parameter order/values (no maxDistance)', () => { + const { values } = buildChunkSearchSql(baseChunkTable(), '[1,0,0]', 7, null); + expect(values).toEqual(['[1,0,0]', 7]); + }); + + it('preserves parameter order/values (with maxDistance)', () => { + const { text, values } = buildChunkSearchSql(baseChunkTable(), '[1,0,0]', 7, 0.4); + expect(values).toEqual(['[1,0,0]', 0.4, 7]); + expect(text).toContain('LIMIT $3'); + }); +}); + +// ─── discoverChunkTables threading ─────────────────────────────────────────── + +describe('discoverChunkTables — threads constructiveUnqualified into ChunkTableInfo', () => { + it('sets unqualified=true when build.options.constructiveUnqualified is true', () => { + const tables = discoverChunkTables(makeBuild(true)); + expect(tables).toHaveLength(1); + expect(tables[0].unqualified).toBe(true); + expect(tables[0].chunksSchema).toBe('llm_test'); + + // End-to-end: the discovered table produces a bare reference. + const { text } = buildChunkSearchSql(tables[0], '[1,0,0]', 5, null); + expect(text).toContain('FROM "articles_chunks"'); + expect(text).not.toContain('"llm_test"."articles_chunks"'); + }); + + it('sets unqualified=false when the flag is false', () => { + const tables = discoverChunkTables(makeBuild(false)); + expect(tables).toHaveLength(1); + expect(tables[0].unqualified).toBe(false); + + const { text } = buildChunkSearchSql(tables[0], '[1,0,0]', 5, null); + expect(text).toContain('FROM "llm_test"."articles_chunks"'); + }); + + it('sets unqualified=false when the flag is absent (default behavior unchanged)', () => { + const tables = discoverChunkTables(makeBuild(undefined)); + expect(tables).toHaveLength(1); + expect(tables[0].unqualified).toBe(false); + + const { text } = buildChunkSearchSql(tables[0], '[1,0,0]', 5, null); + expect(text).toContain('FROM "llm_test"."articles_chunks"'); + }); +}); diff --git a/graphile/graphile-llm/src/plugins/agent-discovery-plugin.ts b/graphile/graphile-llm/src/plugins/agent-discovery-plugin.ts index ad020c4dc3..735ca173dc 100644 --- a/graphile/graphile-llm/src/plugins/agent-discovery-plugin.ts +++ b/graphile/graphile-llm/src/plugins/agent-discovery-plugin.ts @@ -41,6 +41,9 @@ export function clearAgentDiscoveryCache(): void { // ─── Discovery Query ──────────────────────────────────────────────────────── +// Control-plane metaschema_* tables stay fully qualified. The schema row is +// filtered by the requesting tenant's database id ($1) so a shared/pooled +// instance never bleeds another tenant's agent config across the join. const DISCOVERY_SQL = ` SELECT s.schema_name, @@ -49,18 +52,31 @@ const DISCOVERY_SQL = ` acm.task_table_name FROM metaschema_modules_public.agent_chat_module acm JOIN metaschema_public.schema s ON s.id = acm.schema_id + WHERE s.database_id = $1 LIMIT 1 `; /** * Look up agent table info for a database, querying the module config table. - * Results are cached per-database with a 60s TTL. + * + * The requesting tenant is identified by its database id, resolved from + * `jwt.claims.database_id` in the per-request pgSettings — the same way the + * billing config cache obtains it (see config-cache.ts / metering-plugin.ts). + * The query filters on that id so a shared instance cannot return another + * tenant's config. When the id is absent we fail closed (no discovery) and + * cache under a `:nodb` key to avoid poisoning a real tenant's entry. + * + * Results are cached per database id with a 60s TTL. */ export async function getAgentDiscovery( pool: Pool, - dbname: string + dbname: string, + pgSettings?: Record | null ): Promise { - const cached = agentDiscoveryCache.get(dbname); + const databaseId = pgSettings?.['jwt.claims.database_id'] ?? null; + const cacheKey = databaseId ?? `${dbname}:nodb`; + + const cached = agentDiscoveryCache.get(cacheKey); if (cached !== undefined) { return cached; } @@ -68,7 +84,7 @@ export async function getAgentDiscovery( let discovery: AgentDiscovery | null = null; try { - const { rows } = await pool.query(DISCOVERY_SQL); + const { rows } = await pool.query(DISCOVERY_SQL, [databaseId]); if (rows.length > 0) { const row = rows[0]; @@ -90,6 +106,6 @@ export async function getAgentDiscovery( // Module table doesn't exist in this database — not provisioned } - agentDiscoveryCache.set(dbname, discovery); + agentDiscoveryCache.set(cacheKey, discovery); return discovery; } diff --git a/graphile/graphile-llm/src/plugins/rag-plugin.ts b/graphile/graphile-llm/src/plugins/rag-plugin.ts index 3c1a3e15cf..e578982506 100644 --- a/graphile/graphile-llm/src/plugins/rag-plugin.ts +++ b/graphile/graphile-llm/src/plugins/rag-plugin.ts @@ -86,12 +86,19 @@ function parseHasChunksTag(raw: any, codec: any): ChunkTableInfo | null { /** * Discover all chunk-aware tables from the pgRegistry. + * + * Exported for unit testing. */ -function discoverChunkTables(build: any): ChunkTableInfo[] { +export function discoverChunkTables(build: any): ChunkTableInfo[] { const chunkTables: ChunkTableInfo[] = []; const pgRegistry = build.input?.pgRegistry ?? build.pgRegistry; if (!pgRegistry) return chunkTables; + // Blueprint pooling: when the instance is built with unqualified pg + // identifiers, tenant-data references must be search_path-relative (bare + // table name) so the per-request search_path resolves the tenant schema. + const unqualified = (build.options as any)?.constructiveUnqualified === true; + // Scan all codecs for @hasChunks smart tag for (const codec of Object.values(pgRegistry.pgCodecs || {})) { const c = codec as any; @@ -102,6 +109,7 @@ function discoverChunkTables(build: any): ChunkTableInfo[] { const info = parseHasChunksTag(tags.hasChunks, c); if (info) { + info.unqualified = unqualified; chunkTables.push(info); } } @@ -111,17 +119,22 @@ function discoverChunkTables(build: any): ChunkTableInfo[] { /** * Build a SQL query string to search a chunks table for similar embeddings. + * + * Exported for unit testing. */ -function buildChunkSearchSql( +export function buildChunkSearchSql( table: ChunkTableInfo, vectorString: string, limit: number, maxDistance: number | null ): { text: string; values: any[] } { - const schema = table.chunksSchema; - const qualifiedTable = schema - ? `"${schema}"."${table.chunksTableName}"` - : `"${table.chunksTableName}"`; + // Under blueprint pooling (unqualified pg identifiers) the chunk table is a + // tenant-data reference, so emit it bare and let the per-request search_path + // resolve the tenant. Otherwise keep the schema-qualified reference when a + // schema is known (default behavior unchanged). + const qualifiedTable = table.unqualified || !table.chunksSchema + ? `"${table.chunksTableName}"` + : `"${table.chunksSchema}"."${table.chunksTableName}"`; const embeddingCol = `"${table.embeddingField}"`; const contentCol = `"${table.contentField}"`; diff --git a/graphile/graphile-llm/src/types.ts b/graphile/graphile-llm/src/types.ts index 21fafa0dff..ff723eb095 100644 --- a/graphile/graphile-llm/src/types.ts +++ b/graphile/graphile-llm/src/types.ts @@ -184,6 +184,14 @@ export interface ChunkTableInfo { embeddingField: string; /** Text content column on chunks table (the actual chunk text) */ contentField: string; + /** + * When true, reference the chunks table UNQUALIFIED (bare table name) at + * runtime so the per-request search_path resolves the tenant schema. + * Threaded from `build.options.constructiveUnqualified` (blueprint pooling) + * during discovery. Defaults to false/undefined — schema-qualified references + * (existing behavior) are used. + */ + unqualified?: boolean; } // ─── Metering Types ───────────────────────────────────────────────────────── diff --git a/graphile/graphile-presigned-url-plugin/src/storage-module-cache.ts b/graphile/graphile-presigned-url-plugin/src/storage-module-cache.ts index 6ea403fbb5..5cf993558e 100644 --- a/graphile/graphile-presigned-url-plugin/src/storage-module-cache.ts +++ b/graphile/graphile-presigned-url-plugin/src/storage-module-cache.ts @@ -354,12 +354,46 @@ export async function loadAllStorageModules( return configs; } +/** + * Strip a tenant hash prefix from a physical schema name, returning the + * logical schema name. + * + * Hashed multi-tenant schemas are named like + * `marketplace-db-tenant1-5e6b13b2-app-public`, where `-5e6b13b2-` is an + * 8-hex-char tenant hash separating the tenant prefix from the logical schema + * suffix (`app-public`). This removes everything up to and including the first + * such hash segment. If no hash segment is present (e.g. a plain schema like + * `app_public` or a control-plane schema like `metaschema_public`), the name + * is returned unchanged. + * + * (Duplicated locally rather than imported to avoid a cross-package dependency + * for a one-line helper.) + */ +function stripSchemaHashPrefix(name: string): string { + const match = /-[0-9a-f]{8}-/.exec(name); + return match ? name.slice(match.index + match[0].length) : name; +} + /** * Resolve the storage module config from a PostGraphile pgCodec. * * Matches the codec's schema + table name against cached storage modules. * Works for both files codecs (@storageFiles) and buckets codecs (@storageBuckets). * + * Matching is two-tier: + * 1. Exact physical schema match — used for single-tenant / non-pooled + * instances where the codec's build-time schema equals the request + * tenant's actual schema. Keeps existing behavior unchanged. + * 2. Logical schema match — strip the tenant hash prefix from BOTH the + * codec's schema and each config's schema before comparing. Under + * blueprint pooling a single shared instance serves every tenant of a + * given schema-shape, so the codec's build-time `schemaName` belongs to + * the representative tenant while `allConfigs` carry the request tenant's + * actual (differently-hashed) schema. Both share the same logical suffix + * (e.g. `app-public`). Safe because `allConfigs` is already filtered to a + * single databaseId, so (tableName, logicalSchema) uniquely identifies a + * module. + * * @param pgCodec - The PostGraphile codec (has extensions.pg.schemaName, name) * @param allConfigs - All storage module configs for this database * @returns The matching StorageModuleConfig or null @@ -373,10 +407,24 @@ export function resolveStorageConfigFromCodec( if (!schemaName || !tableName) return null; - return allConfigs.find((c) => + // Priority 1: exact physical schema match (non-pooled / single-tenant). + const exact = allConfigs.find((c) => (c.filesTableName === tableName && c.schemaName === schemaName) || (c.bucketsTableName === tableName && c.schemaName === schemaName), - ) || null; + ); + if (exact) return exact; + + // Priority 2: logical schema match (blueprint pooling — the codec's + // build-time schema belongs to the representative tenant; strip the tenant + // hash from both sides so the shared logical suffix lines up). + const logicalSchema = stripSchemaHashPrefix(schemaName); + return allConfigs.find((c) => { + const configLogicalSchema = stripSchemaHashPrefix(c.schemaName); + return ( + (c.filesTableName === tableName && configLogicalSchema === logicalSchema) || + (c.bucketsTableName === tableName && configLogicalSchema === logicalSchema) + ); + }) || null; } // --- Bucket metadata cache --- diff --git a/graphile/graphile-search/src/__tests__/search-config.test.ts b/graphile/graphile-search/src/__tests__/search-config.test.ts index 1ead0f6b6b..a0572734d4 100644 --- a/graphile/graphile-search/src/__tests__/search-config.test.ts +++ b/graphile/graphile-search/src/__tests__/search-config.test.ts @@ -423,3 +423,139 @@ describe('VectorNearbyInput includeChunks field (Phase E)', () => { expect(fields.includeChunks.description).toContain('chunks'); }); }); + +// ─── Blueprint pooling: build.options.constructiveUnqualified ───────────────── +// +// When the pooled preset sets schema.constructiveUnqualified, tenant-data SQL +// must be emitted search_path-relative (no schema prefix) so one shared instance +// can serve every tenant, routed per request via pgSettings search_path. +// Default (flag off / absent) MUST stay byte-identical (fully qualified). + +describe('constructiveUnqualified option (blueprint pooling)', () => { + // Mock sql object that mimics pg-sql2 behavior (same shape as above). + const mockSql = { + identifier: (name: string) => `"${name}"`, + value: (val: any) => `'${val}'`, + literal: (val: any) => `'${val}'`, + raw: (s: string) => s, + fragment: (strings: TemplateStringsArray, ...values: any[]) => { + let result = ''; + strings.forEach((str, i) => { + result += str; + if (i < values.length) result += String(values[i]); + }); + return result; + }, + join: (parts: any[], sep: string) => parts.join(sep), + parens: (expr: any) => `(${expr})`, + }; + const sql = Object.assign( + (strings: TemplateStringsArray, ...values: any[]) => { + let result = ''; + strings.forEach((str: string, i: number) => { + result += str; + if (i < values.length) result += String(values[i]); + }); + return result; + }, + mockSql, + ); + + const unqualifiedBuild = { options: { constructiveUnqualified: true } }; + + // ── bm25: index name passed to to_bm25query ── + describe('bm25 adapter — index name', () => { + const adapter = createBm25Adapter(); + const bm25Column = { + attributeName: 'body', + adapterData: { + bm25Index: { + schemaName: 'app_private', + tableName: 'documents', + columnName: 'body', + indexName: 'documents_body_bm25_idx', + }, + }, + }; + + it('emits the schema-qualified index name by default (flag off)', () => { + const result = adapter.buildFilterApply( + sql, + 'tbl' as any, + bm25Column, + { query: 'hello' }, + {}, + ); + + expect(result).not.toBeNull(); + const scoreStr = String(result!.scoreExpression); + // Default behavior: fully qualified "schema"."index" + expect(scoreStr).toContain('"app_private"."documents_body_bm25_idx"'); + }); + + it('emits the unqualified index name when constructiveUnqualified is set', () => { + const result = adapter.buildFilterApply( + sql, + 'tbl' as any, + bm25Column, + { query: 'hello' }, + unqualifiedBuild, + ); + + expect(result).not.toBeNull(); + const scoreStr = String(result!.scoreExpression); + // Pooled: index name resolves via search_path — no schema prefix + expect(scoreStr).toContain('"documents_body_bm25_idx"'); + expect(scoreStr).not.toContain('app_private'); + }); + }); + + // ── pgvector: chunk table reference ── + describe('pgvector adapter — chunk table reference', () => { + const adapter = createPgvectorAdapter(); + const chunkColumn = { + attributeName: 'embedding', + adapterData: { + chunksInfo: { + chunksSchema: 'app_private', + chunksTableName: 'doc_chunks', + parentFkField: 'document_id', + parentPkField: 'row_id', + embeddingField: 'vec', + }, + }, + }; + + it('emits the schema-qualified chunk table by default (flag off)', () => { + const result = adapter.buildFilterApply( + sql, + 'tbl' as any, + chunkColumn, + { vector: [1, 0, 0], metric: 'COSINE' }, + {}, + ); + + expect(result).not.toBeNull(); + const scoreStr = String(result!.scoreExpression); + expect(scoreStr).toContain('"app_private"."doc_chunks"'); + expect(scoreStr).toContain('LEAST'); + }); + + it('emits the unqualified chunk table when constructiveUnqualified is set', () => { + const result = adapter.buildFilterApply( + sql, + 'tbl' as any, + chunkColumn, + { vector: [1, 0, 0], metric: 'COSINE' }, + unqualifiedBuild, + ); + + expect(result).not.toBeNull(); + const scoreStr = String(result!.scoreExpression); + // Pooled: chunk table resolves via search_path — no schema prefix + expect(scoreStr).toContain('"doc_chunks"'); + expect(scoreStr).not.toContain('app_private'); + expect(scoreStr).toContain('LEAST'); + }); + }); +}); diff --git a/graphile/graphile-search/src/adapters/bm25.ts b/graphile/graphile-search/src/adapters/bm25.ts index 26ea207f38..11907b64ed 100644 --- a/graphile/graphile-search/src/adapters/bm25.ts +++ b/graphile/graphile-search/src/adapters/bm25.ts @@ -169,26 +169,37 @@ export function createBm25Adapter( alias: SQL, column: SearchableColumn, filterValue: any, - _build: any, + build: any, ): FilterApplyResult | null { if (filterValue == null) return null; const { query, threshold, includeChunks } = filterValue; if (!query || typeof query !== 'string' || query.trim().length === 0) return null; + // When the instance is built for blueprint pooling + // (schema.constructiveUnqualified), emit search_path-relative references for + // tenant-data tables/indexes so the per-request search_path resolves the + // tenant schema. Default (flag absent): fully schema-qualified, byte-identical. + const constructiveUnqualified = !!((build as any)?.options?.constructiveUnqualified); + const columnData = column.adapterData as Bm25ColumnData; const bm25Index = columnData.bm25Index; const columnExpr = sql`${alias}.${sql.identifier(column.attributeName)}`; - // Use quoteQualifiedIdentifier to produce the qualified index name - const qualifiedIndexName = `"${bm25Index.schemaName}"."${bm25Index.indexName}"`; - const bm25queryExpr = sql`to_bm25query(${sql.value(query)}, ${sql.value(qualifiedIndexName)})`; + // Index names resolve via search_path just like tables, so under blueprint + // pooling we hand to_bm25query the UNQUALIFIED index name. The store lookup + // (getBm25IndexForAttribute) still keys off the build-time codec schema — + // correct, because the store was populated by the same representative gather. + const indexNameArg = constructiveUnqualified + ? `"${bm25Index.indexName}"` + : `"${bm25Index.schemaName}"."${bm25Index.indexName}"`; + const bm25queryExpr = sql`to_bm25query(${sql.value(query)}, ${sql.value(indexNameArg)})`; const scoreExpr = sql`(${columnExpr} <@> ${bm25queryExpr})`; // Check for chunk-aware querying const chunksInfo = columnData.chunksInfo; if (chunksInfo && chunksInfo.searchIndexes.includes('bm25') && (includeChunks !== false)) { - const chunksTableRef = chunksInfo.chunksSchema + const chunksTableRef = (chunksInfo.chunksSchema && !constructiveUnqualified) ? sql`${sql.identifier(chunksInfo.chunksSchema)}.${sql.identifier(chunksInfo.chunksTableName)}` : sql`${sql.identifier(chunksInfo.chunksTableName)}`; const parentFk = sql.identifier(chunksInfo.parentFkField); @@ -199,7 +210,9 @@ export function createBm25Adapter( // BM25 on chunks requires an index name on the chunks table. // We construct it from the chunks table schema + a conventional index name. // The BM25 index on chunks is named: {chunks_table}_{content_field}_bm25_idx - const chunksIndexName = `"${chunksInfo.chunksSchema || bm25Index.schemaName}"."${chunksInfo.chunksTableName}_${chunksInfo.contentField}_bm25_idx"`; + const chunksIndexName = constructiveUnqualified + ? `"${chunksInfo.chunksTableName}_${chunksInfo.contentField}_bm25_idx"` + : `"${chunksInfo.chunksSchema || bm25Index.schemaName}"."${chunksInfo.chunksTableName}_${chunksInfo.contentField}_bm25_idx"`; const chunkBm25queryExpr = sql`to_bm25query(${sql.value(query)}, ${sql.value(chunksIndexName)})`; const chunkScoreExpr = sql`(${chunksAlias}.${chunkContentField} <@> ${chunkBm25queryExpr})`; diff --git a/graphile/graphile-search/src/adapters/pgvector.ts b/graphile/graphile-search/src/adapters/pgvector.ts index b36ac10fc4..6c320c5e8d 100644 --- a/graphile/graphile-search/src/adapters/pgvector.ts +++ b/graphile/graphile-search/src/adapters/pgvector.ts @@ -187,13 +187,19 @@ export function createPgvectorAdapter( alias: SQL, column: SearchableColumn, filterValue: any, - _build: any, + build: any, ): FilterApplyResult | null { if (filterValue == null) return null; const { vector, metric, distance, includeChunks } = filterValue; if (!vector || !Array.isArray(vector) || vector.length === 0) return null; + // When the instance is built for blueprint pooling + // (schema.constructiveUnqualified), emit search_path-relative references for + // tenant-data tables so the per-request search_path resolves the tenant + // schema. Default (flag absent): fully schema-qualified, byte-identical. + const constructiveUnqualified = !!((build as any)?.options?.constructiveUnqualified); + const resolvedMetric = metric || defaultMetric; const vectorString = `[${vector.join(',')}]`; const vectorExpr = sql`${sql.value(vectorString)}::vector`; @@ -205,7 +211,7 @@ export function createPgvectorAdapter( if (chunksInfo && (includeChunks !== false)) { // Chunk-aware query: find the closest chunk for each parent row // Uses a lateral subquery to get the minimum distance across all chunks - const chunksTableRef = chunksInfo.chunksSchema + const chunksTableRef = (chunksInfo.chunksSchema && !constructiveUnqualified) ? sql`${sql.identifier(chunksInfo.chunksSchema)}.${sql.identifier(chunksInfo.chunksTableName)}` : sql`${sql.identifier(chunksInfo.chunksTableName)}`; const parentFk = sql.identifier(chunksInfo.parentFkField); diff --git a/graphile/graphile-search/src/adapters/trgm.ts b/graphile/graphile-search/src/adapters/trgm.ts index 67cef51501..dcdb9f5393 100644 --- a/graphile/graphile-search/src/adapters/trgm.ts +++ b/graphile/graphile-search/src/adapters/trgm.ts @@ -144,13 +144,19 @@ export function createTrgmAdapter( alias: SQL, column: SearchableColumn, filterValue: any, - _build: any, + build: any, ): FilterApplyResult | null { if (filterValue == null) return null; const { value, threshold, includeChunks } = filterValue; if (!value || typeof value !== 'string' || value.trim().length === 0) return null; + // When the instance is built for blueprint pooling + // (schema.constructiveUnqualified), emit search_path-relative references for + // tenant-data tables so the per-request search_path resolves the tenant + // schema. Default (flag absent): fully schema-qualified, byte-identical. + const constructiveUnqualified = !!((build as any)?.options?.constructiveUnqualified); + const th = threshold != null ? threshold : defaultThreshold; const columnExpr = sql`${alias}.${sql.identifier(column.attributeName)}`; const similarityExpr = sql`similarity(${columnExpr}, ${sql.value(value)})`; @@ -158,7 +164,7 @@ export function createTrgmAdapter( // Check for chunk-aware querying const chunksInfo = column.adapterData as ChunksInfo | undefined; if (chunksInfo && chunksInfo.searchIndexes.includes('trigram') && (includeChunks !== false)) { - const chunksTableRef = chunksInfo.chunksSchema + const chunksTableRef = (chunksInfo.chunksSchema && !constructiveUnqualified) ? sql`${sql.identifier(chunksInfo.chunksSchema)}.${sql.identifier(chunksInfo.chunksTableName)}` : sql`${sql.identifier(chunksInfo.chunksTableName)}`; const parentFk = sql.identifier(chunksInfo.parentFkField); diff --git a/graphile/graphile-search/src/adapters/tsvector.ts b/graphile/graphile-search/src/adapters/tsvector.ts index d9a2ab8c17..78901be37f 100644 --- a/graphile/graphile-search/src/adapters/tsvector.ts +++ b/graphile/graphile-search/src/adapters/tsvector.ts @@ -96,10 +96,16 @@ export function createTsvectorAdapter( alias: SQL, column: SearchableColumn, filterValue: any, - _build: any, + build: any, ): FilterApplyResult | null { if (filterValue == null) return null; + // When the instance is built for blueprint pooling + // (schema.constructiveUnqualified), emit search_path-relative references for + // tenant-data tables so the per-request search_path resolves the tenant + // schema. Default (flag absent): fully schema-qualified, byte-identical. + const constructiveUnqualified = !!((build as any)?.options?.constructiveUnqualified); + // Handle includeChunks option when filter is an object let val: string; let includeChunks: boolean | undefined; @@ -117,7 +123,7 @@ export function createTsvectorAdapter( // Check for chunk-aware querying const chunksInfo = column.adapterData as ChunksInfo | undefined; if (chunksInfo && chunksInfo.searchField && (includeChunks !== false)) { - const chunksTableRef = chunksInfo.chunksSchema + const chunksTableRef = (chunksInfo.chunksSchema && !constructiveUnqualified) ? sql`${sql.identifier(chunksInfo.chunksSchema)}.${sql.identifier(chunksInfo.chunksTableName)}` : sql`${sql.identifier(chunksInfo.chunksTableName)}`; const parentFk = sql.identifier(chunksInfo.parentFkField); diff --git a/graphile/graphile-search/src/codecs/bm25-codec.ts b/graphile/graphile-search/src/codecs/bm25-codec.ts index 117ca88340..6c5767b2b6 100644 --- a/graphile/graphile-search/src/codecs/bm25-codec.ts +++ b/graphile/graphile-search/src/codecs/bm25-codec.ts @@ -36,6 +36,15 @@ export interface Bm25IndexInfo { * * Key: "schemaName.tableName.columnName" * Value: Bm25IndexInfo + * + * NOTE (blueprint pooling): this store is process-global and is cleared + + * repopulated per introspection run (see pgIntrospection_introspection). + * Concurrent gathers are serialized by the server's build semaphore, so a + * single global store is tolerated for now. Under blueprint pooling the store + * is keyed by the representative tenant's schema; the BM25 adapter reads the + * index NAME from these entries and, when schema.constructiveUnqualified is set, + * passes it unqualified to to_bm25query so the per-request search_path resolves + * the correct tenant index at query time. */ export const bm25IndexStore = new Map(); diff --git a/graphile/graphile-settings/src/plugins/meta-schema/cache.ts b/graphile/graphile-settings/src/plugins/meta-schema/cache.ts index 3e9f909b7a..916d6712b0 100644 --- a/graphile/graphile-settings/src/plugins/meta-schema/cache.ts +++ b/graphile/graphile-settings/src/plugins/meta-schema/cache.ts @@ -1,5 +1,41 @@ import type { TableMeta } from './types'; +/** + * Per-build table metadata, keyed by the PostGraphile `build` object. + * + * The server constructs multiple PostGraphile schemas concurrently in a single + * process, so the `init` and `GraphQLObjectType_fields` hooks of different + * builds interleave. Keying the collected metadata on the build object (instead + * of a single shared array) guarantees each schema's `_meta` field serves its + * OWN tables and never bleeds another concurrent build's data. + * + * A WeakMap is used (rather than a property on the build) because graphile-build + * freezes the build object before the `init` hook runs, so it cannot be mutated + * with an own property. The WeakMap also lets entries be garbage-collected once + * a build is discarded. + */ +const tablesMetaByBuild = new WeakMap(); + +export function setTablesMetaForBuild(build: object, tablesMeta: TableMeta[]): void { + tablesMetaByBuild.set(build, tablesMeta); +} + +export function getTablesMetaForBuild(build: object): TableMeta[] { + return tablesMetaByBuild.get(build) ?? []; +} + +/** + * Flat "last build wins" mirror of the most recently collected table metadata. + * + * Retained ONLY for out-of-process codegen consumers — graphile-schema's + * `buildIntrospectionJSON` and graphql/codegen's `DatabaseSchemaSource` — which + * read this (re-exported as `_cachedTablesMeta`) as a side-effect AFTER a single + * `buildSchemaSDL()` call and have no reference to the `build` object. Those + * paths build one schema at a time, so last-write-wins is safe for them. + * + * Do NOT read this from the concurrent schema-serving path (the `_meta` field): + * use getTablesMetaForBuild(build) instead, or concurrent builds will bleed. + */ export let cachedTablesMeta: TableMeta[] = []; export function getCachedTablesMeta(): TableMeta[] { diff --git a/graphile/graphile-settings/src/plugins/meta-schema/plugin.ts b/graphile/graphile-settings/src/plugins/meta-schema/plugin.ts index 2144c323fc..11d66ed0e7 100644 --- a/graphile/graphile-settings/src/plugins/meta-schema/plugin.ts +++ b/graphile/graphile-settings/src/plugins/meta-schema/plugin.ts @@ -1,5 +1,9 @@ import type { GraphileConfig } from 'graphile-config'; -import { getCachedTablesMeta, setCachedTablesMeta } from './cache'; +import { + getTablesMetaForBuild, + setCachedTablesMeta, + setTablesMetaForBuild, +} from './cache'; import { extendQueryWithMetaField } from './graphql-meta-field'; import { collectTablesMeta } from './table-meta-builder'; import type { MetaBuild } from './types'; @@ -18,16 +22,21 @@ export const MetaSchemaPlugin: GraphileConfig.Plugin = { hooks: { init(input, rawBuild) { const build = rawBuild as unknown as MetaBuild; - setCachedTablesMeta(collectTablesMeta(build)); + const tablesMeta = collectTablesMeta(build); + // Keyed by this build so the _meta field resolver reads its own tables, + // even when other builds run concurrently in the same process. + setTablesMetaForBuild(rawBuild, tablesMeta); + // Flat mirror for out-of-process codegen consumers (see cache.ts). + setCachedTablesMeta(tablesMeta); return input; }, - GraphQLObjectType_fields(rawFields, _rawBuild, rawContext) { + GraphQLObjectType_fields(rawFields, rawBuild, rawContext) { const context = rawContext as unknown as QueryTypeContext; if (context.Self?.name !== 'Query') return rawFields; return extendQueryWithMetaField( rawFields as unknown as Record, - getCachedTablesMeta(), + getTablesMetaForBuild(rawBuild), ) as typeof rawFields; }, }, diff --git a/graphile/graphile-settings/src/plugins/meta-schema/table-meta-builder.ts b/graphile/graphile-settings/src/plugins/meta-schema/table-meta-builder.ts index 0b98fccb26..455dfff810 100644 --- a/graphile/graphile-settings/src/plugins/meta-schema/table-meta-builder.ts +++ b/graphile/graphile-settings/src/plugins/meta-schema/table-meta-builder.ts @@ -115,11 +115,27 @@ function buildTableMeta( }; } +/** + * Strip the tenant hash prefix (`-<8hex>-`) from a physical + * schema name, returning the logical name; names without a hash segment are + * returned unchanged. Local duplicate of the server-side helper to avoid a + * cross-package dependency. + */ +function stripSchemaHashPrefix(name: string): string { + const match = /-[0-9a-f]{8}-/.exec(name); + if (!match) return name; + return name.slice(match.index + match[0].length); +} + export function collectTablesMeta(build: MetaBuild): TableMeta[] { const configuredSchemas = getConfiguredSchemas(build); const context = createBuildContext(build); const seenCodecs = new Set(); const tablesMeta: TableMeta[] = []; + // Shared (pooled) instances serve many tenants: reporting the build-time + // PHYSICAL schema name would leak the representative tenant's hashed schema + // identifier to every other tenant via _meta — report the logical name. + const unqualified = !!(build.options as any)?.constructiveUnqualified; for (const resource of Object.values(build.input.pgRegistry.pgResources || {})) { if (!isTableResource(resource)) continue; @@ -137,7 +153,9 @@ export function collectTablesMeta(build: MetaBuild): TableMeta[] { continue; } - tablesMeta.push(buildTableMeta(resource, schemaName, context)); + tablesMeta.push( + buildTableMeta(resource, unqualified ? stripSchemaHashPrefix(schemaName) : schemaName, context) + ); } return tablesMeta; diff --git a/graphql/server/src/middleware/__tests__/blueprint.test.ts b/graphql/server/src/middleware/__tests__/blueprint.test.ts new file mode 100644 index 0000000000..bdfd5882c8 --- /dev/null +++ b/graphql/server/src/middleware/__tests__/blueprint.test.ts @@ -0,0 +1,164 @@ +import { + computeBlueprintKey, + quoteSearchPath, + stripSchemaHashPrefix +} from '../blueprint'; + +describe('stripSchemaHashPrefix', () => { + it('strips a hashed tenant prefix down to the logical schema', () => { + expect(stripSchemaHashPrefix('marketplace-db-tenant1-5e6b13b2-app-public')).toBe( + 'app-public' + ); + }); + + it('returns control-plane / non-hashed schema names unchanged', () => { + expect(stripSchemaHashPrefix('services_public')).toBe('services_public'); + expect(stripSchemaHashPrefix('metaschema_public')).toBe('metaschema_public'); + expect(stripSchemaHashPrefix('app-public')).toBe('app-public'); + }); + + it('handles multi-dash database names in the prefix', () => { + expect(stripSchemaHashPrefix('my-cool-db-name-deadbeef-auth-private')).toBe( + 'auth-private' + ); + expect(stripSchemaHashPrefix('app-5e6b13b2-public')).toBe('public'); + }); + + it('only strips through the FIRST 8-hex segment, keeping later ones intact', () => { + expect( + stripSchemaHashPrefix('marketplace-db-tenant1-5e6b13b2-app-deadbeef-zone') + ).toBe('app-deadbeef-zone'); + }); + + it('does not treat non-hex or wrong-length segments as a hash', () => { + // 'tenant11' is 8 chars but not all hex; 'abcdef1' is 7 hex chars. + expect(stripSchemaHashPrefix('marketplace-tenant11-app-public')).toBe( + 'marketplace-tenant11-app-public' + ); + expect(stripSchemaHashPrefix('marketplace-abcdef1-app-public')).toBe( + 'marketplace-abcdef1-app-public' + ); + }); +}); + +describe('quoteSearchPath', () => { + it('wraps each name in double quotes and comma-joins', () => { + expect(quoteSearchPath(['a-b-c', 'a-b-d'])).toBe('"a-b-c", "a-b-d"'); + }); + + it('quotes a single element without a trailing comma', () => { + expect(quoteSearchPath(['app-public'])).toBe('"app-public"'); + }); + + it('escapes embedded double quotes by doubling them', () => { + expect(quoteSearchPath(['weird"name', 'x'])).toBe('"weird""name", "x"'); + }); + + it('returns an empty string for an empty list', () => { + expect(quoteSearchPath([])).toBe(''); + }); +}); + +describe('computeBlueprintKey', () => { + const base = { + logicalSchemas: ['app-public', 'auth-public'], + shapeFingerprint: 'fingerprint-a', + flags: { a: 1, b: 2 } as Record, + apiName: 'customer-api', + mode: 'domain-lookup' + }; + + it('produces a stable "bp:"-prefixed sha256 hex key', () => { + const key = computeBlueprintKey(base); + expect(key).toMatch(/^bp:[0-9a-f]{64}$/); + expect(computeBlueprintKey(base)).toBe(key); + }); + + it('is stable across the key order of flags', () => { + const key1 = computeBlueprintKey({ ...base, flags: { a: 1, b: 2 } }); + const key2 = computeBlueprintKey({ ...base, flags: { b: 2, a: 1 } }); + expect(key1).toBe(key2); + }); + + it('is stable across the order of logical schemas', () => { + const key1 = computeBlueprintKey({ + ...base, + logicalSchemas: ['app-public', 'auth-public'] + }); + const key2 = computeBlueprintKey({ + ...base, + logicalSchemas: ['auth-public', 'app-public'] + }); + expect(key1).toBe(key2); + }); + + it('differs when the shape fingerprint differs', () => { + const key1 = computeBlueprintKey({ ...base, shapeFingerprint: 'fingerprint-a' }); + const key2 = computeBlueprintKey({ ...base, shapeFingerprint: 'fingerprint-b' }); + expect(key1).not.toBe(key2); + }); + + it('differs when flag values differ', () => { + const key1 = computeBlueprintKey({ ...base, flags: { a: 1, b: 2 } }); + const key2 = computeBlueprintKey({ ...base, flags: { a: 1, b: 3 } }); + expect(key1).not.toBe(key2); + }); + + it('treats undefined flags like an empty flag set', () => { + const key1 = computeBlueprintKey({ ...base, flags: undefined }); + const key2 = computeBlueprintKey({ ...base, flags: {} }); + expect(key1).toBe(key2); + }); + + it('differs when the mode or api name differs', () => { + expect(computeBlueprintKey({ ...base, mode: 'domain-lookup' })).not.toBe( + computeBlueprintKey({ ...base, mode: 'api-name-header' }) + ); + expect(computeBlueprintKey({ ...base, apiName: 'customer-api' })).not.toBe( + computeBlueprintKey({ ...base, apiName: 'other-api' }) + ); + }); + + it('treats null and empty api name identically', () => { + expect(computeBlueprintKey({ ...base, apiName: null })).toBe( + computeBlueprintKey({ ...base, apiName: '' }) + ); + }); +}); + +describe('tenantSearchPath (W3 fix: keep shared public on the path)', () => { + const { tenantSearchPath } = require('../blueprint'); + + it('appends public after the tenant schemas', () => { + expect(tenantSearchPath(['t-5e6b13b2-app-public', 't-5e6b13b2-public'])).toBe( + '"t-5e6b13b2-app-public", "t-5e6b13b2-public", "public"' + ); + }); + + it('does not duplicate an explicit public entry and keeps it last', () => { + expect(tenantSearchPath(['public', 'services_public'])).toBe('"services_public", "public"'); + }); +}); + +describe('computeBlueprintKey dbname isolation (W3 fix)', () => { + const { computeBlueprintKey } = require('../blueprint'); + const base = { + logicalSchemas: ['app-public'], + shapeFingerprint: 'f'.repeat(64), + flags: undefined as Record | undefined, + apiName: 'api', + mode: 'public' + }; + + it('same-shape tenants in DIFFERENT physical databases get DIFFERENT keys', () => { + const a = computeBlueprintKey({ ...base, dbname: 'db_a' }); + const b = computeBlueprintKey({ ...base, dbname: 'db_b' }); + expect(a).not.toBe(b); + }); + + it('same dbname yields a stable key', () => { + expect(computeBlueprintKey({ ...base, dbname: 'db_a' })).toBe( + computeBlueprintKey({ ...base, dbname: 'db_a' }) + ); + }); +}); diff --git a/graphql/server/src/middleware/__tests__/graphile-pooling.test.ts b/graphql/server/src/middleware/__tests__/graphile-pooling.test.ts new file mode 100644 index 0000000000..a9dd182e03 --- /dev/null +++ b/graphql/server/src/middleware/__tests__/graphile-pooling.test.ts @@ -0,0 +1,223 @@ +import type { Pool } from 'pg'; + +import type { ApiStructure } from '../../types'; +import { isBlueprintPoolingEnabled } from '../blueprint'; +import { clearPoolDecisions, computePoolDecision, resolvePoolDecision } from '../pooling-decision'; + +type QueryResult = { rows: Array> }; + +const makeApi = (overrides: Partial = {}): ApiStructure => + ({ + dbname: 'constructive', + anonRole: 'anon', + roleName: 'authenticated', + schema: ['marketplace-db-tenant1-5e6b13b2-app-public'], + apiModules: [], + ...overrides + } as ApiStructure); + +// A mock Pool whose query() resolves queued results in call order. computePoolDecision +// issues the shape-fingerprint query first, then the collision-probe query. +const makePool = (results: QueryResult[]): { pool: Pool; query: jest.Mock } => { + const query = jest.fn(); + results.forEach((r) => query.mockResolvedValueOnce(r)); + return { pool: { query } as unknown as Pool, query }; +}; + +describe('computePoolDecision', () => { + afterEach(() => { + clearPoolDecisions(); + jest.clearAllMocks(); + }); + + it('falls back (pooling=false, key=svc_key) when realtime is enabled, without touching the DB', async () => { + const { pool, query } = makePool([]); + const api = makeApi({ databaseSettings: { enableRealtime: true } as ApiStructure['databaseSettings'] }); + + const decision = await computePoolDecision('svc-1', api, pool); + + expect(decision).toEqual({ key: 'svc-1', pooling: false }); + expect(query).not.toHaveBeenCalled(); + }); + + it('falls back when the API exposes no schemas, without touching the DB', async () => { + const { pool, query } = makePool([]); + const api = makeApi({ schema: [] }); + + const decision = await computePoolDecision('svc-2', api, pool); + + expect(decision).toEqual({ key: 'svc-2', pooling: false }); + expect(query).not.toHaveBeenCalled(); + }); + + it('pools a clean single-tenant shape and returns a stable bp: key', async () => { + const rows = [ + { nspname: 'marketplace-db-tenant1-5e6b13b2-app-public', relname: 'products' }, + { nspname: 'marketplace-db-tenant1-5e6b13b2-app-public', relname: 'orders' } + ]; + const { pool, query } = makePool([{ rows }, { rows: [] }]); + const api = makeApi(); + + const decision = await computePoolDecision('svc-3', api, pool); + + expect(decision.pooling).toBe(true); + expect(decision.key).toMatch(/^bp:[0-9a-f]{64}$/); + expect(query).toHaveBeenCalledTimes(1); // single catalog scan feeds fingerprint + collisions + }); + + it('produces the SAME bp: key for two different physical tenants of the same shape', async () => { + const shapeRows = (hash: string) => [ + { nspname: `marketplace-db-tenant-${hash}-app-public`, relname: 'orders' }, + { nspname: `marketplace-db-tenant-${hash}-app-public`, relname: 'products' } + ]; + + const t1 = makePool([{ rows: shapeRows('5e6b13b2') }, { rows: [] }]); + const d1 = await computePoolDecision( + 'svc-t1', + makeApi({ schema: ['marketplace-db-tenant-5e6b13b2-app-public'] }), + t1.pool + ); + + const t2 = makePool([{ rows: shapeRows('deadbeef') }, { rows: [] }]); + const d2 = await computePoolDecision( + 'svc-t2', + makeApi({ schema: ['marketplace-db-tenant-deadbeef-app-public'] }), + t2.pool + ); + + expect(d1.pooling).toBe(true); + expect(d2.pooling).toBe(true); + expect(d1.key).toBe(d2.key); // same logical shape → shared blueprint instance + }); + + it('threads api.logicalSchemas into the key, deriving it from physical schema only when absent', async () => { + // Identical fingerprint rows across all three calls: only logicalSchemas varies, + // so any key difference is attributable solely to the logicalSchemas input. + const fpRows = [{ nspname: 'shop-5e6b13b2-app-public', relname: 'orders' }]; + + // Explicit logicalSchemas that DIFFER from the stripped physical name. + const a = makePool([{ rows: fpRows }, { rows: [] }]); + const dExplicitDiff = await computePoolDecision( + 'svc-a', + makeApi({ schema: ['shop-5e6b13b2-app-public'], logicalSchemas: ['custom-logical'] }), + a.pool + ); + + // No logicalSchemas → derived from schema.map(stripSchemaHashPrefix) = ['app-public']. + const b = makePool([{ rows: fpRows }, { rows: [] }]); + const dDerived = await computePoolDecision( + 'svc-b', + makeApi({ schema: ['shop-5e6b13b2-app-public'], logicalSchemas: undefined }), + b.pool + ); + + // Explicit logicalSchemas EQUAL to the derived value. + const c = makePool([{ rows: fpRows }, { rows: [] }]); + const dExplicitSame = await computePoolDecision( + 'svc-c', + makeApi({ schema: ['shop-5e6b13b2-app-public'], logicalSchemas: ['app-public'] }), + c.pool + ); + + expect(dExplicitDiff.key).not.toBe(dDerived.key); // provided logicalSchemas is honored + expect(dExplicitSame.key).toBe(dDerived.key); // ?? fallback derivation matches + }); + + it('falls back when an unqualified relation-name collision exists across schemas', async () => { + const rows = [ + { nspname: 'shop-5e6b13b2-auth-public', relname: 'identity_providers' }, + { nspname: 'shop-5e6b13b2-auth-private', relname: 'identity_providers' } + ]; + const { pool } = makePool([{ rows }, { rows: [{ relname: 'identity_providers' }] }]); + const api = makeApi({ schema: ['shop-5e6b13b2-auth-public', 'shop-5e6b13b2-auth-private'] }); + + const decision = await computePoolDecision('svc-4', api, pool); + + expect(decision).toEqual({ key: 'svc-4', pooling: false }); + }); + + it('falls back (never throws) when a catalog probe rejects', async () => { + const query = jest.fn().mockRejectedValue(new Error('connection reset')); + const api = makeApi(); + + const decision = await computePoolDecision('svc-5', api, { query } as unknown as Pool); + + expect(decision).toEqual({ key: 'svc-5', pooling: false, transient: true }); + }); +}); + +describe('resolvePoolDecision caching', () => { + afterEach(() => { + clearPoolDecisions(); + jest.clearAllMocks(); + }); + + it('memoizes the decision per svc_key (one probe pair) until cleared', async () => { + const { pool, query } = makePool([ + { rows: [{ nspname: 'shop-5e6b13b2-app-public', relname: 'orders' }] }, + { rows: [] } + ]); + const api = makeApi({ schema: ['shop-5e6b13b2-app-public'] }); + + const first = await resolvePoolDecision('svc-cache', api, pool); + const second = await resolvePoolDecision('svc-cache', api, pool); + + expect(first).toBe(second); // same cached object identity + expect(first.pooling).toBe(true); + expect(query).toHaveBeenCalledTimes(1); // NOT 2 — second call served from cache + + // clearPoolDecisions() forces the next request to re-probe. + clearPoolDecisions(); + query.mockResolvedValueOnce({ rows: [{ nspname: 'shop-5e6b13b2-app-public', relname: 'orders' }] }); + await resolvePoolDecision('svc-cache', api, pool); + expect(query).toHaveBeenCalledTimes(2); + }); +}); + +describe('blueprint pooling flag gate (flag-off ⇒ key stays svc_key in the dispatcher)', () => { + const original = process.env.GRAPHILE_BLUEPRINT_POOLING; + + afterEach(() => { + if (original === undefined) delete process.env.GRAPHILE_BLUEPRINT_POOLING; + else process.env.GRAPHILE_BLUEPRINT_POOLING = original; + }); + + it('is disabled by default — the dispatcher never runs a decision and uses req.svc_key', () => { + delete process.env.GRAPHILE_BLUEPRINT_POOLING; + expect(isBlueprintPoolingEnabled()).toBe(false); + }); + + it("enables only on '1' or 'true'", () => { + process.env.GRAPHILE_BLUEPRINT_POOLING = '1'; + expect(isBlueprintPoolingEnabled()).toBe(true); + process.env.GRAPHILE_BLUEPRINT_POOLING = 'true'; + expect(isBlueprintPoolingEnabled()).toBe(true); + process.env.GRAPHILE_BLUEPRINT_POOLING = 'yes'; + expect(isBlueprintPoolingEnabled()).toBe(false); + }); +}); + +describe('transient probe failures are not memoized (W3 fix)', () => { + afterEach(() => { + clearPoolDecisions(); + jest.clearAllMocks(); + }); + + it('re-probes on the next request after a thrown catalog probe', async () => { + const query = jest.fn(); + query.mockRejectedValueOnce(new Error('connection reset')); + query.mockResolvedValueOnce({ + rows: [{ nspname: 'shop-5e6b13b2-app-public', relname: 'orders' }] + }); + const pool = { query } as unknown as Pool; + const api = makeApi({ schema: ['shop-5e6b13b2-app-public'] }); + + const first = await resolvePoolDecision('svc-transient', api, pool); + expect(first.pooling).toBe(false); + expect(first.transient).toBe(true); + + const second = await resolvePoolDecision('svc-transient', api, pool); + expect(second.pooling).toBe(true); // re-probed and now poolable + expect(query).toHaveBeenCalledTimes(2); + }); +}); diff --git a/graphql/server/src/middleware/api.ts b/graphql/server/src/middleware/api.ts index 005be581fd..cd20402413 100644 --- a/graphql/server/src/middleware/api.ts +++ b/graphql/server/src/middleware/api.ts @@ -14,6 +14,7 @@ import { getPgPool } from 'pg-cache'; import errorPage50x from '../errors/50x'; import errorPage404Message from '../errors/404-message'; import { ApiConfigResult, ApiError, ApiOptions, ApiStructure, AuthSettings, DatabaseSettings, PubkeyChallengeSettings, RlsModule, WebauthnSettings } from '../types'; +import { stripSchemaHashPrefix } from './blueprint'; import './types'; const log = new Logger('api'); @@ -266,6 +267,7 @@ const toApiStructure = (row: ApiRow, opts: ApiOptions, settings: ResolvedModuleS anonRole: row.anon_role || 'anon', roleName: row.role_name || 'authenticated', schema: row.schemas || [], + logicalSchemas: (row.schemas || []).map(stripSchemaHashPrefix), apiModules: [], rlsModule: settings.rlsModule, domains: [], diff --git a/graphql/server/src/middleware/blueprint.ts b/graphql/server/src/middleware/blueprint.ts new file mode 100644 index 0000000000..3f1c02ef70 --- /dev/null +++ b/graphql/server/src/middleware/blueprint.ts @@ -0,0 +1,182 @@ +import { createHash } from 'node:crypto'; +import type { Pool } from 'pg'; + +// ============================================================================= +// Blueprint pooling identity helpers +// +// OPT-IN "blueprint pooling" shares one PostGraphile instance per schema-shape. +// Tenants are routed per request via a search_path pgSetting. These helpers +// derive the identity of a shared instance (shape fingerprint + blueprint key) +// and provide the search_path plumbing needed to route unqualified SQL. +// ============================================================================= + +/** + * Whether blueprint pooling is enabled via the GRAPHILE_BLUEPRINT_POOLING env + * flag. Accepts '1' or 'true'; anything else (incl. unset) means disabled. + */ +export const isBlueprintPoolingEnabled = (): boolean => { + const value = process.env.GRAPHILE_BLUEPRINT_POOLING; + return value === '1' || value === 'true'; +}; + +/** + * Strip the tenant hash prefix from a physical schema name. + * + * Tenant physical schemas look like `-<8hex>-`, e.g. + * `marketplace-db-tenant1-5e6b13b2-app-public` -> `app-public`. We strip through + * the FIRST `-<8 lowercase hex>-` occurrence and return the remainder. Control + * plane schemas (e.g. `services_public`) contain no such segment and are + * returned unchanged. + */ +export const stripSchemaHashPrefix = (name: string): string => { + const match = /-[0-9a-f]{8}-/.exec(name); + if (!match) return name; + return name.slice(match.index + match[0].length); +}; + +/** + * Render a list of schema names as a Postgres `search_path` value: each name is + * wrapped in double quotes (embedded double quotes doubled) and comma-joined, + * e.g. `["a-b-c", "a-b-d"]` -> `"a-b-c", "a-b-d"`. + */ +export const quoteSearchPath = (schemas: string[]): string => + schemas.map((name) => `"${name.replace(/"/g, '""')}"`).join(', '); + +/** + * Render the per-request search_path for a pooled instance: the tenant's + * physical schemas FIRST (so unqualified tenant relations resolve to the + * tenant), then `public` LAST. `public` must stay on the path because shared + * objects live there (e.g. the `email` domain used by SECURITY DEFINER auth + * functions without their own `SET search_path`); dropping it breaks sign_in + * with "type email does not exist". + */ +export const tenantSearchPath = (schemas: string[]): string => + quoteSearchPath([...schemas.filter((s) => s !== 'public'), 'public']); + +export interface SchemaRelation { + nspname: string; + relname: string; +} + +/** + * Single catalog scan backing both the shape fingerprint and the collision + * check (they previously issued the same pg_class scan twice per decision). + */ +export const fetchSchemaRelations = async ( + pool: Pool, + physicalSchemas: string[] +): Promise => { + const result = await pool.query( + `SELECT n.nspname, c.relname + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = ANY($1) AND c.relkind IN ('r','v','m','p') + ORDER BY 1, 2`, + [physicalSchemas] + ); + return result.rows; +}; + +/** + * Pure fingerprint over pre-fetched relations: sha256 of the sorted + * `[logicalSchema, relname]` pairs (physical names mapped to logical form so + * same-shape tenants collapse to the same fingerprint). + * + * NOTE (v1 granularity): the fingerprint covers relation NAMES only — not + * columns, functions or enum labels. Same-relname column drift between tenants + * is not detected here; it is bounded by flushService's flush-all-pooled-on- + * schema:update semantics. Extending the fingerprint to attributes/procs is a + * documented follow-up. + */ +export const fingerprintFromRelations = (rows: SchemaRelation[]): string => { + const pairs: [string, string][] = rows.map((row) => [ + stripSchemaHashPrefix(row.nspname), + row.relname + ]); + pairs.sort((a, b) => { + if (a[0] !== b[0]) return a[0] < b[0] ? -1 : 1; + if (a[1] !== b[1]) return a[1] < b[1] ? -1 : 1; + return 0; + }); + return createHash('sha256').update(JSON.stringify(pairs)).digest('hex'); +}; + +/** + * Pure collision detection over pre-fetched relations: relation names present + * in more than one PHYSICAL schema of the set (would make unqualified SQL + * ambiguous under search_path routing). + */ +export const collisionsFromRelations = (rows: SchemaRelation[]): string[] => { + const schemasByRelname = new Map>(); + for (const row of rows) { + let set = schemasByRelname.get(row.relname); + if (!set) { + set = new Set(); + schemasByRelname.set(row.relname, set); + } + set.add(row.nspname); + } + return [...schemasByRelname.entries()] + .filter(([, schemas]) => schemas.size > 1) + .map(([relname]) => relname) + .sort(); +}; + +/** + * Compute a fingerprint of the relational shape shared by a set of physical + * schemas. Physical schema names are mapped to their logical form (hash prefix + * stripped) so that different tenants of the same shape collapse to the same + * fingerprint. The fingerprint is a sha256 hex digest over the sorted + * `[logicalSchema, relname]` pairs. + */ +export const computeShapeFingerprint = async ( + pool: Pool, + physicalSchemas: string[] +): Promise => fingerprintFromRelations(await fetchSchemaRelations(pool, physicalSchemas)); + +/** + * Detect relation-name collisions that would make unqualified SQL ambiguous: + * relation names present in more than one of the given schemas. Returns the + * offending relation names (sorted). E.g. a tenant with `identity_providers` + * as a table in `auth-private` and a view in `auth-public` yields + * `['identity_providers']`. + */ +export const checkUnqualifiedCollisions = async ( + pool: Pool, + physicalSchemas: string[] +): Promise => collisionsFromRelations(await fetchSchemaRelations(pool, physicalSchemas)); + +/** + * Compute the stable blueprint key that identifies a poolable PostGraphile + * instance. Two requests share an instance iff they agree on logical schemas, + * shape fingerprint, gather/schema flags, api name and mode. Inputs are + * normalized (schemas sorted, flag entries sorted by key) so the key is + * independent of ordering. + */ +export const computeBlueprintKey = (input: { + logicalSchemas: string[]; + shapeFingerprint: string; + flags: Record | undefined; + apiName?: string | null; + mode: string; + /** + * Physical database backing the instance's pool. REQUIRED for correctness in + * multi-database fleets: without it, same-shape tenants living in DIFFERENT + * physical databases would share one instance whose pool points at only one + * of them. + */ + dbname?: string | null; +}): string => { + const payload = { + s: [...input.logicalSchemas].sort(), + f: input.shapeFingerprint, + g: Object.entries(input.flags || {}).sort((a, b) => + a[0] < b[0] ? -1 : a[0] > b[0] ? 1 : 0 + ), + a: input.apiName || '', + m: input.mode, + d: input.dbname || '' + }; + + return 'bp:' + createHash('sha256').update(JSON.stringify(payload)).digest('hex'); +}; diff --git a/graphql/server/src/middleware/flush.ts b/graphql/server/src/middleware/flush.ts index 653d74631c..0b6c75cb66 100644 --- a/graphql/server/src/middleware/flush.ts +++ b/graphql/server/src/middleware/flush.ts @@ -2,9 +2,11 @@ import { ConstructiveOptions } from '@constructive-io/graphql-types'; import { Logger } from '@pgpmjs/logger'; import { svcCache } from '@pgpmjs/server-utils'; import { NextFunction, Request, Response } from 'express'; -import { graphileCache } from 'graphile-cache'; +import { clearMatchingEntries, graphileCache } from 'graphile-cache'; import { getPgPool } from 'pg-cache'; import './types'; // for Request type +import { isBlueprintPoolingEnabled } from './blueprint'; +import { clearPoolDecisions } from './pooling-decision'; const log = new Logger('flush'); @@ -17,6 +19,12 @@ export const flush = async ( // TODO: check bearer for a flush / special key graphileCache.delete((req as any).svc_key); svcCache.delete((req as any).svc_key); + // Under pooling the serving instance is stored under a `bp:` key, which the + // svc_key delete above cannot reach — mirror flushService's v1 semantics. + if (isBlueprintPoolingEnabled()) { + clearMatchingEntries(/^bp:/); + clearPoolDecisions(); + } res.status(200).send('OK'); return; } @@ -30,6 +38,18 @@ export const flushService = async ( const pgPool = getPgPool(opts.pg); log.info('flushing db ' + databaseId); + // Blueprint pooling (v1): a schema change in ANY tenant can alter a shape that + // pooled instances share, so flush ALL pooled (`bp:`) instances — rebuilds are + // cheap — and drop cached pooling decisions so the next request re-derives the + // fingerprint/key. Runs regardless of isPublic since pooling targets the public API. + if (isBlueprintPoolingEnabled()) { + const cleared = clearMatchingEntries(/^bp:/); + clearPoolDecisions(); + if (cleared > 0) { + log.info(`[pooling] flushed ${cleared} pooled instance(s) after change to db ${databaseId}`); + } + } + const api = new RegExp(`^api:${databaseId}:.*`); const schemata = new RegExp(`^schemata:${databaseId}:.*`); const meta = new RegExp(`^metaschema:api:${databaseId}`); diff --git a/graphql/server/src/middleware/graphile.ts b/graphql/server/src/middleware/graphile.ts index 477e61288b..8e149a55d1 100644 --- a/graphql/server/src/middleware/graphile.ts +++ b/graphql/server/src/middleware/graphile.ts @@ -4,18 +4,31 @@ import type { ConstructiveOptions } from '@constructive-io/graphql-types'; import { Logger } from '@pgpmjs/logger'; import type { NextFunction, Request, RequestHandler, Response } from 'express'; import type { GraphQLError, GraphQLFormattedError } from 'grafast/graphql'; -import { createGraphileInstance, type GraphileCacheEntry, graphileCache } from 'graphile-cache'; +import type { Pool } from 'pg'; +import { + createGraphileInstance, + ensureCacheHeadroom, + type GraphileCacheEntry, + graphileCache, + invokeEntryHandler, +} from 'graphile-cache'; import type { GraphileConfig } from 'graphile-config'; import { createConstructivePreset, makePgService } from 'graphile-settings'; import { getPgPool } from 'pg-cache'; import { getPgEnvOptions } from 'pg-env'; import './types'; // for Request type +import { isBlueprintPoolingEnabled, tenantSearchPath } from './blueprint'; +import { resolvePoolDecision } from './pooling-decision'; import { isGraphqlObservabilityEnabled } from '../diagnostics/observability'; import { HandlerCreationError } from '../errors/api-errors'; import { observeGraphileBuild } from './observability/graphile-build-stats'; import type { DatabaseSettings } from '../types'; import { AuthCookiePlugin } from '../plugins/auth-cookie-plugin'; +// Re-exported so flush.ts (and other callers) can invalidate pooling decisions +// through the graphile module surface. +export { clearPoolDecisions } from './pooling-decision'; + const maskErrorLog = new Logger('graphile:maskError'); const SAFE_ERROR_CODES = new Set([ @@ -191,6 +204,46 @@ export function clearInFlightMap(): void { creating.clear(); } +// ============================================================================= +// Build Admission Control +// ============================================================================= + +/** + * Bounds how many PostGraphile schema builds run concurrently across ALL cache + * keys (the single-flight map only dedups builds for the SAME key). Each build + * transiently allocates hundreds of MB, so concurrent builds of different + * tenants stack those peaks on top of the resident cache — the OOM shape. + * Default 1: builds queue and run serially. Override with GRAPHILE_BUILD_CONCURRENCY. + */ +class BuildSemaphore { + private active = 0; + private waiters: Array<() => void> = []; + constructor(private readonly capacity: number) {} + + async acquire(): Promise { + if (this.active < this.capacity) { + this.active++; + return; + } + await new Promise((resolve) => this.waiters.push(resolve)); + this.active++; + } + + release(): void { + this.active = Math.max(0, this.active - 1); + const wake = this.waiters.shift(); + if (wake) wake(); + } +} + +const parseBuildConcurrency = (): number => { + const raw = process.env.GRAPHILE_BUILD_CONCURRENCY; + const n = raw ? parseInt(raw, 10) : 1; + return Number.isFinite(n) && n > 0 ? n : 1; +}; + +const buildSemaphore = new BuildSemaphore(parseBuildConcurrency()); + const log = new Logger('graphile'); const reqLabel = (req: Request): string => (req.requestId ? `[${req.requestId}]` : '[req]'); @@ -203,13 +256,17 @@ const reqLabel = (req: Request): string => (req.requestId ? `[${req.requestId}]` * (everything on except aggregates). */ const buildPreset = ( - pool: import('pg').Pool, + pool: Pool, schemas: string[], anonRole: string, roleName: string, databaseSettings?: DatabaseSettings, + options?: { pooling?: boolean }, ): GraphileConfig.Preset => { - return { + // When pooling, the instance is shared across tenants of the same schema-shape + // and routed per request via a search_path pgSetting (see grafast.context below). + const pooling = options?.pooling === true; + const preset: GraphileConfig.Preset = { extends: [createConstructivePreset(databaseSettings)], plugins: [AuthCookiePlugin], pgServices: [ @@ -221,7 +278,9 @@ const buildPreset = ( grafserv: { graphqlPath: '/graphql', graphiqlPath: '/graphiql', - graphiql: true, + // GraphiQL (ruru) assets/handlers are per-instance overhead and an unnecessary + // prod surface — enable only in development, or explicitly via GRAPHILE_GRAPHIQL=true. + graphiql: getNodeEnv() === 'development' || process.env.GRAPHILE_GRAPHIQL === 'true', graphiqlOnGraphQLGET: false, maskError, }, @@ -232,6 +291,13 @@ const buildPreset = ( const req = (requestContext as { expressv4?: { req?: Request } })?.expressv4?.req; const context: Record = {}; + // De-closure the role names: read them from the resolved API on the request + // so a shared (pooled) instance uses the REQUESTING tenant's roles rather than + // whichever tenant happened to build it. Falls back to the build-time params. + const api = req?.api; + const role = api?.roleName ?? roleName; + const anon = api?.anonRole ?? anonRole; + if (req) { if (req.databaseId) { context['jwt.claims.database_id'] = req.databaseId; @@ -251,7 +317,7 @@ const buildPreset = ( if (req.token?.user_id) { const pgSettings: Record = { - role: roleName, + role, 'jwt.claims.token_id': req.token.id, 'jwt.claims.user_id': req.token.user_id, ...context, @@ -282,24 +348,47 @@ const buildPreset = ( pgSettings['request.id'] = req.requestId; } + // Pooled instances emit search_path-relative SQL; route this request to + // the REQUESTING tenant's physical schemas (+ shared `public` last — see + // tenantSearchPath). Never set when not pooling. + if (pooling && api?.schema?.length) { + pgSettings['search_path'] = tenantSearchPath(api.schema); + } + return { pgSettings }; } } const anonSettings: Record = { - role: anonRole, + role: anon, ...context, }; if (req?.requestId) { anonSettings['request.id'] = req.requestId; } + // Pooled instances emit search_path-relative SQL; route this request to + // the REQUESTING tenant's physical schemas (+ shared `public` last — see + // tenantSearchPath). Never set when not pooling. + if (pooling && api?.schema?.length) { + anonSettings['search_path'] = tenantSearchPath(api.schema); + } return { pgSettings: anonSettings, }; }, }, -}; + }; + + if (pooling) { + // Stock unqualified-identifier gather + our schema flag so Constructive + // plugins emit search_path-relative SQL for tenant-data references. Cast: + // `constructiveUnqualified` is our custom schema option, absent from base types. + (preset as any).gather = { pgIdentifiers: 'unqualified' }; + (preset as any).schema = { constructiveUnqualified: true }; + } + + return preset; }; export const graphile = (opts: ConstructiveOptions): RequestHandler => { @@ -313,21 +402,49 @@ export const graphile = (opts: ConstructiveOptions): RequestHandler => { log.error(`${label} Missing API info`); return res.status(500).send('Missing API info'); } - const key = req.svc_key; - if (!key) { + const svcKey = req.svc_key; + if (!svcKey) { log.error(`${label} Missing service cache key`); return res.status(500).send('Missing service cache key'); } const { dbname, anonRole, roleName, schema } = api; const schemaLabel = schema?.join(',') || 'unknown'; + // ========================================================================= + // Blueprint Pooling Decision (opt-in via GRAPHILE_BLUEPRINT_POOLING) + // + // When enabled, resolve a shared blueprint key so tenants of the same + // schema-shape attach to ONE instance, routed per request via search_path + // (set in grafast.context). Decisions are memoized per svc_key and cleared + // on flush. When the flag is OFF this block is skipped entirely: `key` stays + // exactly req.svc_key and no extra pool or queries are touched. + // ========================================================================= + const pgConfig = getPgEnvOptions({ ...opts.pg, database: dbname }); + let key = svcKey; + let pooling = false; + let pool: Pool | undefined; + if (isBlueprintPoolingEnabled() && api) { + // Blueprint keying needs the tenant pool to fingerprint the schema shape. + pool = getPgPool(pgConfig); + const decision = await resolvePoolDecision(svcKey, api, pool); + key = decision.key; + pooling = decision.pooling; + if (pooling) { + log.info(`[pooling] svc=${svcKey} → ${key}`); + } + } + // ========================================================================= // Phase A: Cache Check (fast path) // ========================================================================= const cached = graphileCache.get(key); if (cached) { - log.debug(`${label} PostGraphile cache hit key=${key} db=${dbname} schemas=${schemaLabel}`); - return cached.handler(req, res, next); + if (invokeEntryHandler(cached, req, res, next)) { + log.debug(`${label} PostGraphile cache hit key=${key} db=${dbname} schemas=${schemaLabel}`); + return; + } + // Entry is mid-disposal — fall through and rebuild. + log.debug(`${label} PostGraphile cache hit on disposing entry key=${key}; rebuilding`); } log.debug(`${label} PostGraphile cache miss key=${key} db=${dbname} schemas=${schemaLabel}`); @@ -340,7 +457,11 @@ export const graphile = (opts: ConstructiveOptions): RequestHandler => { log.debug(`${label} Coalescing request for PostGraphile[${key}] - waiting for in-flight creation`); try { const instance = await inFlight; - return instance.handler(req, res, next); + if (invokeEntryHandler(instance, req, res, next)) { + return; + } + log.debug(`${label} Coalesced instance already disposing for PostGraphile[${key}], retrying`); + // Fall through to Phase C to retry creation } catch (error) { log.warn(`${label} Coalesced request failed for PostGraphile[${key}], retrying`); // Fall through to Phase C to retry creation @@ -353,9 +474,9 @@ export const graphile = (opts: ConstructiveOptions): RequestHandler => { // Re-check cache after coalesced request failure (another retry may have succeeded) const recheckedCache = graphileCache.get(key); - if (recheckedCache) { + if (recheckedCache && invokeEntryHandler(recheckedCache, req, res, next)) { log.debug(`${label} PostGraphile cache hit on re-check key=${key}`); - return recheckedCache.handler(req, res, next); + return; } // Re-check in-flight map (another retry may have started creation) @@ -363,44 +484,71 @@ export const graphile = (opts: ConstructiveOptions): RequestHandler => { if (retryInFlight) { log.debug(`${label} Re-coalescing request for PostGraphile[${key}]`); const retryInstance = await retryInFlight; - return retryInstance.handler(req, res, next); + if (invokeEntryHandler(retryInstance, req, res, next)) { + return; + } + log.warn(`${label} Re-coalesced instance already disposing for PostGraphile[${key}]`); + return res.status(503).json({ + error: { code: 'SERVICE_ROTATING', message: 'Service is restarting, please retry' } + }); } log.info( `${label} Building PostGraphile v5 handler key=${key} db=${dbname} schemas=${schemaLabel} role=${roleName} anon=${anonRole}`, ); - const pgConfig = getPgEnvOptions({ - ...opts.pg, - database: dbname, - }); - // Route through pg-cache so the pool is tracked and can be cleaned up - // properly, preventing leaked connections during database teardown. - const pool = getPgPool(pgConfig); + // properly, preventing leaked connections during database teardown. When the + // pooling decision above already resolved the pool, reuse it (pg-cache memoizes + // regardless); otherwise resolve it here exactly as before (flag-off path). + const activePool = pool ?? getPgPool(pgConfig); // Create promise and store in in-flight map BEFORE try block - const preset = buildPreset(pool, schema || [], anonRole, roleName, api.databaseSettings); - const creationPromise = observeGraphileBuild( - { - cacheKey: key, - serviceKey: key, - databaseId: api.databaseId ?? null, - }, - () => createGraphileInstance({ - preset, - cacheKey: key, - enableRealtime: api.databaseSettings?.enableRealtime, - }), - { enabled: observabilityEnabled }, - ); + const preset = buildPreset(activePool, schema || [], anonRole, roleName, api.databaseSettings, { pooling }); + const creationPromise = (async (): Promise => { + // Serialize builds process-wide: each build transiently allocates hundreds of MB, + // and only same-key builds are deduped by the single-flight map above. + await buildSemaphore.acquire(); + try { + // While queued for the slot another request may have finished this key. + const builtMeanwhile = graphileCache.get(key); + if (builtMeanwhile) { + return builtMeanwhile; + } + // Evict the LRU instance BEFORE building so the build peak lands on freed + // headroom instead of stacking on a full cache. + ensureCacheHeadroom(1); + return await observeGraphileBuild( + { + cacheKey: key, + serviceKey: key, + databaseId: api.databaseId ?? null, + }, + () => createGraphileInstance({ + preset, + cacheKey: key, + dbname, + enableRealtime: api.databaseSettings?.enableRealtime, + }), + { enabled: observabilityEnabled }, + ); + } finally { + buildSemaphore.release(); + } + })(); creating.set(key, creationPromise); try { const instance = await creationPromise; graphileCache.set(key, instance); log.info(`${label} Cached PostGraphile v5 handler key=${key} db=${dbname}`); - return instance.handler(req, res, next); + if (invokeEntryHandler(instance, req, res, next)) { + return; + } + log.warn(`${label} Freshly built instance already disposing for PostGraphile[${key}]`); + return res.status(503).json({ + error: { code: 'SERVICE_ROTATING', message: 'Service is restarting, please retry' } + }); } catch (error) { log.error(`${label} Failed to create PostGraphile[${key}]:`, error); throw new HandlerCreationError( diff --git a/graphql/server/src/middleware/pooling-decision.ts b/graphql/server/src/middleware/pooling-decision.ts new file mode 100644 index 0000000000..005f2ff669 --- /dev/null +++ b/graphql/server/src/middleware/pooling-decision.ts @@ -0,0 +1,122 @@ +import { Logger } from '@pgpmjs/logger'; +import type { Pool } from 'pg'; + +import type { ApiStructure } from '../types'; +import { + collisionsFromRelations, + computeBlueprintKey, + fetchSchemaRelations, + fingerprintFromRelations, + stripSchemaHashPrefix +} from './blueprint'; + +// ============================================================================= +// Blueprint Pooling: per-svc decision + decision cache +// +// Decides whether a service (svc_key) may attach to a SHARED, search_path-routed +// PostGraphile instance (blueprint pooling) or must keep a per-tenant instance. +// Kept in its own module — free of the heavy graphile.ts import chain — so the +// decision logic is unit-testable without loading PostGraphile. +// ============================================================================= + +const log = new Logger('graphile:pooling'); + +export interface PoolDecision { + /** Effective graphileCache key: a `bp:` blueprint key when pooling, else the svc_key. */ + key: string; + /** Whether this svc attaches to a shared, search_path-routed instance. */ + pooling: boolean; + /** + * True when the fallback came from a thrown catalog probe (a possibly + * transient DB error) rather than a structural opt-out. Transient decisions + * are NOT memoized, so the next request re-probes instead of permanently + * pinning the svc to a per-tenant instance until the next flush. + */ + transient?: boolean; +} + +/** + * Memoized pooling decision per svc_key. Computing a decision runs two catalog + * probes (shape fingerprint + collision check), so the result is cached and + * invalidated on flush (see clearPoolDecisions / flushService). + */ +const poolDecisions = new Map(); + +/** + * Clear all cached pooling decisions. Called by flushService when a tenant + * schema change invalidates pooled instances (v1: any change flushes them all). + */ +export function clearPoolDecisions(): void { + poolDecisions.clear(); +} + +/** + * Compute (WITHOUT caching) the blueprint-pooling decision for a service. + * + * Returns `{ pooling: false, key: svcKey }` — a per-tenant instance keyed by the + * normal svc_key — when the API opts out of pooling: realtime is enabled, there + * are no schemas, an unqualified relation-name collision would make search_path + * routing ambiguous, or the catalog probes throw. Otherwise returns + * `{ pooling: true, key: 'bp:...' }` so same-shape tenants share one instance. + * + * Exported for unit testing; the dispatcher uses the cached resolvePoolDecision. + */ +export const computePoolDecision = async ( + svcKey: string, + api: ApiStructure, + pool: Pool +): Promise => { + // Realtime instances hold a LISTEN/NOTIFY connection + RealtimeManager per + // instance and can't be safely shared across tenants — never pool them. + if (api.databaseSettings?.enableRealtime === true) { + return { key: svcKey, pooling: false }; + } + if (!api.schema || api.schema.length === 0) { + return { key: svcKey, pooling: false }; + } + + try { + // One catalog scan feeds both the fingerprint and the collision check. + const relations = await fetchSchemaRelations(pool, api.schema); + const collisions = collisionsFromRelations(relations); + if (collisions.length > 0) { + log.warn( + `svc=${svcKey} not poolable — unqualified relation collision(s): ${collisions.join(', ')}; using per-tenant instance` + ); + return { key: svcKey, pooling: false }; + } + + const key = computeBlueprintKey({ + logicalSchemas: api.logicalSchemas ?? api.schema.map(stripSchemaHashPrefix), + shapeFingerprint: fingerprintFromRelations(relations), + flags: api.databaseSettings as Record | undefined, + apiName: (api as { apiName?: string | null }).apiName ?? null, + mode: 'public', + dbname: api.dbname + }); + return { key, pooling: true }; + } catch (err) { + log.warn( + `svc=${svcKey} not poolable — shape/collision probe failed: ${err instanceof Error ? err.message : String(err)}; using per-tenant instance (not memoized)` + ); + return { key: svcKey, pooling: false, transient: true }; + } +}; + +/** + * Resolve the pooling decision for a service, memoized per svc_key. Exported for + * unit testing of the memoization/invalidation behaviour. + */ +export const resolvePoolDecision = async ( + svcKey: string, + api: ApiStructure, + pool: Pool +): Promise => { + const cached = poolDecisions.get(svcKey); + if (cached) return cached; + const decision = await computePoolDecision(svcKey, api, pool); + if (!decision.transient) { + poolDecisions.set(svcKey, decision); + } + return decision; +}; diff --git a/packages/express-context/src/types.ts b/packages/express-context/src/types.ts index 39e0a79c56..f6a860a0e8 100644 --- a/packages/express-context/src/types.ts +++ b/packages/express-context/src/types.ts @@ -106,6 +106,12 @@ export interface ApiStructure { anonRole: string; roleName: string; schema: string[]; + /** + * Logical schema names (tenant hash prefix stripped) derived from `schema`. + * Used by blueprint pooling to key a shared PostGraphile instance by + * schema-shape rather than by physical tenant schema. + */ + logicalSchemas?: string[]; apiModules: ApiModule[]; rlsModule?: RlsModule; domains?: string[];