Python Topics : Thread Safety
Threading in Python
  • Concurrency - the ability of a system to handle multiple tasks by allowing their execution to overlap in time but not necessarily happen simultaneously.
  • Parallelism - The simultaneous execution of multiple tasks that run at the same time to leverage multiple processing units/CPU cores.
Python's threading is a concurrency framework that allows multiple threads which run concurrently each executing pieces of code
when running multiple threads, the interpreter switches between them
handing the control of execution over to each thread
import threading
import time
from concurrent.futures import ThreadPoolExecutor

def threaded_function():
    for number in range(3):
        print(f"Printing from {threading.current_thread().name}. {number=}")
        time.sleep(0.1)

with ThreadPoolExecutor(max_workers=4, thread_name_prefix="Worker") as executor:
    for _ in range(4):
        executor.submit(threaded_function)
output is
$ python threading_example.py
Printing from Worker_0. number=0
Printing from Worker_1. number=0
Printing from Worker_2. number=0
Printing from Worker_3. number=0
Printing from Worker_0. number=1
Printing from Worker_2. number=1
Printing from Worker_1. number=1
Printing from Worker_3. number=1
Printing from Worker_0. number=2
Printing from Worker_2. number=2
Printing from Worker_1. number=2
Printing from Worker_3. number=2
the interpreter performs a context switch
Python pauses the execution state of the current thread and passes control to another thread
when the context switches, Python saves the current execution state so that it can resume later
by switching the control of execution at specific intervals, multiple threads can execute code concurrently
check the context switch interval of thee interpreter
>>> import sys
>>> sys.getswitchinterval()
0.005
an interval of five milliseconds doesn't mean the threads switch exactly every five milliseconds
the interpreter considers switching to another thread at these intervals
in the thread pool example there's a call to sleep() inside the threaded_function
the call delays the program execution by 0.1 seconds
doing so increases the chance of a context switch happening in between because the execution will take much longer than the context switch interval

due to context switching programs can behave unexpectedly when run in a multithreaded environment
thread safety ensures suh programs run predictably and reliably

Understanding Thread Safety
thread safety refers to the property of an algorithm or program being able to function correctly during simultaneous execution by multiple threads
code is considered thread-safe if it behaves deterministically and produces the desired output when run in a multithreaded environment

thread safety issues occur because of

shared mutable data threads share the memory of their parent process
all variables and data structures are shared across threads
can lead to errors when working with shared, changeable data
non-atomic operations occur in a multithreaded environment when operations involving multiple steps are interrupted by context switches
can result in unexpected outcomes if threads are switched during the operation
The GIL and Its Implications on Threading
Python's Global Interpreter Lock (GIL) is a mutex
protects access to objects
prevents multiple threads from executing Python bytecodes simultaneously
the GIL allows only one thread to execute at a single point in time
can lead to performance penalties when used for multithreading in CPU-bound programs

when an operation is completed in a single bytecode instruction, it's atomic
because the GIL only allows one thread to run at a time, atomic operations are safe from interference by other threads
ensures that atomic operations are generally thread-safe
no worries about conflicts between threads with atomic operations

the GIL limits parallelism but enables concurrency by allowing multiple threads to run concurrently
a GIL-free Python interpreter is available starting with Python 3.13 and can be enabled with a build-time flag
the default Python interpreter will still have the GIL in 3.13

Race Conditions
a race condition occurs when the outcome of a program depends on the sequence or timing of uncontrollable events like thread execution order
race conditions can lead to logical errors and non-deterministic results when code is run

if two threads simultaneously read and write to a shared variable without adequate synchronization, they can interfere with each other
can lead to incorrect results and behaviors
below two threads try to modify an attribute simultaneously

import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance

    def withdraw(self, amount):
        if self.balance >= amount:
            new_balance = self.balance - amount
            time.sleep(0.1)  # Simulate a delay
            self.balance = new_balance
        else:
            raise ValueError("Insufficient balance")

account = BankAccount(1000)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(account.withdraw, 500)
    executor.submit(account.withdraw, 700)

