Geração Programática de Imagens: Como Construir um Pipeline de Lote Escalável

Construa um pipeline de geração programática de imagens pronto para produção. Aborda engenharia de prompts em escala, processamento em lote, lógica de retentativa, geração assíncrona, integração S3/CMS e análise de custos. Inclui exemplo completo em Python.

by AnyCap

Pipeline de geração programática de imagens com camadas de Generation, Orchestration e Integration conectadas por linhas de fluxo de dados brilhantes

A maioria dos tutoriais de geração de imagens para em uma única imagem. Mostram um comando curl, um resultado bonito e dão por encerrado. Serve para "gerar foto de gato". É inútil quando você precisa de 500 imagens para um projeto real.

Geração programática de imagens — gerar imagens em escala, a partir de código, sem intervenção humana — é uma habilidade diferente. Este guia cobre o pipeline completo: engenharia de prompts em escala, processamento em lote, tratamento de erros, processamento assíncrono, gestão de outputs e integração em sistemas de produção.


As Três Camadas de um Pipeline de Imagens em Produção

Todo pipeline de imagens em produção tem três camadas:

Camada O que faz Ferramentas
Generation Transforma prompts em imagens AnyCap CLI, APIs REST
Orchestration Gerencia lotes, retentativas, concorrência Scripts Python, sistemas de filas
Integration Conecta ao seu app, CMS, armazenamento Webhooks, S3, APIs de CMS

A maioria dos desenvolvedores só pensa na Camada 1. Mas são as Camadas 2 e 3 que determinam o sucesso ou fracasso do pipeline.


Camada 1: Engenharia de Prompt em Escala

Quando você gera uma imagem, pode criar o prompt perfeito com todo cuidado. Quando gera 500, precisa de um sistema de prompts.

A Abordagem por Templates

# prompts.py — Templates de prompts centralizados
from dataclasses import dataclass
from typing import Optional

@dataclass
class ImageJob:
    template: str
    params: dict
    output_path: str
    model: str = "nano-banana-2"
    
PROMPT_TEMPLATES = {
    "product_hero": "Foto de produto e-commerce: {product_name}, {color}, iluminação de estúdio, fundo branco, 1024x1024, fotografia comercial",
    "blog_hero": "Ilustração de cabeçalho de blog: {topic}, estilo {style}, clima {mood}, 1200x630, editorial",
    "social_post": "Visual para redes sociais: {subject}, formato {platform}, estética {vibe}, {dimensions}",
}

def build_prompt(template_key: str, **params) -> str:
    return PROMPT_TEMPLATES[template_key].format(**params)

O Padrão de Escalabilidade

# Gerar 100 fotos de produto a partir de um CSV
import csv, subprocess, json
from concurrent.futures import ThreadPoolExecutor, as_completed

def generate_single(job: ImageJob) -> dict:
    prompt = build_prompt(job.template, **job.params)
    
    result = subprocess.run([
        "anycap", "image", "generate",
        "--prompt", prompt,
        "--model", job.model,
        "--output-format", "json",
        "-o", job.output_path
    ], capture_output=True, text=True)
    
    return {
        "output_path": job.output_path,
        "success": result.returncode == 0,
        "data": json.loads(result.stdout) if result.returncode == 0 else None,
        "error": result.stderr if result.returncode != 0 else None
    }

# Construir lista de jobs a partir dos dados
jobs = []
with open("products.csv") as f:
    for row in csv.DictReader(f):
        jobs.append(ImageJob(
            template="product_hero",
            params={"product_name": row["name"], "color": row["color"]},
            output_path=f"output/{row['sku']}.png"
        ))

# Executar com controle de concorrência
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(generate_single, job): job for job in jobs}
    for future in as_completed(futures):
        result = future.result()
        status = "✅" if result["success"] else "❌"
        print(f"{status} {result['output_path']}")

Camada 2: Orchestration — A Parte Que Todo Mundo Esquece

Gerar é fácil. Tornar o processo confiável em escala é a verdadeira engenharia.

Padrão 1: Processamento Assíncrono em Lote

Para lotes grandes (100+ imagens), use o modo assíncrono para evitar bloqueios:

# Enviar job em lote
anycap image generate \
  --prompt "$(python build-prompts.py --csv products.csv)" \
  --model nano-banana-2 \
  --async \
  --batch-size 20 \
  --webhook "https://seu-app.com/webhooks/images" \
  -o output/products/

Seu webhook recebe os resultados conforme ficam prontos. Sem polling. Sem problemas de timeout.

Padrão 2: Retentativa com Backoff Exponencial

import time, random

def generate_with_retry(job: ImageJob, max_retries: int = 3) -> dict:
    for attempt in range(max_retries):
        result = generate_single(job)
        if result["success"]:
            return result
        
        if attempt < max_retries - 1:
            wait = (2 ** attempt) + random.uniform(0, 1)
            print(f"Retentativa {attempt + 1}/{max_retries} para {job.output_path} em {wait:.1f}s")
            time.sleep(wait)
    
    return result  # Retornar a última falha

