Geração Programática de Imagens: Como Construir um Pipeline de Lote Escalável

Construa um pipeline de geração programática de imagens pronto para produção. Abrange engenharia de prompts em escala, processamento em lote, lógica de retentativa, geração assíncrona, integração S3/CMS e análise de custos. Inclui exemplo completo em Python.

by AnyCap

Pipeline de geração programática de imagens com camadas de Generation, Orchestration e Integration conectadas por linhas de fluxo de dados brilhantes

A maioria dos tutoriais de geração de imagens para numa única imagem. Mostram um comando curl, um resultado bonito e dão o trabalho por concluído. Serve para "gerar a foto de um gato". É inútil quando precisa de 500 imagens para um projeto real.

Geração programática de imagens — gerar imagens em escala, a partir de código, sem intervenção humana — é uma competência diferente. Este guia cobre o pipeline completo: engenharia de prompts em escala, processamento em lote, tratamento de erros, processamento assíncrono, gestão de outputs e integração em sistemas de produção.


As Três Camadas de um Pipeline de Imagens em Produção

Cada pipeline de imagens em produção tem três camadas:

Camada O que faz Ferramentas
Generation Transforma prompts em imagens AnyCap CLI, APIs REST
Orchestration Gere lotes, retentativas, concorrência Scripts Python, sistemas de filas
Integration Liga à sua app, CMS, armazenamento Webhooks, S3, APIs de CMS

A maioria dos programadores só pensa na Camada 1. Mas são as Camadas 2 e 3 que determinam o sucesso ou fracasso do pipeline.


Camada 1: Engenharia de Prompts em Escala

Quando gera uma imagem, pode criar o prompt perfeito com todo o cuidado. Quando gera 500, precisa de um sistema de prompts.

A Abordagem por Templates

# prompts.py — Templates de prompts centralizados
from dataclasses import dataclass
from typing import Optional

@dataclass
class ImageJob:
    template: str
    params: dict
    output_path: str
    model: str = "nano-banana-2"
    
PROMPT_TEMPLATES = {
    "product_hero": "Fotografia de produto e-commerce: {product_name}, {color}, iluminação de estúdio, fundo branco, 1024x1024, fotografia comercial",
    "blog_hero": "Ilustração de cabeçalho de blog: {topic}, estilo {style}, atmosfera {mood}, 1200x630, editorial",
    "social_post": "Visual para redes sociais: {subject}, formato {platform}, estética {vibe}, {dimensions}",
}

def build_prompt(template_key: str, **params) -> str:
    return PROMPT_TEMPLATES[template_key].format(**params)

O Padrão de Escalabilidade

# Gerar 100 fotos de produto a partir de um CSV
import csv, subprocess, json
from concurrent.futures import ThreadPoolExecutor, as_completed

def generate_single(job: ImageJob) -> dict:
    prompt = build_prompt(job.template, **job.params)
    
    result = subprocess.run([
        "anycap", "image", "generate",
        "--prompt", prompt,
        "--model", job.model,
        "--output-format", "json",
        "-o", job.output_path
    ], capture_output=True, text=True)
    
    return {
        "output_path": job.output_path,
        "success": result.returncode == 0,
        "data": json.loads(result.stdout) if result.returncode == 0 else None,
        "error": result.stderr if result.returncode != 0 else None
    }

# Construir lista de tarefas a partir dos dados
jobs = []
with open("products.csv") as f:
    for row in csv.DictReader(f):
        jobs.append(ImageJob(
            template="product_hero",
            params={"product_name": row["name"], "color": row["color"]},
            output_path=f"output/{row['sku']}.png"
        ))

# Executar com controlo de concorrência
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(generate_single, job): job for job in jobs}
    for future in as_completed(futures):
        result = future.result()
        status = "✅" if result["success"] else "❌"
        print(f"{status} {result['output_path']}")

Camada 2: Orchestration — A Parte Que Toda a Gente Esquece

Gerar é fácil. Tornar o processo fiável em escala é a verdadeira engenharia.

Padrão 1: Processamento Assíncrono em Lote

Para lotes grandes (100+ imagens), use o modo assíncrono para evitar bloqueios:

# Submeter tarefa em lote
anycap image generate \
  --prompt "$(python build-prompts.py --csv products.csv)" \
  --model nano-banana-2 \
  --async \
  --batch-size 20 \
  --webhook "https://a-sua-app.com/webhooks/images" \
  -o output/products/

O seu webhook recebe os resultados à medida que ficam prontos. Sem polling. Sem problemas de timeout.

Padrão 2: Retentativa com Backoff Exponencial

import time, random

def generate_with_retry(job: ImageJob, max_retries: int = 3) -> dict:
    for attempt in range(max_retries):
        result = generate_single(job)
        if result["success"]:
            return result
        
        if attempt < max_retries - 1:
            wait = (2 ** attempt) + random.uniform(0, 1)
            print(f"Retentativa {attempt + 1}/{max_retries} para {job.output_path} em {wait:.1f}s")
            time.sleep(wait)
    
    return result  # Devolver a última falha

