diff --git a/.github/join_profiles.py b/.github/join_profiles.py new file mode 100644 index 00000000..09e4c13e --- /dev/null +++ b/.github/join_profiles.py @@ -0,0 +1,466 @@ +#!/usr/bin/env python3 +"""Build root-level profile artifacts from the repo's split profile sources. + +Input layout: + /profile_sources/ + / + / + metadata.json + vendor.idx + vendor/ + printer_model/ + printer/ + print/ + filament/ + physical_printer/ + presets/ + obsolete_presets/ + +Output layout: + / + manifest.json + repos/ + / + vendor_indices.zip + / + .ini +""" + +from __future__ import annotations + +import argparse +import io +import json +import sys +import urllib.error +import urllib.request +import zipfile +from collections import defaultdict +from pathlib import Path +from typing import Iterable +from urllib.parse import quote, urljoin + + +REPO_URLS = ( + "https://preset-repo-api.prusa3d.com/v1/repos", + "https://raw.githubusercontent.com/Dark98/SliceBeam/refs/heads/master/.profiledumpsrepo/manifest.json", +) + +USER_AGENT = "SliceBeamProfileDump/1.0" +INVALID_FILE_CHARS = '<>:"/\\|?*' +ASSET_KEYS = {"thumbnail", "bed_model", "bed_texture"} +REPO_ROOT = Path(__file__).resolve().parent.parent +PROFILE_SOURCES_DIR = REPO_ROOT / "profile_sources" +BACKEND_STATIC_DIR = REPO_ROOT + + +def fetch_bytes(url: str) -> bytes: + request = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) + with urllib.request.urlopen(request) as response: + return response.read() + + +def fetch_json(url: str): + return json.loads(fetch_bytes(url).decode("utf-8")) + + +def parse_vendor_version(index_bytes: bytes) -> str: + for raw_line in index_bytes.decode("utf-8").splitlines(): + line = raw_line.strip() + if not line or " = " in line: + continue + space_index = line.find(" ") + if space_index == -1: + raise ValueError(f"Malformed vendor index line: {line!r}") + return line[:space_index] + raise ValueError("Could not determine vendor version from index file") + + +def sanitize_file_name(name: str) -> str: + sanitized = "".join("_" if ch in INVALID_FILE_CHARS else ch for ch in name).strip() + sanitized = sanitized.rstrip(". ") + return sanitized or "unnamed" + + +def safe_asset_relative_path(path_str: str) -> Path: + normalized = path_str.replace("\\", "/").strip().lstrip("/") + if not normalized: + raise ValueError("Empty asset path") + candidate = Path(*[part for part in normalized.split("/") if part not in ("", ".")]) + if any(part == ".." for part in candidate.parts): + raise ValueError(f"Unsafe asset path: {path_str}") + return candidate + + +def parse_sections(ini_text: str) -> dict[str, list[tuple[str, list[str]]]]: + grouped: dict[str, list[tuple[str, list[str]]]] = defaultdict(list) + for section_type, section_name, lines in parse_sections_ordered(ini_text): + grouped[section_type].append((section_name, lines)) + return dict(grouped) + + +def parse_sections_ordered(ini_text: str) -> list[tuple[str, str, list[str]]]: + sections: dict[str, list[tuple[str, list[str]]]] = defaultdict(list) + current_type: str | None = None + current_name: str | None = None + current_lines: list[str] = [] + ordered: list[tuple[str, str, list[str]]] = [] + + def flush() -> None: + nonlocal current_type, current_name, current_lines + if current_type is None: + return + name = current_name or current_type + sections[current_type].append((name, current_lines.copy())) + ordered.append((current_type, name, current_lines.copy())) + current_type = None + current_name = None + current_lines = [] + + for line in ini_text.splitlines(): + if line.startswith("[") and line.endswith("]"): + flush() + section = line[1:-1] + if ":" in section: + section_type, section_name = section.split(":", 1) + else: + section_type, section_name = section, section + current_type = section_type + current_name = section_name + current_lines = [line] + elif current_type is not None: + current_lines.append(line) + + flush() + return ordered + + +def iter_referenced_assets(ordered_sections: list[tuple[str, str, list[str]]]) -> list[str]: + referenced: list[str] = [] + seen: set[str] = set() + for section_type, _, lines in ordered_sections: + if section_type != "printer_model": + continue + for line in lines[1:]: + stripped = line.strip() + if not stripped or stripped.startswith(";") or stripped.startswith("#") or " = " not in stripped: + continue + key, value = stripped.split(" = ", 1) + if key not in ASSET_KEYS: + continue + value = value.strip() + if not value or value in seen: + continue + safe_asset_relative_path(value) + seen.add(value) + referenced.append(value) + return referenced + + +def fetch_optional_asset(base_url: str, asset_path: str) -> bytes | None: + safe_rel_path = safe_asset_relative_path(asset_path).as_posix() + encoded_path = "/".join(quote(part) for part in safe_rel_path.split("/")) + asset_url = urljoin(base_url.rstrip("/") + "/", encoded_path) + try: + return fetch_bytes(asset_url) + except urllib.error.HTTPError as exc: + if exc.code == 404: + print(f" [warn] missing asset {asset_path} at {asset_url}") + return None + raise + + +def write_assets(assets_root: Path, assets: dict[str, bytes]) -> list[str]: + written: list[str] = [] + for rel_path_str, data in sorted(assets.items()): + rel_path = safe_asset_relative_path(rel_path_str) + target = assets_root / rel_path + target.parent.mkdir(parents=True, exist_ok=True) + target.write_bytes(data) + written.append(rel_path.as_posix()) + return written + + +def collect_split_source_assets(vendor_dir: Path) -> dict[str, bytes]: + assets_dir = vendor_dir / "assets" + if not assets_dir.exists(): + return {} + + assets: dict[str, bytes] = {} + for path in sorted(p for p in assets_dir.rglob("*") if p.is_file()): + rel_path = path.relative_to(assets_dir).as_posix() + safe_asset_relative_path(rel_path) + assets[rel_path] = path.read_bytes() + return assets + + +def write_sections(split_dir: Path, ini_text: str) -> dict[str, int]: + counts: dict[str, int] = {} + for section_type, _, _ in parse_sections_ordered(ini_text): + counts[section_type] = counts.get(section_type, 0) + 1 + _write_split_sections(split_dir, parse_sections_ordered(ini_text)) + return counts + + +def _write_split_sections(split_dir: Path, ordered_sections: list[tuple[str, str, list[str]]]) -> list[dict[str, str]]: + file_usage: dict[tuple[str, str], int] = defaultdict(int) + section_order: list[dict[str, str]] = [] + for section_type, section_name, lines in ordered_sections: + target_dir = split_dir / sanitize_file_name(section_type) + target_dir.mkdir(parents=True, exist_ok=True) + + file_usage[(section_type, section_name)] += 1 + suffix = file_usage[(section_type, section_name)] + file_name = sanitize_file_name(section_name) + if not file_name.lower().endswith(".ini"): + file_name = f"{file_name}.ini" + if suffix > 1: + stem = Path(file_name).stem + file_name = f"{stem}__{suffix}{Path(file_name).suffix}" + + relative_path = Path(sanitize_file_name(section_type)) / file_name + (split_dir / relative_path).write_text("\n".join(lines) + "\n", encoding="utf-8") + section_order.append( + { + "type": section_type, + "name": section_name, + "path": relative_path.as_posix(), + } + ) + return section_order + + +def iter_filtered_repos(repo_filter: set[str] | None) -> list[dict]: + all_repos: list[dict] = [] + for manifest_url in REPO_URLS: + entries = fetch_json(manifest_url) + for entry in entries: + if not entry["id"].endswith("-fff"): + continue + if repo_filter and entry["id"] not in repo_filter: + continue + all_repos.append(entry) + return all_repos + + +def iter_vendor_entries(index_zip_bytes: bytes) -> Iterable[tuple[str, bytes]]: + with zipfile.ZipFile(io.BytesIO(index_zip_bytes)) as zf: + for name in sorted(zf.namelist()): + if name.endswith("/"): + continue + with zf.open(name) as fp: + yield name, fp.read() + + +def dump_vendor(repo: dict, vendor_index_name: str, vendor_index_bytes: bytes, output_root: Path) -> tuple[dict, dict[str, bytes]]: + vendor_name = Path(vendor_index_name).stem + version = parse_vendor_version(vendor_index_bytes) + ini_url = repo["url"].rstrip("/") + f"/{vendor_name}/{version}.ini" + ini_bytes = fetch_bytes(ini_url) + ini_text = ini_bytes.decode("utf-8") + base_url = repo["url"].rstrip("/") + f"/{vendor_name}" + ordered_sections = parse_sections_ordered(ini_text) + referenced_assets = iter_referenced_assets(ordered_sections) + assets: dict[str, bytes] = {} + for asset_path in referenced_assets: + asset_bytes = fetch_optional_asset(base_url, asset_path) + if asset_bytes is not None: + assets[asset_path] = asset_bytes + + vendor_dir = output_root / repo["id"] / sanitize_file_name(vendor_name) + raw_dir = vendor_dir / "raw" + split_dir = vendor_dir / "split" + assets_dir = vendor_dir / "assets" + raw_dir.mkdir(parents=True, exist_ok=True) + split_dir.mkdir(parents=True, exist_ok=True) + if assets: + assets_dir.mkdir(parents=True, exist_ok=True) + + raw_path = raw_dir / f"{sanitize_file_name(version)}.ini" + raw_path.write_bytes(ini_bytes) + section_counts = write_sections(split_dir, ini_text) + asset_paths = write_assets(assets_dir, assets) if assets else [] + + metadata = { + "repo_id": repo["id"], + "repo_name": repo.get("name"), + "vendor": vendor_name, + "version": version, + "ini_url": ini_url, + "index_url": repo["index_url"], + "section_counts": section_counts, + "asset_paths": asset_paths, + } + (vendor_dir / "metadata.json").write_text(json.dumps(metadata, indent=2), encoding="utf-8") + return metadata, assets + + +def write_split_source_vendor( + split_root: Path, + repo: dict, + vendor_index_name: str, + vendor_index_bytes: bytes, + version: str, + ini_text: str, + assets: dict[str, bytes] | None = None, +) -> None: + vendor_name = Path(vendor_index_name).stem + vendor_dir = split_root / sanitize_file_name(repo["id"]) / sanitize_file_name(vendor_name) + vendor_dir.mkdir(parents=True, exist_ok=True) + + ordered_sections = parse_sections_ordered(ini_text) + section_order = _write_split_sections(vendor_dir, ordered_sections) + asset_paths = write_assets(vendor_dir / "assets", assets or {}) if assets else [] + (vendor_dir / "vendor.idx").write_bytes(vendor_index_bytes) + metadata = { + "repo": { + "id": repo["id"], + "name": repo.get("name", repo["id"]), + "description": repo.get("description", ""), + "visibility": repo.get("visibility", ""), + }, + "vendor": vendor_name, + "version": version, + "index_name": vendor_index_name, + "section_order": section_order, + "asset_paths": asset_paths, + } + (vendor_dir / "metadata.json").write_text(json.dumps(metadata, indent=2), encoding="utf-8") + + +def write_backend_static_repo(static_root: Path, repos: list[dict], vendor_artifacts: dict[str, list[dict]]) -> None: + static_root.mkdir(parents=True, exist_ok=True) + repos_root = static_root / "repos" + repos_root.mkdir(parents=True, exist_ok=True) + + manifest_entries = [] + for repo in repos: + repo_id = repo["id"] + repo_root = repos_root / sanitize_file_name(repo_id) + repo_root.mkdir(parents=True, exist_ok=True) + + artifacts = vendor_artifacts.get(repo_id, []) + for artifact in artifacts: + vendor_root = repo_root / sanitize_file_name(artifact["vendor_name"]) + vendor_root.mkdir(parents=True, exist_ok=True) + (vendor_root / f"{sanitize_file_name(artifact['version'])}.ini").write_bytes(artifact["ini_bytes"]) + for asset_rel_path, asset_bytes in sorted(artifact.get("assets", {}).items()): + asset_target = vendor_root / safe_asset_relative_path(asset_rel_path) + asset_target.parent.mkdir(parents=True, exist_ok=True) + asset_target.write_bytes(asset_bytes) + + with zipfile.ZipFile(repo_root / "vendor_indices.zip", "w", compression=zipfile.ZIP_DEFLATED) as zf: + for artifact in artifacts: + zf.writestr(artifact["index_name"], artifact["index_bytes"]) + + manifest_entries.append( + { + "name": repo.get("name", repo_id), + "description": repo.get("description", ""), + "visibility": repo.get("visibility", ""), + "id": repo_id, + "url": f"./repos/{repo_id}", + "index_url": f"./repos/{repo_id}/vendor_indices.zip", + } + ) + + (static_root / "manifest.json").write_text(json.dumps(manifest_entries, indent=2), encoding="utf-8") + + +def build_backend_static_from_split_source(split_root: Path, static_root: Path) -> None: + repos_map: dict[str, dict] = {} + vendor_artifacts: dict[str, list[dict]] = defaultdict(list) + + for repo_dir in sorted(path for path in split_root.iterdir() if path.is_dir()): + for vendor_dir in sorted(path for path in repo_dir.iterdir() if path.is_dir()): + metadata_path = vendor_dir / "metadata.json" + if not metadata_path.exists(): + raise FileNotFoundError(f"Missing metadata.json in {vendor_dir}") + + metadata = json.loads(metadata_path.read_text(encoding="utf-8")) + repo = metadata["repo"] + repo_id = repo["id"] + repos_map[repo_id] = { + "id": repo_id, + "name": repo.get("name", repo_id), + "description": repo.get("description", ""), + "visibility": repo.get("visibility", ""), + } + + parts: list[str] = [] + for section in metadata["section_order"]: + section_path = vendor_dir / Path(section["path"]) + content = section_path.read_text(encoding="utf-8").rstrip() + parts.append(content) + ini_text = "\n\n".join(parts) + "\n" + + vendor_artifacts[repo_id].append( + { + "vendor_name": metadata["vendor"], + "index_name": metadata["index_name"], + "index_bytes": (vendor_dir / "vendor.idx").read_bytes(), + "version": metadata["version"], + "ini_bytes": ini_text.encode("utf-8"), + "assets": collect_split_source_assets(vendor_dir), + } + ) + + repos = sorted(repos_map.values(), key=lambda item: item["id"]) + write_backend_static_repo(static_root, repos, vendor_artifacts) + + +def build_backend_static_repo_in_repo() -> None: + build_backend_static_from_split_source(PROFILE_SOURCES_DIR, BACKEND_STATIC_DIR) + + +def split_backend_static_repo(static_root: Path, split_root: Path) -> None: + manifest_path = static_root / "manifest.json" + if not manifest_path.exists(): + raise FileNotFoundError(f"Missing manifest.json in {static_root}") + + repos = json.loads(manifest_path.read_text(encoding="utf-8")) + for repo in repos: + repo_id = repo["id"] + repo_dir = static_root / "repos" / sanitize_file_name(repo_id) + index_zip_path = repo_dir / "vendor_indices.zip" + if not index_zip_path.exists(): + raise FileNotFoundError(f"Missing vendor_indices.zip for repo {repo_id}") + + with zipfile.ZipFile(index_zip_path) as zf: + for index_name in sorted(zf.namelist()): + if index_name.endswith("/"): + continue + vendor_name = Path(index_name).stem + index_bytes = zf.read(index_name) + version = parse_vendor_version(index_bytes) + ini_path = repo_dir / sanitize_file_name(vendor_name) / f"{sanitize_file_name(version)}.ini" + if not ini_path.exists(): + raise FileNotFoundError(f"Missing INI for {repo_id}/{vendor_name}/{version}") + ini_text = ini_path.read_text(encoding="utf-8") + ordered_sections = parse_sections_ordered(ini_text) + assets: dict[str, bytes] = {} + for asset_rel_path in iter_referenced_assets(ordered_sections): + asset_path = repo_dir / sanitize_file_name(vendor_name) / safe_asset_relative_path(asset_rel_path) + if asset_path.exists(): + assets[asset_rel_path] = asset_path.read_bytes() + write_split_source_vendor(split_root, repo, index_name, index_bytes, version, ini_text, assets) + + +def main(argv: list[str]) -> int: + parser = argparse.ArgumentParser(description=__doc__) + args = parser.parse_args(argv) + + try: + build_backend_static_repo_in_repo() + print(f"Backend static repo written to {BACKEND_STATIC_DIR}") + return 0 + except urllib.error.URLError as exc: + print(f"Network error: {exc}", file=sys.stderr) + return 1 + except Exception as exc: # pragma: no cover + print(f"Failed: {exc}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:])) diff --git a/.github/workflows/join-profiles.yml b/.github/workflows/join-profiles.yml new file mode 100644 index 00000000..7b820e98 --- /dev/null +++ b/.github/workflows/join-profiles.yml @@ -0,0 +1,62 @@ +name: Join Profiles + +permissions: + contents: write + +on: + workflow_dispatch: + push: + paths: + - ".github/join_profiles.py" + - ".github/workflows/join-profiles.yml" + - "profile_sources/**" + pull_request: + paths: + - ".github/join_profiles.py" + - ".github/workflows/join-profiles.yml" + - "profile_sources/**" + +jobs: + join-profiles: + runs-on: ubuntu-latest + env: + PYTHON_VERSION: "3.12" + steps: + - name: Check out repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: Join Profiles + run: | + echo "Joining Profiles" + echo " Python: $(python --version)" + echo " Script: $GITHUB_WORKSPACE/.github/join_profiles.py" + echo " Output: $GITHUB_WORKSPACE" + + rm -rf repos manifest.json + + python "$GITHUB_WORKSPACE/.github/join_profiles.py" + + echo "Profile joining finished" + + - name: Commit generated profiles + if: github.event_name != 'pull_request' + run: | + git config user.name "github-actions[bot]" + git config user.email "41898282+github-actions[bot]@users.noreply.github.com" + + git add manifest.json repos + + if git diff --cached --quiet; then + echo "No generated profile changes to commit" + exit 0 + fi + + git commit -m "Update Profiles" + git push