Database Replicas¶
Separação read/write para escalabilidade de banco de dados.
Arquitetura¶
flowchart LR
subgraph App["Aplicação"]
VS[ViewSet]
end
subgraph "Database Layer"
VS --> |SELECT| READ[(Replica<br/>Read-only)]
VS --> |INSERT/UPDATE/DELETE| WRITE[(Primary<br/>Read-Write)]
WRITE --> |Replicação| READ
end
style READ fill:#e3f2fd
style WRITE fill:#fff3e0
Quando Usar¶
flowchart TB
subgraph "Usar Replica (db.read)"
R1[Queries de listagem]
R2[Busca/Filtro]
R3[Relatórios]
R4[Analytics]
end
subgraph "Usar Primary (db.write)"
W1[Criar registros]
W2[Atualizar registros]
W3[Deletar registros]
W4[Transações]
end
style R1 fill:#e3f2fd
style R2 fill:#e3f2fd
style R3 fill:#e3f2fd
style R4 fill:#e3f2fd
style W1 fill:#fff3e0
style W2 fill:#fff3e0
style W3 fill:#fff3e0
style W4 fill:#fff3e0
Configuração via Settings¶
# src/settings.py
class AppSettings(Settings):
# Primary (write)
database_url: str = "postgresql+asyncpg://user:pass@primary:5432/db"
# Replica (read)
database_read_url: str = "postgresql+asyncpg://user:pass@replica:5432/db"
# Pool da replica (opcional, default: 2x do write)
database_read_pool_size: int = 10
database_read_max_overflow: int = 20
Ou via ambiente:
# .env
DATABASE_URL=postgresql+asyncpg://user:pass@primary:5432/db
DATABASE_READ_URL=postgresql+asyncpg://user:pass@replica:5432/db
Settings de Replicas¶
| Setting | Tipo | Default | Descrição |
|---|---|---|---|
database_read_url |
str \| None |
None |
URL do banco de leitura (replica) |
database_read_pool_size |
int \| None |
None |
Pool size para replica (default: 2x write) |
database_read_max_overflow |
int \| None |
None |
Max overflow para replica (default: 2x write) |
Citus e réplica de leitura¶
Em clusters Citus (overview), escritos e muitas leituras passam pelo coordinator. Separar DATABASE_READ_URL para outro host pode ou não fazer sentido conforme o provedor e o plano de roteamento. Veja Citus (Postgres distribuído).
Como Funciona¶
Quando database_read_url está configurado:
- Operações de escrita → Banco Primary
- Operações de leitura → Banco Replica
- Você escolhe qual usar via dependencies
Dependencies¶
get_db_replicas¶
Retorna DatabaseSession com propriedades .read e .write.
from strider.database import get_db_replicas, DatabaseSession
from fastapi import Depends
async def my_view(db: DatabaseSession = Depends(get_db_replicas)):
# Lê da replica
users = await User.objects.using(db.read).all()
# Escreve no primary
user = User(name="John")
await user.save(db.write)
get_write_db¶
Sessão somente escrita.
from strider.database import get_write_db
from sqlalchemy.ext.asyncio import AsyncSession
async def create_user(db: AsyncSession = Depends(get_write_db)):
user = User(name="John")
await user.save(db)
get_read_db¶
Sessão somente leitura.
from strider.database import get_read_db
async def list_users(db: AsyncSession = Depends(get_read_db)):
return await User.objects.using(db).all()
Type Aliases¶
from strider.database import DBSession, WriteSession, ReadSession
async def my_view(
db: DBSession, # DatabaseSession com .read/.write
write_db: WriteSession, # AsyncSession para escrita
read_db: ReadSession, # AsyncSession para leitura
):
pass
Uso com QuerySet¶
# Leitura explícita
users = await User.objects.using(db.read).filter(active=True).all()
# Escrita explícita
await User.objects.using(db.write).filter(id=1).update(name="Jane")
Configuração de Pool¶
# src/settings.py
class AppSettings(Settings):
# Pool de escrita
database_pool_size: int = 5
database_max_overflow: int = 10
database_pool_timeout: int = 30
database_pool_recycle: int = 3600
# Pool de leitura (default: 2x escrita)
database_read_pool_size: int | None = None # Default: 10
database_read_max_overflow: int | None = None # Default: 20
Verificar Status da Replica¶
from strider.database import is_replica_configured
if is_replica_configured():
print("Usando replica de leitura separada")
else:
print("Usando banco único")
Métodos de DatabaseSession¶
db: DatabaseSession = Depends(get_db_replicas)
# Propriedades
db.write # Sessão de escrita (primary)
db.read # Sessão de leitura (replica)
db.primary # Alias para write
db.replica # Alias para read
# Verificar se usa replica
db.is_using_replica() # True se read != write
# Controle de transação
await db.commit() # Commit da sessão de escrita
await db.rollback() # Rollback da sessão de escrita
await db.close() # Fecha ambas as sessões
Consistência Read-After-Write¶
Importante: Replicas podem ter lag de replicação.
async def create_and_read(db: DatabaseSession = Depends(get_db_replicas)):
# Cria no primary
user = User(name="John")
await user.save(db.write)
await db.commit()
# Lê do primary (não da replica) para consistência
user = await User.objects.using(db.write).get(id=user.id)
# Ou aguarda replicação
await asyncio.sleep(0.1)
user = await User.objects.using(db.read).get(id=user.id)
Comportamento de Fallback¶
Se database_read_url não está definido ou é igual a database_url:
db.readedb.writeretornam a mesma sessãois_replica_configured()retornaFalse- Nenhuma mudança de comportamento necessária no código
Integração com ViewSet¶
ViewSets usam get_db por padrão (sessão única). Para replicas:
from strider.database import get_db_replicas
class UserViewSet(ModelViewSet):
model = User
# Sobrescreve para usar replicas
async def get_queryset(self, db):
# db é de get_db, mas você pode usar replicas
return User.objects.using(db)
Ou configure globalmente:
# No setup do StrideApp
from strider.dependencies import set_session_factory
from strider.database import get_db_replicas
set_session_factory(get_db_replicas)
Inicialização¶
Replicas são auto-inicializadas pelo StrideApp quando database_read_url está definido.
Inicialização manual:
from strider.database import init_replicas
await init_replicas(
write_url="postgresql+asyncpg://...",
read_url="postgresql+asyncpg://...",
pool_size=5,
max_overflow=10,
)