Claude Code × Batch APIで100件のタスクを並列処理する——コスト50%削減の実装パターン

Claude Code × Batch APIで100件のタスクを並列処理する——コスト50%削減の実装パターン

Anthropic Batch APIをClaude Codeから活用する実践ガイド。単一リクエストと比べてコスト50%削減・処理時間1/10になるケースを実装コード付きで解説。エラーハンドリング・進捗監視・結果集約まで含めた本番向けパターン集。

エンジニアのゆとです。

「100件の顧客データを一件ずつAIに処理させたら終わるまでに3時間かかった」——こういう経験をしたことがある人は多いと思う。

Anthropic Batch APIを使うと、この種の大量処理が劇的に変わる。コストが最大50%削減され、非同期で処理できるので待ち時間もなくなる。

自分がClaude Codeのスクリプトで実際に使っているパターンを整理した。


Batch APIとは何か

Anthropic Batch APIは、複数のリクエストをまとめて送信する非同期処理API。

通常のMessages APIとの主な違い:

Messages APIBatch API
処理方式リアルタイム(同期)非同期(後から取得)
料金通常料金50%割引
上限レートリミットに依存1バッチ最大10,000件
用途インタラクティブ処理大量一括処理

キャッチは「24時間以内に処理が完了する」という保証があること。リアルタイム性が不要で大量処理するケースでは、Batch APIが圧倒的に有利だ。


使えるケース・使えないケース

向いているケース

  • コンテンツの一括分類・タグ付け
  • 大量ドキュメントの要約生成
  • SEOキーワードの一括分析
  • テストデータの生成
  • ログの一括解析

向いていないケース

  • ユーザーがリアルタイムで結果を待つ処理
  • 前の結果に依存する連鎖処理
  • 24時間以内に確実に終わる保証が必要なクリティカルパス

基本実装パターン

Claude CodeでBatch APIを使う際の最小構成。

import anthropic
import json
from pathlib import Path

client = anthropic.Anthropic()

def create_batch(items: list[dict]) -> str:
    """バッチリクエストを作成してbatch_idを返す"""
    requests = []
    for item in items:
        requests.append(
            anthropic.types.message_create_params.Request(
                custom_id=item["id"],
                params=anthropic.types.MessageCreateParamsNonStreaming(
                    model="claude-opus-4-5",
                    max_tokens=1024,
                    messages=[
                        {
                            "role": "user",
                            "content": item["prompt"]
                        }
                    ]
                )
            )
        )
    
    batch = client.messages.batches.create(requests=requests)
    print(f"[batch] created: {batch.id}, count: {len(requests)}")
    return batch.id


def wait_for_batch(batch_id: str, poll_interval: int = 60) -> None:
    """バッチ完了まで定期ポーリング"""
    import time
    while True:
        batch = client.messages.batches.retrieve(batch_id)
        print(f"[batch] status: {batch.processing_status}, "
              f"succeeded: {batch.request_counts.succeeded}, "
              f"errored: {batch.request_counts.errored}")
        
        if batch.processing_status == "ended":
            break
        time.sleep(poll_interval)


def collect_results(batch_id: str) -> dict[str, str]:
    """バッチ結果を収集してcustom_id -> テキストのdictを返す"""
    results = {}
    for result in client.messages.batches.results(batch_id):
        if result.result.type == "succeeded":
            results[result.custom_id] = result.result.message.content[0].text
        else:
            results[result.custom_id] = f"[ERROR] {result.result.error}"
    return results

実践例1: ブログ記事のSEOメタ一括生成

100記事分のメタディスクリプションを一括生成した実例。

import json
from pathlib import Path

# 記事タイトルと本文抜粋を読み込む
articles = []
for i, mdx in enumerate(Path("src/content/blog").glob("*.mdx")):
    content = mdx.read_text()[:2000]  # 先頭2000文字
    articles.append({
        "id": mdx.stem,
        "prompt": f"""以下の記事の先頭部分を読んでSEOメタディスクリプションを生成してください。

## 記事タイトル
{mdx.stem.replace("-", " ").replace("2026", "")}

## 記事冒頭
{content}

## 要件
- 120〜160文字で書く
- 検索クエリを自然に含める
- 読者がクリックしたくなる具体的なベネフィットを書く
- 「徹底解説」「完全ガイド」は使わない

メタディスクリプションのみ出力(説明文不要):"""
    })

# バッチ送信
batch_id = create_batch(articles[:100])  # 最大100件
print(f"batch_id: {batch_id}")

# batch_idをファイルに保存しておく(後で回収用)
Path("batch_ids.json").write_text(json.dumps({"meta_batch": batch_id}))

このスクリプトを流して待つだけ。100件なら通常1〜4時間で完了する。


実践例2: LaunchAgentで夜間バッチ処理

起動と回収を分離してLaunchAgentで夜間に自動実行する設計。