Padrão 3: Arquitetura Baseada em Filas

Para sistemas de produção, use uma fila adequada:

# Fila de tarefas simples baseada em Redis
import redis, json

r = redis.Redis()

def enqueue_job(job: ImageJob):
    r.lpush("image_jobs", json.dumps({
        "template": job.template,
        "params": job.params,
        "output_path": job.output_path,
        "model": job.model,
    }))

def worker_loop():
    while True:
        _, job_data = r.brpop("image_jobs")
        job = json.loads(job_data)
        result = generate_single(ImageJob(**job))
        
        if result["success"]:
            r.lpush("image_results", json.dumps(result))
        else:
            r.lpush("image_failures", json.dumps(result))

Camada 3: Integration — Levar as Imagens ao Destino

Upload para S3

import boto3

s3 = boto3.client("s3")

def upload_to_s3(local_path: str, bucket: str, key: str) -> str:
    s3.upload_file(local_path, bucket, key, ExtraArgs={
        "ContentType": "image/png",
        "CacheControl": "public, max-age=31536000",
    })
    return f"https://{bucket}.s3.amazonaws.com/{key}"

Publicar no CMS

import requests

def update_cms_product_image(sku: str, image_url: str):
    requests.patch(
        f"https://cms.example.com/api/products/{sku}",
        headers={"Authorization": "Bearer $CMS_TOKEN"},
        json={"image_url": image_url}
    )

Notificar a Equipa

def notify_slack(message: str):
    requests.post(
        "https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
        json={"text": message}
    )

O Script de Pipeline Completo

#!/usr/bin/env python3
"""production-pipeline.py — Pipeline completo de geração de imagens"""

import csv, subprocess, json, time, random, sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import boto3, requests

# --- Configuração ---
S3_BUCKET = "my-assets"
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK"
MAX_WORKERS = 4
MAX_RETRIES = 3

PROMPTS = {
    "product": "Foto e-commerce: {name}, {color}, estúdio, fundo branco, 1024x1024",
    "lifestyle": "Foto lifestyle: {name}, {color}, {scene}, luz natural, 1024x1024",
}

@dataclass
class Job:
    template: str
    params: dict
    output: str
    model: str = "nano-banana-2"

def generate(job: Job) -> dict:
    prompt = PROMPTS[job.template].format(**job.params)
    for attempt in range(MAX_RETRIES):
        result = subprocess.run([
            "anycap", "image", "generate",
            "--prompt", prompt, "--model", job.model,
            "--output-format", "json", "-o", job.output
        ], capture_output=True, text=True)
        if result.returncode == 0:
            data = json.loads(result.stdout)
            return {"path": job.output, "url": data.get("image_url"), "success": True}
        if attempt < MAX_RETRIES - 1:
            time.sleep((2 ** attempt) + random.uniform(0, 1))
    return {"path": job.output, "success": False, "error": result.stderr}

def upload(path: str) -> str:
    key = path.replace("output/", "")
    s3 = boto3.client("s3")
    s3.upload_file(path, S3_BUCKET, key, ExtraArgs={"ContentType": "image/png"})
    return f"https://{S3_BUCKET}.s3.amazonaws.com/{key}"

def notify(text: str):
    requests.post(SLACK_WEBHOOK, json={"text": text})

def run_pipeline(csv_path: str):
    jobs = []
    with open(csv_path) as f:
        for row in csv.DictReader(f):
            jobs.append(Job("product", {"name": row["name"], "color": row["color"]}, f"output/{row['sku']}.png"))
    
    notify(f"🚀 Pipeline iniciado: {len(jobs)} imagens")
    
    results = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(generate, job): job for job in jobs}
        for future in as_completed(futures):
            result = future.result()
            if result["success"]:
                result["s3_url"] = upload(result["path"])
                results.append(result)
    
    success = len(results)
    failed = len(jobs) - success
    notify(f"{'✅' if failed == 0 else '⚠️'} Pipeline concluído: {success}/{len(jobs)} imagens. {failed} falhas.")
    return results

if __name__ == "__main__":
    run_pipeline(sys.argv[1])

Escolher o Modelo Certo para o Seu Pipeline

Tipo de Pipeline Modelo Porquê
Imagens hero, output final Seedream 5 Melhor qualidade à primeira passagem
Geração em massa, variantes Nano Banana 2 Mais rápido e barato
Revisões, aperfeiçoamentos Nano Banana Pro Melhor edição image-to-image
Prototipagem, iteração Nano Banana 2 Velocidade > perfeição nas fases iniciais

Custos em Escala

Volume Nano Banana 2 Seedream 5 Design Manual
100 imagens ~$0.50 ~$1.50 $500-1.000
1.000 imagens ~$5 ~$15 $5.000-10.000
10.000 imagens ~$50 ~$150 $50.000+
100.000 imagens ~$500 ~$1.500 Impraticável

Última atualização: maio de 2026.