temp backfill script for ko-fi
This commit is contained in:
parent
42652a02fe
commit
21ee374373
1 changed files with 224 additions and 0 deletions
224
scripts/backfill-kofi.mjs
Normal file
224
scripts/backfill-kofi.mjs
Normal file
|
|
@ -0,0 +1,224 @@
|
|||
// scripts/backfill-kofi.mjs
|
||||
// Usage (inside container):
|
||||
// node scripts/backfill-kofi.mjs --file /data/kofi.csv [--dry] [--concurrency=5]
|
||||
//
|
||||
// CSV expected headers (case-insensitive; unknown columns ignored):
|
||||
// email, from_name, type, timestamp, is_subscription_payment, is_first_subscription_payment,
|
||||
// tier_name, discord_userid, discord_username, amount, currency, kofi_transaction_id
|
||||
//
|
||||
// Upsert strategy: find existing where provider="kofi" and (email == email || external_user_id == email || external_user_id == discord_userid || external_user_id == from_name)
|
||||
|
||||
import fs from "node:fs";
|
||||
import { parse } from "csv-parse/sync";
|
||||
|
||||
// --- envs from the running container ---
|
||||
const DIRECTUS = (process.env.DIRECTUS_URL || "").replace(/\/$/, "");
|
||||
const BOT_TOKEN = process.env.DIRECTUS_TOKEN_ADMIN_SUPPORTER || "";
|
||||
const COLLECTION = "user_memberships";
|
||||
|
||||
if (!DIRECTUS || !BOT_TOKEN) {
|
||||
console.error("[backfill] Missing DIRECTUS_URL or DIRECTUS_TOKEN_ADMIN_SUPPORTER in env.");
|
||||
process.exit(2);
|
||||
}
|
||||
|
||||
// ---- args ----
|
||||
const args = new Map(
|
||||
process.argv.slice(2).flatMap((a) => {
|
||||
if (!a.startsWith("--")) return [];
|
||||
const [k, v = "true"] = a.replace(/^--/, "").split("=");
|
||||
return [[k, v]];
|
||||
})
|
||||
);
|
||||
const file = args.get("file");
|
||||
const dry = args.get("dry") === "true" || args.has("dry");
|
||||
const concurrency = Number(args.get("concurrency") || 5);
|
||||
|
||||
if (!file) {
|
||||
console.error("Usage: node scripts/backfill-kofi.mjs --file /path/to/kofi.csv [--dry] [--concurrency=5]");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// ---- helpers ----
|
||||
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
|
||||
|
||||
function addOneMonthPlusOneDay(d) {
|
||||
const nd = new Date(d);
|
||||
const m = nd.getMonth();
|
||||
nd.setMonth(m + 1);
|
||||
// handle month overflow (e.g., Jan 31 -> Mar 2, fix by going last day of prev month)
|
||||
if (nd.getMonth() !== ((m + 1) % 12)) nd.setDate(0);
|
||||
nd.setDate(nd.getDate() + 1);
|
||||
return nd;
|
||||
}
|
||||
|
||||
function normBool(v) {
|
||||
if (typeof v === "boolean") return v;
|
||||
if (v == null) return false;
|
||||
const s = String(v).trim().toLowerCase();
|
||||
return s === "true" || s === "1" || s === "yes" || s === "y";
|
||||
}
|
||||
|
||||
function lowerOrNull(v) {
|
||||
const s = (v ?? "").toString().trim();
|
||||
return s ? s.toLowerCase() : null;
|
||||
}
|
||||
|
||||
function strOrNull(v) {
|
||||
const s = (v ?? "").toString().trim();
|
||||
return s || null;
|
||||
}
|
||||
|
||||
async function upsertOne(rec) {
|
||||
const email = lowerOrNull(rec.email);
|
||||
const from_name = strOrNull(rec.from_name);
|
||||
const discord_userid = strOrNull(rec.discord_userid);
|
||||
const type = strOrNull(rec.type) || "";
|
||||
const isSub = normBool(rec.is_subscription_payment) || type.toLowerCase().includes("subscription");
|
||||
const ts = rec.timestamp ? new Date(rec.timestamp) : null;
|
||||
|
||||
const extId =
|
||||
email ||
|
||||
discord_userid ||
|
||||
from_name ||
|
||||
"unknown";
|
||||
|
||||
const started_at =
|
||||
isSub && ts ? ts.toISOString() : null;
|
||||
|
||||
const renews_at =
|
||||
isSub && ts ? addOneMonthPlusOneDay(ts).toISOString() : null;
|
||||
|
||||
const status = isSub ? "active" : "one_time";
|
||||
|
||||
const payload = {
|
||||
provider: "kofi",
|
||||
external_user_id: extId,
|
||||
email,
|
||||
username: from_name,
|
||||
status,
|
||||
tier: strOrNull(rec.tier_name),
|
||||
started_at,
|
||||
renews_at,
|
||||
last_event_at: new Date().toISOString(),
|
||||
// Keep a compact raw for backfill; webhook now stores full JSON in TEXT
|
||||
raw: JSON.stringify({
|
||||
src: "backfill",
|
||||
type,
|
||||
email,
|
||||
from_name,
|
||||
is_subscription_payment: isSub,
|
||||
is_first_subscription_payment: normBool(rec.is_first_subscription_payment),
|
||||
timestamp: rec.timestamp || null,
|
||||
discord_userid: discord_userid || null,
|
||||
discord_username: strOrNull(rec.discord_username),
|
||||
amount: strOrNull(rec.amount),
|
||||
currency: strOrNull(rec.currency),
|
||||
kofi_transaction_id: strOrNull(rec.kofi_transaction_id),
|
||||
tier_name: strOrNull(rec.tier_name),
|
||||
}),
|
||||
};
|
||||
|
||||
// Find existing (same criteria as webhook)
|
||||
const filter = encodeURIComponent(
|
||||
JSON.stringify({
|
||||
_and: [{ provider: { _eq: "kofi" } }],
|
||||
_or: [
|
||||
...(email ? [{ email: { _eq: email } }, { external_user_id: { _eq: email } }] : []),
|
||||
...(discord_userid ? [{ external_user_id: { _eq: discord_userid } }] : []),
|
||||
...(from_name ? [{ external_user_id: { _eq: from_name } }] : []),
|
||||
],
|
||||
})
|
||||
);
|
||||
|
||||
if (dry) {
|
||||
return { dry: true, will: payload };
|
||||
}
|
||||
|
||||
// read existing
|
||||
const exRes = await fetch(`${DIRECTUS}/items/${COLLECTION}?filter=${filter}&limit=1`, {
|
||||
headers: { Authorization: `Bearer ${BOT_TOKEN}` },
|
||||
cache: "no-store",
|
||||
});
|
||||
if (!exRes.ok) {
|
||||
const errText = await exRes.text().catch(() => "");
|
||||
throw new Error(`directus_read_failed(${exRes.status}): ${errText}`);
|
||||
}
|
||||
const exJson = await exRes.json().catch(() => ({}));
|
||||
const existing = exJson?.data?.[0] || null;
|
||||
|
||||
// For updates with no ts/isSub we won't clobber start/renew — mirror webhook behavior.
|
||||
const body = { ...payload };
|
||||
if (existing && (!isSub || !ts)) {
|
||||
delete body.started_at;
|
||||
delete body.renews_at;
|
||||
}
|
||||
|
||||
const url = existing
|
||||
? `${DIRECTUS}/items/${COLLECTION}/${existing.id}`
|
||||
: `${DIRECTUS}/items/${COLLECTION}`;
|
||||
const method = existing ? "PATCH" : "POST";
|
||||
|
||||
const wr = await fetch(url, {
|
||||
method,
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Authorization: `Bearer ${BOT_TOKEN}`,
|
||||
},
|
||||
body: JSON.stringify(body),
|
||||
});
|
||||
|
||||
if (!wr.ok) {
|
||||
const t = await wr.text().catch(() => "");
|
||||
throw new Error(`directus_write_failed(${wr.status}): ${t}`);
|
||||
}
|
||||
const wj = await wr.json().catch(() => ({}));
|
||||
return { ok: true, method, id: wj?.data?.id || existing?.id || null };
|
||||
}
|
||||
|
||||
async function run() {
|
||||
const csvBuf = fs.readFileSync(file);
|
||||
const records = parse(csvBuf, {
|
||||
columns: true,
|
||||
skip_empty_lines: true,
|
||||
bom: true,
|
||||
relax_column_count: true,
|
||||
trim: true,
|
||||
});
|
||||
|
||||
console.log(`[backfill] Loaded ${records.length} rows from ${file}`);
|
||||
console.log(`[backfill] Directus=${DIRECTUS} collection=${COLLECTION} dry=${dry} concurrency=${concurrency}`);
|
||||
|
||||
let ok = 0, fail = 0;
|
||||
let idx = 0;
|
||||
|
||||
const pool = new Array(concurrency).fill(0).map(async () => {
|
||||
while (idx < records.length) {
|
||||
const i = idx++;
|
||||
const rec = records[i];
|
||||
try {
|
||||
const res = await upsertOne(rec);
|
||||
if (res.dry) {
|
||||
console.log(`[dry] row#${i + 1}: would upsert ext="${(res.will.external_user_id)}" email="${res.will.email}" status=${res.will.status}`);
|
||||
} else {
|
||||
console.log(`[ok] row#${i + 1}: ${res.method} id=${res.id}`);
|
||||
}
|
||||
ok++;
|
||||
} catch (e) {
|
||||
console.error(`[err] row#${i + 1}: ${(e && e.message) || e}`);
|
||||
fail++;
|
||||
// small backoff to avoid hammering if there's a transient issue
|
||||
await sleep(150);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
await Promise.all(pool);
|
||||
console.log(`[backfill] done. ok=${ok} fail=${fail}`);
|
||||
if (fail > 0) process.exitCode = 1;
|
||||
}
|
||||
|
||||
run().catch((e) => {
|
||||
console.error("[backfill] fatal:", e);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue