Python Topics : Async Features
Understanding Asynchronous Programming
two examples of synchronous programming
batch processing programs often created as synchronous programs
get some input, process it, and create some output.
steps follow one after the other until the program reaches the desired output
the program only needs to pay attention to the steps and their order
command-line programs small, quick processes that run in a terminal
scripts are used to
  • create something
  • transform one thing into something else
  • generate a report
  • list out some data
can be expressed as a series of program steps that are executed sequentially until the program is done
an asynchronous program behaves differently
still takes one execution step at a time
the difference is that the system may not wait for an execution step to be completed before moving on to the next one>br/> the program will move on to future execution steps even though a previous step hasn't yet finished and is still running elsewhere
also means that the program knows what to do when a previous step does finish running

Building a Synchronous Web Server
a web server's basic unit of work is, more or less, the same as batch processing
the server will get some input, process it, and create the output
as a synchronous program this would create a working web server

define 'working'
one unit of work (input, process, output) is not the only purpose
server should be able to handle hundreds or even thousands of units of work as quickly as possible
several work units may arrive all at once
a synchronous web server has little if any value

Thinking Differently About Programming
the real world is almost entirely asynchronous, and so is how individuals interact with it
an example is a parent balancing the checkbook, doing the laundry, and keeping an eye on the children all at the same time
  • balancing the checkbook is a synchronous task
    one step follows another until it's done
  • can break away from the checkbook to
    • do laundry
    • unload the dryer
    • move clothes from the washer to the dryer
    • start another load in the washer
  • doing laundry is a sychronous task
    once started the task becomes asynchronous
    the washer and dryer can run idependently
    some appliances can notify the user when their task completes
  • watching the children is another asynchronous task
    once they are playing, they can do so independently for the most part
    his changes when someone needs attention
    watching the children are a long-running task with high priority
    supersedes any other tasks
Programming Parents : Thought Experiments
Thought Experiment #1: The Synchronous Parent
can only do one task at a time
since watching the children is a high priority task, it will consume all the resources until the children are asleep in bed
nothing else can be accomplished until that point

Thought Experiment #2: The Polling Parent
multiple tasks can be completed simultaneously
polling can be expensive
let the parent poll the tasks every 15 minutes so every task receives attention
there are issues
  1. the parent may spend a lot of time checking on things that don't need attention
    the washer and dryer haven't yet finished
    the children don't need any attention unless something unexpected happens
  2. the parent may miss completed tasks that do need attention
    if the washer finished its cycle at the beginning of the polling interval, it wouldn't get any attention until the next poll
    as a highest priority task when something might be going drastically wrong, the children couldn't tolerate fifteen minutes with no attention
if the polling period was reduced the parent would spend more time switching between tasks and less time on the tasks

Thought Experiment #3: The Threading Parent
create virtual parents using threads
can't use different threads to monitor the washer and dryer
each thread needs access to the other's applicance leading to a deadlock (perhaps one thread should handle the laundry)
the checkbook is a shared resource
if a child needs to go to urgent care, the thread will need the checkbook which is controlled by a different thread
Using Python Async Features in Practice
Synchronous Programming
the code belows is supposedly multithreaded
the while loop in the task() method lets the first thread empty the queue
the second thread has nothing to
import queue