print(f"Final account balance: {account.balance}")
the result can be either 300 or 500 even though the second withdrawal will result in an overdraft
flow of execution could be
  • the calls to executor.submit() results in the creation of two threads
  • the first thread checks the balance (1000) and finds it sufficient for a withdrawal of 500
    it proceeds with the withdrawal.
  • a context switch happens before the first thread saves the update balance to the .balance attribute
    the second thread starts and attemps to withdraw an amount of 700
    it checks the balance and finds it sufficient for the withdrawal of 700
  • both threads independently calculate new balances based on the original balance of 1000
    the first thread attempts to update the .balance attribute to 500
    the second thread attempts to update the .balance attribute to 300
  • the thread that's last to update 'wins' and sets the balance to either 300 or 500
error is due to simultaneous operations on shared mutable data

Synchronization Primitives
Python's threading module provides various synchronization primitives to
  • prevent race conditions
  • allow for coordination across threads
use synchronization primitives to
  • control the simultaneous execution of a block of code by threads
  • make multiple code statements atomic with respect to a thread
  • limit concurrent access by threads
  • coordinate between threads and perform actions based on the state of other threads
Using Python Threading Locks for Mutual Exclusion
a lock is a synchronization primitive that can be used for exclusive access to a resource
once a thread acquires a lock, no other threads can acquire it and proceed until the lock is released
can use a lock to wrap a statement or group of statements that should be executed atomically

threading.Lock for Primitive Locking
can create a Lock object by calling the Lock() constructor from the threading module
a Lock object has two states : locked and unlocked
when it's unlocked, it can be acquired by a thread by calling the .acquire() method on Lock
the lock is then held by the thread and other threads can't access it
the Lock object is released by calling the .release() method so other threads can acquire it

.acquire() when the Lock object state is unlocked, .acquire() changes the Lock object to a locked state and returns immediately
if the Lock object is in a locked state, .acquire() blocks the program execution of other threads
waits for the Lock object to be released by the thread holding the lock
.release() when the Lock object state is locked, the .acquire() method calls from other threads will block their execution until the thread holding the lock calls .release() on Lock
should only be called in the locked state because it changes the state to unlocked and returns immediately
if an attempt is made to release an unlocked lock, a RuntimeError is raised
the Lock object can be used as a context manager when used with the with statement
this automates the acquiring and releasing of locks
when the program enters the with block, the .acquire() method on the Lock is automatically called
when the program exits the with block, the .release() method is called

the banking example made thread-safe by adding a lock

import threading
import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance
        self.account_lock = threading.Lock()

    def withdraw(self, amount):
        with self.account_lock:
            if self.balance >= amount:
                new_balance = self.balance - amount
                print(f"Withdrawing {amount}...")
                time.sleep(0.1)  # Simulate a delay
                self.balance = new_balance
            else:
                raise ValueError("Insufficient balance")

    def deposit(self, amount):
        with self.account_lock:
            new_balance = self.balance + amount
            print(f"Depositing {amount}...")
            time.sleep(0.1)  # Simulate a delay
            self.balance = new_balance

account = BankAccount(1000)

with ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(account.withdraw, 700)
    executor.submit(account.deposit, 1000)
    executor.submit(account.withdraw, 300)

print(f"Final account balance: {account.balance}")
threading.RLock for Reentrant Locking
if a lock isn't released properly due to an error or oversight in the code, it can lead to a deadlock
threads wait indefinitely for the lock to be released.
reasons for a deadlock include

Nested Lock Acquisition a deadlock can occur if a thread attempts to acquire a lock it already holds
leads to the thread blocking itself
a situation that doesn't resolve without external intervention
Multiple Locks Acquisition a deadlock is likely when multiple locks are used, and threads acquire them in inconsistent order
if two threads each hold one lock and are waiting for the other, neither thread can proceed
results in a deadlock
nested lock acquistion can be resolved by using a reentrant lock RLock
if a Lock was used there would be a deadlock
import threading
import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self):
        self.balance = 0
                self.lock = threading.RLock()

    def deposit(self, amount):
        print(
            f"Thread {threading.current_thread().name} "
            "waiting to acquire lock for .deposit()"
        )
        with self.lock:
            print(
                f"Thread {threading.current_thread().name} "
                "acquired lock for .deposit()"
            )
            time.sleep(0.1)
            self._update_balance(amount)

    def _update_balance(self, amount):
        print(
            f"Thread {threading.current_thread().name} "
            "waiting to acquire lock for ._update_balance()"
        )
        with self.lock:
            print(
                f"Thread {threading.current_thread().name} "
                "acquired lock for ._update_balance()"
            )
            self.balance += amount

