Add to your .env file:
REDIS_URL=redis://localhost:6379/0Create tasks in app/tasks/ directory:
from app.taskiq import get_taskiq_broker
tskq_broker = get_taskiq_broker()
@tskq_broker.task(task_name="send_email")
async def send_email(data: dict | None = None) -> dict:
"""Send email task."""
data = data or {}
email = data.get("email", "user@example.com")
# Your task logic here
print(f"[TASK] Sending email to {email}")
return {
"status": "success",
"email": email,
"message": "Email sent successfully"
}from app.tasks.my_task import send_email
from app.lib.tskq.task_invoker import invoke_task
# Execute immediately
await invoke_task(
send_email,
data={"email": "user@example.com"}
)# Execute after 60 seconds delay
await invoke_task(
send_email,
data={"email": "user@example.com"},
delay=60
)Create scheduled tasks in app/tasks/ directory using the schedule parameter:
from app.taskiq import get_taskiq_broker
tskq_broker = get_taskiq_broker()
@tskq_broker.task(
task_name="task-schedule-test",
schedule=[{"cron": "*/1 * * * *"}], # every 1 minute
)
async def task_schedule_test() -> str:
"""My schedule task."""
message = "schedule task is running every 1 minute"
print(f"[TASK EXECUTION] {message}")
return messageNote: These tasks will automatically run on schedule when the scheduler is started.
bash scripts/taskiq-worker.shbash scripts/taskiq-scheduler.shNote: Both worker and scheduler must be running for scheduled tasks to execute.