Files
ipfs-lab/ipfs_cli.py
T

93 lines
2.5 KiB
Python

import argparse
import os
import sys
from pathlib import Path
import requests
API_URL = os.getenv("IPFS_API_URL", "http://127.0.0.1:5001").rstrip("/")
TOKEN = os.getenv("IPFS_API_TOKEN")
def _auth_headers() -> dict[str, str]:
return {"Authorization": f"Bearer {TOKEN}"} if TOKEN else {}
def upload_file(path: Path) -> str:
if not path.is_file():
raise FileNotFoundError(f"Файл не найден: {path}")
url = f"{API_URL}/api/v0/add"
with path.open("rb") as f:
files = {"file": (path.name, f)}
r = requests.post(url, headers=_auth_headers(), files=files, timeout=120)
if r.status_code != 200:
raise RuntimeError(f"Ошибка загрузки (status={r.status_code}): {r.text}")
data = r.json()
cid = data.get("Hash")
if not cid:
raise RuntimeError(f"Не удалось получить CID из ответа: {data}")
return cid
def download_file(cid: str) -> Path:
if not cid:
raise ValueError("CID пустой")
download_dir = Path("download")
download_dir.mkdir(parents=True, exist_ok=True)
out_path = download_dir / f"{cid}.bin"
url = f"{API_URL}/api/v0/cat"
params = {"arg": cid}
with requests.post(
url,
headers=_auth_headers(),
params=params,
stream=True,
timeout=120,
) as r:
if r.status_code != 200:
raise RuntimeError(
f"Ошибка скачивания (status={r.status_code}): {r.text}"
)
with out_path.open("wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return out_path
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(add_help=False)
sub = parser.add_subparsers(dest="command", required=True)
up = sub.add_parser("upload", add_help=False)
up.add_argument("file")
dl = sub.add_parser("download", add_help=False)
dl.add_argument("cid")
args = parser.parse_args(argv)
try:
if args.command == "upload":
cid = upload_file(Path(args.file))
print(cid)
elif args.command == "download":
out = download_file(args.cid)
print(out)
else:
raise ValueError("Неизвестная команда")
return 0
except Exception as exc:
print(f"Ошибка: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())