makearmy-app/scripts/backfill-kofi.mjs

225 lines
6.8 KiB
JavaScript
Raw Permalink Normal View History

2025-10-20 11:13:30 -04:00
// scripts/backfill-kofi.mjs
// Usage (inside container):
// node scripts/backfill-kofi.mjs --file /data/kofi.csv [--dry] [--concurrency=5]
//
// CSV expected headers (case-insensitive; unknown columns ignored):
// email, from_name, type, timestamp, is_subscription_payment, is_first_subscription_payment,
// tier_name, discord_userid, discord_username, amount, currency, kofi_transaction_id
//
// Upsert strategy: find existing where provider="kofi" and (email == email || external_user_id == email || external_user_id == discord_userid || external_user_id == from_name)
import fs from "node:fs";
import { parse } from "csv-parse/sync";
// --- envs from the running container ---
const DIRECTUS = (process.env.DIRECTUS_URL || "").replace(/\/$/, "");
const BOT_TOKEN = process.env.DIRECTUS_TOKEN_ADMIN_SUPPORTER || "";
const COLLECTION = "user_memberships";
if (!DIRECTUS || !BOT_TOKEN) {
console.error("[backfill] Missing DIRECTUS_URL or DIRECTUS_TOKEN_ADMIN_SUPPORTER in env.");
process.exit(2);
}
// ---- args ----
const args = new Map(
process.argv.slice(2).flatMap((a) => {
if (!a.startsWith("--")) return [];
const [k, v = "true"] = a.replace(/^--/, "").split("=");
return [[k, v]];
})
);
const file = args.get("file");
const dry = args.get("dry") === "true" || args.has("dry");
const concurrency = Number(args.get("concurrency") || 5);
if (!file) {
console.error("Usage: node scripts/backfill-kofi.mjs --file /path/to/kofi.csv [--dry] [--concurrency=5]");
process.exit(1);
}
// ---- helpers ----
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
function addOneMonthPlusOneDay(d) {
const nd = new Date(d);
const m = nd.getMonth();
nd.setMonth(m + 1);
// handle month overflow (e.g., Jan 31 -> Mar 2, fix by going last day of prev month)
if (nd.getMonth() !== ((m + 1) % 12)) nd.setDate(0);
nd.setDate(nd.getDate() + 1);
return nd;
}
function normBool(v) {
if (typeof v === "boolean") return v;
if (v == null) return false;
const s = String(v).trim().toLowerCase();
return s === "true" || s === "1" || s === "yes" || s === "y";
}
function lowerOrNull(v) {
const s = (v ?? "").toString().trim();
return s ? s.toLowerCase() : null;
}
function strOrNull(v) {
const s = (v ?? "").toString().trim();
return s || null;
}
async function upsertOne(rec) {
const email = lowerOrNull(rec.email);
const from_name = strOrNull(rec.from_name);
const discord_userid = strOrNull(rec.discord_userid);
const type = strOrNull(rec.type) || "";
const isSub = normBool(rec.is_subscription_payment) || type.toLowerCase().includes("subscription");
const ts = rec.timestamp ? new Date(rec.timestamp) : null;
const extId =
email ||
discord_userid ||
from_name ||
"unknown";
const started_at =
isSub && ts ? ts.toISOString() : null;
const renews_at =
isSub && ts ? addOneMonthPlusOneDay(ts).toISOString() : null;
const status = isSub ? "active" : "one_time";
const payload = {
provider: "kofi",
external_user_id: extId,
email,
username: from_name,
status,
tier: strOrNull(rec.tier_name),
started_at,
renews_at,
last_event_at: new Date().toISOString(),
// Keep a compact raw for backfill; webhook now stores full JSON in TEXT
raw: JSON.stringify({
src: "backfill",
type,
email,
from_name,
is_subscription_payment: isSub,
is_first_subscription_payment: normBool(rec.is_first_subscription_payment),
timestamp: rec.timestamp || null,
discord_userid: discord_userid || null,
discord_username: strOrNull(rec.discord_username),
amount: strOrNull(rec.amount),
currency: strOrNull(rec.currency),
kofi_transaction_id: strOrNull(rec.kofi_transaction_id),
tier_name: strOrNull(rec.tier_name),
}),
};
// Find existing (same criteria as webhook)
const filter = encodeURIComponent(
JSON.stringify({
_and: [{ provider: { _eq: "kofi" } }],
_or: [
...(email ? [{ email: { _eq: email } }, { external_user_id: { _eq: email } }] : []),
...(discord_userid ? [{ external_user_id: { _eq: discord_userid } }] : []),
...(from_name ? [{ external_user_id: { _eq: from_name } }] : []),
],
})
);
if (dry) {
return { dry: true, will: payload };
}
// read existing
const exRes = await fetch(`${DIRECTUS}/items/${COLLECTION}?filter=${filter}&limit=1`, {
headers: { Authorization: `Bearer ${BOT_TOKEN}` },
cache: "no-store",
});
if (!exRes.ok) {
const errText = await exRes.text().catch(() => "");
throw new Error(`directus_read_failed(${exRes.status}): ${errText}`);
}
const exJson = await exRes.json().catch(() => ({}));
const existing = exJson?.data?.[0] || null;
// For updates with no ts/isSub we won't clobber start/renew — mirror webhook behavior.
const body = { ...payload };
if (existing && (!isSub || !ts)) {
delete body.started_at;
delete body.renews_at;
}
const url = existing
? `${DIRECTUS}/items/${COLLECTION}/${existing.id}`
: `${DIRECTUS}/items/${COLLECTION}`;
const method = existing ? "PATCH" : "POST";
const wr = await fetch(url, {
method,
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${BOT_TOKEN}`,
},
body: JSON.stringify(body),
});
if (!wr.ok) {
const t = await wr.text().catch(() => "");
throw new Error(`directus_write_failed(${wr.status}): ${t}`);
}
const wj = await wr.json().catch(() => ({}));
return { ok: true, method, id: wj?.data?.id || existing?.id || null };
}
async function run() {
const csvBuf = fs.readFileSync(file);
const records = parse(csvBuf, {
columns: true,
skip_empty_lines: true,
bom: true,
relax_column_count: true,
trim: true,
});
console.log(`[backfill] Loaded ${records.length} rows from ${file}`);
console.log(`[backfill] Directus=${DIRECTUS} collection=${COLLECTION} dry=${dry} concurrency=${concurrency}`);
let ok = 0, fail = 0;
let idx = 0;
const pool = new Array(concurrency).fill(0).map(async () => {
while (idx < records.length) {
const i = idx++;
const rec = records[i];
try {
const res = await upsertOne(rec);
if (res.dry) {
console.log(`[dry] row#${i + 1}: would upsert ext="${(res.will.external_user_id)}" email="${res.will.email}" status=${res.will.status}`);
} else {
console.log(`[ok] row#${i + 1}: ${res.method} id=${res.id}`);
}
ok++;
} catch (e) {
console.error(`[err] row#${i + 1}: ${(e && e.message) || e}`);
fail++;
// small backoff to avoid hammering if there's a transient issue
await sleep(150);
}
}
});
await Promise.all(pool);
console.log(`[backfill] done. ok=${ok} fail=${fail}`);
if (fail > 0) process.exitCode = 1;
}
run().catch((e) => {
console.error("[backfill] fatal:", e);
process.exit(1);
});