程序化图像生成:如何构建可扩展的批量管道

构建生产就绪的程序化图像生成管道。涵盖大规模提示词工程、批量处理、重试逻辑、异步生成、S3/CMS 集成及成本分析。包含完整 Python 示例。

by AnyCap

程序化图像生成管道,Generation、Orchestration、Integration 三层由发光数据流线连接

大多数图像生成教程只生成一张图片就结束了。它们展示一个 curl 命令、一张漂亮的输出,然后就说完成了。对于"生成一张猫的图片"来说这没问题。但当你需要为一个实际项目生成 500 张图片时,这毫无用处。

程序化图像生成——通过代码在无需人工干预的情况下大规模生成图像——是一项完全不同的技能。本指南涵盖了完整的管道:大规模提示词工程、批量处理、错误处理、异步处理、输出管理以及集成到生产系统。


生产级图像管道的三个层次

每个生产级图像管道都有三个层次:

层次 功能 工具
Generation(生成) 将提示词转换为图像 AnyCap CLI、REST API
Orchestration(编排) 管理批次、重试、并发 Python 脚本、队列系统
Integration(集成) 连接到你的应用、CMS、存储 Webhook、S3、CMS API

大多数开发者只考虑第一层。但第二层和第三层才是管道成败的关键。


第一层:大规模提示词工程

当你只生成一张图片时,你可以精心打磨完美的提示词。当你要生成 500 张时,你需要一个提示词系统。

模板化方案

# prompts.py — 集中的提示词模板
from dataclasses import dataclass
from typing import Optional

@dataclass
class ImageJob:
    template: str
    params: dict
    output_path: str
    model: str = "nano-banana-2"
    
PROMPT_TEMPLATES = {
    "product_hero": "电商产品图:{product_name},{color},影棚灯光,白色背景,1024x1024,商业摄影",
    "blog_hero": "博客头图:{topic},{style}风格,{mood}氛围,1200x630,编辑风格",
    "social_post": "社交媒体视觉:{subject},{platform}格式,{vibe}美学,{dimensions}",
}

def build_prompt(template_key: str, **params) -> str:
    return PROMPT_TEMPLATES[template_key].format(**params)

规模化模式

# 从 CSV 生成 100 张产品图
import csv, subprocess, json
from concurrent.futures import ThreadPoolExecutor, as_completed

def generate_single(job: ImageJob) -> dict:
    prompt = build_prompt(job.template, **job.params)
    
    result = subprocess.run([
        "anycap", "image", "generate",
        "--prompt", prompt,
        "--model", job.model,
        "--output-format", "json",
        "-o", job.output_path
    ], capture_output=True, text=True)
    
    return {
        "output_path": job.output_path,
        "success": result.returncode == 0,
        "data": json.loads(result.stdout) if result.returncode == 0 else None,
        "error": result.stderr if result.returncode != 0 else None
    }

# 从数据构建任务列表
jobs = []
with open("products.csv") as f:
    for row in csv.DictReader(f):
        jobs.append(ImageJob(
            template="product_hero",
            params={"product_name": row["name"], "color": row["color"]},
            output_path=f"output/{row['sku']}.png"
        ))

# 使用并发控制执行
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(generate_single, job): job for job in jobs}
    for future in as_completed(futures):
        result = future.result()
        status = "✅" if result["success"] else "❌"
        print(f"{status} {result['output_path']}")

第二层:编排——所有人都忘记的部分

生成很容易。让它在规模下保持可靠才是真正的工程。

模式 1:异步批量处理

对于大批量(100+ 张图片),使用异步模式避免阻塞:

# 提交批量任务
anycap image generate \
  --prompt "$(python build-prompts.py --csv products.csv)" \
  --model nano-banana-2 \
  --async \
  --batch-size 20 \
  --webhook "https://your-app.com/webhooks/images" \
  -o output/products/

你的 Webhook 会在完成时接收结果。无需轮询。没有超时问题。

模式 2:指数退避重试

import time, random

def generate_with_retry(job: ImageJob, max_retries: int = 3) -> dict:
    for attempt in range(max_retries):
        result = generate_single(job)
        if result["success"]:
            return result
        
        if attempt < max_retries - 1:
            wait = (2 ** attempt) + random.uniform(0, 1)
            print(f"重试 {attempt + 1}/{max_retries}:{job.output_path},等待 {wait:.1f}秒")
            time.sleep(wait)
    
    return result  # 返回最后一次失败

模式 3:基于队列的架构

对于生产系统,使用适当的队列:

# 基于 Redis 的简单任务队列
import redis, json

r = redis.Redis()

def enqueue_job(job: ImageJob):
    r.lpush("image_jobs", json.dumps({
        "template": job.template,
        "params": job.params,
        "output_path": job.output_path,
        "model": job.model,
    }))

def worker_loop():
    while True:
        _, job_data = r.brpop("image_jobs")
        job = json.loads(job_data)
        result = generate_single(ImageJob(**job))
        
        if result["success"]:
            r.lpush("image_results", json.dumps(result))
        else:
            r.lpush("image_failures", json.dumps(result))

第三层:集成——将图像送到该去的地方

上传到 S3

import boto3

s3 = boto3.client("s3")

def upload_to_s3(local_path: str, bucket: str, key: str) -> str:
    s3.upload_file(local_path, bucket, key, ExtraArgs={
        "ContentType": "image/png",
        "CacheControl": "public, max-age=31536000",
    })
    return f"https://{bucket}.s3.amazonaws.com/{key}"

发布到 CMS

import requests

def update_cms_product_image(sku: str, image_url: str):
    requests.patch(
        f"https://cms.example.com/api/products/{sku}",
        headers={"Authorization": "Bearer $CMS_TOKEN"},
        json={"image_url": image_url}
    )

通知你的团队

def notify_slack(message: str):
    requests.post(
        "https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
        json={"text": message}
    )

完整的管道脚本

#!/usr/bin/env python3
"""production-pipeline.py — 完整的图像生成管道"""

import csv, subprocess, json, time, random, sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import boto3, requests

# --- 配置 ---
S3_BUCKET = "my-assets"
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK"
MAX_WORKERS = 4
MAX_RETRIES = 3

PROMPTS = {
    "product": "电商照片:{name},{color},影棚,白色背景,1024x1024",
    "lifestyle": "生活方式照片:{name},{color},{scene},自然光,1024x1024",
}

@dataclass
class Job:
    template: str
    params: dict
    output: str
    model: str = "nano-banana-2"

def generate(job: Job) -> dict:
    prompt = PROMPTS[job.template].format(**job.params)
    for attempt in range(MAX_RETRIES):
        result = subprocess.run([
            "anycap", "image", "generate",
            "--prompt", prompt, "--model", job.model,
            "--output-format", "json", "-o", job.output
        ], capture_output=True, text=True)
        if result.returncode == 0:
            data = json.loads(result.stdout)
            return {"path": job.output, "url": data.get("image_url"), "success": True}
        if attempt < MAX_RETRIES - 1:
            time.sleep((2 ** attempt) + random.uniform(0, 1))
    return {"path": job.output, "success": False, "error": result.stderr}

def upload(path: str) -> str:
    key = path.replace("output/", "")
    s3 = boto3.client("s3")
    s3.upload_file(path, S3_BUCKET, key, ExtraArgs={"ContentType": "image/png"})
    return f"https://{S3_BUCKET}.s3.amazonaws.com/{key}"

def notify(text: str):
    requests.post(SLACK_WEBHOOK, json={"text": text})

def run_pipeline(csv_path: str):
    jobs = []
    with open(csv_path) as f:
        for row in csv.DictReader(f):
            jobs.append(Job("product", {"name": row["name"], "color": row["color"]}, f"output/{row['sku']}.png"))
    
    notify(f"🚀 管道启动:{len(jobs)} 张图片")
    
    results = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(generate, job): job for job in jobs}
        for future in as_completed(futures):
            result = future.result()
            if result["success"]:
                result["s3_url"] = upload(result["path"])
                results.append(result)
    
    success = len(results)
    failed = len(jobs) - success
    notify(f"{'✅' if failed == 0 else '⚠️'} 管道完成:{success}/{len(jobs)} 张成功。{failed} 张失败。")
    return results

if __name__ == "__main__":
    run_pipeline(sys.argv[1])

为你的管道选择合适的模型

管道类型 模型 原因
主图、最终输出 Seedream 5 最佳首轮生成质量
批量生成、变体 Nano Banana 2 最快、最便宜
修改、优化 Nano Banana Pro 最佳的图生图编辑
原型设计、迭代 Nano Banana 2 早期阶段速度 > 完美度

规模化成本

数量 Nano Banana 2 Seedream 5 人工设计
100 张 ~$0.50 ~$1.50 $500-1,000
1,000 张 ~$5 ~$15 $5,000-10,000
10,000 张 ~$50 ~$150 $50,000+
100,000 张 ~$500 ~$1,500 不切实际

最后更新:2026年5月