# submit_batch.sh — 毎晩0:00に実行
#!/bin/bash
VENV="/Users/moru.handa/Project/venv/bin/python3"
SCRIPT="/Users/moru.handa/Project/scripts/submit_nightly_batch.py"

$VENV $SCRIPT 2>&1 >> /tmp/nightly_batch.log
# submit_nightly_batch.py
import json
from pathlib import Path
from datetime import date

def get_todays_tasks() -> list[dict]:
    """その日処理すべきタスクを生成"""
    tasks = []
    
    # 例: 当日公開予定記事のソーシャル投稿案を生成
    schedule = json.loads(Path("RELEASE_SCHEDULE.json").read_text())
    today_str = str(date.today())
    
    for item in schedule:
        if item.get("scheduled_at", "").startswith(today_str):
            tasks.append({
                "id": f"social_{item['id']}",
                "prompt": f"次の記事タイトルでXポスト案を3パターン作成せよ: {item['title']}"
            })
    
    return tasks

tasks = get_todays_tasks()
if tasks:
    batch_id = create_batch(tasks)
    # batch_idをjsonに保存
    state = {"pending_batches": [{"id": batch_id, "created": str(date.today()), "type": "social_posts"}]}
    Path("/tmp/batch_state.json").write_text(json.dumps(state, indent=2))
# collect_batch.py — 毎朝6:00に実行
import json
from pathlib import Path

state_file = Path("/tmp/batch_state.json")
if not state_file.exists():
    exit(0)

state = json.loads(state_file.read_text())
for batch_info in state.get("pending_batches", []):
    results = collect_results(batch_info["id"])
    # 結果を用途に合わせて処理
    output_path = Path(f"/tmp/batch_results_{batch_info['id'][:8]}.json")
    output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2))
    print(f"[collect] saved {len(results)} results to {output_path}")

エラーハンドリングとリトライ

Batch APIはリクエスト単位でエラーが起きるため、バッチ全体が失敗することは少ない。ただし一部のリクエストがエラーになる場合はある。

def process_results_with_retry(batch_id: str, original_items: list[dict]) -> dict:
    results = collect_results(batch_id)
    
    # エラーになったアイテムを抽出
    failed_ids = [k for k, v in results.items() if v.startswith("[ERROR]")]
    
    if failed_ids:
        print(f"[retry] {len(failed_ids)} items failed, retrying...")
        failed_items = [item for item in original_items if item["id"] in failed_ids]
        
        # 失敗分だけ再バッチ
        retry_batch_id = create_batch(failed_items)
        wait_for_batch(retry_batch_id)
        retry_results = collect_results(retry_batch_id)
        
        # マージ
        results.update(retry_results)
    
    return results

コスト計算の実例

自分が実際にやっているコンテンツ処理タスクでの比較。

タスク: 週50記事の要約生成(平均入力2,000トークン / 出力300トークン)

方式入力コスト出力コスト合計(週)
Messages API(claude-opus-4-5)$15.00$7.50$22.50
Batch API(claude-opus-4-5)$7.50$3.75$11.25

週$11.25の節約。月換算で約$45。MaxプランのAPIコストとは別枠なので、従量課金を使う場合はこの差が効いてくる。


Claude Code スクリプトとの統合

Claude Codeのtask実行から直接Batch APIを呼ぶケース。

# CLAUDE.mdのスキルから呼ぶ想定のスクリプト
def run_batch_analysis(keyword_list: list[str]) -> list[dict]:
    """キーワードリストのSEO難易度を一括分析"""
    items = [
        {
            "id": f"kw_{i:04d}",
            "prompt": f"""キーワード「{kw}」について以下を分析してください:
1. 検索意図(情報収集/比較検討/購買)
2. 競合の強さ(強/中/弱)の推定理由
3. このキーワードで記事を書くなら何を差別化軸にすべきか

JSON形式で回答:
{{"intent": "...", "competition": "...", "differentiation": "..."}}"""
        }
        for i, kw in enumerate(keyword_list)
    ]
    
    batch_id = create_batch(items)
    wait_for_batch(batch_id, poll_interval=30)
    raw_results = collect_results(batch_id)
    
    import json
    parsed = []
    for custom_id, text in raw_results.items():
        try:
            parsed.append({"keyword": keyword_list[int(custom_id.split("_")[1])], **json.loads(text)})
        except Exception:
            pass
    
    return parsed

まとめ

Batch APIが有効なのは「大量・非同期・コスト最小化」の3条件が揃うケース。

  • リアルタイム性が不要な処理に積極的に使う
  • 50%割引は無視できないコスト差
  • 送信と回収を分離してLaunchAgentで夜間自動実行する設計が相性いい
  • エラーは件単位で処理してリトライを組み込む

「AIを使うとコストが高い」という話をよく聞くが、処理の種類によってはBatch APIで半額になる。大量処理の設計段階でBatch APIが選択肢に入っているかどうかで、月の運用コストが変わる。


記事が見つかりません:

記事が見つかりません:

記事が見つかりません:

← 記事一覧に戻る