diff --git a/scripts/backfill-kofi.mjs b/scripts/backfill-kofi.mjs new file mode 100644 index 00000000..d28bf8e1 --- /dev/null +++ b/scripts/backfill-kofi.mjs @@ -0,0 +1,224 @@ +// scripts/backfill-kofi.mjs +// Usage (inside container): +// node scripts/backfill-kofi.mjs --file /data/kofi.csv [--dry] [--concurrency=5] +// +// CSV expected headers (case-insensitive; unknown columns ignored): +// email, from_name, type, timestamp, is_subscription_payment, is_first_subscription_payment, +// tier_name, discord_userid, discord_username, amount, currency, kofi_transaction_id +// +// Upsert strategy: find existing where provider="kofi" and (email == email || external_user_id == email || external_user_id == discord_userid || external_user_id == from_name) + +import fs from "node:fs"; +import { parse } from "csv-parse/sync"; + +// --- envs from the running container --- +const DIRECTUS = (process.env.DIRECTUS_URL || "").replace(/\/$/, ""); +const BOT_TOKEN = process.env.DIRECTUS_TOKEN_ADMIN_SUPPORTER || ""; +const COLLECTION = "user_memberships"; + +if (!DIRECTUS || !BOT_TOKEN) { + console.error("[backfill] Missing DIRECTUS_URL or DIRECTUS_TOKEN_ADMIN_SUPPORTER in env."); + process.exit(2); +} + +// ---- args ---- +const args = new Map( + process.argv.slice(2).flatMap((a) => { + if (!a.startsWith("--")) return []; + const [k, v = "true"] = a.replace(/^--/, "").split("="); + return [[k, v]]; + }) +); +const file = args.get("file"); +const dry = args.get("dry") === "true" || args.has("dry"); +const concurrency = Number(args.get("concurrency") || 5); + +if (!file) { + console.error("Usage: node scripts/backfill-kofi.mjs --file /path/to/kofi.csv [--dry] [--concurrency=5]"); + process.exit(1); +} + +// ---- helpers ---- +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); + +function addOneMonthPlusOneDay(d) { + const nd = new Date(d); + const m = nd.getMonth(); + nd.setMonth(m + 1); + // handle month overflow (e.g., Jan 31 -> Mar 2, fix by going last day of prev month) + if (nd.getMonth() !== ((m + 1) % 12)) nd.setDate(0); + nd.setDate(nd.getDate() + 1); + return nd; +} + +function normBool(v) { + if (typeof v === "boolean") return v; + if (v == null) return false; + const s = String(v).trim().toLowerCase(); + return s === "true" || s === "1" || s === "yes" || s === "y"; +} + +function lowerOrNull(v) { + const s = (v ?? "").toString().trim(); + return s ? s.toLowerCase() : null; +} + +function strOrNull(v) { + const s = (v ?? "").toString().trim(); + return s || null; +} + +async function upsertOne(rec) { + const email = lowerOrNull(rec.email); + const from_name = strOrNull(rec.from_name); + const discord_userid = strOrNull(rec.discord_userid); + const type = strOrNull(rec.type) || ""; + const isSub = normBool(rec.is_subscription_payment) || type.toLowerCase().includes("subscription"); + const ts = rec.timestamp ? new Date(rec.timestamp) : null; + + const extId = + email || + discord_userid || + from_name || + "unknown"; + + const started_at = + isSub && ts ? ts.toISOString() : null; + + const renews_at = + isSub && ts ? addOneMonthPlusOneDay(ts).toISOString() : null; + + const status = isSub ? "active" : "one_time"; + + const payload = { + provider: "kofi", + external_user_id: extId, + email, + username: from_name, + status, + tier: strOrNull(rec.tier_name), + started_at, + renews_at, + last_event_at: new Date().toISOString(), + // Keep a compact raw for backfill; webhook now stores full JSON in TEXT + raw: JSON.stringify({ + src: "backfill", + type, + email, + from_name, + is_subscription_payment: isSub, + is_first_subscription_payment: normBool(rec.is_first_subscription_payment), + timestamp: rec.timestamp || null, + discord_userid: discord_userid || null, + discord_username: strOrNull(rec.discord_username), + amount: strOrNull(rec.amount), + currency: strOrNull(rec.currency), + kofi_transaction_id: strOrNull(rec.kofi_transaction_id), + tier_name: strOrNull(rec.tier_name), + }), + }; + + // Find existing (same criteria as webhook) + const filter = encodeURIComponent( + JSON.stringify({ + _and: [{ provider: { _eq: "kofi" } }], + _or: [ + ...(email ? [{ email: { _eq: email } }, { external_user_id: { _eq: email } }] : []), + ...(discord_userid ? [{ external_user_id: { _eq: discord_userid } }] : []), + ...(from_name ? [{ external_user_id: { _eq: from_name } }] : []), + ], + }) + ); + + if (dry) { + return { dry: true, will: payload }; + } + + // read existing + const exRes = await fetch(`${DIRECTUS}/items/${COLLECTION}?filter=${filter}&limit=1`, { + headers: { Authorization: `Bearer ${BOT_TOKEN}` }, + cache: "no-store", + }); + if (!exRes.ok) { + const errText = await exRes.text().catch(() => ""); + throw new Error(`directus_read_failed(${exRes.status}): ${errText}`); + } + const exJson = await exRes.json().catch(() => ({})); + const existing = exJson?.data?.[0] || null; + + // For updates with no ts/isSub we won't clobber start/renew — mirror webhook behavior. + const body = { ...payload }; + if (existing && (!isSub || !ts)) { + delete body.started_at; + delete body.renews_at; + } + + const url = existing + ? `${DIRECTUS}/items/${COLLECTION}/${existing.id}` + : `${DIRECTUS}/items/${COLLECTION}`; + const method = existing ? "PATCH" : "POST"; + + const wr = await fetch(url, { + method, + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${BOT_TOKEN}`, + }, + body: JSON.stringify(body), + }); + + if (!wr.ok) { + const t = await wr.text().catch(() => ""); + throw new Error(`directus_write_failed(${wr.status}): ${t}`); + } + const wj = await wr.json().catch(() => ({})); + return { ok: true, method, id: wj?.data?.id || existing?.id || null }; +} + +async function run() { + const csvBuf = fs.readFileSync(file); + const records = parse(csvBuf, { + columns: true, + skip_empty_lines: true, + bom: true, + relax_column_count: true, + trim: true, + }); + + console.log(`[backfill] Loaded ${records.length} rows from ${file}`); + console.log(`[backfill] Directus=${DIRECTUS} collection=${COLLECTION} dry=${dry} concurrency=${concurrency}`); + + let ok = 0, fail = 0; + let idx = 0; + + const pool = new Array(concurrency).fill(0).map(async () => { + while (idx < records.length) { + const i = idx++; + const rec = records[i]; + try { + const res = await upsertOne(rec); + if (res.dry) { + console.log(`[dry] row#${i + 1}: would upsert ext="${(res.will.external_user_id)}" email="${res.will.email}" status=${res.will.status}`); + } else { + console.log(`[ok] row#${i + 1}: ${res.method} id=${res.id}`); + } + ok++; + } catch (e) { + console.error(`[err] row#${i + 1}: ${(e && e.message) || e}`); + fail++; + // small backoff to avoid hammering if there's a transient issue + await sleep(150); + } + } + }); + + await Promise.all(pool); + console.log(`[backfill] done. ok=${ok} fail=${fail}`); + if (fail > 0) process.exitCode = 1; +} + +run().catch((e) => { + console.error("[backfill] fatal:", e); + process.exit(1); +}); +