account = BankAccount()

with ThreadPoolExecutor(max_workers=3, thread_name_prefix="Worker") as executor:
    for _ in range(3):
        executor.submit(account.deposit, 100)
Limiting Access With Semaphores
a semaphore is useful when the number of resources is limited and a number of threads try to access these limited resources
it uses a counter to limit access by multiple threads to a critical section
the Semaphore() constructor accepts a value argument
argument denotes the maximum number of concurrent threads acquiring it

Semaphore objects have .acquire() and .release() methods and can be used as a context manager
each .acquire() call reduces a semaphores's counter by one
further .acquire() calls are blocked when the counter reaches zero

when Semaphore is used as a context manager, the context manager block is entered after a successful .acquire() call
the .release() method is automatically called when the control exits the with block

can use this approach in scenarios where resources are limited, and a number of threads are trying to concurrently access the same resources
in a banking context, an example where multiple customers are waiting to be served by a limited number of tellers

import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

# Semaphore with a maximum of 2 resources (tellers)
teller_semaphore = threading.Semaphore(2)

def now():
    return time.strftime("%H:%M:%S")

def serve_customer(name):
    print(f"{now()}: {name} is waiting for a teller.")
    with teller_semaphore:
        print(f"{now()}: {name} is being served by a teller.")
        # Simulate the time taken for the teller to serve the customer
        time.sleep(random.randint(1, 3))
        print(f"{now()}: {name} is done being served.")

customers = [
    "Customer 1",
    "Customer 2",
    "Customer 3",
    "Customer 4",
    "Customer 5",
]

with ThreadPoolExecutor(max_workers=5) as executor:
    for customer_name in customers:
        thread = executor.submit(serve_customer, customer_name)

print(f"{now()}: All customers have been served.")
output
$ python bank_semaphore.py
10:12:28: Customer 1 is waiting for a teller.
10:12:28: Customer 1 is being served by a teller.
10:12:28: Customer 2 is waiting for a teller.
10:12:28: Customer 2 is being served by a teller.
10:12:28: Customer 3 is waiting for a teller.
10:12:28: Customer 4 is waiting for a teller.
10:12:28: Customer 5 is waiting for a teller.
10:12:29: Customer 1 is done being served.
10:12:29: Customer 3 is being served by a teller.
10:12:30: Customer 3 is done being served.
10:12:30: Customer 4 is being served by a teller.
10:12:31: Customer 2 is done being served.
10:12:31: Customer 5 is being served by a teller.
10:12:32: Customer 4 is done being served.
10:12:33: Customer 5 is done being served.
10:12:33: All customers have been served.
Using Synchronization Primitives for Communication and Coordination
Events for Signaling
can use Event objects for signaling, allowing a thread to notify one or more threads about an action
an Event object can be created by instantiating Event from the threading module
Event objects maintain an internal flag that starts as False
can set this flag to True with .set() and reset it to False with .clear()
threads can wait for the flag to become True using .wait(), which blocks the thread until the flag is set
import threading
import time
from concurrent.futures import ThreadPoolExecutor

bank_open = threading.Event()
transactions_open = threading.Event()

def serve_customer(customer_data):
    print(f"{customer_data['name']} is waiting for the bank to open.")

    bank_open.wait()
    print(f"{customer_data['name']} entered the bank")
    if customer_data["type"] == "WITHDRAW_MONEY":
        print(f"{customer_data['name']} is waiting for transactions to open.")
        transactions_open.wait()
        print(f"{customer_data['name']} is starting their transaction.")

        # Simulate the time taken for performing the transaction
        time.sleep(2)

        print(
            f"{customer_data['name']} completed transaction and exited bank"
        )
    else:
        # Simulate the time taken for banking
        time.sleep(2)
        print(f"{customer_data['name']} has exited bank")

customers = [
    {"name": "Customer 1", "type": "WITHDRAW_MONEY"},
    {"name": "Customer 2", "type": "CHECK_BALANCE"},
    {"name": "Customer 3", "type": "WITHDRAW_MONEY"},
    {"name": "Customer 4", "type": "WITHDRAW_MONEY"},
]

