Integration3 min read

Integrating HappyHorse into a Postgres-Backed Render Queue

At real volume, fire-and-forget breaks down. You need to know which request belonged to which user, what it cost, whether it finished, and where the output lives after the signed URL expires.


At real volume, fire-and-forget breaks down. You need to know which request belonged to which user, what it cost, whether it finished, and where the output lives after the signed URL expires. Postgres handles this with one table and a webhook handler.

The table

All generations across all endpoints live here. Do not split by model; one place answers "how much did user X spend this month."

SQL
1create type generation_status as enum (
2 'submitted', 'in_progress', 'completed', 'failed', 'canceled'
3);
4
5create table generations (
6 id bigserial primary key,
7 request_id text not null,
8 user_id bigint not null,
9 endpoint text not null,
10 inputs jsonb not null,
11 outputs jsonb,
12 status generation_status not null default 'submitted',
13 cost_cents integer not null default 0,
14 error text,
15 created_at timestamptz not null default now(),
16 finished_at timestamptz
17);
18
19create unique index generations_request_id_key
20 on generations (request_id);
21create index generations_user_created_idx
22 on generations (user_id, created_at desc);
23create index generations_endpoint_status_idx
24 on generations (endpoint, status);
25create index generations_inputs_gin_idx
26 on generations using gin (inputs);

Four indexes earning their keep: (user_id, created_at desc) for dashboards, unique request_id for webhook idempotency, (endpoint, status) for reconciliation, GIN on inputs for prompt search.

Table shape
Table shape

The submit path

Write the row at submit time. The row must exist so that if the webhook fires first, the upsert has a target.

TS
1import { fal } from "@fal-ai/client";
2import { pool } from "./db";
3
4// or fal-ai/happyhorse/v1/text-to-video once available
5const ENDPOINT = "fal-ai/seedance-2.0/text-to-video";
6
7export async function submitGeneration(userId: number, prompt: string) {
8 const inputs = { prompt, resolution: "1080p", duration: 5 };
9 const { request_id } = await fal.queue.submit(ENDPOINT, {
10 input: inputs,
11 webhookUrl: process.env.WEBHOOK_URL,
12 });
13 const cost = estimateCostCents(ENDPOINT, inputs);
14
15 await pool.query(
16 `insert into generations
17 (request_id, user_id, endpoint, inputs, cost_cents, status)
18 values ($1, $2, $3, $4, $5, 'submitted')
19 on conflict (request_id) do nothing`,
20 [request_id, userId, ENDPOINT, inputs, cost]
21 );
22 return request_id;
23}

The on conflict do nothing covers the case where the webhook beats the insert. For HappyHorse the per-second price is still TBD on the public pricing page, so hedge and reconcile on webhook. For Seedance 2.0 you can compute a close estimate up front.

The webhook handler

One endpoint, idempotent by request_id. fal.ai may retry webhooks; treat duplicates as no-ops.

TS
1export async function handleWebhook(req, res) {
2 const { request_id, status, payload, error } = req.body;
3
4 let outputs = null;
5 if (status === "OK" && payload?.video?.url) {
6 outputs = {
7 video_url: await copyToStorage(payload.video.url),
8 original_url: payload.video.url,
9 duration: payload.video.duration,
10 seed: payload.seed,
11 };
12 }
13
14 await pool.query(
15 `update generations
16 set status = $2, outputs = $3,
17 error = $4, finished_at = now()
18 where request_id = $1`,
19 [request_id, status === "OK" ? "completed" : "failed",
20 outputs, error ?? null]
21 );
22 res.status(200).json({ ok: true });
23}

HappyHorse output URLs are time-limited. Store only the signed URL and a two-week-old generation returns a 403. Copy the video to storage you control before handing the link back.

Webhook flow
Webhook flow

The reconciler

Webhooks fail. Run a cron every 5 minutes that picks up submitted jobs older than 10 minutes.

TS
1export async function reconcileStuck() {
2 const { rows } = await pool.query(
3 `select request_id, endpoint from generations
4 where status = 'submitted'
5 and created_at < now() - interval '10 minutes'
6 limit 100`
7 );
8 for (const row of rows) {
9 const s = await fal.queue.status(row.endpoint, {
10 requestId: row.request_id,
11 });
12 if (s.status === "COMPLETED") {
13 // apply the same logic as handleWebhook
14 }
15 }
16}

The (endpoint, status) index keeps this query cheap with millions of completed rows.

Queue architecture
Queue architecture

Also reading