Workers e tarefas¶
Guia de decisão: Tarefas (@task) vs stream workers (@worker).
Abaixo: Parte 1 = filas de jobs strider.tasks (strider worker / strider tasks). Parte 2 = consumo de tópicos strider.messaging (strider runworker / strider workers).
Parte 1 — Tarefas (strider.tasks)¶
Jobs com @task, tópicos tasks.<fila> no Kafka, processo strider worker, listagem strider tasks.
Fluxo (tarefas em background)¶
flowchart LR
subgraph "Producer"
API[API Request]
PUB[Publish Task]
end
subgraph "Queue"
TOPIC[Kafka Topic]
end
subgraph "Worker"
POLL[Poll Messages]
PROC[Process]
RETRY{Retry?}
DLQ[Dead Letter Queue]
end
API --> PUB --> TOPIC
TOPIC --> POLL --> PROC
PROC --> |error| RETRY
RETRY --> |yes| TOPIC
RETRY --> |max retries| DLQ
PROC --> |success| COMMIT[Commit Offset]
style TOPIC fill:#fff3e0
style DLQ fill:#ffcdd2
style COMMIT fill:#c8e6c9
Estratégia de Retry¶
flowchart TB
subgraph "Retry com Backoff"
E1[Erro 1] --> |wait 1s| R1[Retry 1]
R1 --> |erro| E2[Erro 2]
E2 --> |wait 2s| R2[Retry 2]
R2 --> |erro| E3[Erro 3]
E3 --> |wait 4s| R3[Retry 3]
R3 --> |erro| DLQ[Dead Letter Queue]
R1 --> |sucesso| OK1[✓]
R2 --> |sucesso| OK2[✓]
R3 --> |sucesso| OK3[✓]
end
style DLQ fill:#ffcdd2
style OK1 fill:#c8e6c9
style OK2 fill:#c8e6c9
style OK3 fill:#c8e6c9
Configuração Plug-and-Play¶
# src/settings.py
class AppSettings(Settings):
# Tasks - auto-configurado quando task_enabled=True
task_enabled: bool = True
task_default_queue: str = "default"
task_default_retry: int = 3
task_default_retry_delay: int = 60
task_retry_backoff: bool = True
task_default_timeout: int = 300
task_worker_concurrency: int = 4
task_result_backend: str = "redis" # none, redis, database
# Kafka (necessário para workers)
kafka_enabled: bool = True
kafka_bootstrap_servers: str = "localhost:9092"
Zero configuração explícita: Você NÃO precisa chamar configure_tasks(). Basta definir task_enabled=True.
Settings de Tasks¶
| Setting | Tipo | Default | Descrição |
|---|---|---|---|
task_enabled |
bool |
False |
Habilita Tasks |
task_default_queue |
str |
"default" |
Fila padrão |
task_default_retry |
int |
3 |
Número de retries |
task_default_retry_delay |
int |
60 |
Delay entre retries (segundos) |
task_retry_backoff |
bool |
True |
Usar backoff exponencial |
task_default_timeout |
int |
300 |
Timeout de task (segundos) |
task_worker_concurrency |
int |
4 |
Tarefas concorrentes por worker |
task_result_backend |
Literal |
"none" |
Backend: none, redis, database |
Definir uma tarefa (@task — não confundir com @worker)¶
# src/tasks.py (ou módulo apontado por tasks_module / auto-discovery)
from strider.tasks import task
@task(queue="default", retry=3)
async def send_digest(user_id: int) -> None:
"""Job enfileirado com await send_digest.delay(user_id=1)."""
...
- Subir o processo que consome filas:
strider worker(opção-q/--queue). - Ver o que foi registrado:
strider tasks. - Agendamento com cron/intervalo:
@periodic_task+strider scheduler.
Registo validado na importação: TaskRegistrationError (nome duplicado entre @task e @periodic_task, fila inválida, timeout < 1, etc.).
Parte 2 — Stream workers (strider.messaging)¶
Consumidores de tópico (pipeline de eventos). Processo strider runworker <Nome>. Listagem strider workers.
Decorator @worker e classe Worker partilham WorkerConfig e a mesma validação na importação (WorkerRegistrationError se algo estiver inconsistente). O prefixo de tópico tasks. é rejeitado aqui (reservado ao sistema de filas strider.tasks em Kafka).
Decorator @worker (função)¶
from strider.messaging import worker
@worker(
topic="billing-events",
group_id="billing-processor",
)
async def process_billing_event(message: dict) -> dict:
"""Processa um evento do stream (não é @task de strider.tasks)."""
result = await do_work(message["data"])
return {"status": "completed", "result": result}
Classe Worker¶
from strider.messaging import Worker
class EmailWorker(Worker):
input_topic = "emails"
group_id = "email-sender"
concurrency = 4
max_retries = 3
async def process(self, message: dict) -> dict:
"""Processa uma mensagem."""
await send_email(
to=message["to"],
subject=message["subject"],
body=message["body"],
)
return {"sent": True}
async def on_error(self, message: dict, error: Exception):
"""Chamado em erro de processamento."""
logger.error(f"Falha ao enviar email: {error}")
async def on_success(self, message: dict, result):
"""Chamado em sucesso."""
logger.info(f"Email enviado: {result}")
Opções da classe Worker¶
class MyWorker(Worker):
input_topic = "orders-stream"
output_topic = "results" # Opcional: publica resultados
group_id = "my-worker"
# Concorrência
concurrency = 4 # Processamento paralelo
# Retry
max_retries = 3
retry_backoff = "exponential" # "linear", "fixed"
# Batching
batch_size = 10 # Processa N mensagens de uma vez
batch_timeout = 5.0 # Tempo máximo para batch (segundos)
# Dead Letter Queue
dlq_topic = "tasks-dlq" # Mensagens com falha vão aqui
Processamento em batch¶
class BatchWorker(Worker):
input_topic = "events"
batch_size = 100
batch_timeout = 10.0
async def process_batch(self, messages: list[dict]) -> list:
"""Processa múltiplas mensagens de uma vez."""
results = []
for msg in messages:
result = await process_single(msg)
results.append(result)
return results
Política de retry (stream)¶
from strider.messaging import worker
@worker(
topic="orders-stream",
max_retries=5,
retry_backoff="exponential",
dlq_topic="orders-stream-dlq",
)
async def process_order_stream(message: dict):
# Se falhar, vai fazer retry com backoff exponencial
# Após max_retries, mensagem vai para dlq_topic
...
Cálculo de backoff:
- "fixed": Sempre initial_delay segundos
- "linear": initial_delay * attempt segundos
- "exponential": initial_delay * (2 ** attempt) segundos
Output topic¶
@worker(
topic="orders",
output_topic="order-results",
group_id="order-processor",
)
async def process_order(message: dict) -> dict:
# Valor de retorno é publicado no output_topic
return {"order_id": message["id"], "status": "processed"}
Executar stream workers (CLI)¶
# Um worker registrado (nome da classe ou do handler)
strider runworker EmailWorker
# Todos os workers de mensageria
strider runworker all
Opções de concorrência ficam na classe Worker (concurrency, etc.), não no comando acima.
Publicar no tópico (messaging)¶
from strider.messaging import get_producer
producer = get_producer("kafka")
await producer.send(
topic="orders-stream",
message={"type": "process", "data": {...}},
key="event-123",
)
Registry (Python)¶
from strider.messaging import (
get_worker,
get_all_workers,
list_workers,
run_worker,
run_all_workers,
)
# Obter config do worker
config = get_worker("EmailWorker")
# Listar todos os workers
names = list_workers()
# Executar programaticamente
await run_worker(EmailWorker)
await run_all_workers()
Tratamento de erros (stream)¶
class MyWorker(Worker):
input_topic = "orders-stream"
dlq_topic = "orders-stream-dlq"
async def process(self, message: dict):
try:
return await do_work(message)
except TemporaryError:
# Re-raise para disparar retry
raise
except PermanentError as e:
# Log e não faz retry
logger.error(f"Erro permanente: {e}")
return {"error": str(e)}
async def on_error(self, message: dict, error: Exception):
# Chamado após todos os retries esgotados
# Mensagem será enviada para dlq_topic
await notify_admin(error)
Graceful shutdown¶
Stream workers tratam SIGTERM/SIGINT:
- Para de aceitar novas mensagens
- Finaliza processamento do batch atual
- Commit dos offsets
- Sai limpo
Monitoramento¶
class MyWorker(Worker):
async def on_success(self, message: dict, result):
metrics.increment("worker.success")
async def on_error(self, message: dict, error: Exception):
metrics.increment("worker.error")
Operations Center¶
O admin panel inclui monitoramento de workers e tarefas:
class AppSettings(Settings):
ops_enabled: bool = True
ops_task_persist: bool = True
ops_task_retention_days: int = 30
ops_worker_heartbeat_interval: int = 30
ops_worker_offline_ttl: int = 24
Exemplo completo (stream)¶
# src/workers/email.py
from strider.messaging import Worker
from src.services.email import EmailService
class EmailWorker(Worker):
input_topic = "emails"
group_id = "email-sender"
concurrency = 4
max_retries = 3
retry_backoff = "exponential"
dlq_topic = "emails-dlq"
def __init__(self):
self.email_service = EmailService()
async def process(self, message: dict) -> dict:
await self.email_service.send(
to=message["to"],
subject=message["subject"],
template=message["template"],
context=message.get("context", {}),
)
return {"sent": True, "to": message["to"]}
async def on_error(self, message: dict, error: Exception):
logger.error(f"Email falhou: {message['to']} - {error}")
async def on_success(self, message: dict, result):
logger.info(f"Email enviado: {result['to']}")
strider runworker EmailWorker
Próximos passos¶
- Tarefas vs stream workers — qual comando usar
- Messaging — integração Kafka/Redis
- Settings — configurações