with ThreadPoolExecutor(max_workers=4) as executor:
    for customer_data in customers:
        executor.submit(serve_customer, customer_data)

    print("Bank manager is preparing to open the bank.")
    time.sleep(2)
    print("Bank is now open!")
    bank_open.set()  # Signal that the bank is open

    time.sleep(3)
    print("Transactions are now open!")
    transactions_open.set()

print("All customers have completed their transactions.")
output
$ python bank_event.py
Customer 1 is waiting for the bank to open.
Customer 2 is waiting for the bank to open.
Customer 3 is waiting for the bank to open.
Customer 4 is waiting for the bank to open.
Bank manager is preparing to open the bank.
Bank is now open!
Customer 1 entered the bank
Customer 4 entered the bank
Customer 3 entered the bank
Customer 3 is waiting for transactions to open.
Customer 1 is waiting for transactions to open.
Customer 2 entered the bank
Customer 4 is waiting for transactions to open.
Customer 2 has exited bank
Transactions are now open!
Customer 4 is starting their transaction.
Customer 3 is starting their transaction.
Customer 1 is starting their transaction.
Customer 3 completed transaction and exited bank
Customer 1 completed transaction and exited bank
Customer 4 completed transaction and exited bank
All customers have completed their transactions.
the pseudo code
  • customers wait for the bank to open before entering
    initially, four customers are waiting and only enter the bank after the manager opens it, which is done by bank_open.set()
  • customers may enter the bank in any order once the bank is open
  • multiple customers can enter the bank simultaneously once it's open
  • customers withdrawing money wait for an additional event, which in this case is for transactions to open
    customers 1, 3, and 4 waited for the transaction_open event to be set
  • withdrawal transactions by Customers 1, 3, and 4 start concurrently once they're allowed
Conditions for Conditional Waiting
a Condition object is built on top of a Lock or RLock object
supports additional methods which allow threads
  • to wait for certain conditions to be met
  • to signal other threads that those conditions have changed
Condition objects are always associated with a lock
the lock argument used in the Condition() constructor accepts either a Lock or RLock object
if this argument is omitted, a new RLock object is created and used as the underlying lock

methoddescription
.acquire() used to acquire the underlying lock associated with the Condition
must be called before a thread can wait on or signal a condition.
.release()releases the underlying lock
.wait(timeout=None) blocks the thread until it's notified or a specified timeout occurs
method releases the lock before blocking and reacquires it upon notification or when the timeout expires
used when a thread needs to wait for a specific condition to be true before proceeding
.notify(n=1) wakes up one of the threads waiting for the condition if any are waiting
if multiple threads are waiting, the method selects one to notify at random.
.notify_all() wakes up all threads waiting for the condition
it's the broadest way to handle notifications, ensuring that all waiting threads are notified
it's useful when a change affects all waiting threads or when all threads need to recheck the condition they're waiting on
Condition methods can be used to coordinate across threads
allows to effectively manage the flow of execution in a multithreaded environment
using these methods can ensure the threads are able to
  • wait for specific conditions to be met before proceeding
  • notify one or multiple threads when a condition has changed
  • maintain control over the sequence and timing of thread operations
like locks Condition objects support the context manager protocol
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

customer_available_condition = threading.Condition()

# Customers waiting to be served by the Teller
customer_queue = []

def now():
    return time.strftime("%H:%M:%S")

def serve_customers():
    while True:
        with customer_available_condition:
            # Wait for a customer to arrive
            while not customer_queue:
                print(f"{now()}: Teller is waiting for a customer.")
                customer_available_condition.wait()

            # Serve the customer
            customer = customer_queue.pop(0)
            print(f"{now()}: Teller is serving {customer}.")

        # Simulate the time taken to serve the customer
        time.sleep(random.randint(1, 5))
        print(f"{now()}: Teller has finished serving {customer}.")

def add_customer_to_queue(name):
    with customer_available_condition:
        print(f"{now()}: {name} has arrived at the bank.")
        customer_queue.append(name)

        customer_available_condition.notify()

customer_names = [
    "Customer 1",
    "Customer 2",
    "Customer 3",
    "Customer 4",
    "Customer 5",
]

