Concurrency and Parallelism

Concurrency Models in Python

Chapter 19 from "Fluent Python"

Concurrency is about dealing with multiple things at once.

Parallelism is about doing multiple things at once.

Concurrency is about structure, Parallelism is about execution.

A CPU with 4 cores can run 4 processes in parallel but 100s processes concurrently.

Old notes

# simple DFS
class Solution:
    def crawl(self, start: str, parser: 'HtmlParser') -> List[str]:
        hostname = lambda x: x.split('/')[2]
        visited,stack= set([start]),[start]
        while stack:
            s = stack.pop()
            for u in parser.getUrls(s):
                if u not in visited and hostname(start) == hostname(u):
                    visited.add(u)
                    stack.append(u)
        return visited

# concurrent DFS
from concurrent import futures
class Solution:
    def crawl(self, s: str, parser: 'HtmlParser') -> List[str]:
        hostname = lambda x: x.split('/')[2]
        visited = set([s])
        with futures.ThreadPoolExecutor(max_workers=16) as executor:
            tasks = [executor.submit(parser.getUrls, s)]
            while tasks:
                neigh = tasks.pop().result()
                for u in neigh:
                    if u not in visited and hostname(s) == hostname(u):
                        visited.add(u)
                        tasks.append(executor.submit(parser.getUrls, u))
        return visited

Deadlocks and how to properly acquire release locks

from threading import Lock

Common examples of the cause of threading deadlocks include:

  • A thread that waits on itself (acquires itself twice releases once and waits for itself)
  • Threads that wait on each other (e.g. A waits on B, B waits on A).
  • Thread that fails to release a resource (e.g. mutex lock, semaphore, barrier, condition, event, etc.).
  • Threads that acquire mutex locks in different orders (e.g. fail to perform lock ordering).
from threading import Lock
lock = Lock()
lock.acquire()
lock.acquire()
lock.release() # deadlock

# no deadlock I think
lock.acquire()
lock.release() 
lock.acquire()

malko e mazalo...

from threading import Lock
class FizzBuzz:
    def __init__(self, n: int):
        self.n = n
        self.finish = False
        self.fizz_lock = Lock()
        self.buzz_lock = Lock()
        self.fizzbuzz_lock = Lock()
        self.main = Lock()
        self.fizz_lock.acquire()
        self.buzz_lock.acquire()
        self.fizzbuzz_lock.acquire()

    # printFizz() outputs "fizz"
    def fizz(self, printFizz: 'Callable[[], None]') -> None:
        while True:
            self.fizz_lock.acquire()
            if self.finish: return
            printFizz()
            self.main.release()

    # printBuzz() outputs "buzz"
    def buzz(self, printBuzz: 'Callable[[], None]') -> None:
        while True:
            self.buzz_lock.acquire()
            if self.finish: return
            printBuzz()
            self.main.release()

    # printFizzBuzz() outputs "fizzbuzz"
    def fizzbuzz(self, printFizzBuzz: 'Callable[[], None]') -> None:
        while True:
            self.fizzbuzz_lock.acquire()
            if self.finish: return
            printFizzBuzz()
            self.main.release()
            
    # printNumber(x) outputs "x", where x is an integer.
    def number(self, printNumber: 'Callable[[int], None]') -> None:
        for i in range(1,self.n+1):
            self.main.acquire()
            if i % 3 == 0 and i % 5 != 0: self.fizz_lock.release()
            elif i % 3 != 0 and i % 5 == 0: self.buzz_lock.release()
            elif i % 3 == 0 and i % 5 == 0: self.fizzbuzz_lock.release()
            else:
                printNumber(i)
                self.main.release()
        
        self.main.acquire()
        self.finish = True
        self.buzz_lock.release()
        self.fizz_lock.release()
        self.fizzbuzz_lock.release()


GOAL: Run only one thread at a time!!!!!

Racing conditions are dangerous.

Native Coroutines

Python has three ways to run things concurrently

  • coroutines (classic and native)
  • threads
  • processes

