Threading in Python | ||||||||||||||||||||
when running multiple threads, the interpreter switches between them handing the control of execution over to each thread import threading import time from concurrent.futures import ThreadPoolExecutor def threaded_function(): for number in range(3): print(f"Printing from {threading.current_thread().name}. {number=}") time.sleep(0.1) with ThreadPoolExecutor(max_workers=4, thread_name_prefix="Worker") as executor: for _ in range(4): executor.submit(threaded_function)output is $ python threading_example.py Printing from Worker_0. number=0 Printing from Worker_1. number=0 Printing from Worker_2. number=0 Printing from Worker_3. number=0 Printing from Worker_0. number=1 Printing from Worker_2. number=1 Printing from Worker_1. number=1 Printing from Worker_3. number=1 Printing from Worker_0. number=2 Printing from Worker_2. number=2 Printing from Worker_1. number=2 Printing from Worker_3. number=2the interpreter performs a context switch Python pauses the execution state of the current thread and passes control to another thread when the context switches, Python saves the current execution state so that it can resume later by switching the control of execution at specific intervals, multiple threads can execute code concurrently check the context switch interval of thee interpreter >>> import sys >>> sys.getswitchinterval() 0.005an interval of five milliseconds doesn't mean the threads switch exactly every five milliseconds the interpreter considers switching to another thread at these intervals in the thread pool example there's a call to sleep() inside the threaded_function the call delays the program execution by 0.1 seconds doing so increases the chance of a context switch happening in between because the execution will take much longer than the context switch interval due to context switching programs can behave unexpectedly when run in a multithreaded environment thread safety ensures suh programs run predictably and reliably |
||||||||||||||||||||
Understanding Thread Safety | ||||||||||||||||||||
thread safety refers to the property of an algorithm or program being able to function correctly
during simultaneous execution by multiple threads code is considered thread-safe if it behaves deterministically and produces the desired output when run in a multithreaded environment thread safety issues occur because of
The GIL and Its Implications on Threading
Python's Global Interpreter Lock (GIL) is a mutexprotects access to objects prevents multiple threads from executing Python bytecodes simultaneously the GIL allows only one thread to execute at a single point in time can lead to performance penalties when used for multithreading in CPU-bound programs when an operation is completed in a single bytecode instruction, it's atomic because the GIL only allows one thread to run at a time, atomic operations are safe from interference by other threads ensures that atomic operations are generally thread-safe no worries about conflicts between threads with atomic operations the GIL limits parallelism but enables concurrency by allowing multiple threads to run concurrently a GIL-free Python interpreter is available starting with Python 3.13 and can be enabled with a build-time flag the default Python interpreter will still have the GIL in 3.13 Race Conditions
a race condition occurs when the outcome of a program depends on the sequence or timing
of uncontrollable events like thread execution orderrace conditions can lead to logical errors and non-deterministic results when code is run if two threads simultaneously read and write to a shared variable without adequate synchronization, they can interfere with each other can lead to incorrect results and behaviors below two threads try to modify an attribute simultaneously import time from concurrent.futures import ThreadPoolExecutor class BankAccount: def __init__(self, balance=0): self.balance = balance def withdraw(self, amount): if self.balance >= amount: new_balance = self.balance - amount time.sleep(0.1) # Simulate a delay self.balance = new_balance else: raise ValueError("Insufficient balance") account = BankAccount(1000) with ThreadPoolExecutor(max_workers=2) as executor: executor.submit(account.withdraw, 500) executor.submit(account.withdraw, 700) print(f"Final account balance: {account.balance}")the result can be either 300 or 500 even though the second withdrawal will result in an overdraft flow of execution could be
Synchronization Primitives
Python's threading module provides various synchronization primitives to
|
||||||||||||||||||||
Using Python Threading Locks for Mutual Exclusion | ||||||||||||||||||||
a lock is a synchronization primitive that can be used for exclusive access to a resource once a thread acquires a lock, no other threads can acquire it and proceed until the lock is released can use a lock to wrap a statement or group of statements that should be executed atomically threading.Lock for Primitive Locking
can create a Lock object by calling the Lock() constructor from the threading modulea Lock object has two states : locked and unlocked when it's unlocked, it can be acquired by a thread by calling the .acquire() method on Lock the lock is then held by the thread and other threads can't access it the Lock object is released by calling the .release() method so other threads can acquire it
this automates the acquiring and releasing of locks when the program enters the with block, the .acquire() method on the Lock is automatically called when the program exits the with block, the .release() method is called the banking example made thread-safe by adding a lock import threading import time from concurrent.futures import ThreadPoolExecutor class BankAccount: def __init__(self, balance=0): self.balance = balance self.account_lock = threading.Lock() def withdraw(self, amount): with self.account_lock: if self.balance >= amount: new_balance = self.balance - amount print(f"Withdrawing {amount}...") time.sleep(0.1) # Simulate a delay self.balance = new_balance else: raise ValueError("Insufficient balance") def deposit(self, amount): with self.account_lock: new_balance = self.balance + amount print(f"Depositing {amount}...") time.sleep(0.1) # Simulate a delay self.balance = new_balance account = BankAccount(1000) with ThreadPoolExecutor(max_workers=3) as executor: executor.submit(account.withdraw, 700) executor.submit(account.deposit, 1000) executor.submit(account.withdraw, 300) print(f"Final account balance: {account.balance}") threading.RLock for Reentrant Locking
if a lock isn't released properly due to an error or oversight in the code,
it can lead to a deadlockthreads wait indefinitely for the lock to be released. reasons for a deadlock include
if a Lock was used there would be a deadlock import threading import time from concurrent.futures import ThreadPoolExecutor class BankAccount: def __init__(self): self.balance = 0 self.lock = threading.RLock() def deposit(self, amount): print( f"Thread {threading.current_thread().name} " "waiting to acquire lock for .deposit()" ) with self.lock: print( f"Thread {threading.current_thread().name} " "acquired lock for .deposit()" ) time.sleep(0.1) self._update_balance(amount) def _update_balance(self, amount): print( f"Thread {threading.current_thread().name} " "waiting to acquire lock for ._update_balance()" ) with self.lock: print( f"Thread {threading.current_thread().name} " "acquired lock for ._update_balance()" ) self.balance += amount account = BankAccount() with ThreadPoolExecutor(max_workers=3, thread_name_prefix="Worker") as executor: for _ in range(3): executor.submit(account.deposit, 100) |
||||||||||||||||||||
Limiting Access With Semaphores | ||||||||||||||||||||
a semaphore is useful when the number of resources is limited and a number of threads try
to access these limited resources it uses a counter to limit access by multiple threads to a critical section the Semaphore() constructor accepts a value argument argument denotes the maximum number of concurrent threads acquiring it Semaphore objects have .acquire() and .release() methods and can be used as a context manager each .acquire() call reduces a semaphores's counter by one further .acquire() calls are blocked when the counter reaches zero when Semaphore is used as a context manager, the context manager block is entered after a successful .acquire() call the .release() method is automatically called when the control exits the with block can use this approach in scenarios where resources are limited, and a number of threads are trying to concurrently access the same resources in a banking context, an example where multiple customers are waiting to be served by a limited number of tellers import random import threading import time from concurrent.futures import ThreadPoolExecutor # Semaphore with a maximum of 2 resources (tellers) teller_semaphore = threading.Semaphore(2) def now(): return time.strftime("%H:%M:%S") def serve_customer(name): print(f"{now()}: {name} is waiting for a teller.") with teller_semaphore: print(f"{now()}: {name} is being served by a teller.") # Simulate the time taken for the teller to serve the customer time.sleep(random.randint(1, 3)) print(f"{now()}: {name} is done being served.") customers = [ "Customer 1", "Customer 2", "Customer 3", "Customer 4", "Customer 5", ] with ThreadPoolExecutor(max_workers=5) as executor: for customer_name in customers: thread = executor.submit(serve_customer, customer_name) print(f"{now()}: All customers have been served.")output $ python bank_semaphore.py 10:12:28: Customer 1 is waiting for a teller. 10:12:28: Customer 1 is being served by a teller. 10:12:28: Customer 2 is waiting for a teller. 10:12:28: Customer 2 is being served by a teller. 10:12:28: Customer 3 is waiting for a teller. 10:12:28: Customer 4 is waiting for a teller. 10:12:28: Customer 5 is waiting for a teller. 10:12:29: Customer 1 is done being served. 10:12:29: Customer 3 is being served by a teller. 10:12:30: Customer 3 is done being served. 10:12:30: Customer 4 is being served by a teller. 10:12:31: Customer 2 is done being served. 10:12:31: Customer 5 is being served by a teller. 10:12:32: Customer 4 is done being served. 10:12:33: Customer 5 is done being served. 10:12:33: All customers have been served. |
||||||||||||||||||||
Using Synchronization Primitives for Communication and Coordination | ||||||||||||||||||||
Events for Signaling
can use Event objects for signaling, allowing a thread to notify one or more
threads about an actionan Event object can be created by instantiating Event from the threading module Event objects maintain an internal flag that starts as False can set this flag to True with .set() and reset it to False with .clear() threads can wait for the flag to become True using .wait(), which blocks the thread until the flag is set import threading import time from concurrent.futures import ThreadPoolExecutor bank_open = threading.Event() transactions_open = threading.Event() def serve_customer(customer_data): print(f"{customer_data['name']} is waiting for the bank to open.") bank_open.wait() print(f"{customer_data['name']} entered the bank") if customer_data["type"] == "WITHDRAW_MONEY": print(f"{customer_data['name']} is waiting for transactions to open.") transactions_open.wait() print(f"{customer_data['name']} is starting their transaction.") # Simulate the time taken for performing the transaction time.sleep(2) print( f"{customer_data['name']} completed transaction and exited bank" ) else: # Simulate the time taken for banking time.sleep(2) print(f"{customer_data['name']} has exited bank") customers = [ {"name": "Customer 1", "type": "WITHDRAW_MONEY"}, {"name": "Customer 2", "type": "CHECK_BALANCE"}, {"name": "Customer 3", "type": "WITHDRAW_MONEY"}, {"name": "Customer 4", "type": "WITHDRAW_MONEY"}, ] with ThreadPoolExecutor(max_workers=4) as executor: for customer_data in customers: executor.submit(serve_customer, customer_data) print("Bank manager is preparing to open the bank.") time.sleep(2) print("Bank is now open!") bank_open.set() # Signal that the bank is open time.sleep(3) print("Transactions are now open!") transactions_open.set() print("All customers have completed their transactions.")output $ python bank_event.py Customer 1 is waiting for the bank to open. Customer 2 is waiting for the bank to open. Customer 3 is waiting for the bank to open. Customer 4 is waiting for the bank to open. Bank manager is preparing to open the bank. Bank is now open! Customer 1 entered the bank Customer 4 entered the bank Customer 3 entered the bank Customer 3 is waiting for transactions to open. Customer 1 is waiting for transactions to open. Customer 2 entered the bank Customer 4 is waiting for transactions to open. Customer 2 has exited bank Transactions are now open! Customer 4 is starting their transaction. Customer 3 is starting their transaction. Customer 1 is starting their transaction. Customer 3 completed transaction and exited bank Customer 1 completed transaction and exited bank Customer 4 completed transaction and exited bank All customers have completed their transactions.the pseudo code
Conditions for Conditional Waiting
a Condition object is built on top of a Lock or RLock objectsupports additional methods which allow threads
the lock argument used in the Condition() constructor accepts either a Lock or RLock object if this argument is omitted, a new RLock object is created and used as the underlying lock
allows to effectively manage the flow of execution in a multithreaded environment using these methods can ensure the threads are able to
import random import threading import time from concurrent.futures import ThreadPoolExecutor customer_available_condition = threading.Condition() # Customers waiting to be served by the Teller customer_queue = [] def now(): return time.strftime("%H:%M:%S") def serve_customers(): while True: with customer_available_condition: # Wait for a customer to arrive while not customer_queue: print(f"{now()}: Teller is waiting for a customer.") customer_available_condition.wait() # Serve the customer customer = customer_queue.pop(0) print(f"{now()}: Teller is serving {customer}.") # Simulate the time taken to serve the customer time.sleep(random.randint(1, 5)) print(f"{now()}: Teller has finished serving {customer}.") def add_customer_to_queue(name): with customer_available_condition: print(f"{now()}: {name} has arrived at the bank.") customer_queue.append(name) customer_available_condition.notify() customer_names = [ "Customer 1", "Customer 2", "Customer 3", "Customer 4", "Customer 5", ] with ThreadPoolExecutor(max_workers=6) as executor: teller_thread = executor.submit(serve_customers) for name in customer_names: # Simulate customers arriving at random intervals time.sleep(random.randint(1, 3)) executor.submit(add_customer_to_queue, name)the Condition object customer_available_condition acts as both a lock and a way to communicate between threads used to coordinate between the teller and customers the customer_queue is the shared resource protected by the condition the Condition object is used with the with statement to ensure that the Condition object's lock is properly acquired and released serve_customers() runs in an infinite loop and performs the following functions
$ python bank_condition.py 10:15:08: Teller is waiting for a customer. 10:15:09: Customer 1 has arrived at the bank. 10:15:09: Teller is serving Customer 1. 10:15:11: Customer 2 has arrived at the bank. 10:15:12: Teller has finished serving Customer 1. 10:15:12: Teller is serving Customer 2. 10:15:13: Teller has finished serving Customer 2. 10:15:13: Teller is waiting for a customer. 10:15:14: Customer 3 has arrived at the bank. 10:15:14: Teller is serving Customer 3. 10:15:15: Customer 4 has arrived at the bank. 10:15:17: Customer 5 has arrived at the bank. 10:15:18: Teller has finished serving Customer 3. 10:15:18: Teller is serving Customer 4. 10:15:22: Teller has finished serving Customer 4. 10:15:22: Teller is serving Customer 5. 10:15:25: Teller has finished serving Customer 5. 10:15:25: Teller is waiting for a customer.the pseudo code
Barriers for Coordination
a Barrier is a synchronization primitive that allows a group of threads to wait for each
other before continuing executioncan use Barrier objects to block program execution until a specified number of threads have reached the barrier point Barrier c'tor syntax Barrier(parties, action=None, timeout=None)
import random import threading import time from concurrent.futures import ThreadPoolExecutor teller_barrier = threading.Barrier(3 def now(): return time.strftime("%H:%M:%S") def prepare_for_work(name): print(f"{now()}: {name} is preparing their counter.") # Simulate the delay to prepare the counter time.sleep(random.randint(1, 3)) print(f"{now()}: {name} has finished preparing.") # Wait for all tellers to finish preparing teller_barrier.wait() print(f"{now()}: {name} is now ready to serve customers.") tellers = ["Teller 1", "Teller 2", "Teller 3"] with ThreadPoolExecutor(max_workers=3) as executor: for teller_name in tellers: executor.submit(prepare_for_work, teller_name) print(f"{now()}: All tellers are ready to serve customers.") |
||||||||||||||||||||
Deciding When to Use Synchronization Primitives | ||||||||||||||||||||
When running code in a multithreaded environment, race conditions are common
and result in unexpected and non-deterministic output to make a program thread-safe need to know when to use synchronization primitives to control and coordinate threads
|