with ThreadPoolExecutor(max_workers=6) as executor:
    teller_thread = executor.submit(serve_customers)
    for name in customer_names:
        # Simulate customers arriving at random intervals
        time.sleep(random.randint(1, 3))
        executor.submit(add_customer_to_queue, name)
the Condition object customer_available_condition acts as both a lock and a way to communicate between threads
used to coordinate between the teller and customers

the customer_queue is the shared resource protected by the condition
the Condition object is used with the with statement to ensure that the Condition object's lock is properly acquired and released

serve_customers() runs in an infinite loop and performs the following functions

  • waits for customers using customer_available_condition.wait()
  • uses the .wait() method and releases the Condition object's lock
    add_customer_to_queue() can acquire the lock when a new customer arrives
  • simulates the action of the teller by serving customers when they arrive with a random delay representing the service time
the output is
$ python bank_condition.py
10:15:08: Teller is waiting for a customer.
10:15:09: Customer 1 has arrived at the bank.
10:15:09: Teller is serving Customer 1.
10:15:11: Customer 2 has arrived at the bank.
10:15:12: Teller has finished serving Customer 1.
10:15:12: Teller is serving Customer 2.
10:15:13: Teller has finished serving Customer 2.
10:15:13: Teller is waiting for a customer.
10:15:14: Customer 3 has arrived at the bank.
10:15:14: Teller is serving Customer 3.
10:15:15: Customer 4 has arrived at the bank.
10:15:17: Customer 5 has arrived at the bank.
10:15:18: Teller has finished serving Customer 3.
10:15:18: Teller is serving Customer 4.
10:15:22: Teller has finished serving Customer 4.
10:15:22: Teller is serving Customer 5.
10:15:25: Teller has finished serving Customer 5.
10:15:25: Teller is waiting for a customer.
the pseudo code
  • the teller alternates between serving customers and waiting for new ones
  • when a customer arrives while the teller is waiting, the teller immediately starts serving them
    at the timestamp 10:15:08 the teller is waiting for a customer
    as soon as Customer 1 arrives at 10:15:09 the teller starts serving the customer
  • when a customer arrives while the teller is busy, customers 2, 4 and 5 wait in the queue until the teller is free.
Barriers for Coordination
a Barrier is a synchronization primitive that allows a group of threads to wait for each other before continuing execution
can use Barrier objects to block program execution until a specified number of threads have reached the barrier point
Barrier c'tor syntax
Barrier(parties, action=None, timeout=None)
argumentdescription
parties specifies the number of threads of the Barrier object
the .wait() method waits for this number of threads to reach the barrier point before proceeding
action is an optional callable that will be called by one of the threads when it's released
timeout is an optional callable that will be called by one of the threads when it's released
below the Barrier object waits until it has intercepted three threads before releasing the threads
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

teller_barrier = threading.Barrier(3

def now():
    return time.strftime("%H:%M:%S")

def prepare_for_work(name):
    print(f"{now()}: {name} is preparing their counter.")

    # Simulate the delay to prepare the counter
    time.sleep(random.randint(1, 3))
    print(f"{now()}: {name} has finished preparing.")

    # Wait for all tellers to finish preparing
                teller_barrier.wait()
    print(f"{now()}: {name} is now ready to serve customers.")

tellers = ["Teller 1", "Teller 2", "Teller 3"]

with ThreadPoolExecutor(max_workers=3) as executor:
    for teller_name in tellers:
        executor.submit(prepare_for_work, teller_name)

print(f"{now()}: All tellers are ready to serve customers.")
Deciding When to Use Synchronization Primitives
When running code in a multithreaded environment, race conditions are common and result in unexpected and non-deterministic output
to make a program thread-safe need to know when to use synchronization primitives to control and coordinate threads

guidelinedescription
check for atomicity requirements keep in mind that operations from different threads can interleave in unpredictable ways due to context switches
if a block of statements needs to be executed as an atomic unit, need to implement proper mutual exclusion synchronization primitives
check for shared mutable data when multiple threads operate on shared data, one thread may read the data that's currently being modified by another thread
should either avoid sharing data by using thread-local storage
if sharing is really necessary use locks or other synchronization mechanisms to prevent race conditions
external library code might not be designed for thread safety third-party packages and methods from the standard-library modules might not be designed with thread safety in mind
index