225 lines
6.8 KiB
JavaScript
225 lines
6.8 KiB
JavaScript
|
|
// scripts/backfill-kofi.mjs
|
||
|
|
// Usage (inside container):
|
||
|
|
// node scripts/backfill-kofi.mjs --file /data/kofi.csv [--dry] [--concurrency=5]
|
||
|
|
//
|
||
|
|
// CSV expected headers (case-insensitive; unknown columns ignored):
|
||
|
|
// email, from_name, type, timestamp, is_subscription_payment, is_first_subscription_payment,
|
||
|
|
// tier_name, discord_userid, discord_username, amount, currency, kofi_transaction_id
|
||
|
|
//
|
||
|
|
// Upsert strategy: find existing where provider="kofi" and (email == email || external_user_id == email || external_user_id == discord_userid || external_user_id == from_name)
|
||
|
|
|
||
|
|
import fs from "node:fs";
|
||
|
|
import { parse } from "csv-parse/sync";
|
||
|
|
|
||
|
|
// --- envs from the running container ---
|
||
|
|
const DIRECTUS = (process.env.DIRECTUS_URL || "").replace(/\/$/, "");
|
||
|
|
const BOT_TOKEN = process.env.DIRECTUS_TOKEN_ADMIN_SUPPORTER || "";
|
||
|
|
const COLLECTION = "user_memberships";
|
||
|
|
|
||
|
|
if (!DIRECTUS || !BOT_TOKEN) {
|
||
|
|
console.error("[backfill] Missing DIRECTUS_URL or DIRECTUS_TOKEN_ADMIN_SUPPORTER in env.");
|
||
|
|
process.exit(2);
|
||
|
|
}
|
||
|
|
|
||
|
|
// ---- args ----
|
||
|
|
const args = new Map(
|
||
|
|
process.argv.slice(2).flatMap((a) => {
|
||
|
|
if (!a.startsWith("--")) return [];
|
||
|
|
const [k, v = "true"] = a.replace(/^--/, "").split("=");
|
||
|
|
return [[k, v]];
|
||
|
|
})
|
||
|
|
);
|
||
|
|
const file = args.get("file");
|
||
|
|
const dry = args.get("dry") === "true" || args.has("dry");
|
||
|
|
const concurrency = Number(args.get("concurrency") || 5);
|
||
|
|
|
||
|
|
if (!file) {
|
||
|
|
console.error("Usage: node scripts/backfill-kofi.mjs --file /path/to/kofi.csv [--dry] [--concurrency=5]");
|
||
|
|
process.exit(1);
|
||
|
|
}
|
||
|
|
|
||
|
|
// ---- helpers ----
|
||
|
|
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
|
||
|
|
|
||
|
|
function addOneMonthPlusOneDay(d) {
|
||
|
|
const nd = new Date(d);
|
||
|
|
const m = nd.getMonth();
|
||
|
|
nd.setMonth(m + 1);
|
||
|
|
// handle month overflow (e.g., Jan 31 -> Mar 2, fix by going last day of prev month)
|
||
|
|
if (nd.getMonth() !== ((m + 1) % 12)) nd.setDate(0);
|
||
|
|
nd.setDate(nd.getDate() + 1);
|
||
|
|
return nd;
|
||
|
|
}
|
||
|
|
|
||
|
|
function normBool(v) {
|
||
|
|
if (typeof v === "boolean") return v;
|
||
|
|
if (v == null) return false;
|
||
|
|
const s = String(v).trim().toLowerCase();
|
||
|
|
return s === "true" || s === "1" || s === "yes" || s === "y";
|
||
|
|
}
|
||
|
|
|
||
|
|
function lowerOrNull(v) {
|
||
|
|
const s = (v ?? "").toString().trim();
|
||
|
|
return s ? s.toLowerCase() : null;
|
||
|
|
}
|
||
|
|
|
||
|
|
function strOrNull(v) {
|
||
|
|
const s = (v ?? "").toString().trim();
|
||
|
|
return s || null;
|
||
|
|
}
|
||
|
|
|
||
|
|
async function upsertOne(rec) {
|
||
|
|
const email = lowerOrNull(rec.email);
|
||
|
|
const from_name = strOrNull(rec.from_name);
|
||
|
|
const discord_userid = strOrNull(rec.discord_userid);
|
||
|
|
const type = strOrNull(rec.type) || "";
|
||
|
|
const isSub = normBool(rec.is_subscription_payment) || type.toLowerCase().includes("subscription");
|
||
|
|
const ts = rec.timestamp ? new Date(rec.timestamp) : null;
|
||
|
|
|
||
|
|
const extId =
|
||
|
|
email ||
|
||
|
|
discord_userid ||
|
||
|
|
from_name ||
|
||
|
|
"unknown";
|
||
|
|
|
||
|
|
const started_at =
|
||
|
|
isSub && ts ? ts.toISOString() : null;
|
||
|
|
|
||
|
|
const renews_at =
|
||
|
|
isSub && ts ? addOneMonthPlusOneDay(ts).toISOString() : null;
|
||
|
|
|
||
|
|
const status = isSub ? "active" : "one_time";
|
||
|
|
|
||
|
|
const payload = {
|
||
|
|
provider: "kofi",
|
||
|
|
external_user_id: extId,
|
||
|
|
email,
|
||
|
|
username: from_name,
|
||
|
|
status,
|
||
|
|
tier: strOrNull(rec.tier_name),
|
||
|
|
started_at,
|
||
|
|
renews_at,
|
||
|
|
last_event_at: new Date().toISOString(),
|
||
|
|
// Keep a compact raw for backfill; webhook now stores full JSON in TEXT
|
||
|
|
raw: JSON.stringify({
|
||
|
|
src: "backfill",
|
||
|
|
type,
|
||
|
|
email,
|
||
|
|
from_name,
|
||
|
|
is_subscription_payment: isSub,
|
||
|
|
is_first_subscription_payment: normBool(rec.is_first_subscription_payment),
|
||
|
|
timestamp: rec.timestamp || null,
|
||
|
|
discord_userid: discord_userid || null,
|
||
|
|
discord_username: strOrNull(rec.discord_username),
|
||
|
|
amount: strOrNull(rec.amount),
|
||
|
|
currency: strOrNull(rec.currency),
|
||
|
|
kofi_transaction_id: strOrNull(rec.kofi_transaction_id),
|
||
|
|
tier_name: strOrNull(rec.tier_name),
|
||
|
|
}),
|
||
|
|
};
|
||
|
|
|
||
|
|
// Find existing (same criteria as webhook)
|
||
|
|
const filter = encodeURIComponent(
|
||
|
|
JSON.stringify({
|
||
|
|
_and: [{ provider: { _eq: "kofi" } }],
|
||
|
|
_or: [
|
||
|
|
...(email ? [{ email: { _eq: email } }, { external_user_id: { _eq: email } }] : []),
|
||
|
|
...(discord_userid ? [{ external_user_id: { _eq: discord_userid } }] : []),
|
||
|
|
...(from_name ? [{ external_user_id: { _eq: from_name } }] : []),
|
||
|
|
],
|
||
|
|
})
|
||
|
|
);
|
||
|
|
|
||
|
|
if (dry) {
|
||
|
|
return { dry: true, will: payload };
|
||
|
|
}
|
||
|
|
|
||
|
|
// read existing
|
||
|
|
const exRes = await fetch(`${DIRECTUS}/items/${COLLECTION}?filter=${filter}&limit=1`, {
|
||
|
|
headers: { Authorization: `Bearer ${BOT_TOKEN}` },
|
||
|
|
cache: "no-store",
|
||
|
|
});
|
||
|
|
if (!exRes.ok) {
|
||
|
|
const errText = await exRes.text().catch(() => "");
|
||
|
|
throw new Error(`directus_read_failed(${exRes.status}): ${errText}`);
|
||
|
|
}
|
||
|
|
const exJson = await exRes.json().catch(() => ({}));
|
||
|
|
const existing = exJson?.data?.[0] || null;
|
||
|
|
|
||
|
|
// For updates with no ts/isSub we won't clobber start/renew — mirror webhook behavior.
|
||
|
|
const body = { ...payload };
|
||
|
|
if (existing && (!isSub || !ts)) {
|
||
|
|
delete body.started_at;
|
||
|
|
delete body.renews_at;
|
||
|
|
}
|
||
|
|
|
||
|
|
const url = existing
|
||
|
|
? `${DIRECTUS}/items/${COLLECTION}/${existing.id}`
|
||
|
|
: `${DIRECTUS}/items/${COLLECTION}`;
|
||
|
|
const method = existing ? "PATCH" : "POST";
|
||
|
|
|
||
|
|
const wr = await fetch(url, {
|
||
|
|
method,
|
||
|
|
headers: {
|
||
|
|
"Content-Type": "application/json",
|
||
|
|
Authorization: `Bearer ${BOT_TOKEN}`,
|
||
|
|
},
|
||
|
|
body: JSON.stringify(body),
|
||
|
|
});
|
||
|
|
|
||
|
|
if (!wr.ok) {
|
||
|
|
const t = await wr.text().catch(() => "");
|
||
|
|
throw new Error(`directus_write_failed(${wr.status}): ${t}`);
|
||
|
|
}
|
||
|
|
const wj = await wr.json().catch(() => ({}));
|
||
|
|
return { ok: true, method, id: wj?.data?.id || existing?.id || null };
|
||
|
|
}
|
||
|
|
|
||
|
|
async function run() {
|
||
|
|
const csvBuf = fs.readFileSync(file);
|
||
|
|
const records = parse(csvBuf, {
|
||
|
|
columns: true,
|
||
|
|
skip_empty_lines: true,
|
||
|
|
bom: true,
|
||
|
|
relax_column_count: true,
|
||
|
|
trim: true,
|
||
|
|
});
|
||
|
|
|
||
|
|
console.log(`[backfill] Loaded ${records.length} rows from ${file}`);
|
||
|
|
console.log(`[backfill] Directus=${DIRECTUS} collection=${COLLECTION} dry=${dry} concurrency=${concurrency}`);
|
||
|
|
|
||
|
|
let ok = 0, fail = 0;
|
||
|
|
let idx = 0;
|
||
|
|
|
||
|
|
const pool = new Array(concurrency).fill(0).map(async () => {
|
||
|
|
while (idx < records.length) {
|
||
|
|
const i = idx++;
|
||
|
|
const rec = records[i];
|
||
|
|
try {
|
||
|
|
const res = await upsertOne(rec);
|
||
|
|
if (res.dry) {
|
||
|
|
console.log(`[dry] row#${i + 1}: would upsert ext="${(res.will.external_user_id)}" email="${res.will.email}" status=${res.will.status}`);
|
||
|
|
} else {
|
||
|
|
console.log(`[ok] row#${i + 1}: ${res.method} id=${res.id}`);
|
||
|
|
}
|
||
|
|
ok++;
|
||
|
|
} catch (e) {
|
||
|
|
console.error(`[err] row#${i + 1}: ${(e && e.message) || e}`);
|
||
|
|
fail++;
|
||
|
|
// small backoff to avoid hammering if there's a transient issue
|
||
|
|
await sleep(150);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
});
|
||
|
|
|
||
|
|
await Promise.all(pool);
|
||
|
|
console.log(`[backfill] done. ok=${ok} fail=${fail}`);
|
||
|
|
if (fail > 0) process.exitCode = 1;
|
||
|
|
}
|
||
|
|
|
||
|
|
run().catch((e) => {
|
||
|
|
console.error("[backfill] fatal:", e);
|
||
|
|
process.exit(1);
|
||
|
|
});
|
||
|
|
|