- 並行/並列処理とは?
- 負荷の種類
- マルチスレッド/マルチプロセス
- コルーチン/イベントループ/I/O多重化
- Pythonの非同期ライブラリ
- asyncioの話
concurrent)と並列(parallel)という似た用語がある同義 として使われることも多い実行状態に保てる ことを並行(concurrent)と呼ぶ同時に実行できる ことを並列(parallel)と呼ぶ包含 する概念である雑に言うと
並列処理並行処理
- 数値計算のようにCPUを使い続けるような処理 …
CPUバウンド- ファイルの読み書き、DBへの接続、ネットワーク通信 …
I/Oバウンド
- マルチスレッドを扱う
ThreadPoolExecuter- マルチプロセスを扱う
ProcessPoolExecuter
例えば
というの考える
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | from concurrent.futures import ThreadPoolExecutor, as_completed
def busy_io(index):
# 巨大なファイルを読み込む
with open('large{}.txt'.format(index)) as fp:
content = fp.read()
return 'Finish thread{}'.format(index)
if __name__ == '__main__':
futures = []
worker_num = 3 # ワーカースレッド数
task_num = 6 # 実行タスク数
with ThreadPoolExecutor(worker_num) as executer:
for index in range(task_num):
futures.append(executer.submit(busy_io, index))
# 実行が終わったものから結果を表示
for x in as_completed(futures):
print(x.result())
|
実行するとどうなるか?
I/O待ち が発生する処理速度はかなり速くなるCPUバウンドな処理 を実行してみましょう例えば
というのを考える
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | from concurrent.futures import ThreadPoolExecutor, as_completed
def busy_cpu(index):
# 大きめの計算処理をする
sum(range(10**8))
return 'Finish thread{}'.format(index)
if __name__ == '__main__':
futures = []
worker_num = 3 # ワーカースレッド数
task_num = 6 # 実行タスク数
with ThreadPoolExecutor(worker_num) as executer:
for index in range(task_num):
futures.append(executer.submit(busy_cpu, index))
# 実行が終わったのものから結果を表示
for x in as_completed(futures):
print(x.result())
|
実行するとどうなるか?
なぜ?
GIL という仕組みにより、実行されるスレッドは 常に一つ に制限されている結果的に
処理時間は変わらないGlobal Interpretor Lock の 略称常に一つのみ という制限なぜ制限あるのか?
ちなみに
GILが解放される らしいです注釈
GIL を解放するのはほとんどがシステムのI/O関数を呼び出す時ですが、メモリバッファに対する圧縮や
暗号化のように、Pythonのオブジェクトにアクセスしない長時間かかる計算処理を呼び出すときもGIL
を解放することは有益です。例えば、 zlib や hashlib モジュールは圧縮やハッシュ計算の前にGILを解放します。
マルチプロセス で 並列 に実行する注意点
1 2 3 | # コア数を確かめる時は?
import os
os.cpu_count()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | from concurrent.futures import ProcessPoolExecutor, as_completed
def busy_cpu(index):
# 大きめの計算処理をする
sum(range(10**8))
return 'Finish process{}'.format(index)
if __name__ == '__main__':
futures = []
worker_num = 3 # ワーカープロセス数
task_num = 6 # 実行タスク数
# ProcessPoolExecuterに変えるだけで良い
with ProcesssPoolExecutor(worker_num) as executer:
for index in range(task_num):
futures.append(executer.submit(busy_cpu, index))
# 実行が終わったのから結果を表示
for x in as_completed(futures):
print(x.result())
|
- I/Oバウンドな処理には効果的
- プロセスに比べて起動コストが低い
- GILがあるためCPUバウンドな処理には不向き
- スレッド間でのデータ競合を気をつけなければいけない
- GILの影響を受けずに並列に処理することができる
- メモリ空間を別プロセスと共有しない
- CPUのコア数を超えて並列化はできない
- スレッドよりもメモリを消費する
- プロセス間通信しないとデータを受け渡せない
非同期I/O です同期I/O(ブロッキングI/O) と呼ぶ非同期I/O(ノンブロッキングI/O) と呼ぶ
- より正確には
ノンブロッキングI/Oと非同期I/Oは定義が異なる- ノンブロッキングI/Oと非同期I/Oの違いを理解する
イベントループI/O多重化コルーチン
イベント駆動プログラミングイベントループ であるどんな事をするのか?
イベントハンドラ を イベントループ に登録イベントキュー に入れて管理イベントディスパッチャー が イベントハンドラ を実行するどんなのがイベント?
主にイベントループのライブラリが内部で
ジェネレータベースのコルーチン1 2 3 4 5 6 7 8 9 10 | def coro():
world = yield "Hello"
yield world
c = coro()
print(c) # => <generator object coro at 0x10f2df620>
print(next(c)) # => Hello
print(c.send('World')) # => World
|
| ライブラリ名 | イベントループ | コルーチン |
| gevent | libev | greentlet |
| Twisted | reactor | inlineCallbacks |
| Tornado | tornado.gen.coroutine | tornado.ioloop |
async/await (ネイティブコルーチン) が利用可能になったasyn/await構文 (ネイティブコルーチン) が使えるPython 3.4 以前
@asyncio.coroutine
def hello_world():
print("Hello World!")
yield from asyncio.sleep(1)
Python 3.5 以降
async def hello_world():
print("Hello World!")
await asyncio.sleep(1)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
|
普段に使うようなライブラリの置き換え
非同期系だと
1 2 3 | import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
忙しい中、レビューしてくださった。@shimizukawaさん、@crochacoさん、@mahataさん、@kamekoさん ありがとうございました。
また参考にさせていただいた資料、本の著者の皆さま。本当ににありがとうございました。