// scripts/backfill-kofi.mjs // Usage (inside container): // node scripts/backfill-kofi.mjs --file /data/kofi.csv [--dry] [--concurrency=5] // // CSV expected headers (case-insensitive; unknown columns ignored): // email, from_name, type, timestamp, is_subscription_payment, is_first_subscription_payment, // tier_name, discord_userid, discord_username, amount, currency, kofi_transaction_id // // Upsert strategy: find existing where provider="kofi" and (email == email || external_user_id == email || external_user_id == discord_userid || external_user_id == from_name) import fs from "node:fs"; import { parse } from "csv-parse/sync"; // --- envs from the running container --- const DIRECTUS = (process.env.DIRECTUS_URL || "").replace(/\/$/, ""); const BOT_TOKEN = process.env.DIRECTUS_TOKEN_ADMIN_SUPPORTER || ""; const COLLECTION = "user_memberships"; if (!DIRECTUS || !BOT_TOKEN) { console.error("[backfill] Missing DIRECTUS_URL or DIRECTUS_TOKEN_ADMIN_SUPPORTER in env."); process.exit(2); } // ---- args ---- const args = new Map( process.argv.slice(2).flatMap((a) => { if (!a.startsWith("--")) return []; const [k, v = "true"] = a.replace(/^--/, "").split("="); return [[k, v]]; }) ); const file = args.get("file"); const dry = args.get("dry") === "true" || args.has("dry"); const concurrency = Number(args.get("concurrency") || 5); if (!file) { console.error("Usage: node scripts/backfill-kofi.mjs --file /path/to/kofi.csv [--dry] [--concurrency=5]"); process.exit(1); } // ---- helpers ---- const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); function addOneMonthPlusOneDay(d) { const nd = new Date(d); const m = nd.getMonth(); nd.setMonth(m + 1); // handle month overflow (e.g., Jan 31 -> Mar 2, fix by going last day of prev month) if (nd.getMonth() !== ((m + 1) % 12)) nd.setDate(0); nd.setDate(nd.getDate() + 1); return nd; } function normBool(v) { if (typeof v === "boolean") return v; if (v == null) return false; const s = String(v).trim().toLowerCase(); return s === "true" || s === "1" || s === "yes" || s === "y"; } function lowerOrNull(v) { const s = (v ?? "").toString().trim(); return s ? s.toLowerCase() : null; } function strOrNull(v) { const s = (v ?? "").toString().trim(); return s || null; } async function upsertOne(rec) { const email = lowerOrNull(rec.email); const from_name = strOrNull(rec.from_name); const discord_userid = strOrNull(rec.discord_userid); const type = strOrNull(rec.type) || ""; const isSub = normBool(rec.is_subscription_payment) || type.toLowerCase().includes("subscription"); const ts = rec.timestamp ? new Date(rec.timestamp) : null; const extId = email || discord_userid || from_name || "unknown"; const started_at = isSub && ts ? ts.toISOString() : null; const renews_at = isSub && ts ? addOneMonthPlusOneDay(ts).toISOString() : null; const status = isSub ? "active" : "one_time"; const payload = { provider: "kofi", external_user_id: extId, email, username: from_name, status, tier: strOrNull(rec.tier_name), started_at, renews_at, last_event_at: new Date().toISOString(), // Keep a compact raw for backfill; webhook now stores full JSON in TEXT raw: JSON.stringify({ src: "backfill", type, email, from_name, is_subscription_payment: isSub, is_first_subscription_payment: normBool(rec.is_first_subscription_payment), timestamp: rec.timestamp || null, discord_userid: discord_userid || null, discord_username: strOrNull(rec.discord_username), amount: strOrNull(rec.amount), currency: strOrNull(rec.currency), kofi_transaction_id: strOrNull(rec.kofi_transaction_id), tier_name: strOrNull(rec.tier_name), }), }; // Find existing (same criteria as webhook) const filter = encodeURIComponent( JSON.stringify({ _and: [{ provider: { _eq: "kofi" } }], _or: [ ...(email ? [{ email: { _eq: email } }, { external_user_id: { _eq: email } }] : []), ...(discord_userid ? [{ external_user_id: { _eq: discord_userid } }] : []), ...(from_name ? [{ external_user_id: { _eq: from_name } }] : []), ], }) ); if (dry) { return { dry: true, will: payload }; } // read existing const exRes = await fetch(`${DIRECTUS}/items/${COLLECTION}?filter=${filter}&limit=1`, { headers: { Authorization: `Bearer ${BOT_TOKEN}` }, cache: "no-store", }); if (!exRes.ok) { const errText = await exRes.text().catch(() => ""); throw new Error(`directus_read_failed(${exRes.status}): ${errText}`); } const exJson = await exRes.json().catch(() => ({})); const existing = exJson?.data?.[0] || null; // For updates with no ts/isSub we won't clobber start/renew — mirror webhook behavior. const body = { ...payload }; if (existing && (!isSub || !ts)) { delete body.started_at; delete body.renews_at; } const url = existing ? `${DIRECTUS}/items/${COLLECTION}/${existing.id}` : `${DIRECTUS}/items/${COLLECTION}`; const method = existing ? "PATCH" : "POST"; const wr = await fetch(url, { method, headers: { "Content-Type": "application/json", Authorization: `Bearer ${BOT_TOKEN}`, }, body: JSON.stringify(body), }); if (!wr.ok) { const t = await wr.text().catch(() => ""); throw new Error(`directus_write_failed(${wr.status}): ${t}`); } const wj = await wr.json().catch(() => ({})); return { ok: true, method, id: wj?.data?.id || existing?.id || null }; } async function run() { const csvBuf = fs.readFileSync(file); const records = parse(csvBuf, { columns: true, skip_empty_lines: true, bom: true, relax_column_count: true, trim: true, }); console.log(`[backfill] Loaded ${records.length} rows from ${file}`); console.log(`[backfill] Directus=${DIRECTUS} collection=${COLLECTION} dry=${dry} concurrency=${concurrency}`); let ok = 0, fail = 0; let idx = 0; const pool = new Array(concurrency).fill(0).map(async () => { while (idx < records.length) { const i = idx++; const rec = records[i]; try { const res = await upsertOne(rec); if (res.dry) { console.log(`[dry] row#${i + 1}: would upsert ext="${(res.will.external_user_id)}" email="${res.will.email}" status=${res.will.status}`); } else { console.log(`[ok] row#${i + 1}: ${res.method} id=${res.id}`); } ok++; } catch (e) { console.error(`[err] row#${i + 1}: ${(e && e.message) || e}`); fail++; // small backoff to avoid hammering if there's a transient issue await sleep(150); } } }); await Promise.all(pool); console.log(`[backfill] done. ok=${ok} fail=${fail}`); if (fail > 0) process.exitCode = 1; } run().catch((e) => { console.error("[backfill] fatal:", e); process.exit(1); });