ホーム  >   ブログ  >   Pythonのconcurrent.futuresを試す

2017-10-01

Pythonのconcurrent.futuresを試す

EuroScipy 2017 でPythonの concurrent.futures についての話を聞いたので、改めて調べてみた。

2系まではPythonの並列処理といえば標準の multiprocessing.Pool が定番だったけど、3系からは新たなインタフェースとして concurrent.futures という選択肢もふえた。

Scalaなんかでおなじみの Future とは、並行処理の結果を参照する存在。Pythonの Future f = concurrent.futures.Future() は処理の『実行中 f.running() 』『キャンセル済み f.canceled() 』『完了 f.done() 』といった“状態”を参照するメソッドを提供している。そして f.result() を呼べば完了までブロッキング。

実際には、非同期処理は Executor オブジェクトによってスケジューリングされる。このときマルチスレッドなら ThreadPoolExecutor、マルチプロセスなら ProcessPoolExecutor を使う。

マルチスレッド: ThreadPoolExecutor

スレッドプールを利用した並列化。

重要なのは、たとえ複数スレッドで処理を実行しても、ワーカーたちは1つのインタプリタを共有している点。これによりメモリオーバーヘッドが小さい、spawnが早い、ワーカー間の同期が不要といった意味で、処理の効率的な非同期呼び出しが期待できる。

しかし同時に、Pythonには Global Interpreter Lock (GIL) という mutex があり、1つのインタプリタ上では同時に1スレッドからしかリソースにアクセスできないという制約がある。なので ThreadPoolExecutor による並列化は、CPU-boundedな処理に対しては必ずしも有効とは言えない。

これは過度な制約だという見方もあり、numpy, pandas, sklearn といった有名ライブラリは with nogil を付与したコンパイルによってGILフリーなマルチスレッド処理を(部分的に)行っていたりするらしい。

マルチプロセス: ProcessPoolExecutor

一方で、プロセスレベルの並列化では各ワーカーが自分だけのインタプリタを持つ。

これによりマルチスレッドと比較するとspawnが遅く、メモリオーバーヘッドが大きく、プロセス間で同期をとる必要が生じてしまうが、GILに縛られない並列化が可能となる。

従来 multiprocessing.Pool() でプロセスプールを作って実現していた並列化はこちらに相当する。

試す

では、1秒wait()×2回をシングルスレッド、マルチスレッド、マルチプロセスそれぞれで試してみよう:

import time
from concurrent import futures


def wait():
    time.sleep(1)


if __name__ == '__main__':
    start = time.time()
    wait()
    wait()
    end = time.time()
    print('Single: elapsed time = {}'.format(end - start))

    start = time.time()
    with futures.ThreadPoolExecutor() as executor:
        f1 = executor.submit(wait)
        f2 = executor.submit(wait)
        f1.result()
        f2.result()
    end = time.time()
    print('Multi-thread: elapsed time = {}'.format(end - start))

    start = time.time()
    with futures.ProcessPoolExecutor() as executor:
        f1 = executor.submit(wait)
        f2 = executor.submit(wait)
        f1.result()
        f2.result()
    end = time.time()
    print('Multi-process: elapsed time = {}'.format(end - start))

executor.submit() でタスクのスケジューリングをして、返ってきた Future オブジェクトが完了するまで result() で処理をブロックする。

結果:

$ python wait.py
Single:        elapsed time = 2.0102460384368896
Multi-thread:  elapsed time = 1.0070040225982666
Multi-process: elapsed time = 1.0193650722503662

並列処理のパワーを感じる。

次に、大きい数の素数判定を並列に行う公式ドキュメントの例を試してみる:

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]


def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


if __name__ == '__main__':
    import time

    start = time.time()
    for number, prime in zip(PRIMES, map(is_prime, PRIMES)):
        print('%d is prime: %s' % (number, prime))
    end = time.time()
    print('Single:        elapsed time = {}'.format(end - start))

    start = time.time()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))
    end = time.time()
    print('Multi-thread:  elapsed time = {}'.format(end - start))

    start = time.time()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))
    end = time.time()
    print('Multi-process: elapsed time = {}'.format(end - start))

判定対象の数を is_prime()executor.map して呼ぶ。使い方は通常の mapmultiprocessing.Pool.map と同じ。

$ python prime.py
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
Single:        elapsed time = 2.9173738956451416
...
Multi-thread:  elapsed time = 3.619951009750366
...
Multi-process: elapsed time = 1.7795209884643555

結果は、シングルスレッド vs マルチプロセスでは処理時間が期待通り短縮されているが、マルチスレッドはむしろシングルスレッドよりも遅い。これがGILの罠。CPU-boundedな処理の ThreadPoolExecutor による並列化は期待を裏切る。

Q. 結局どれを使って並列化すべきなのか

(僕の観測範囲では)機械学習系の論文の割と雑なコードがGitHubで公開されていると、それはたいてい multiprocessing.Pool を使っている気がするけど、それで本当に良いのだろうか。

concurrent.futures を紹介してくれた EuroScipy 2017 のトークの本題は、concurrent.futures をロバストにした Executor loky の紹介だった 1。というわけで、この先は loky と concurrent.futures の違いを知った上で、未だに広く使われている multiprocessing.Pool も含めて、「結局どれを使うべきなのか」ということを考える必要がある 2

というわけで、続きます。

1. なんと joblib.Parallel の正体は loky らしい。知らなかった…。
2. まぁなんとなく、「joblibを使おう」みたいな結論になりそうな気がする。

  シェアする

このエントリーをはてなブックマークに追加

  カテゴリ

プログラミング コンピュータシステム

  あわせて読みたい

2017-12-01
『Java本格入門』メモ
2017-10-28
Courseraの"Functional Programming in Scala Specialization"を修了した
2017-09-02
EuroSciPy 2017に参加してしゃべってきた

  もっと見る

最終更新日: 2022-01-18

  書いた人: たくち

Takuya Kitazawaたくち)です。長野県出身、カナダ・バンクーバー在住のソフトウェアエンジニア。これまでB2B/B2Cの各領域で、Web技術・データサイエンス・機械学習のプロダクト化および顧客への導入支援・コンサルティング、そして関連分野の啓蒙活動に携わってきました。現在は主に北米(カナダ)、アジア(日本)、アフリカ(マラウイ)の個人および企業を対象にフリーランスとして活動中。詳しい経歴はレジュメ を参照ください。いろいろなまちを走って、時に自然と戯れながら、その時間その場所の「日常」を生きています。ご意見・ご感想およびお仕事のご相談は [email protected] まで。

  近況   一杯のコーヒーを贈る

  免責事項

  • Amazonのアソシエイトとして、当サイトは amazon.co.jp 上の適格販売により収入を得ています。
  • 当サイトおよび関連するメディア上での発言はすべて私個人の見解であり、所属する(あるいは過去に所属した)組織のいかなる見解を代表するものでもありません。
  • 当サイトのコンテンツ・情報につきまして、可能な限り正確な情報を掲載するよう努めておりますが、個人ブログという性質上、誤情報や客観性を欠いた意見が入り込んでいることもございます。いかなる場合でも、当サイトおよびリンク先に掲載された内容によって生じた損害等の一切の責任を負いかねますのでご了承ください。
  • その他、記事の内容や掲載画像などに問題がございましたら、直接メールでご連絡ください。確認の後、対応させていただきます。