-
-
Notifications
You must be signed in to change notification settings - Fork 34k
Description
Bug report
Bug description:
We have asyncio Python servers we are trying to run under free-threading that create event loops. Under, say, 800 QPS we noticed performance regressions relative to the GIL build and, upon tracing, most of the regression is in the cancel_all_tasks call when it calls the C implementation to get all tasks for the loop in question. My LLM suggested that the performance bottleneck is in the _PyEval_StopTheWorld call required. It suggested an alternative implementation using per-thread task buckets to avoid needing to stop the world.
It created the following repro script to show the performance issue. With its suggested changes it creates 10X loops and increases churn rate 83X under free-threading (changes would be ifdef-guarded to avoid a 10X regression in loop churn rate under standard build).
I still need to validate the changes it proposed but, without doing something, we would have to avoid asyncio + free-threading because of this.
import asyncio
import threading
import time
import argparse
import sys
# Configuration
NUM_CHURN_THREADS = 20 # Number of threads creating/destroying loops
RUN_DURATION = 10 # Seconds to run
TARGET_LOOPS_PER_SEC = 800 # Total target loops per second across all threads
stop_event = threading.Event()
def churn_worker(worker_id):
"""Continuously creates and destroys event loops."""
count = 0
while not stop_event.is_set():
# asyncio.run creates a new event loop, runs the coroutine, and closes it.
asyncio.run(asyncio.sleep(0))
count += 1
# Optional: slight delay to throttle if needed, but we want max contention for this demo
# time.sleep(0.001)
return count
async def monitor_coro():
"""Calls all_tasks continuously."""
count = 0
start_time = time.time()
while not stop_event.is_set():
# This triggers the scan of all tasks in all threads
tasks = asyncio.all_tasks()
count += 1
# Yield to allow other things to happen on this loop, though we mostly care about the scan cost
await asyncio.sleep(0)
duration = time.time() - start_time
return count, duration
def monitor_worker(results_list):
"""Runs a persistent loop that calls all_tasks."""
try:
count, duration = asyncio.run(monitor_coro())
results_list.append((count, duration))
except Exception as e:
print(f"Monitor failed: {e}")
def main():
parser = argparse.ArgumentParser(description="Asyncio Performance Demo")
parser.add_argument("--threads", type=int, default=NUM_CHURN_THREADS, help="Number of churn threads")
parser.add_argument("--duration", type=int, default=RUN_DURATION, help="Duration in seconds")
args = parser.parse_args()
print(f"Starting generic asyncio benchmark on {sys.version}...")
print(f"Configuration: {args.threads} churn threads, {args.duration}s duration.")
threads = []
# Start churn threads
# We use a ThreadPoolExecutor or just raw threads. Raw threads are fine.
# To measure iterations, we can use a mutable list or class
churn_counts = [0] * args.threads
def wrapped_churn(idx):
churn_counts[idx] = churn_worker(idx)
for i in range(args.threads):
t = threading.Thread(target=wrapped_churn, args=(i,))
t.start()
threads.append(t)
# Start monitor thread
monitor_results = []
monitor_thread = threading.Thread(target=monitor_worker, args=(monitor_results,))
monitor_thread.start()
threads.append(monitor_thread)
# Run for specified duration
try:
time.sleep(args.duration)
except KeyboardInterrupt:
pass
finally:
stop_event.set()
# Join all
for t in threads:
t.join()
# Aggregate results
total_loops = sum(churn_counts)
loops_per_sec = total_loops / args.duration
monitor_calls, mon_duration =0, 1
if monitor_results:
monitor_calls, mon_duration = monitor_results[0]
all_tasks_per_sec = monitor_calls / mon_duration if mon_duration > 0 else 0
print("-" * 40)
print(f"Results:")
print(f" Total Event Loops Created: {total_loops}")
print(f" Loop Churn Rate: {loops_per_sec:.2f} loops/sec")
print(f" all_tasks() Calls: {monitor_calls}")
print(f" all_tasks() Rate: {all_tasks_per_sec:.2f} calls/sec")
print("-" * 40)
print("Interpretation:")
print(" Higher 'Loop Churn Rate' indicates less blocking during thread/task destruction.")
print(" Higher 'all_tasks() Rate' indicates faster scanning of tasks.")
print(" On unoptimized Python (with StopTheWorld), these numbers should be significantly lower")
print(" concurrently due to the global pause.")
if __name__ == "__main__":
main()CPython versions tested on:
3.14
Operating systems tested on:
Linux
Metadata
Metadata
Assignees
Labels
Projects
Status