Multi-Processing-Beispiel¶
Wir beginnen hier mit Code, der klar und einfach ist und von oben nach unten ausgeführt wird. Er ist einfach zu entwickeln und inkrementell zu testen:
[1]:
from multiprocessing.pool import ThreadPool as Pool
import requests
sites = [
"https://github.com/veit/jupyter-tutorial/",
"https://jupyter-tutorial.readthedocs.io/en/latest/",
"https://github.com/veit/pyviz-tutorial/",
"https://pyviz-tutorial.readthedocs.io/de/latest/",
"https://cusy.io/en",
]
def sitesize(url):
with requests.get(url) as u:
return url, len(u.content)
pool = Pool(4)
for result in pool.imap_unordered(sitesize, sites):
print(result)
('https://pyviz-tutorial.readthedocs.io/de/latest/', 32803)
('https://jupyter-tutorial.readthedocs.io/en/latest/', 40884)
('https://github.com/veit/jupyter-tutorial/', 237670)
('https://github.com/veit/pyviz-tutorial/', 213932)
('https://cusy.io/en', 33545)
Hinweis
Eine gute Entwicklungsstrategie ist die Verwendung von map, um den Code in einem einzelnen Prozess und einem einzelnen Thread zu testen, bevor zu Multi-Processing gewechselt wird.
Hinweis
Um besser einschätzen zu können, wann ThreadPool
und wann Pool
verwendet werden sollte, hier einige Faustregeln:
Für CPU-lastige Jobs sollte
multiprocessing.pool.Pool
verwendet werden. Üblicherweise beginnen wir hier mit der doppelten Anzahl von CPU-Kernen für die Pool-Größe, mindestens jedoch mit 4.Für I/O-lastige Jobs sollte
multiprocessing.pool.ThreadPool
verwendet werden. Üblicherweise beginnen wir hier mit der fünffachen Anzahl von CPU-Kernen für die Pool-Größe.Verwenden wir Python 3 und benötigen kein mit
Pool
identisches Interface, nutzen wir concurrent.future.Executor stattmultiprocessing.pool.ThreadPool
; er hat ein einfacheres Interface und wurde von Anfang an für Threads konzipiert. Da er Instanzen vonconcurrent.futures.Future
zurückgibt, ist er kompatibel zu vielen anderen Bibliotheken, einschließlichasyncio
.Für CPU- und I/O-lastige Jobs bevorzugen wir
multiprocessing.Pool
, da hierdurch eine bessere Prozess-Isolierung erreicht wird.
[2]:
from multiprocessing.pool import ThreadPool as Pool
import requests
sites = [
"https://github.com/veit/jupyter-tutorial/",
"https://jupyter-tutorial.readthedocs.io/en/latest/",
"https://github.com/veit/pyviz-tutorial/",
"https://pyviz-tutorial.readthedocs.io/de/latest/",
"https://cusy.io/en",
]
def sitesize(url):
with requests.get(url) as u:
return url, len(u.content)
for result in map(sitesize, sites):
print(result)
('https://github.com/veit/jupyter-tutorial/', 237669)
('https://jupyter-tutorial.readthedocs.io/en/latest/', 40884)
('https://github.com/veit/pyviz-tutorial/', 213932)
('https://pyviz-tutorial.readthedocs.io/de/latest/', 32803)
('https://cusy.io/en', 33545)
Was ist parallelisierbar?¶
Amdahlsche Gesetz¶
Der Geschwindigkeitszuwachs vor allem durch den sequentiellen Anteil des Problems beschränkt, da sich dessen Ausführungszeit durch Parallelisierung nicht verringern lässt. Zudem entstehen durch Parallelisierung zusätzliche Kosten wie etwa für die Kommunikation und die Synchronisierung der Prozesse.
In unserem Beispiel können die folgenden Aufgaben nur seriell abgearbeitet werden:
UDP DNS request für die URL
UDP DNS response
Socket vom OS
TCP-Connection
Senden des HTTP Request für die Root-Ressource
Warten auf die TCP Response
Zählen der Zeichen auf der Website
[3]:
from multiprocessing.pool import ThreadPool as Pool
import requests
sites = [
"https://github.com/veit/jupyter-tutorial/",
"https://jupyter-tutorial.readthedocs.io/en/latest/",
"https://github.com/veit/pyviz-tutorial/",
"https://pyviz-tutorial.readthedocs.io/de/latest/",
"https://cusy.io/en",
]
def sitesize(url):
"""Determine the size of a website"""
with requests.get(url, stream=True) as u:
return url, len(u.content)
pool = Pool(4)
for result in pool.imap_unordered(sitesize, sites):
print(result)
('https://github.com/veit/pyviz-tutorial/', 213932)
('https://cusy.io/en', 33545)
('https://jupyter-tutorial.readthedocs.io/en/latest/', 40884)
('https://github.com/veit/jupyter-tutorial/', 237670)
('https://pyviz-tutorial.readthedocs.io/de/latest/', 32803)
Hinweis
imap_unordered wird verwendet, um die Reaktionsfähigkeit zu verbessern. Dies ist jedoch nur möglich, da die Funktion das Argument und das Ergebnis als Tuple zurückgibt.
Tipps¶
Macht nicht zu viele Trips hin und her
Erhaltet ihr zu viele iterierbare Ergebnisse, ist dies ein guter Indikator für zu viele Trips, wie z.B. in
>>> def sitesize(url, start): ... req = urllib.request.Request() ... req.add_header('Range:%d-%d' % (start, start+1000)) ... u = urllib.request.urlopen(url, req) ... block = u.read() ... return url, len(block)
Macht auf jedem Trip relevante Fortschritte
Sobald ihr den Prozess erhaltet, solltet ihr deutliche Fortschritte erzielen und euch nicht verzetteln. Das folgende Beispiel verdeutlicht zu kleine Zwischenschritte:
>>> def sitesize(url, results): ... with requests.get(url, stream=True) as u: ... while True: ... line = u.iter_lines() ... results.put((url, len(line)))
Sendet und empfangt nicht zu viele Daten
Das folgende Beispiel erhöht unnötig die Datenmenge:
>>> def sitesize(url): ... with requests.get(url) as u: ... return url, u.content