Understanding Asynchronous Programming | ||||
two examples of synchronous programming
still takes one execution step at a time the difference is that the system may not wait for an execution step to be completed before moving on to the next one>br/> the program will move on to future execution steps even though a previous step hasn't yet finished and is still running elsewhere also means that the program knows what to do when a previous step does finish running Building a Synchronous Web Server
a web server's basic unit of work is, more or less, the same as batch processingthe server will get some input, process it, and create the output as a synchronous program this would create a working web server define 'working' one unit of work (input, process, output) is not the only purpose server should be able to handle hundreds or even thousands of units of work as quickly as possible several work units may arrive all at once a synchronous web server has little if any value Thinking Differently About Programming
the real world is almost entirely asynchronous, and so is how individuals interact with itan example is a parent balancing the checkbook, doing the laundry, and keeping an eye on the children all at the same time
|
||||
Programming Parents : Thought Experiments | ||||
Thought Experiment #1: The Synchronous Parent
can only do one task at a timesince watching the children is a high priority task, it will consume all the resources until the children are asleep in bed nothing else can be accomplished until that point Thought Experiment #2: The Polling Parent
multiple tasks can be completed simultaneouslypolling can be expensive let the parent poll the tasks every 15 minutes so every task receives attention there are issues
Thought Experiment #3: The Threading Parent
create virtual parents using threadscan't use different threads to monitor the washer and dryer each thread needs access to the other's applicance leading to a deadlock (perhaps one thread should handle the laundry) the checkbook is a shared resource if a child needs to go to urgent care, the thread will need the checkbook which is controlled by a different thread |
||||
Using Python Async Features in Practice | ||||
Synchronous Programming
the code belows is supposedly multithreadedthe while loop in the task() method lets the first thread empty the queue the second thread has nothing to import queue def task(name, work_queue): if work_queue.empty(): print(f"Task {name} nothing to do") else: while not work_queue.empty(): count = work_queue.get() total = 0 print(f"Task {name} running") for x in range(count): total += 1 print(f"Task {name} total: {total}") def main(): """ This is the main entry point for the program """ # Create the queue of work work_queue = queue.Queue() # Put some work in the queue for work in [15, 10, 5, 2]: work_queue.put(work) # Create some synchronous tasks tasks = [(task, "One", work_queue), (task, "Two", work_queue)] # Run the tasks for t, n, q in tasks: t(n, q) if __name__ == "__main__": main() Simple Cooperative Concurrency
below the code is essentially the same as before with the exception
of the yield statementthe yield statement turns task() into a generator called just like any other function when the yield statement is executed, control is returned to the caller of the function essentially a context switch as control moves from the generator function to the caller import queue def task(name, queue): while not queue.empty(): count = queue.get() total = 0 print(f"Task {name} running") for x in range(count): total += 1 yield print(f"Task {name} total: {total}") def main(): """ This is the main entry point for the program """ # Create the queue of work work_queue = queue.Queue() # Put some work in the queue for work in [15, 10, 5, 2]: work_queue.put(work) # Create some tasks tasks = [task("One", work_queue), task("Two", work_queue)] # Run the tasks done = False while not done: for t in tasks: try: next(t) except StopIteration: tasks.remove(t) if len(tasks) == 0: done = True if __name__ == "__main__": main() Cooperative Concurrency With Blocking Calls
below the timer code in task() acts as a blocking callimport time import queue from codetiming import Timer def task(name, queue): timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}") while not queue.empty(): delay = queue.get() print(f"Task {name} running") timer.start() time.sleep(delay) timer.stop() yield def main(): """ This is the main entry point for the program """ # Create the queue of work work_queue = queue.Queue() # Put some work in the queue for work in [15, 10, 5, 2]: work_queue.put(work) tasks = [task("One", work_queue), task("Two", work_queue)] # Run the tasks done = False with Timer(text="\nTotal elapsed time: {:.1f}"): while not done: for t in tasks: try: next(t) except StopIteration: tasks.remove(t) if len(tasks) == 0: done = True if __name__ == "__main__": main()with the addition of the delay there is no performance gain the delay stops the processing of the entire program while the CPU just waits for the IO delay to be over Task One running Task One elapsed time: 15.0 Task Two running Task Two elapsed time: 10.0 Task One running Task One elapsed time: 5.0 Task Two running Task Two elapsed time: 2.0 Total elapsed time: 32.0 Cooperative Concurrency With Non-Blocking Calls
the next version of the example makes use of Python async features using asyncio/await
the time and queue modules have been replaced with the asyncio packagegives the app access to asynchronous friendly (non-blocking) sleep and queue functionality the change to task() defines it as asynchronous with the addition of the async prefix this indicates to Python that the function will be asynchronous the time.sleep(delay) and yield statements have been removed they are replaced with await asyncio.sleep(delay) this creates a non-blocking delay that will perform a context switch back to the caller main() the while loop inside main() has been replaced a call to await asyncio.gather(...) replaces task_array the call tells asyncio
creates an event loop the loop will run main() which in turn will run the two instances of task() the event loop runs all the code, including main() the CPU is busy doing work when task code is executing a context switch occurs when the await keyword is reached control passes back to the event loop the event loop looks at all the tasks waiting for an event (in this case, an asyncio.sleep(delay) timeout) and passes control to a task with an event that's ready await asyncio.sleep(delay) is non-blocking WRT the CPU instead of waiting for the delay to timeout
the CPU can stay busy if work is available, while the event loop monitors the events which will happen in the future import asyncio from codetiming import Timer async def task(name, work_queue): timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}") while not work_queue.empty(): delay = await work_queue.get() print(f"Task {name} running") timer.start() await asyncio.sleep(delay) timer.stop() async def main(): """ This is the main entry point for the program """ # Create the queue of work work_queue = asyncio.Queue() # Put some work in the queue for work in [15, 10, 5, 2]: await work_queue.put(work) # Run the tasks with Timer(text="\nTotal elapsed time: {:.1f}"): await asyncio.gather( asyncio.create_task(task("One", work_queue)), asyncio.create_task(task("Two", work_queue)), ) if __name__ == "__main__": asyncio.run(main())the output Task One running Task Two running Task Two total elapsed time: 10.0 Task Two running Task One total elapsed time: 15.0 Task One running Task Two total elapsed time: 5.0 Task One total elapsed time: 2.0 Total elapsed time: 17.0the processing time still takes 32 seconds by using asyncio the app runs in 17 seconds instead of 32 seconds Synchronous (Blocking) HTTP Calls
the program has been modified to import the requests module to make the actual HTTP requests
$ python -m pip install requeststhe queue now contains a list of URLs task() no longer increments a counter requests gets the contents of a URL retrieved from the queue and prints how long it took to do so import queue import requests from codetiming import Timer def task(name, work_queue): timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}") with requests.Session() as session: while not work_queue.empty(): url = work_queue.get() print(f"Task {name} getting URL: {url}") timer.start() session.get(url) timer.stop() yield def main(): """ This is the main entry point for the program """ # Create the queue of work work_queue = queue.Queue() # Put some work in the queue for url in [ "http://google.com", "http://yahoo.com", "http://linkedin.com", "http://apple.com", "http://microsoft.com", "http://facebook.com", "http://twitter.com", ]: work_queue.put(url) tasks = [task("One", work_queue), task("Two", work_queue)] # Run the tasks done = False with Timer(text="\nTotal elapsed time: {:.1f}"): while not done: for t in tasks: try: next(t) except StopIteration: tasks.remove(t) if len(tasks) == 0: done = True if __name__ == "__main__": main()output Task One getting URL: http://google.com Task One total elapsed time: 0.3 Task Two getting URL: http://yahoo.com Task Two total elapsed time: 0.8 Task One getting URL: http://linkedin.com Task One total elapsed time: 0.4 Task Two getting URL: http://apple.com Task Two total elapsed time: 0.3 Task One getting URL: http://microsoft.com Task One total elapsed time: 0.5 Task Two getting URL: http://facebook.com Task Two total elapsed time: 0.5 Task One getting URL: http://twitter.com Task One total elapsed time: 0.4 Total elapsed time: 3.2yield turns task() into a generator performs a context switch that lets the other task instance run since the app is running syncronously each session.get() call blocks the CPU until the page is retrieved Asynchronous (Non-Blocking) HTTP Calls
this version of the program modifies the previous one to use Python async featuresit also imports the aiohttp module aiohttp module is a library to make HTTP requests in an asynchronous fashion using asyncio $ python -m pip install requeststhe tasks here have been modified to remove the yield call since the code to make the HTTP GET call is no longer blocking the call also performs a context switch back to the event loop import asyncio import aiohttp from codetiming import Timer async def task(name, work_queue): timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}") async with aiohttp.ClientSession() as session: while not work_queue.empty(): url = await work_queue.get() print(f"Task {name} getting URL: {url}") timer.start() async with session.get(url) as response: await response.text() timer.stop() async def main(): """ This is the main entry point for the program """ # Create the queue of work work_queue = asyncio.Queue() # Put some work in the queue for url in [ "http://google.com", "http://yahoo.com", "http://linkedin.com", "http://apple.com", "http://microsoft.com", "http://facebook.com", "http://twitter.com", ]: await work_queue.put(url) # Run the tasks with Timer(text="\nTotal elapsed time: {:.1f}"): await asyncio.gather( asyncio.create_task(task("One", work_queue)), asyncio.create_task(task("Two", work_queue)), ) if __name__ == "__main__": asyncio.run(main())output Task One getting URL: http://google.com Task Two getting URL: http://yahoo.com Task One total elapsed time: 0.3 Task One getting URL: http://linkedin.com Task One total elapsed time: 0.3 Task One getting URL: http://apple.com Task One total elapsed time: 0.3 Task One getting URL: http://microsoft.com Task Two total elapsed time: 0.9 Task Two getting URL: http://facebook.com Task Two total elapsed time: 0.4 Task Two getting URL: http://twitter.com Task One total elapsed time: 0.5 Task Two total elapsed time: 0.3 Total elapsed time: 1.7because the HTTP GET calls are running asynchronously the time taken is about half the cumulative time of all the HTTP GET |