From 427fef5669bc43d6ea0e82c28ced2657e401e380 Mon Sep 17 00:00:00 2001 From: Abdessamad Derraz <3028866+Abdess@users.noreply.github.com> Date: Mon, 30 Mar 2026 21:35:41 +0200 Subject: [PATCH] fix: text-based YAML patching preserves formatting replace yaml.dump with surgical text edits for contents/source_ref. preserves comments, block scalars, quoting, indentation. fix FBNeo new entry detection using parsed keys instead of text search. --- scripts/scraper/_hash_merge.py | 225 ++++++++++++++++++++++++-- scripts/scraper/fbneo_hash_scraper.py | 4 +- 2 files changed, 218 insertions(+), 11 deletions(-) diff --git a/scripts/scraper/_hash_merge.py b/scripts/scraper/_hash_merge.py index 76734e16..c1faade3 100644 --- a/scripts/scraper/_hash_merge.py +++ b/scripts/scraper/_hash_merge.py @@ -155,7 +155,7 @@ def merge_fbneo_profile( profile['files'] = non_archive + merged if write: - _backup_and_write(profile_path, profile) + _backup_and_write_fbneo(profile_path, profile, hashes) return profile @@ -345,14 +345,221 @@ def _contents_differ(old: list[dict], new: list[dict]) -> bool: def _backup_and_write(path: str, data: dict) -> None: + """Write merged profile using text-based patching to preserve formatting. + + Instead of yaml.dump (which destroys comments, quoting, indentation), + this reads the original file as text, patches specific fields + (core_version, contents, source_ref), and appends new entries. + """ p = Path(path) backup = p.with_suffix('.old.yml') shutil.copy2(p, backup) - with open(p, 'w', encoding='utf-8') as f: - yaml.dump( - data, - f, - default_flow_style=False, - allow_unicode=True, - sort_keys=False, - ) + + original = p.read_text(encoding='utf-8') + patched = _patch_core_version(original, data.get('core_version', '')) + patched = _patch_bios_entries(patched, data.get('files', [])) + patched = _append_new_entries(patched, data.get('files', []), original) + + p.write_text(patched, encoding='utf-8') + + +def _patch_core_version(text: str, version: str) -> str: + """Replace core_version value in-place.""" + if not version: + return text + import re + return re.sub( + r'^(core_version:\s*).*$', + rf'\g<1>"{version}"', + text, + count=1, + flags=re.MULTILINE, + ) + + +def _patch_bios_entries(text: str, files: list[dict]) -> str: + """Patch contents and source_ref for existing bios_zip entries in-place. + + Processes entries in reverse order to preserve line offsets. + Each entry's "owned" lines are: the `- name:` line plus all indented + lines that follow (4+ spaces), stopping at blank lines, comments, + or the next `- name:`. + """ + import re + + # Build a lookup of what to patch + patches: dict[str, dict] = {} + for fe in files: + if fe.get('category') != 'bios_zip': + continue + patches[fe['name']] = fe + + if not patches: + return text + + lines = text.split('\n') + # Find all entry start positions (line indices) + entry_starts: list[tuple[int, str]] = [] + for i, line in enumerate(lines): + m = re.match(r'^ - name:\s*(.+?)\s*$', line) + if m: + entry_starts.append((i, m.group(1).strip('"').strip("'"))) + + # Process in reverse so line insertions don't shift indices + for idx in range(len(entry_starts) - 1, -1, -1): + start_line, entry_name = entry_starts[idx] + if entry_name not in patches: + continue + + fe = patches[entry_name] + contents = fe.get('contents', []) + source_ref = fe.get('source_ref', '') + + # Find the last "owned" line of this entry + # Owned = indented with 4+ spaces (field lines of this entry) + last_owned = start_line + for j in range(start_line + 1, len(lines)): + stripped = lines[j].strip() + if not stripped: + break # blank line = end of entry + if stripped.startswith('#'): + break # comment = belongs to next entry + if re.match(r'^ - ', lines[j]): + break # next list item + if re.match(r'^ ', lines[j]) or re.match(r'^ \w', lines[j]): + last_owned = j + else: + break + + # Patch source_ref in-place + if source_ref: + found_sr = False + for j in range(start_line + 1, last_owned + 1): + if re.match(r'^ source_ref:', lines[j]): + lines[j] = f' source_ref: "{source_ref}"' + found_sr = True + break + if not found_sr: + lines.insert(last_owned + 1, f' source_ref: "{source_ref}"') + last_owned += 1 + + # Remove existing contents block if present + contents_start = None + contents_end = None + for j in range(start_line + 1, last_owned + 1): + if re.match(r'^ contents:', lines[j]): + contents_start = j + elif contents_start is not None: + if re.match(r'^ ', lines[j]): + contents_end = j + else: + break + if contents_end is None and contents_start is not None: + contents_end = contents_start + + if contents_start is not None: + del lines[contents_start:contents_end + 1] + last_owned -= (contents_end - contents_start + 1) + + # Insert new contents after last owned line + if contents: + new_lines = _format_contents(contents).split('\n') + for k, cl in enumerate(new_lines): + lines.insert(last_owned + 1 + k, cl) + + return '\n'.join(lines) + + +def _append_new_entries(text: str, files: list[dict], original: str) -> str: + """Append new bios_zip entries (system=None) that aren't in the original.""" + # Parse original to get existing entry names (more reliable than text search) + existing_data = yaml.safe_load(original) or {} + existing_names = {f['name'] for f in existing_data.get('files', [])} + + new_entries = [] + for fe in files: + if fe.get('category') != 'bios_zip' or fe.get('system') is not None: + continue + if fe['name'] in existing_names: + continue + new_entries.append(fe) + + if not new_entries: + return text + + lines = [] + for fe in new_entries: + lines.append(f'\n - name: {fe["name"]}') + lines.append(f' required: {str(fe["required"]).lower()}') + lines.append(f' category: bios_zip') + if fe.get('source_ref'): + lines.append(f' source_ref: "{fe["source_ref"]}"') + if fe.get('contents'): + lines.append(_format_contents(fe['contents'])) + + if lines: + text = text.rstrip('\n') + '\n' + '\n'.join(lines) + '\n' + + return text + + +def _format_contents(contents: list[dict]) -> str: + """Format a contents list as YAML text.""" + lines = [' contents:'] + for rom in contents: + lines.append(f' - name: {rom["name"]}') + if rom.get('description'): + lines.append(f' description: {rom["description"]}') + if rom.get('size'): + lines.append(f' size: {rom["size"]}') + if rom.get('crc32'): + lines.append(f' crc32: "{rom["crc32"]}"') + if rom.get('sha1'): + lines.append(f' sha1: "{rom["sha1"]}"') + if rom.get('bad_dump'): + lines.append(f' bad_dump: true') + return '\n'.join(lines) + + +def _backup_and_write_fbneo(path: str, data: dict, hashes: dict) -> None: + """Write merged FBNeo profile using text-based patching. + + FBNeo profiles have individual ROM entries with archive: field. + Only patches core_version and appends new ROM entries. + Existing entries are left untouched (CRC32 changes are rare). + """ + p = Path(path) + backup = p.with_suffix('.old.yml') + shutil.copy2(p, backup) + + original = p.read_text(encoding='utf-8') + patched = _patch_core_version(original, data.get('core_version', '')) + + # Identify new ROM entries by comparing parsed data keys, not text search + existing_data = yaml.safe_load(original) or {} + existing_keys = { + (f['archive'], f['name']) + for f in existing_data.get('files', []) + if f.get('archive') + } + new_roms = [ + f for f in data.get('files', []) + if f.get('archive') and (f['archive'], f['name']) not in existing_keys + ] + + if new_roms: + lines = [] + for fe in new_roms: + lines.append(f' - name: "{fe["name"]}"') + lines.append(f' archive: {fe["archive"]}') + lines.append(f' required: {str(fe.get("required", True)).lower()}') + if fe.get('size'): + lines.append(f' size: {fe["size"]}') + if fe.get('crc32'): + lines.append(f' crc32: "{fe["crc32"]}"') + if fe.get('source_ref'): + lines.append(f' source_ref: "{fe["source_ref"]}"') + lines.append('') + patched = patched.rstrip('\n') + '\n\n' + '\n'.join(lines) + + p.write_text(patched, encoding='utf-8') diff --git a/scripts/scraper/fbneo_hash_scraper.py b/scripts/scraper/fbneo_hash_scraper.py index 2f1e95df..bd2dd859 100644 --- a/scripts/scraper/fbneo_hash_scraper.py +++ b/scripts/scraper/fbneo_hash_scraper.py @@ -269,8 +269,8 @@ def run( diff = compute_diff(str(path), str(CACHE_PATH), mode='fbneo') print(_format_diff(path.stem, diff, show_added=is_main)) - if not dry_run and (diff['added'] or diff['updated']): - is_main = path.name == 'fbneo.yml' + effective_added = diff['added'] if is_main else [] + if not dry_run and (effective_added or diff['updated']): merge_fbneo_profile(str(path), str(CACHE_PATH), write=True, add_new=is_main) log.info('merged changes into %s', path.name)