{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Threading example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Updating and displaying a counter:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting up\n", "The count is 1\n", "The count is 2\n", "The count is 3\n", "The count is 4\n", "The count is 5\n", "The count is 6\n", "The count is 7\n", "The count is 8\n", "The count is 9\n", "The count is 10\n", "Finishing up\n" ] } ], "source": [ "counter = 0\n", "\n", "print(\"Starting up\")\n", "for i in range(10):\n", " counter += 1\n", " print(f\"The count is {counter}\")\n", "print(\"Finishing up\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start with code that is clear, simple, and top-down. It’s easy to develop and incrementally testable.\n", "\n", "
\n", "\n", "**Note:**\n", " \n", "Test and debug your application before you start threading. Threading never makes debugging easier.\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Convert to functions\n", "\n", "The next step is to create reusable code as a function:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting up\n", "The count is 1\n", "The count is 2\n", "The count is 3\n", "The count is 4\n", "The count is 5\n", "The count is 6\n", "The count is 7\n", "The count is 8\n", "The count is 9\n", "The count is 10\n", "Finishing up\n" ] } ], "source": [ "counter = 0\n", "\n", "\n", "def worker():\n", " \"My job is to increment the counter and print the current count\"\n", " global counter\n", "\n", " counter += 1\n", " print(f\"The count is {counter}\")\n", "\n", "\n", "print(\"Starting up\")\n", "for i in range(10):\n", " worker()\n", "print(\"Finishing up\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Multi-Threading\n", "\n", "Now some worker threads can be started:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting up\n", "The count is 1\n", "The count is 2\n", "The count is 3\n", "The count is 4\n", "The count is 5\n", "The count is 6\n", "The count is 7\n", "The count is 8\n", "The count is 9\n", "The count is 10\n", "Finishing up\n" ] } ], "source": [ "import threading\n", "\n", "\n", "counter = 0\n", "\n", "\n", "def worker():\n", " \"My job is to increment the counter and print the current count\"\n", " global counter\n", "\n", " counter += 1\n", " print(f\"The count is {counter}\")\n", "\n", "\n", "print(\"Starting up\")\n", "for i in range(10):\n", " threading.Thread(target=worker).start()\n", "print(\"Finishing up\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Test\n", "\n", "A simple test run leads to the same result.\n", "\n", "### Detection of race conditions\n", "\n", "
\n", "\n", "**Note:**\n", " \n", "Tests cannot prove the absence of errors. Many interesting race conditions do not show up in test environments.\n", "
\n", "\n", "### Fuzzing\n", "\n", "Fuzzing is a technique to improve the detection of race conditions:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting up\n", "The count is 1\n", "The count is 2\n", "The count is 2\n", "The count is 4\n", "The count is 5\n", "The count is 5\n", "The count is 5\n", "Finishing up\n", "The count is 10\n", "The count is 10\n", "The count is 10\n" ] } ], "source": [ "import random\n", "import threading\n", "import time\n", "\n", "\n", "FUZZ = True\n", "\n", "\n", "def fuzz():\n", " if FUZZ:\n", " time.sleep(random.random())\n", "\n", "\n", "counter = 0\n", "\n", "\n", "def worker():\n", " \"My job is to increment the counter and print the current count\"\n", " global counter\n", "\n", " fuzz()\n", " oldcnt = counter\n", " fuzz()\n", " counter = oldcnt + 1\n", " fuzz()\n", " print(f\"The count is {counter}\", end=\"\\n\")\n", " fuzz()\n", "\n", "\n", "print(\"Starting up\")\n", "fuzz()\n", "for i in range(10):\n", " threading.Thread(target=worker).start()\n", " fuzz()\n", "print(\"Finishing up\")\n", "fuzz()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This technique is limited to relatively small blocks of code and is imperfect in that it still cannot prove the absence of errors. Nevertheless, fuzzed tests can reveal race conditions." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Careful threading with queues\n", "\n", "The following rules must be observed:\n", "\n", "1. All shared resources should be executed in exactly one thread. All communication with this thread should be done with only one atomic message queue – usually with the [queue module](https://docs.python.org/3/library/queue.html), email or message queues such as [RabbitMQ](https://www.rabbitmq.com/) or [ZeroMQ](https://zeromq.org).\n", "\n", " Resources that require this technology include global variables, user inputs, output devices, files, sockets, etc.\n", "\n", "2. One category of sequencing problems is to ensure that step A is performed before step B. The solution is to run them both on the same thread, with all the actions happening in sequence.\n", "\n", "3. To implement a *barrier* that waits for all parallel threads to complete, just join all threads with join().\n", "\n", "4. You cannot wait for daemon threads to complete (they are infinite loops); instead you should execute join() on the queue itself, so that the tasks are only merged when all tasks in the queue have been completed.\n", "\n", "5. You can use global variables to communicate between functions, but only within a single-threaded program. In a multi-thread program, however, you cannot use global variables because they are mutable. Then the better solution is threading.local(), since it is global in a thread, but not beyond.\n", "\n", "6. Never try to terminate a thread from the outside: you never know if that thread is holding a lock. Therefore, Python does not provide a direct thread termination mechanism. However, if you try to do this with ctypes, this is a recipe for deadlock.\n", "\n", "Now, if we apply these rules, our code looks like this:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting up\n", "The count is 1\n", "The count is 2\n", "The count is 3\n", "The count is 4\n", "The count is 5\n", "The count is 6\n", "The count is 7\n", "The count is 8\n", "The count is 9\n", "The count is 10\n", "Finishing up\n" ] } ], "source": [ "import queue\n", "import threading\n", "\n", "\n", "counter = 0\n", "\n", "counter_queue = queue.Queue()\n", "\n", "\n", "def counter_manager():\n", " \"I have EXCLUSIVE rights to update the counter variable\"\n", " global counter\n", "\n", " while True:\n", " increment = counter_queue.get()\n", " counter += increment\n", " print_queue.put(\n", " [\n", " f\"The count is {counter}\",\n", " ]\n", " )\n", " counter_queue.task_done()\n", "\n", "\n", "t = threading.Thread(target=counter_manager)\n", "t.daemon = True\n", "t.start()\n", "del t\n", "\n", "print_queue = queue.Queue()\n", "\n", "\n", "def print_manager():\n", " while True:\n", " job = print_queue.get()\n", " for line in job:\n", " print(line)\n", " print_queue.task_done()\n", "\n", "\n", "t = threading.Thread(target=print_manager)\n", "t.daemon = True\n", "t.start()\n", "del t\n", "\n", "\n", "def worker():\n", " \"My job is to increment the counter and print the current count\"\n", " counter_queue.put(1)\n", "\n", "\n", "print_queue.put([\"Starting up\"])\n", "worker_threads = []\n", "for i in range(10):\n", " t = threading.Thread(target=worker)\n", " worker_threads.append(t)\n", " t.start()\n", "for t in worker_threads:\n", " t.join()\n", "\n", "counter_queue.join()\n", "print_queue.put([\"Finishing up\"])\n", "print_queue.join()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Careful threading with locks\n", "\n", "If we thread with locks instead of queues, the code looks even tidier:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting up\n", "The count is 1\n", "The count is 2\n", "The count is 3\n", "The count is 4\n", "The count is 5\n", "The count is 6\n", "The count is 7\n", "The count is 8\n", "The count is 9\n", "The count is 10\n", "Finishing up\n" ] } ], "source": [ "import random\n", "import threading\n", "import time\n", "\n", "\n", "counter_lock = threading.Lock()\n", "printer_lock = threading.Lock()\n", "\n", "counter = 0\n", "\n", "\n", "def worker():\n", " global counter\n", " with counter_lock:\n", " counter += 1\n", " with printer_lock:\n", " print(f\"The count is {counter}\")\n", "\n", "\n", "with printer_lock:\n", " print(\"Starting up\")\n", "\n", "worker_threads = []\n", "for i in range(10):\n", " t = threading.Thread(target=worker)\n", " worker_threads.append(t)\n", " t.start()\n", "for t in worker_threads:\n", " t.join()\n", "\n", "with printer_lock:\n", " print(\"Finishing up\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, a few notes on locks:\n", "\n", "1. Locks are just so-called *flags*, they are not really reliable.\n", "2. In general, locks should be viewed as a primitive tool that is difficult to understand in non-trivial examples. For more complex applications, it is better to use atomic message queues.\n", "3. The more locks that are set at the same time, the less the benefits of simultaneous processing." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.13 Kernel", "language": "python", "name": "python313" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.0" }, "latex_envs": { "LaTeX_envs_menu_present": true, "autoclose": false, "autocomplete": true, "bibliofile": "biblio.bib", "cite_by": "apalike", "current_citInitial": 1, "eqLabelWithNumbers": true, "eqNumInitial": 1, "hotkeys": { "equation": "Ctrl-E", "itemize": "Ctrl-I" }, "labels_anchors": false, "latex_user_defs": false, "report_style_numbering": false, "user_envs_cfg": false }, "varInspector": { "cols": { "lenName": 16, "lenType": 16, "lenVar": 40 }, "kernels_config": { "python": { "delete_cmd_postfix": "", "delete_cmd_prefix": "del ", "library": "var_list.py", "varRefreshCmd": "print(var_dic_list())" }, "r": { "delete_cmd_postfix": ") ", "delete_cmd_prefix": "rm(", "library": "var_list.r", "varRefreshCmd": "cat(var_dic_list()) " } }, "types_to_exclude": [ "module", "function", "builtin_function_or_method", "instance", "_Feature" ], "window_display": false }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": {}, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 4 }