{ "cells": [ { "cell_type": "markdown", "id": "ccf4ea1c", "metadata": {}, "source": [ "# Dask\n", "\n", "Dask erfüllt zwei verschiedene Aufgaben:\n", "\n", "1. die dynamische Aufgabenplanung wird optimiert, ähnlich wie bei [Airflow](https://airflow.apache.org/), [Luigi](https://github.com/spotify/luigi) oder [Celery](https://docs.celeryq.dev/en/stable/)\n", "2. Arrays, Dataframes und Lists werden parallel mit dynamischem Task Scheduling ausgeführt." ] }, { "cell_type": "markdown", "id": "d6041d52", "metadata": {}, "source": [ "## Skalierung von Laptops bis hin zu Clustern\n", "\n", "\n", "Dask kann mit uv auf einem Laptop installiert werden und erweitert die Größe der Datensätze von *passt in den Arbeitsspeicher* zu *passt auf die Festplatte*. Dask kann jedoch auch auf einen Cluster mit Hunderten von Rechnern skaliert werden. Dask ist robust, flexibel, Data Local und hat eine geringe Latenzzeit. Weitere Informationen findet ihr in der Dokumentation zum [Distributed Scheduler](https://distributed.dask.org/en/latest/). Dieser einfache Übergang zwischen einer einzelnen Maschine und einem Cluster ermöglicht einen einfachen Start und ein Wachstum nach Bedarf." ] }, { "cell_type": "markdown", "id": "4e332597", "metadata": {}, "source": [ "## Dask installieren\n", "\n", "\n", "Ihr könnt alles installieren, was für die meisten gängigen Anwendungen von Dask erforderlich ist (Arrays, Dataframes, …). Dies installiert sowohl Dask als auch Abhängigkeiten wie NumPy, Pandas, usw., die für verschiedene Arbeiten benötigt werden:\n", "\n", "``` bash\n", "$ uv add \"dask[complete]\"\n", "```\n", "\n", "Es können aber auch nur einzelne Subsets installiert werden:\n", "\n", "``` bash\n", "$ uv add \"dask[array]\"\n", "$ uv add \"dask[dataframe]\"\n", "$ uv add \"dask[diagnostics]\"\n", "$ uv add \"dask[distributed]\"\n", "```" ] }, { "cell_type": "markdown", "id": "cb34d835", "metadata": {}, "source": [ "### Testen der Installation" ] }, { "cell_type": "code", "execution_count": 1, "id": "6d1b3a12", "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[1m============================= test session starts ==============================\u001b[0m\n", "platform darwin -- Python 3.7.4, pytest-6.2.5, py-1.10.0, pluggy-1.0.0\n", "Users/veit\n", "plugins: anyio-3.3.1\n", "collected 4218 items / 16 skipped / 4202 selected \u001b[0m\u001b[1m\u001b[1m\u001b[1m\u001b[1m\u001b[1m\u001b[1m\u001b[1m\u001b[1m\n", "…\n", "\u001b[33m=============================== warnings summary ===============================\u001b[0m\n", "…\n", "-- Docs: https://docs.pytest.org/en/stable/warnings.html\n", "\u001b[33m==== \u001b[32m4013 passed\u001b[0m, \u001b[33m\u001b[1m207 skipped\u001b[0m, \u001b[33m\u001b[1m14 xfailed\u001b[0m, \u001b[33m\u001b[1m22 warnings\u001b[0m\u001b[33m in 357.82s (0:05:57)\u001b[0m\u001b[33m ====\u001b[0m\n" ] } ], "source": [ "!pytest /Users/veit/.local/share/virtualenvs/python-311-6zxVKbDJ/lib/python3.11/site-packages/dask/tests /Users/veit/.local/share/virtualenvs/python-311-6zxVKbDJ/lib/python3.11/site-packages/dask/array/tests" ] }, { "cell_type": "markdown", "id": "7d944110", "metadata": {}, "source": [ "## Vertraute Bedienung" ] }, { "cell_type": "markdown", "id": "5acd42b5", "metadata": {}, "source": [ "### Dask DataFrame\n", "\n", "… imitiert Pandas" ] }, { "cell_type": "code", "execution_count": 2, "id": "6a83265c", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "\n", "df = pd.read_csv(\"2021-09-01.csv\")\n", "df.groupby(df.user_id).value.mean()" ] }, { "cell_type": "code", "execution_count": 3, "id": "ebefec46", "metadata": {}, "outputs": [], "source": [ "import dask.dataframe as dd\n", "\n", "\n", "dd = pd.read_csv(\"2021-09-01.csv\")\n", "dd.groupby(dd.user_id).value.mean()" ] }, { "cell_type": "markdown", "id": "c904e314", "metadata": {}, "source": [ "
\n", "\n", "**Siehe auch**\n", "\n", "* [Dask DataFrame Docs](https://docs.dask.org/en/latest/dataframe.html)\n", "* [Dask DataFrame Best Practices](https://docs.dask.org/en/latest/dataframe-best-practices.html)\n", "
" ] }, { "cell_type": "markdown", "id": "bd4cdd77", "metadata": {}, "source": [ "### Dask Array\n", "\n", "… imitiert NumPy" ] }, { "cell_type": "code", "execution_count": 4, "id": "7c11efbb", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "\n", "\n", "f = h5py.File(\"mydata.h5\")\n", "x = np.array(f[\".\"])" ] }, { "cell_type": "code", "execution_count": 5, "id": "c1b99ffd", "metadata": {}, "outputs": [], "source": [ "import dask.array as da\n", "\n", "\n", "f = h5py.File(\"mydata.h5\")\n", "x = da.array(f[\".\"])" ] }, { "cell_type": "markdown", "id": "fd485220", "metadata": {}, "source": [ "
\n", "\n", "**Siehe auch**\n", "\n", "* [Dask Array Docs](https://docs.dask.org/en/latest/array.html)\n", "* [Dask Array Best Practices](https://docs.dask.org/en/latest/array-best-practices.html)\n", "
" ] }, { "cell_type": "markdown", "id": "601abdeb", "metadata": {}, "source": [ "### Dask Bag\n", "\n", "… imitiert [iterators](https://docs.python.org/3/library/itertools.html), [Toolz](https://toolz.readthedocs.io/en/latest/index.html) und [PySpark](https://spark.apache.org/docs/latest/api/python/)." ] }, { "cell_type": "code", "execution_count": 6, "id": "23c242b9", "metadata": {}, "outputs": [], "source": [ "import json\n", "\n", "import dask.bag as db\n", "\n", "\n", "b = db.read_text(\"2021-09-01.csv\").map(json.loads)\n", "b.pluck(\"user_id\").frequencies().topk(10, lambda pair: pair[1]).compute()" ] }, { "cell_type": "markdown", "id": "9b9d965f", "metadata": {}, "source": [ "
\n", "\n", "**Siehe auch**\n", "\n", "* [Dask Bag Docs](https://docs.dask.org/en/latest/bag.html)\n", "
" ] }, { "cell_type": "markdown", "id": "b1b1b4d3", "metadata": {}, "source": [ "### Dask Delayed\n", "\n", "… imitiert loops und umschließt benutzerdefinierten Code." ] }, { "cell_type": "code", "execution_count": 7, "id": "0373128a", "metadata": {}, "outputs": [], "source": [ "from dask import delayed\n", "\n", "\n", "L = []\n", "for fn in \"2021-*-*.csv\": # Use for loops to build up computation\n", " data = delayed(load)(fn) # Delay execution of function\n", " L.append(delayed(process)(data)) # Build connections between variables\n", "\n", "result = delayed(summarize)(L)\n", "result.compute()" ] }, { "cell_type": "markdown", "id": "23063451", "metadata": {}, "source": [ "
\n", "\n", "**Siehe auch**\n", "\n", "* [Dask Delayed Docs](https://docs.dask.org/en/latest/delayed.html)\n", "* [Dask Delayed Best Practices](https://docs.dask.org/en/latest/delayed-best-practices.html)\n", "* [Dask-Pipeline-Beispiel: Tracking der Internationalen Raumstation mit Dask](../clean-prep/dask-pipeline.ipynb)\n", "
" ] }, { "cell_type": "markdown", "id": "3f4a6539", "metadata": {}, "source": [ "## Das ``concurrent.futures``-Interface ermöglicht die Übermittlung von selbstdefinierten Aufgaben.\n", "\n", "
\n", "\n", "**Bemerkung**\n", "\n", "Für das folgende Beispiel muss Dask mit der `distributed`-Option installiert werden, z.B.\n", "\n", "``` bash\n", "$ uv add \"dask[distributed]\"\n", "```\n", "
" ] }, { "cell_type": "code", "execution_count": 8, "id": "cfa665a0", "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client\n", "\n", "\n", "client = Client(\"scheduler:port\")\n", "\n", "futures = []\n", "for fn in filenames:\n", " future = client.submit(load, fn)\n", " futures.append(future)\n", "\n", "summary = client.submit(summarize, futures)\n", "summary.result()" ] }, { "cell_type": "markdown", "id": "acf78713", "metadata": {}, "source": [ "
\n", "\n", "**Siehe auch**\n", "\n", "* [Dask Futures Docs](https://docs.dask.org/en/latest/futures.html)\n", "* [Dask Futures Quickstart](https://distributed.dask.org/en/latest/quickstart.html)\n", "* [Dask Futures Examples](https://examples.dask.org/futures.html)\n", "
" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.11 Kernel", "language": "python", "name": "python311" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.4" }, "latex_envs": { "LaTeX_envs_menu_present": true, "autoclose": false, "autocomplete": true, "bibliofile": "biblio.bib", "cite_by": "apalike", "current_citInitial": 1, "eqLabelWithNumbers": true, "eqNumInitial": 1, "hotkeys": { "equation": "Ctrl-E", "itemize": "Ctrl-I" }, "labels_anchors": false, "latex_user_defs": false, "report_style_numbering": false, "user_envs_cfg": false }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": {}, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 5 }