Native coroutines use asyncio library using async def and await syntax

  • asyncio is a library that allows the USER to manually create an event loop (with ONE THREAD) and schedule tasks to run concurrently
  • Allows YOU to multitask with functions!
  • asyncio is Python’s standard library
  • Run time of concurrently ran functions = Run time of slowest function, if you schedule the tasks to run in the background!

Creating Background Task

say you have two functions one that takes 10 seconds and one that takes 5 seconds, how can you run them concurrently?

import asyncio

async def slow():
    print("Starting slow")
    await asyncio.sleep(10)
    print("Ended slow")

async def calc_that_does_not_wait_slow():
    await asyncio.sleep(5)
    print(1)


async def main():
      await calc_that_does_not_wait_slow()
      await slow()
import time
print(start:= time.time())
await main()
print(end:= time.time())
print(end-start)

# takes 15 seconds
# even if you swap 
#       await calc_that_does_not_wait_slow()
#       await slow()
# it will take 15 seconds

You need to make them one as background task! Currently you are running code sequentially.

import asyncio
async def slow():
    print('starting slow')
    await asyncio.sleep(10)
    print('ended slow')

async def calc_that_does_not_wait_slow():
    await asyncio.sleep(5)
    print(1)

async def main():
    slow_task = asyncio.create_task(slow()) 
    await calc_that_does_not_wait_slow()
# Outside main:
print(start:= time.time())
await main()
print(end:= time.time())
print(end-start)
# Prints 5 seconds only! And 5 seconds after prints ended slow!
#  How it runs: 
#
#      slow() is started in the background (will take 10 seconds).
#      You immediately start and await calc_that_does_not_wait_slow() (waits 5 seconds, then prints 1).
#      When calc_that_does_not_wait_slow() is done (after 5 seconds), main() returns—even if slow() is still running!
#      The program ends; slow() may be cancelled or left unfinished.
     

Lets await the task and the calculation now.

  • await the slow first and then the other -> 15 seconds - sad
import asyncio

async def slow():
    print("Starting slow")
    await asyncio.sleep(10)
    print("Ended slow")

async def calc_that_does_not_wait_slow():
    await asyncio.sleep(5)
    print(1)


async def main():
      slow_task = asyncio.create_task(slow()) # background task
      await slow_task
      await calc_that_does_not_wait_slow()
import time
print(start:= time.time())
await main()
print(end:= time.time())
print(end-start)

# Output
# 1754464873.7352946
# Starting slow
# Ended slow
# 1
# 1754464888.7525644
# 15.017269849777222
  • await the slow after and it takes 10 seconds
import asyncio

async def slow():
    print("Starting slow")
    await asyncio.sleep(10)
    print("Ended slow")

async def calc_that_does_not_wait_slow():
    await asyncio.sleep(5)
    print(1)


async def main():
      slow_task = asyncio.create_task(slow())
      await calc_that_does_not_wait_slow()
      await slow_task
import time
print(start:= time.time())
await main()
print(end:= time.time())
print(end-start)

# Output 
# 1754464849.4491456
# Starting slow
# 1
# Ended slow
# 1754464859.4551563
# 10.00601077079773

Background tasks are confusing - just use gather!

import asyncio

async def slow():
    print("Starting slow")
    await asyncio.sleep(10)
    print("Ended slow")

async def calc_that_does_not_wait_slow():
    print(1)

async def main():
    # Both coroutines start at once; don't wait for one to finish before the other
    await asyncio.gather(slow(), calc_that_does_not_wait_slow())

# will run for 10 seconds as well

Summary

Concurrent async functions:

  • If you want two async functions (coroutines) to run simultaneously, you must schedule them to run at the same time.
  • await-ing one after the other causes them to run sequentially, so the total run time is the sum of both delays.

Background tasks with create_task:

  • Using asyncio.create_task(coro()) starts a coroutine in the background.
  • If you don't later await the task, your program may exit before the background task finishes.
  • If you start the slow task in the background and only await the fast task, the program finishes after the fast task—even if the slow one hadn't finished yet (the slow one is cancelled or left unfinished).