def task(name, work_queue):
    if work_queue.empty():
        print(f"Task {name} nothing to do")
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            print(f"Task {name} running")
            for x in range(count):
                total += 1
            print(f"Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some synchronous tasks
    tasks = [(task, "One", work_queue), (task, "Two", work_queue)]

    # Run the tasks
    for t, n, q in tasks:
        t(n, q)

if __name__ == "__main__":
    main()
Simple Cooperative Concurrency
below the code is essentially the same as before with the exception of the yield statement
the yield statement turns task() into a generator
called just like any other function
when the yield statement is executed, control is returned to the caller of the function
essentially a context switch as control moves from the generator function to the caller
import queue

def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        print(f"Task {name} running")
        for x in range(count):
            total += 1
                yield
        print(f"Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some tasks
    tasks = [task("One", work_queue), task("Two", work_queue)]

    # Run the tasks
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

if __name__ == "__main__":
    main()
Cooperative Concurrency With Blocking Calls
below the timer code in task() acts as a blocking call
import time
import queue
from codetiming import Timer

def task(name, queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    while not queue.empty():
        delay = queue.get()
        print(f"Task {name} running")
                timer.start()
        time.sleep(delay)
        timer.stop()
        yield

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    tasks = [task("One", work_queue), task("Two", work_queue)]

    # Run the tasks
    done = False
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        while not done:
            for t in tasks:
                try:
                    next(t)
                except StopIteration:
                    tasks.remove(t)
                if len(tasks) == 0:
                    done = True

if __name__ == "__main__":
    main()
with the addition of the delay there is no performance gain
the delay stops the processing of the entire program while the CPU just waits for the IO delay to be over
Task One running
Task One elapsed time: 15.0
Task Two running
Task Two elapsed time: 10.0
Task One running
Task One elapsed time: 5.0
Task Two running
Task Two elapsed time: 2.0

Total elapsed time: 32.0
Cooperative Concurrency With Non-Blocking Calls
the next version of the example makes use of Python async features using asyncio/await

the time and queue modules have been replaced with the asyncio package
gives the app access to asynchronous friendly (non-blocking) sleep and queue functionality
the change to task() defines it as asynchronous with the addition of the async prefix
this indicates to Python that the function will be asynchronous

the time.sleep(delay) and yield statements have been removed
they are replaced with await asyncio.sleep(delay)
this creates a non-blocking delay that will perform a context switch back to the caller main()

the while loop inside main() has been replaced
a call to await asyncio.gather(...) replaces task_array
the call tells asyncio

  1. create two tasks based on task() and start running them
  2. wait for both of these to be completed before moving forward
the line asyncio.run(main()) runs main()
creates an event loop
the loop will run main() which in turn will run the two instances of task()

the event loop runs all the code, including main()
the CPU is busy doing work when task code is executing
a context switch occurs when the await keyword is reached
control passes back to the event loop
the event loop looks at all the tasks waiting for an event (in this case, an asyncio.sleep(delay) timeout) and passes control to a task with an event that's ready

await asyncio.sleep(delay) is non-blocking WRT the CPU
instead of waiting for the delay to timeout

  • the CPU registers a sleep event on the event loop task queue
  • performs a context switch by passing control to the event loop
the event loop continuously looks for completed events and passes control back to the task waiting for that event
the CPU can stay busy if work is available, while the event loop monitors the events which will happen in the future
import asyncio
from codetiming import Timer

async def task(name, work_queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    while not work_queue.empty():
        delay = await work_queue.get()
        print(f"Task {name} running")
        timer.start()
                await asyncio.sleep(delay)
        timer.stop()

async def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = asyncio.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        await work_queue.put(work)

    # Run the tasks
    with Timer(text="\nTotal elapsed time: {:.1f}"):
                await asyncio.gather(
            asyncio.create_task(task("One", work_queue)),
            asyncio.create_task(task("Two", work_queue)),
        )

if __name__ == "__main__":
                asyncio.run(main())
the output
Task One running
Task Two running
Task Two total elapsed time: 10.0
Task Two running
Task One total elapsed time: 15.0
Task One running
Task Two total elapsed time: 5.0
Task One total elapsed time: 2.0

Total elapsed time: 17.0
the processing time still takes 32 seconds
by using asyncio the app runs in 17 seconds instead of 32 seconds

Synchronous (Blocking) HTTP Calls
the program has been modified to import the requests module to make the actual HTTP requests
$ python -m pip install requests
the queue now contains a list of URLs
task() no longer increments a counter
requests gets the contents of a URL retrieved from the queue and prints how long it took to do so
import queue
import requests
from codetiming import Timer

def task(name, work_queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    with requests.Session() as session:
        while not work_queue.empty():
            url = work_queue.get()
            print(f"Task {name} getting URL: {url}")
            timer.start()
            session.get(url)
            timer.stop()
            yield

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://apple.com",
        "http://microsoft.com",
        "http://facebook.com",
        "http://twitter.com",
    ]:
        work_queue.put(url)

    tasks = [task("One", work_queue), task("Two", work_queue)]

    # Run the tasks
    done = False
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        while not done:
            for t in tasks:
                try:
                    next(t)
                except StopIteration:
                    tasks.remove(t)
                if len(tasks) == 0:
                    done = True

if __name__ == "__main__":
    main()
output
Task One getting URL: http://google.com
Task One total elapsed time: 0.3
Task Two getting URL: http://yahoo.com
Task Two total elapsed time: 0.8
Task One getting URL: http://linkedin.com
Task One total elapsed time: 0.4
Task Two getting URL: http://apple.com
Task Two total elapsed time: 0.3
Task One getting URL: http://microsoft.com
Task One total elapsed time: 0.5
Task Two getting URL: http://facebook.com
Task Two total elapsed time: 0.5
Task One getting URL: http://twitter.com
Task One total elapsed time: 0.4

Total elapsed time: 3.2
yield turns task() into a generator
performs a context switch that lets the other task instance run
since the app is running syncronously each session.get() call blocks the CPU until the page is retrieved

Asynchronous (Non-Blocking) HTTP Calls
this version of the program modifies the previous one to use Python async features
it also imports the aiohttp module aiohttp module is a library to make HTTP requests in an asynchronous fashion using asyncio
$ python -m pip install requests
the tasks here have been modified to remove the yield call since the code to make the HTTP GET call is no longer blocking
the call also performs a context switch back to the event loop
import asyncio
import aiohttp
from codetiming import Timer

async def task(name, work_queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    async with aiohttp.ClientSession() as session:
        while not work_queue.empty():
            url = await work_queue.get()
            print(f"Task {name} getting URL: {url}")
            timer.start()
            async with session.get(url) as response:
                await response.text()
            timer.stop()

async def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = asyncio.Queue()

    # Put some work in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://apple.com",
        "http://microsoft.com",
        "http://facebook.com",
        "http://twitter.com",
    ]:
        await work_queue.put(url)

    # Run the tasks
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        await asyncio.gather(
            asyncio.create_task(task("One", work_queue)),
            asyncio.create_task(task("Two", work_queue)),
        )

if __name__ == "__main__":
    asyncio.run(main())
output
Task One getting URL: http://google.com
Task Two getting URL: http://yahoo.com
Task One total elapsed time: 0.3
Task One getting URL: http://linkedin.com
Task One total elapsed time: 0.3
Task One getting URL: http://apple.com
Task One total elapsed time: 0.3
Task One getting URL: http://microsoft.com
Task Two total elapsed time: 0.9
Task Two getting URL: http://facebook.com
Task Two total elapsed time: 0.4
Task Two getting URL: http://twitter.com
Task One total elapsed time: 0.5
Task Two total elapsed time: 0.3

Total elapsed time: 1.7
because the HTTP GET calls are running asynchronously the time taken is about half the cumulative time of all the HTTP GET
index