{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Threading example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Updating and displaying a counter:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting up\n", "The count is 1\n", "The count is 2\n", "The count is 3\n", "The count is 4\n", "The count is 5\n", "The count is 6\n", "The count is 7\n", "The count is 8\n", "The count is 9\n", "The count is 10\n", "Finishing up\n" ] } ], "source": [ "counter = 0\n", "\n", "print(\"Starting up\")\n", "for i in range(10):\n", " counter += 1\n", " print(f\"The count is {counter}\")\n", "print(\"Finishing up\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start with code that is clear, simple, and top-down. It’s easy to develop and incrementally testable.\n", "\n", "
join()
.\n",
"\n",
"4. You cannot wait for daemon threads to complete (they are infinite loops); instead you should execute join()
on the queue itself, so that the tasks are only merged when all tasks in the queue have been completed.\n",
"\n",
"5. You can use global variables to communicate between functions, but only within a single-threaded program. In a multi-thread program, however, you cannot use global variables because they are mutable. Then the better solution is threading.local()
, since it is global in a thread, but not beyond.\n",
"\n",
"6. Never try to terminate a thread from the outside: you never know if that thread is holding a lock. Therefore, Python does not provide a direct thread termination mechanism. However, if you try to do this with ctypes, this is a recipe for deadlock.\n",
"\n",
"Now, if we apply these rules, our code looks like this:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Starting up\n",
"The count is 1\n",
"The count is 2\n",
"The count is 3\n",
"The count is 4\n",
"The count is 5\n",
"The count is 6\n",
"The count is 7\n",
"The count is 8\n",
"The count is 9\n",
"The count is 10\n",
"Finishing up\n"
]
}
],
"source": [
"import queue\n",
"import threading\n",
"\n",
"\n",
"counter = 0\n",
"\n",
"counter_queue = queue.Queue()\n",
"\n",
"\n",
"def counter_manager():\n",
" \"I have EXCLUSIVE rights to update the counter variable\"\n",
" global counter\n",
"\n",
" while True:\n",
" increment = counter_queue.get()\n",
" counter += increment\n",
" print_queue.put(\n",
" [\n",
" f\"The count is {counter}\",\n",
" ]\n",
" )\n",
" counter_queue.task_done()\n",
"\n",
"\n",
"t = threading.Thread(target=counter_manager)\n",
"t.daemon = True\n",
"t.start()\n",
"del t\n",
"\n",
"print_queue = queue.Queue()\n",
"\n",
"\n",
"def print_manager():\n",
" while True:\n",
" job = print_queue.get()\n",
" for line in job:\n",
" print(line)\n",
" print_queue.task_done()\n",
"\n",
"\n",
"t = threading.Thread(target=print_manager)\n",
"t.daemon = True\n",
"t.start()\n",
"del t\n",
"\n",
"\n",
"def worker():\n",
" \"My job is to increment the counter and print the current count\"\n",
" counter_queue.put(1)\n",
"\n",
"\n",
"print_queue.put([\"Starting up\"])\n",
"worker_threads = []\n",
"for i in range(10):\n",
" t = threading.Thread(target=worker)\n",
" worker_threads.append(t)\n",
" t.start()\n",
"for t in worker_threads:\n",
" t.join()\n",
"\n",
"counter_queue.join()\n",
"print_queue.put([\"Finishing up\"])\n",
"print_queue.join()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Careful threading with locks\n",
"\n",
"If we thread with locks instead of queues, the code looks even tidier:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Starting up\n",
"The count is 1\n",
"The count is 2\n",
"The count is 3\n",
"The count is 4\n",
"The count is 5\n",
"The count is 6\n",
"The count is 7\n",
"The count is 8\n",
"The count is 9\n",
"The count is 10\n",
"Finishing up\n"
]
}
],
"source": [
"import random\n",
"import threading\n",
"import time\n",
"\n",
"\n",
"counter_lock = threading.Lock()\n",
"printer_lock = threading.Lock()\n",
"\n",
"counter = 0\n",
"\n",
"\n",
"def worker():\n",
" global counter\n",
" with counter_lock:\n",
" counter += 1\n",
" with printer_lock:\n",
" print(f\"The count is {counter}\")\n",
"\n",
"\n",
"with printer_lock:\n",
" print(\"Starting up\")\n",
"\n",
"worker_threads = []\n",
"for i in range(10):\n",
" t = threading.Thread(target=worker)\n",
" worker_threads.append(t)\n",
" t.start()\n",
"for t in worker_threads:\n",
" t.join()\n",
"\n",
"with printer_lock:\n",
" print(\"Finishing up\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, a few notes on locks:\n",
"\n",
"1. Locks are just so-called *flags*, they are not really reliable.\n",
"2. In general, locks should be viewed as a primitive tool that is difficult to understand in non-trivial examples. For more complex applications, it is better to use atomic message queues.\n",
"3. The more locks that are set at the same time, the less the benefits of simultaneous processing."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.13 Kernel",
"language": "python",
"name": "python313"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.0"
},
"latex_envs": {
"LaTeX_envs_menu_present": true,
"autoclose": false,
"autocomplete": true,
"bibliofile": "biblio.bib",
"cite_by": "apalike",
"current_citInitial": 1,
"eqLabelWithNumbers": true,
"eqNumInitial": 1,
"hotkeys": {
"equation": "Ctrl-E",
"itemize": "Ctrl-I"
},
"labels_anchors": false,
"latex_user_defs": false,
"report_style_numbering": false,
"user_envs_cfg": false
},
"varInspector": {
"cols": {
"lenName": 16,
"lenType": 16,
"lenVar": 40
},
"kernels_config": {
"python": {
"delete_cmd_postfix": "",
"delete_cmd_prefix": "del ",
"library": "var_list.py",
"varRefreshCmd": "print(var_dic_list())"
},
"r": {
"delete_cmd_postfix": ") ",
"delete_cmd_prefix": "rm(",
"library": "var_list.r",
"varRefreshCmd": "cat(var_dic_list()) "
}
},
"types_to_exclude": [
"module",
"function",
"builtin_function_or_method",
"instance",
"_Feature"
],
"window_display": false
},
"widgets": {
"application/vnd.jupyter.widget-state+json": {
"state": {},
"version_major": 2,
"version_minor": 0
}
}
},
"nbformat": 4,
"nbformat_minor": 4
}