Optimizar FastAPI de forma rápida y efectiva

Tiempo de lectura: 3 minutos

Estas 4 optimizaciones mejoran el rendimiento, la concurrencia y la velocidad de respuesta de una API FastAPI en producción con Docker.

Improve -pexels

Gunicorn + UvicornWorker

¿Qué es?

Gunicorn actúa como process manager por encima de Uvicorn. Lanza múltiples workers (procesos independientes) para que la API pueda atender varias peticiones en paralelo, no de forma secuencial.

Impacto principal: Mejor concurrencia bajo carga. Con 4 workers puedes atender 4 requests simultáneos más los async dentro de cada uno.

requirements.txt

gunicorn==23.0.0

docker-compose.yml

command: gunicorn main:app \
         -w 4 \
         -k uvicorn.workers.UvicornWorker \
         --bind 0.0.0.0:8000 \
         --access-logfile - \
         --log-level info

Entornos: desarrollo vs producción

docker-compose.yml — producción

command: gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000

docker-compose.override.yml — desarrollo (se aplica automáticamente)

services:
  quierolibros_back_api:
    command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
# Desarrollo (aplica override automáticamente)
docker compose up

# Producción (ignora el override)
docker compose -f docker-compose.yml up

Fix de logs duplicados

Con múltiples workers los logs se repiten N veces. Para limpiarlos, añade esto al inicio de main.py:

import logging

logging.getLogger('uvicorn.access').propagate = False
logging.getLogger('uvicorn.error').propagate = False

Nota: Los logs multiplicados por worker son normales — cada proceso es independiente. El fix solo afecta al formato visual.

Pool de conexiones a la base de datos

¿Qué es?

En lugar de abrir y cerrar una conexión nueva por cada query, el pool mantiene un conjunto de conexiones abiertas y reutilizables. Esto elimina la latencia de establecer la conexión TCP+autenticación en cada petición.

Impacto principal: Reducción de latencia en queries. Con 4 workers y pool_size=20, la BD puede recibir hasta 240 conexiones simultáneas bajo pico.

Código

from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    SQLALCHEMY_DATABASE_URL_ASYNC,
    pool_pre_ping=True,    # detecta conexiones caídas
    pool_recycle=300,      # recicla conexiones viejas (5 min)
    pool_size=20,          # conexiones permanentes en el pool
    max_overflow=40,       # extra bajo pico, luego se cierran
    pool_timeout=30,       # segundos esperando conexión libre
)

Parámetros explicados

ParámetroDefaultRecomendadoPara qué sirve
pool_size520Conexiones siempre abiertas
max_overflow1040Extra bajo pico, luego se cierran
pool_timeout3030Evita espera infinita
pool_recycle300 Recicla conexiones viejas
pool_pre_pingTrue Detecta conexiones caídas

Configurar MariaDB para aguantar las conexiones

Con 4 workers × (20 + 40) = 240 conexiones máximas. Verifica el límite actual:

-- Verificar límite actual
SHOW VARIABLES LIKE 'max_connections';

Si es menor a 240, súbelo en el docker-compose.yml del servicio MariaDB:

mariadb_quierolibros_back:
  command: --max-connections=300

orjson — Serialización JSON rápida

¿Qué es?

orjson es una librería que reemplaza el serializador JSON nativo de Python. Está escrita en Rust, lo que la hace hasta 3-5x más rápida. La diferencia se nota especialmente al devolver listas grandes de libros o artículos.

Impacto principal: 3-5x más rápido serializando JSON. Especialmente notable en endpoints que devuelven listas grandes de registros.

Instalación

Añade a requirements.txt:

orjson==3.10.18

Configuración en main.py

from fastapi import FastAPI
from fastapi.responses import ORJSONResponse

app = FastAPI(
title='API V2',
description='API REST',
default_response_class=ORJSONResponse, # ← una sola línea
)

Nota: Un solo cambio en main.py afecta a todos los endpoints automáticamente. No hay que modificar ningún endpoint individual.

Async/await y llamadas paralelas

Llamadas HTTP externas correctas

Usa httpx de forma async para no bloquear el event loop mientras esperas respuesta de APIs externas:

Mal — bloquea mientras espera respuesta

import requests

@app.get('/data')
def get_data():
    r = requests.get('https://api.externa.com')
    return r.json()

Bien — no bloquea

import httpx

@app.get('/data')
async def get_data():
    async with httpx.AsyncClient() as client:
        r = await client.get('https://api.externa.com')
        return r.json()

Llamadas paralelas con asyncio.gather

Si un endpoint hace varias llamadas externas seguidas, ejecútalas en paralelo en lugar de una por una:

Secuencial — 600ms total

r1 = await client.get('https://api1.com')   # 300ms
r2 = await client.get('https://api2.com')   # 300ms
# Total: 600ms

Paralelo — ~300ms total

import asyncio

r1, r2 = await asyncio.gather(
    client.get('https://api1.com'),
    client.get('https://api2.com'),
)
# Total: ~300ms

Deja un comentario