Padrão 3: Arquitetura Baseada em Filas

Para sistemas de produção, use uma fila adequada:

# Fila de jobs simples baseada em Redis
import redis, json

r = redis.Redis()

def enqueue_job(job: ImageJob):
    r.lpush("image_jobs", json.dumps({
        "template": job.template,
        "params": job.params,
        "output_path": job.output_path,
        "model": job.model,
    }))

def worker_loop():
    while True:
        _, job_data = r.brpop("image_jobs")
        job = json.loads(job_data)
        result = generate_single(ImageJob(**job))
        
        if result["success"]:
            r.lpush("image_results", json.dumps(result))
        else:
            r.lpush("image_failures", json.dumps(result))

Camada 3: Integration — Levando as Imagens ao Destino

Upload para S3

import boto3

s3 = boto3.client("s3")

def upload_to_s3(local_path: str, bucket: str, key: str) -> str:
    s3.upload_file(local_path, bucket, key, ExtraArgs={
        "ContentType": "image/png",
        "CacheControl": "public, max-age=31536000",
    })
    return f"https://{bucket}.s3.amazonaws.com/{key}"

Publicar no CMS

import requests

def update_cms_product_image(sku: str, image_url: str):
    requests.patch(
        f"https://cms.example.com/api/products/{sku}",
        headers={"Authorization": "Bearer $CMS_TOKEN"},
        json={"image_url": image_url}
    )

Notificar a Equipe

def notify_slack(message: str):
    requests.post(
        "https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
        json={"text": message}
    )

O Script de Pipeline Completo

#!/usr/bin/env python3
"""production-pipeline.py — Pipeline completo de geração de imagens"""

import csv, subprocess, json, time, random, sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import boto3, requests

# --- Configuração ---
S3_BUCKET = "my-assets"
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK"
MAX_WORKERS = 4
MAX_RETRIES = 3

PROMPTS = {
    "product": "Foto e-commerce: {name}, {color}, estúdio, fundo branco, 1024x1024",
    "lifestyle": "Foto lifestyle: {name}, {color}, {scene}, luz natural, 1024x1024",
}

@dataclass
class Job:
    template: str
    params: dict
    output: str
    model: str = "nano-banana-2"

def generate(job: Job) -> dict:
    prompt = PROMPTS[job.template].format(**job.params)
    for attempt in range(MAX_RETRIES):
        result = subprocess.run([
            "anycap", "image", "generate",
            "--prompt", prompt, "--model", job.model,
            "--output-format", "json", "-o", job.output
        ], capture_output=True, text=True)
        if result.returncode == 0:
            data = json.loads(result.stdout)
            return {"path": job.output, "url": data.get("image_url"), "success": True}
        if attempt < MAX_RETRIES - 1:
            time.sleep((2 ** attempt) + random.uniform(0, 1))
    return {"path": job.output, "success": False, "error": result.stderr}

def upload(path: str) -> str:
    key = path.replace("output/", "")
    s3 = boto3.client("s3")
    s3.upload_file(path, S3_BUCKET, key, ExtraArgs={"ContentType": "image/png"})
    return f"https://{S3_BUCKET}.s3.amazonaws.com/{key}"

def notify(text: str):
    requests.post(SLACK_WEBHOOK, json={"text": text})

def run_pipeline(csv_path: str):
    jobs = []
    with open(csv_path) as f:
        for row in csv.DictReader(f):
            jobs.append(Job("product", {"name": row["name"], "color": row["color"]}, f"output/{row['sku']}.png"))
    
    notify(f"🚀 Pipeline iniciado: {len(jobs)} imagens")
    
    results = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(generate, job): job for job in jobs}
        for future in as_completed(futures):
            result = future.result()
            if result["success"]:
                result["s3_url"] = upload(result["path"])
                results.append(result)
    
    success = len(results)
    failed = len(jobs) - success
    notify(f"{'✅' if failed == 0 else '⚠️'} Pipeline concluído: {success}/{len(jobs)} imagens. {failed} falhas.")
    return results

if __name__ == "__main__":
    run_pipeline(sys.argv[1])

Escolhendo o Modelo Certo para o Seu Pipeline

Tipo de Pipeline Modelo Por quê
Imagens hero, output final Seedream 5 Melhor qualidade na primeira passagem
Geração em massa, variantes Nano Banana 2 Mais rápido e barato
Revisões, refinamentos Nano Banana Pro Melhor edição image-to-image
Prototipagem, iteração Nano Banana 2 Velocidade > perfeição nas fases iniciais

Custos em Escala

Volume Nano Banana 2 Seedream 5 Design Manual
100 imagens ~$0.50 ~$1.50 $500-1.000
1.000 imagens ~$5 ~$15 $5.000-10.000
10.000 imagens ~$50 ~$150 $50.000+
100.000 imagens ~$500 ~$1.500 Impraticável

Última atualização: maio de 2026.