Limiting concurrency with Python Coroutines
In a previous note, I discussed running coroutines in a non-blocking manner using gather
.
This approach works well when you have a known number of coroutines that you want to run in a non-blocking manner.
However, if you have tens, hundreds, or more tasks, especially when network calls are involved, it can be important to limit concurrency.
We can use a semaphore to limit the number of coroutines that are running at once by blocking until other coroutines have finished executing.
import asyncio
async def constrained_execution(funcs, max_runs): jobs = [] semaphore = asyncio.Semaphore(max_runs)
async def job(fn): async with semaphore: return await fn()
for func in funcs: jobs.append(asyncio.ensure_future(job(func))) return await asyncio.gather(*jobs)
async def process(item): print(f"Processing item {item}") await asyncio.sleep(1) return item
async def main(): fns = [] for item in range(100): fns.append(lambda item=item: process(item)) results = await constrained_execution(fns, 10) print(results)
if __name__ == "__main__": asyncio.run(main())
This approach is especially useful when you want to improve performance by making network requests using coroutines, but don’t want to exceed a specific number of requests in parallel, due to issues like rate limiting or API availability.
import aiohttpimport json
async def fetch_post_title(session, id_): url = f"https://jsonplaceholder.typicode.com/posts?id={id_}" async with session.get(url) as response: response = json.loads(await response.text()) return response[0]["title"]
async def main():
ids = range(1, 101) async with aiohttp.ClientSession() as session: fns = [ lambda id_=id_: fetch_post_title(session, id_) for id_ in ids ] results = await constrained_execution(fns, 5) print(len(results)) # 100 print(results) # [ '...', '...', ... ]
if __name__ == "__main__": asyncio.run(main())
A function like constrained_execution
can be useful for scripting tasks you encounter in your day to day.
Recommended
Using open-interpreter to create a DIY audiobook with say
I used open-interpreter to read an epub file and create a DIY audio book.
Installing Python Packages with Nix
I've been meaning to try out Simon's llm package for a while now. From reading the docs and following the development, it's a modular,...
Language Model Streaming With SSE
OpenAI popularized a pattern of streaming results from a backend API in realtime with ChatGPT. This approach is useful because the time a language...