Programmatic Image Generation: How to Build a Batch Pipeline That Scales

Build a production-ready programmatic image generation pipeline. Covers prompt engineering at scale, batch processing, retry logic, async generation, S3/CMS integration, and cost analysis. Full Python example included.

by AnyCap

Programmatic image generation pipeline with Generation, Orchestration, and Integration layers connected by glowing data flow lines

Most image generation tutorials stop at one image. They show you a curl command, a pretty output, and call it done. That's fine for "generate a cat picture." It's useless when you need 500 images for a real project.

Programmatic image generation — generating images at scale, from code, without human intervention — is a different skill. This guide covers the full pipeline: prompt engineering at scale, batch generation, error handling, async processing, output management, and integration into production systems.


The Three Layers of a Production Image Pipeline

Every production image pipeline has three layers:

Layer What it does Tools
Generation Turns prompts into images AnyCap CLI, REST APIs
Orchestration Manages batches, retries, concurrency Python scripts, queue systems
Integration Connects to your app, CMS, storage Webhooks, S3, CMS APIs

Most developers only think about Layer 1. But Layers 2 and 3 are where pipelines live or die.


Layer 1: Prompt Engineering at Scale

When you generate one image, you can lovingly craft the perfect prompt. When you generate 500, you need a prompt system.

The Template Approach

# prompts.py — Centralized prompt templates
from dataclasses import dataclass
from typing import Optional

@dataclass
class ImageJob:
    template: str
    params: dict
    output_path: str
    model: str = "nano-banana-2"
    
PROMPT_TEMPLATES = {
    "product_hero": "E-commerce product photo: {product_name}, {color}, studio lighting, white background, 1024x1024, commercial photography",
    "blog_hero": "Blog header illustration: {topic}, {style} style, {mood} mood, 1200x630, editorial",
    "social_post": "Social media visual: {subject}, {platform} format, {vibe} aesthetic, {dimensions}",
}

def build_prompt(template_key: str, **params) -> str:
    return PROMPT_TEMPLATES[template_key].format(**params)

The Scale-Up Pattern

# Generate 100 product photos from a CSV
import csv, subprocess, json
from concurrent.futures import ThreadPoolExecutor, as_completed

def generate_single(job: ImageJob) -> dict:
    prompt = build_prompt(job.template, **job.params)
    
    result = subprocess.run([
        "anycap", "image", "generate",
        "--prompt", prompt,
        "--model", job.model,
        "--output-format", "json",
        "-o", job.output_path
    ], capture_output=True, text=True)
    
    return {
        "output_path": job.output_path,
        "success": result.returncode == 0,
        "data": json.loads(result.stdout) if result.returncode == 0 else None,
        "error": result.stderr if result.returncode != 0 else None
    }

# Build job list from data
jobs = []
with open("products.csv") as f:
    for row in csv.DictReader(f):
        jobs.append(ImageJob(
            template="product_hero",
            params={"product_name": row["name"], "color": row["color"]},
            output_path=f"output/{row['sku']}.png"
        ))

# Execute with concurrency control
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(generate_single, job): job for job in jobs}
    for future in as_completed(futures):
        result = future.result()
        status = "✅" if result["success"] else "❌"
        print(f"{status} {result['output_path']}")

Layer 2: Orchestration — The Part Everyone Forgets

Generation is easy. Making it reliable at scale is the real engineering.

Pattern 1: Async Batch Processing

For large batches (100+ images), use async mode to avoid blocking:

# Submit batch job
anycap image generate \
  --prompt "$(python build-prompts.py --csv products.csv)" \
  --model nano-banana-2 \
  --async \
  --batch-size 20 \
  --webhook "https://your-app.com/webhooks/images" \
  -o output/products/

Your webhook receives results as they complete. No polling. No timeout issues.

Pattern 2: Retry with Exponential Backoff

import time, random

def generate_with_retry(job: ImageJob, max_retries: int = 3) -> dict:
    for attempt in range(max_retries):
        result = generate_single(job)
        if result["success"]:
            return result
        
        if attempt < max_retries - 1:
            wait = (2 ** attempt) + random.uniform(0, 1)
            print(f"Retry {attempt + 1}/{max_retries} for {job.output_path} in {wait:.1f}s")
            time.sleep(wait)
    
    return result  # Return the last failure

Pattern 3: Queue-Based Architecture

For production systems, use a proper queue:

# Simple Redis-based job queue
import redis, json

r = redis.Redis()

