Threading example

Updating and displaying a counter:

[1]:
counter = 0

print("Starting up")
for i in range(10):
    counter += 1
    print(f"The count is {counter}")
print("Finishing up")
Starting up
The count is 1
The count is 2
The count is 3
The count is 4
The count is 5
The count is 6
The count is 7
The count is 8
The count is 9
The count is 10
Finishing up

Start with code that is clear, simple, and top-down. It’s easy to develop and incrementally testable.

Note:

Test and debug your application before you start threading. Threading never makes debugging easier.

Convert to functions

The next step is to create reusable code as a function:

[2]:
counter = 0


def worker():
    "My job is to increment the counter and print the current count"
    global counter

    counter += 1
    print(f"The count is {counter}")


print("Starting up")
for i in range(10):
    worker()
print("Finishing up")
Starting up
The count is 1
The count is 2
The count is 3
The count is 4
The count is 5
The count is 6
The count is 7
The count is 8
The count is 9
The count is 10
Finishing up

Multi-Threading

Now some worker threads can be started:

[3]:
import threading


counter = 0


def worker():
    "My job is to increment the counter and print the current count"
    global counter

    counter += 1
    print(f"The count is {counter}")


print("Starting up")
for i in range(10):
    threading.Thread(target=worker).start()
print("Finishing up")
Starting up
The count is 1
The count is 2
The count is 3
The count is 4
The count is 5
The count is 6
The count is 7
The count is 8
The count is 9
The count is 10
Finishing up

Test

A simple test run leads to the same result.

Detection of race conditions

Note:

Tests cannot prove the absence of errors. Many interesting race conditions do not show up in test environments.

Fuzzing

Fuzzing is a technique to improve the detection of race conditions:

[4]:
import random
import threading
import time


FUZZ = True


def fuzz():
    if FUZZ:
        time.sleep(random.random())


counter = 0


def worker():
    "My job is to increment the counter and print the current count"
    global counter

    fuzz()
    oldcnt = counter
    fuzz()
    counter = oldcnt + 1
    fuzz()
    print(f"The count is {counter}", end="\n")
    fuzz()


print("Starting up")
fuzz()
for i in range(10):
    threading.Thread(target=worker).start()
    fuzz()
print("Finishing up")
fuzz()
Starting up
The count is 1
The count is 2
The count is 2
The count is 4
The count is 5
The count is 5
The count is 5
Finishing up
The count is 10
The count is 10
The count is 10

This technique is limited to relatively small blocks of code and is imperfect in that it still cannot prove the absence of errors. Nevertheless, fuzzed tests can reveal race conditions.

Careful threading with queues

The following rules must be observed:

  1. All shared resources should be executed in exactly one thread. All communication with this thread should be done with only one atomic message queue – usually with the queue module, email or message queues such as RabbitMQ or ZeroMQ.

    Resources that require this technology include global variables, user inputs, output devices, files, sockets, etc.

  2. One category of sequencing problems is to ensure that step A is performed before step B. The solution is to run them both on the same thread, with all the actions happening in sequence.

  3. To implement a barrier that waits for all parallel threads to complete, just join all threads with join().

  4. You cannot wait for daemon threads to complete (they are infinite loops); instead you should execute join() on the queue itself, so that the tasks are only merged when all tasks in the queue have been completed.

  5. You can use global variables to communicate between functions, but only within a single-threaded program. In a multi-thread program, however, you cannot use global variables because they are mutable. Then the better solution is threading.local(), since it is global in a thread, but not beyond.

  6. Never try to terminate a thread from the outside: you never know if that thread is holding a lock. Therefore, Python does not provide a direct thread termination mechanism. However, if you try to do this with ctypes, this is a recipe for deadlock.

Now, if we apply these rules, our code looks like this:

[5]:
import queue
import threading


counter = 0

counter_queue = queue.Queue()


def counter_manager():
    "I have EXCLUSIVE rights to update the counter variable"
    global counter

    while True:
        increment = counter_queue.get()
        counter += increment
        print_queue.put(
            [
                f"The count is {counter}",
            ]
        )
        counter_queue.task_done()


t = threading.Thread(target=counter_manager)
t.daemon = True
t.start()
del t

print_queue = queue.Queue()


def print_manager():
    while True:
        job = print_queue.get()
        for line in job:
            print(line)
        print_queue.task_done()


t = threading.Thread(target=print_manager)
t.daemon = True
t.start()
del t


def worker():
    "My job is to increment the counter and print the current count"
    counter_queue.put(1)


print_queue.put(["Starting up"])
worker_threads = []
for i in range(10):
    t = threading.Thread(target=worker)
    worker_threads.append(t)
    t.start()
for t in worker_threads:
    t.join()

counter_queue.join()
print_queue.put(["Finishing up"])
print_queue.join()
Starting up
The count is 1
The count is 2
The count is 3
The count is 4
The count is 5
The count is 6
The count is 7
The count is 8
The count is 9
The count is 10
Finishing up

Careful threading with locks

If we thread with locks instead of queues, the code looks even tidier:

[6]:
import random
import threading
import time


counter_lock = threading.Lock()
printer_lock = threading.Lock()

counter = 0


def worker():
    global counter
    with counter_lock:
        counter += 1
        with printer_lock:
            print(f"The count is {counter}")


with printer_lock:
    print("Starting up")

worker_threads = []
for i in range(10):
    t = threading.Thread(target=worker)
    worker_threads.append(t)
    t.start()
for t in worker_threads:
    t.join()

with printer_lock:
    print("Finishing up")
Starting up
The count is 1
The count is 2
The count is 3
The count is 4
The count is 5
The count is 6
The count is 7
The count is 8
The count is 9
The count is 10
Finishing up

Finally, a few notes on locks:

  1. Locks are just so-called flags, they are not really reliable.

  2. In general, locks should be viewed as a primitive tool that is difficult to understand in non-trivial examples. For more complex applications, it is better to use atomic message queues.

  3. The more locks that are set at the same time, the less the benefits of simultaneous processing.