Dask¶
Dask performs two different tasks: 1. it optimizes dynamic task scheduling, similar to Airflow, Luigi or Celery. 2. it performs parallel data like arrays, dataframes, and lists with dynamic task scheduling.
Scales from laptops to clusters¶
Dask can be easily installed on a laptop with pipenv and expands the size of the datasets from fits in memory to fits on disk. Dask can also scale to a cluster of hundreds of machines. It is resilient, elastic, data-local and has low latency. For more information, see the distributed scheduler documentation. This simple transition between a single machine and a cluster allows users to both start easily and grow as needed.
Install Dask¶
You can install everything that is required for most common applications of Dask (arrays, dataframes, …). This installs both Dask and dependencies such as NumPy, Pandas, etc. that are required for various workloads:
$ pipenv install "dask[complete]"
However, only individual subsets can be installed with:
$ pipenv install "dask[array]"
$ pipenv install "dask[dataframe]"
$ pipenv install "dask[diagnostics]"
$ pipenv install "dask[distributed]"
Testing the installation¶
[1]:
!pytest /Users/veit/.local/share/virtualenvs/python-311-6zxVKbDJ/lib/python3.11/site-packages/dask/tests /Users/veit/.local/share/virtualenvs/python-311-6zxVKbDJ/lib/python3.11/site-packages/dask/array/tests
============================= test session starts ==============================
platform darwin -- Python 3.11.4, pytest-7.4.0, pluggy-1.2.0
rootdir: /Users/veit
plugins: hypothesis-6.82.0, cov-4.1.0, anyio-3.7.1, typeguard-2.13.3
collected 5030 items / 18 skipped
../../../../../.local/share/virtualenvs/python-311-6zxVKbDJ/lib/python3.11/site-packages/dask/tests/test_backends.py . [ 0%]
s [ 0%]
../../../../../.local/share/virtualenvs/python-311-6zxVKbDJ/lib/python3.11/site-packages/dask/tests/test_base.py . [ 0%]
…
../../../../../.local/share/virtualenvs/python-311-6zxVKbDJ/lib/python3.11/site-packages/dask/array/tests/test_xarray.py . [ 99%]
.... [100%]
=================================== FAILURES ===================================
__________________________ test_solve_assume_a[20-10] __________________________
…
E Failed: DID NOT RAISE <class 'FutureWarning'>
/Users/veit/.local/share/virtualenvs/python-311-6zxVKbDJ/lib/python3.11/site-packages/dask/array/tests/test_linalg.py:809: Failed
__________________________ test_solve_assume_a[30-6] ___________________________
…
E Failed: DID NOT RAISE <class 'DeprecationWarning'>
/Users/veit/.local/share/virtualenvs/python-311-6zxVKbDJ/lib/python3.11/site-packages/dask/array/tests/test_random.py:202: Failed
=============================== warnings summary ===============================
…
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
=========================== short test summary info ============================
FAILED ../../../../../.local/share/virtualenvs/python-311-6zxVKbDJ/lib/python3.11/site-packages/dask/array/tests/test_linalg.py::test_solve_assume_a[20-10] - Failed: DID NOT RAISE <class 'FutureWarning'>
FAILED ../../../../../.local/share/virtualenvs/python-311-6zxVKbDJ/lib/python3.11/site-packages/dask/array/tests/test_linalg.py::test_solve_assume_a[30-6] - Failed: DID NOT RAISE <class 'FutureWarning'>
FAILED ../../../../../.local/share/virtualenvs/python-311-6zxVKbDJ/lib/python3.11/site-packages/dask/array/tests/test_random.py::test_RandomState_only_funcs - Failed: DID NOT RAISE <class 'DeprecationWarning'>
= 3 failed, 4543 passed, 487 skipped, 15 xfailed, 34 warnings in 60.13s (0:01:00) =
Familiar operation¶
Dask DataFrame¶
… imitates Pandas
[2]:
import pandas as pd
df = pd.read_csv("2021-09-01.csv")
df.groupby(df.user_id).value.mean()
[3]:
import dask.dataframe as dd
dd = pd.read_csv("2021-09-01.csv")
dd.groupby(dd.user_id).value.mean()
Dask Array¶
… imitates NumPy
[4]:
import numpy as np
f = h5py.File("mydata.h5")
x = np.array(f["."])
[5]:
import dask.array as da
f = h5py.File("mydata.h5")
x = da.array(f["."])
See also
Dask Bag¶
… imitates iterators, Toolz und PySpark.
[6]:
import json
import dask.bag as db
b = db.read_text("2021-09-01.csv").map(json.loads)
b.pluck("user_id").frequencies().topk(10, lambda pair: pair[1]).compute()
See also
Dask Delayed¶
… imitates loops and wraps custom code
[7]:
from dask import delayed
L = []
for fn in "2021-*-*.csv": # Use for loops to build up computation
data = delayed(load)(fn) # Delay execution of function
L.append(delayed(process)(data)) # Build connections between variables
result = delayed(summarize)(L)
result.compute()
The concurrent.futures interface enables the submission of user-defined tasks.¶
Note
For the following example, Dask must be installed with the distributed
option, e.g.
$ pipenv install dask[distributed]
[8]:
from dask.distributed import Client
client = Client("scheduler:port")
futures = []
for fn in filenames:
future = client.submit(load, fn)
futures.append(future)
summary = client.submit(summarize, futures)
summary.result()