def enqueue_job(job: ImageJob):
    r.lpush("image_jobs", json.dumps({
        "template": job.template,
        "params": job.params,
        "output_path": job.output_path,
        "model": job.model,
    }))

def worker_loop():
    while True:
        _, job_data = r.brpop("image_jobs")
        job = json.loads(job_data)
        result = generate_single(ImageJob(**job))
        
        if result["success"]:
            r.lpush("image_results", json.dumps(result))
        else:
            r.lpush("image_failures", json.dumps(result))

Layer 3: Integration — Getting Images Where They Need to Go

Upload to S3

import boto3

s3 = boto3.client("s3")

def upload_to_s3(local_path: str, bucket: str, key: str) -> str:
    s3.upload_file(local_path, bucket, key, ExtraArgs={
        "ContentType": "image/png",
        "CacheControl": "public, max-age=31536000",
    })
    return f"https://{bucket}.s3.amazonaws.com/{key}"

Post to CMS

import requests

def update_cms_product_image(sku: str, image_url: str):
    requests.patch(
        f"https://cms.example.com/api/products/{sku}",
        headers={"Authorization": "Bearer $CMS_TOKEN"},
        json={"image_url": image_url}
    )

Notify Your Team

def notify_slack(message: str):
    requests.post(
        "https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
        json={"text": message}
    )

The Complete Pipeline Script

#!/usr/bin/env python3
"""production-pipeline.py — Full image generation pipeline"""

import csv, subprocess, json, time, random, sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import boto3, requests

# --- Configuration ---
S3_BUCKET = "my-assets"
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK"
MAX_WORKERS = 4
MAX_RETRIES = 3

PROMPTS = {
    "product": "E-commerce photo: {name}, {color}, studio, white bg, 1024x1024",
    "lifestyle": "Lifestyle photo: {name}, {color}, {scene}, natural light, 1024x1024",
}

@dataclass
class Job:
    template: str
    params: dict
    output: str
    model: str = "nano-banana-2"

def generate(job: Job) -> dict:
    prompt = PROMPTS[job.template].format(**job.params)
    for attempt in range(MAX_RETRIES):
        result = subprocess.run([
            "anycap", "image", "generate",
            "--prompt", prompt, "--model", job.model,
            "--output-format", "json", "-o", job.output
        ], capture_output=True, text=True)
        if result.returncode == 0:
            data = json.loads(result.stdout)
            return {"path": job.output, "url": data.get("image_url"), "success": True}
        if attempt < MAX_RETRIES - 1:
            time.sleep((2 ** attempt) + random.uniform(0, 1))
    return {"path": job.output, "success": False, "error": result.stderr}

def upload(path: str) -> str:
    key = path.replace("output/", "")
    s3 = boto3.client("s3")
    s3.upload_file(path, S3_BUCKET, key, ExtraArgs={"ContentType": "image/png"})
    return f"https://{S3_BUCKET}.s3.amazonaws.com/{key}"

def notify(text: str):
    requests.post(SLACK_WEBHOOK, json={"text": text})

def run_pipeline(csv_path: str):
    jobs = []
    with open(csv_path) as f:
        for row in csv.DictReader(f):
            jobs.append(Job("product", {"name": row["name"], "color": row["color"]}, f"output/{row['sku']}.png"))
    
    notify(f"🚀 Pipeline started: {len(jobs)} images")
    
    results = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(generate, job): job for job in jobs}
        for future in as_completed(futures):
            result = future.result()
            if result["success"]:
                result["s3_url"] = upload(result["path"])
                results.append(result)
    
    success = len(results)
    failed = len(jobs) - success
    notify(f"{'✅' if failed == 0 else '⚠️'} Pipeline complete: {success}/{len(jobs)} images. {failed} failed.")
    return results

if __name__ == "__main__":
    run_pipeline(sys.argv[1])

Choosing the Right Model for Your Pipeline

Pipeline Type Model Why
Hero images, final output Seedream 5 Best first-pass quality
Bulk generation, variants Nano Banana 2 Fastest, cheapest
Revisions, refinements Nano Banana Pro Best image-to-image editing
Prototyping, iteration Nano Banana 2 Speed > perfection in early stages

Cost at Scale

Volume Nano Banana 2 Seedream 5 Manual Design
100 images ~$0.50 ~$1.50 $500-1,000
1,000 images ~$5 ~$15 $5,000-10,000
10,000 images ~$50 ~$150 $50,000+
100,000 images ~$500 ~$1,500 Not practical

Last updated: May 2026.