#!/usr/bin/env python3 """ Direct HuggingFace dataset downloader that avoids full repo enumeration. Uses the HF tree API per-directory, then downloads files via HTTP. Usage: python scripts/download_assets_direct.py [OPTIONS] Options: --min Download only required scene assets --full Download all scene assets (default) --with-curobo Also download CuRobo package --with-drake Also download panda_drake package --local-dir DIR Where to save (default: current directory) --mirror URL HF mirror base URL (default: https://hf-mirror.com) """ import os import sys import argparse import time from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed import requests REPO_ID = "InternRobotics/InternData-A1" ASSET_PREFIX = "InternDataAssets" def get_token(): token = os.environ.get("HF_TOKEN", "") if not token: token_file = Path.home() / ".cache/huggingface/token" if token_file.exists(): token = token_file.read_text().strip() return token def list_dir(api_base, repo_id, path, token, session): """List files in a specific directory via HF tree API (non-recursive for speed).""" url = f"{api_base}/api/datasets/{repo_id}/tree/main/{path}" headers = {"Authorization": f"Bearer {token}"} if token else {} files = [] cursor = None while True: params = {"expand": "false", "recursive": "true", "limit": "1000"} if cursor: params["cursor"] = cursor r = session.get(url, headers=headers, params=params, timeout=30) r.raise_for_status() data = r.json() if isinstance(data, list): items = data cursor = None else: items = data.get("items", []) cursor = data.get("nextCursor") for item in items: if item.get("type") == "file": files.append(item["path"]) if not cursor or not items: break return files def download_file(mirror_base, repo_id, file_path, local_dir, token, session): """Download a single file.""" url = f"{mirror_base}/datasets/{repo_id}/resolve/main/{file_path}" headers = {"Authorization": f"Bearer {token}"} if token else {} local_path = Path(local_dir) / file_path local_path.parent.mkdir(parents=True, exist_ok=True) if local_path.exists(): return file_path, "skipped" for attempt in range(3): try: with session.get(url, headers=headers, stream=True, timeout=60) as r: r.raise_for_status() with open(local_path, "wb") as f: for chunk in r.iter_content(chunk_size=1024 * 1024): f.write(chunk) return file_path, "ok" except Exception as e: if attempt == 2: return file_path, f"error: {e}" time.sleep(2 ** attempt) def download_dir(api_base, mirror_base, repo_id, path, local_dir, token, label, workers=8): print(f"[INFO] Listing {label} ...") session = requests.Session() try: files = list_dir(api_base, repo_id, path, token, session) except Exception as e: print(f"[ERROR] Failed to list {path}: {e}") return print(f"[INFO] Downloading {label} ({len(files)} files) ...") errors = [] done = 0 with ThreadPoolExecutor(max_workers=workers) as pool: futures = {pool.submit(download_file, mirror_base, repo_id, f, local_dir, token, session): f for f in files} for fut in as_completed(futures): fp, status = fut.result() done += 1 if "error" in status: errors.append((fp, status)) print(f" [{done}/{len(files)}] FAIL {fp}: {status}") else: print(f" [{done}/{len(files)}] {status.upper()} {fp}", end="\r") print() if errors: print(f"[WARN] {len(errors)} files failed in {label}") def download_file_single(api_base, mirror_base, repo_id, file_path, local_dir, token, label): print(f"[INFO] Downloading {label} ...") session = requests.Session() fp, status = download_file(mirror_base, repo_id, file_path, local_dir, token, session) if "error" in status: print(f"[ERROR] {fp}: {status}") else: print(f"[INFO] {label} done.") def main(): parser = argparse.ArgumentParser() parser.add_argument("--min", action="store_true") parser.add_argument("--full", action="store_true") parser.add_argument("--with-curobo", action="store_true") parser.add_argument("--with-drake", action="store_true") parser.add_argument("--local-dir", default=".") parser.add_argument("--mirror", default="https://hf-mirror.com") parser.add_argument("--workers", type=int, default=8) args = parser.parse_args() mode = "min" if args.min else "full" mirror = os.environ.get("HF_ENDPOINT", args.mirror).rstrip("/") api_base = mirror token = get_token() local_dir = args.local_dir if not token: print("[WARN] No HF token found. Gated datasets will fail.") print(f"[INFO] Mirror: {mirror}, mode={mode}, curobo={args.with_curobo}, drake={args.with_drake}") base = f"{ASSET_PREFIX}/assets" # Required scene assets print("[INFO] ========== Downloading required scene assets ==========") for d in ["background_textures", "envmap_lib", "floor_textures", "table_textures", "table0"]: download_dir(api_base, mirror, REPO_ID, f"{base}/{d}", local_dir, token, d, args.workers) download_file_single(api_base, mirror, REPO_ID, f"{base}/table_info.json", local_dir, token, "table_info.json") # Full mode extras if mode == "full": print("[INFO] ========== Downloading all robots and tasks ==========") for robot in ["lift2", "franka", "frankarobotiq", "split_aloha_mid_360", "G1_120s"]: download_dir(api_base, mirror, REPO_ID, f"{base}/{robot}", local_dir, token, f"robot:{robot}", args.workers) for task in ["basic", "art", "long_horizon", "pick_and_place"]: download_dir(api_base, mirror, REPO_ID, f"{base}/{task}", local_dir, token, f"task:{task}", args.workers) if args.with_curobo: print("[INFO] ========== Downloading CuRobo ==========") download_dir(api_base, mirror, REPO_ID, f"{ASSET_PREFIX}/curobo", local_dir, token, "curobo", args.workers) if args.with_drake: print("[INFO] ========== Downloading panda_drake ==========") download_dir(api_base, mirror, REPO_ID, f"{ASSET_PREFIX}/panda_drake", local_dir, token, "panda_drake", args.workers) print(f"[INFO] Done!") if __name__ == "__main__": main()