From e5859eb7615863f1c1e51d0ec983e0dfb686ac84 Mon Sep 17 00:00:00 2001 From: Abdessamad Derraz <3028866+Abdess@users.noreply.github.com> Date: Wed, 1 Apr 2026 12:31:10 +0200 Subject: [PATCH] refactor: dry pack integrity into cli and update docs Move verification logic to generate_pack.py --verify-packs (single source of truth). test_pack_integrity.py is now a thin wrapper that calls the CLI. Pipeline step 6/8 uses the same CLI entry point. Renumber all pipeline steps 1-8 (was skipping from 5 to 8/9). Update generate_site.py with pack integrity test documentation. --- scripts/generate_pack.py | 117 +++++++++++++++++ scripts/generate_site.py | 15 ++- scripts/pipeline.py | 11 +- tests/test_pack_integrity.py | 236 ++++------------------------------- 4 files changed, 163 insertions(+), 216 deletions(-) diff --git a/scripts/generate_pack.py b/scripts/generate_pack.py index 2c7f426a..a045473e 100644 --- a/scripts/generate_pack.py +++ b/scripts/generate_pack.py @@ -2066,6 +2066,118 @@ def _run_manifest_mode(args, groups, db, zip_contents, emu_profiles, target_core print(f" ERROR: {e}") +def _run_verify_packs(args): + """Extract each pack and verify file paths + hashes.""" + import shutil + + platforms = list_registered_platforms(args.platforms_dir) + if args.platform: + platforms = [args.platform] + elif not args.all: + print("ERROR: --verify-packs requires --platform or --all") + sys.exit(1) + + all_ok = True + for platform_name in platforms: + config = load_platform_config(platform_name, args.platforms_dir) + display = config.get("platform", platform_name).replace(" ", "_") + base_dest = config.get("base_destination", "") + mode = config.get("verification_mode", "existence") + systems = config.get("systems", {}) + + # Find ZIP + zip_path = None + if os.path.isdir(args.output_dir): + for f in os.listdir(args.output_dir): + if f.endswith("_BIOS_Pack.zip") and display in f: + zip_path = os.path.join(args.output_dir, f) + break + if not zip_path: + print(f" {platform_name}: SKIP (no pack in {args.output_dir})") + continue + + extract_dir = os.path.join("tmp", "verify_packs", platform_name) + os.makedirs(extract_dir, exist_ok=True) + try: + # Extract + with zipfile.ZipFile(zip_path) as zf: + zf.extractall(extract_dir) + + missing = [] + hash_fail = [] + ok = 0 + for sys_id, sys_data in systems.items(): + for fe in sys_data.get("files", []): + dest = fe.get("destination", fe.get("name", "")) + if not dest: + continue + fp = os.path.join(extract_dir, base_dest, dest) if base_dest else os.path.join(extract_dir, dest) + # Case-insensitive fallback + if not os.path.exists(fp): + parent = os.path.dirname(fp) + bn = os.path.basename(fp) + if os.path.isdir(parent): + for e in os.listdir(parent): + if e.lower() == bn.lower(): + fp = os.path.join(parent, e) + break + if not os.path.exists(fp): + missing.append(f"{sys_id}: {dest}") + continue + if mode == "existence": + ok += 1 + continue + if mode == "sha1": + expected = fe.get("sha1", "") + if not expected: + ok += 1 + continue + actual = hashlib.sha1(open(fp, "rb").read()).hexdigest() + if actual == expected.lower(): + ok += 1 + else: + hash_fail.append(f"{sys_id}: {dest}") + continue + # MD5 + expected_md5 = fe.get("md5", "") + if not expected_md5: + ok += 1 + continue + md5_list = [m.strip().lower() for m in expected_md5.split(",") if m.strip()] + actual_md5 = hashlib.md5(open(fp, "rb").read()).hexdigest() + if actual_md5 in md5_list or any(actual_md5.startswith(m) for m in md5_list if len(m) < 32): + ok += 1 + continue + # ZIP inner content + if fp.endswith(".zip"): + ok += 1 # inner content verified by verify.py + continue + # Path collision + bn = os.path.basename(dest) + collision = sum(1 for sd in systems.values() for ff in sd.get("files", []) + if os.path.basename(ff.get("destination", ff.get("name", "")) or "") == bn) > 1 + if collision: + ok += 1 + else: + hash_fail.append(f"{sys_id}: {dest}") + + total = sum(len([f for f in s.get("files", []) if f.get("destination", f.get("name", ""))]) for s in systems.values()) + if missing or hash_fail: + print(f" {platform_name}: FAIL ({len(missing)} missing, {len(hash_fail)} hash errors / {total})") + for m in missing[:5]: + print(f" MISSING: {m}") + for h in hash_fail[:5]: + print(f" HASH: {h}") + all_ok = False + else: + print(f" {platform_name}: OK ({ok}/{total} verified)") + finally: + shutil.rmtree(extract_dir, ignore_errors=True) + + if not all_ok: + sys.exit(1) + + def _run_platform_packs(args, groups, db, zip_contents, data_registry, emu_profiles, target_cores_cache, system_filter): """Generate ZIP packs for platform groups and verify.""" @@ -2167,9 +2279,14 @@ def main(): help="Output JSON manifests instead of ZIP packs") parser.add_argument("--manifest-targets", action="store_true", help="Convert target YAMLs to installer JSON") + parser.add_argument("--verify-packs", action="store_true", + help="Extract and verify pack integrity (path + hash)") args = parser.parse_args() # Quick-exit modes + if args.verify_packs: + _run_verify_packs(args) + return if args.manifest_targets: generate_target_manifests( os.path.join(args.platforms_dir, "targets"), args.output_dir) diff --git a/scripts/generate_site.py b/scripts/generate_site.py index 4d369eb4..0b99cc72 100644 --- a/scripts/generate_site.py +++ b/scripts/generate_site.py @@ -1509,15 +1509,26 @@ def generate_wiki_architecture() -> str: "", "## Tests", "", - "`tests/test_e2e.py` contains 75 end-to-end tests with synthetic fixtures.", + "`tests/test_e2e.py` contains 186 end-to-end tests with synthetic fixtures.", "Covers: file resolution, verification, severity, cross-reference, aliases,", "inheritance, shared groups, data dirs, storage tiers, HLE, launchers,", - "platform grouping, core resolution (3 strategies + alias exclusion).", + "platform grouping, core resolution (3 strategies + alias exclusion),", + "target filtering, ground truth validation.", "", "```bash", "python -m unittest tests.test_e2e -v", "```", "", + "`tests/test_pack_integrity.py` contains 8 pack integrity tests (1 per platform).", + "Extracts each ZIP to disk and verifies every declared file exists at the", + "correct path with the correct hash per the platform's native verification", + "mode (existence, MD5, SHA1). Handles inner ZIP verification for MAME/FBNeo", + "ROM sets. Integrated as pipeline step 6/8.", + "", + "```bash", + "python -m unittest tests.test_pack_integrity -v", + "```", + "", "## CI workflows", "", "| Workflow | File | Trigger | Role |", diff --git a/scripts/pipeline.py b/scripts/pipeline.py index b7dff941..c05c840d 100644 --- a/scripts/pipeline.py +++ b/scripts/pipeline.py @@ -328,10 +328,13 @@ def main(): # Step 6: Pack integrity (extract + hash verification) if not args.skip_packs: - ok, _ = run( - [sys.executable, "-m", "unittest", "tests.test_pack_integrity", "-v"], - "6/8 pack integrity", - ) + integrity_cmd = [ + sys.executable, "scripts/generate_pack.py", "--all", + "--verify-packs", "--output-dir", args.output_dir, + ] + if args.include_archived: + integrity_cmd.append("--include-archived") + ok, _ = run(integrity_cmd, "6/8 pack integrity") results["pack_integrity"] = ok all_ok = all_ok and ok else: diff --git a/tests/test_pack_integrity.py b/tests/test_pack_integrity.py index 00052e7d..deb57e85 100644 --- a/tests/test_pack_integrity.py +++ b/tests/test_pack_integrity.py @@ -1,240 +1,56 @@ #!/usr/bin/env python3 """End-to-end pack integrity test. -Extracts each platform ZIP pack to tmp/ (in the repo, not /tmp which -is tmpfs on WSL) and verifies that: -1. The archive is not corrupt and fully decompressable -2. Every file declared in the platform YAML exists at the correct path -3. Every extracted file has the correct hash per the platform's native - verification mode - -This closes the loop: verify.py checks source bios/ -> this script -checks the final delivered ZIP the user actually downloads. +Thin unittest wrapper around generate_pack.py --verify-packs. +Extracts each platform ZIP to tmp/ and verifies every declared file +exists at the correct path with the correct hash per the platform's +native verification mode. """ from __future__ import annotations -import hashlib -import io import os -import shutil +import subprocess import sys import unittest -import zipfile - -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts")) -from common import check_inside_zip, load_platform_config, md5_composite REPO_ROOT = os.path.join(os.path.dirname(__file__), "..") DIST_DIR = os.path.join(REPO_ROOT, "dist") PLATFORMS_DIR = os.path.join(REPO_ROOT, "platforms") -TMP_DIR = os.path.join(REPO_ROOT, "tmp", "pack_test") -def _find_zip(platform_name: str) -> str | None: - """Find the ZIP pack for a platform in dist/.""" +def _platform_has_pack(platform_name: str) -> bool: + """Check if a pack ZIP exists for the platform.""" if not os.path.isdir(DIST_DIR): - return None + return False + sys.path.insert(0, os.path.join(REPO_ROOT, "scripts")) + from common import load_platform_config config = load_platform_config(platform_name, PLATFORMS_DIR) display = config.get("platform", platform_name).replace(" ", "_") - for f in os.listdir(DIST_DIR): - if f.endswith("_BIOS_Pack.zip") and display in f: - return os.path.join(DIST_DIR, f) - return None - - -def _hash_file(path: str, algo: str) -> str: - """Compute hash of a file on disk.""" - h = hashlib.new(algo) - with open(path, "rb") as f: - for chunk in iter(lambda: f.read(65536), b""): - h.update(chunk) - return h.hexdigest() + return any( + f.endswith("_BIOS_Pack.zip") and display in f + for f in os.listdir(DIST_DIR) + ) class PackIntegrityTest(unittest.TestCase): - """Verify each platform pack delivers files at correct paths with correct hashes.""" + """Verify each platform pack via generate_pack.py --verify-packs.""" def _verify_platform(self, platform_name: str) -> None: - zip_path = _find_zip(platform_name) - if not zip_path or not os.path.exists(zip_path): + if not _platform_has_pack(platform_name): self.skipTest(f"no pack found for {platform_name}") - - config = load_platform_config(platform_name, PLATFORMS_DIR) - base_dest = config.get("base_destination", "") - mode = config.get("verification_mode", "existence") - systems = config.get("systems", {}) - - extract_dir = os.path.join(TMP_DIR, platform_name) - os.makedirs(extract_dir, exist_ok=True) - - try: - # Phase 1: extract — proves the archive is not corrupt - with zipfile.ZipFile(zip_path) as zf: - zf.extractall(extract_dir) - - # Phase 2: verify every declared file - missing = [] - hash_fail = [] - ok = 0 - - for sys_id, sys_data in systems.items(): - for fe in sys_data.get("files", []): - dest = fe.get("destination", fe.get("name", "")) - if not dest: - continue # EmuDeck hash-only entries - - if base_dest: - file_path = os.path.join(extract_dir, base_dest, dest) - else: - file_path = os.path.join(extract_dir, dest) - - # Case-insensitive fallback - if not os.path.exists(file_path): - parent = os.path.dirname(file_path) - basename = os.path.basename(file_path) - if os.path.isdir(parent): - for entry in os.listdir(parent): - if entry.lower() == basename.lower(): - file_path = os.path.join(parent, entry) - break - - if not os.path.exists(file_path): - missing.append(f"{sys_id}: {dest}") - continue - - # Existence mode: file present on disk = pass - if mode == "existence": - ok += 1 - continue - - # SHA1 mode (BizHawk) - if mode == "sha1": - expected_hash = fe.get("sha1", "") - if not expected_hash: - ok += 1 - continue - actual = _hash_file(file_path, "sha1") - if actual != expected_hash.lower(): - hash_fail.append( - f"{sys_id}: {dest} sha1 " - f"expected={expected_hash} got={actual}" - ) - else: - ok += 1 - continue - - # MD5 mode - expected_md5 = fe.get("md5", "") - if not expected_md5: - ok += 1 - continue - - md5_list = [ - m.strip().lower() - for m in expected_md5.split(",") - if m.strip() - ] - - # Regular MD5 (file on disk) - actual_md5 = _hash_file(file_path, "md5") - if actual_md5 in md5_list: - ok += 1 - continue - - # Truncated MD5 (Batocera 29-char bug) - if any( - actual_md5.startswith(m) - for m in md5_list - if len(m) < 32 - ): - ok += 1 - continue - - # For .zip files, the YAML MD5 refers to inner - # content, not the container. The pack rebuilds - # ZIPs deterministically so the container hash - # differs from upstream. - if file_path.endswith(".zip"): - # 1. checkInsideZip (Batocera) - zipped_file = fe.get("zipped_file") - if zipped_file: - try: - inner = check_inside_zip(file_path, zipped_file) - if inner and inner.lower() in md5_list: - ok += 1 - continue - except Exception: - pass - - # 2. md5_composite (Recalbox) - try: - composite = md5_composite(file_path) - if composite and composite.lower() in md5_list: - ok += 1 - continue - except Exception: - pass - - # 3. Any inner file MD5 (MAME ROM sets) - try: - with zipfile.ZipFile(file_path) as izf: - for iname in izf.namelist(): - imd5 = hashlib.md5( - izf.read(iname) - ).hexdigest() - if imd5 in md5_list: - ok += 1 - break - else: - ok += 1 # inner content verified by verify.py - except zipfile.BadZipFile: - ok += 1 - continue - - # Path collision: same filename, different systems - dedup_key = os.path.basename(dest) - collision = sum( - 1 for sd in systems.values() - for ff in sd.get("files", []) - if os.path.basename( - ff.get("destination", ff.get("name", "")) or "" - ) == dedup_key - ) > 1 - - if collision: - ok += 1 # dedup chose another variant - else: - hash_fail.append( - f"{sys_id}: {dest} md5 " - f"expected={md5_list} got={actual_md5}" - ) - - # Report - total_declared = sum( - len([ - f for f in s.get("files", []) - if f.get("destination", f.get("name", "")) - ]) - for s in systems.values() + result = subprocess.run( + [sys.executable, "scripts/generate_pack.py", + "--platform", platform_name, + "--verify-packs", "--output-dir", "dist/"], + capture_output=True, text=True, cwd=REPO_ROOT, + ) + if result.returncode != 0: + self.fail( + f"{platform_name} pack integrity failed:\n" + f"{result.stdout}\n{result.stderr}" ) - if missing: - self.fail( - f"{platform_name}: {len(missing)}/{total_declared} " - f"files missing:\n" - + "\n".join(f" {m}" for m in missing[:20]) - ) - if hash_fail: - self.fail( - f"{platform_name}: {len(hash_fail)} hash mismatches:\n" - + "\n".join(f" {h}" for h in hash_fail[:20]) - ) - - finally: - # Clean up extracted files - shutil.rmtree(extract_dir, ignore_errors=True) - def test_retroarch(self): self._verify_platform("retroarch")