速度¶
最も一般的に使用されるPythonの実装であるCPythonは、CPUバインドされたタスクの速度が遅いです。 PyPy は高速です。
David Beazley のCPUバウンドテストコード(複数のテストで追加されたループ)を若干修正したものを使用すると、CPythonとPyPyの処理の違いを見ることができます。
# PyPy
$ ./pypy -V
Python 2.7.1 (7773f8fc4223, Nov 18 2011, 18:47:10)
[PyPy 1.7.0 with GCC 4.4.3]
$ ./pypy measure2.py
0.0683999061584
0.0483210086823
0.0388588905334
0.0440690517426
0.0695300102234
# CPython
$ ./python -V
Python 2.7.1
$ ./python measure2.py
1.06774401665
1.45412397385
1.51485204697
1.54693889618
1.60109114647
コンテキスト¶
The GIL¶
The GIL (Global Interpreter Lock) は、Pythonが複数のスレッドを同時に動作させる方法です。 Pythonのメモリ管理は完全にスレッドセーフではないため、複数のスレッドが同じPythonコードを同時に実行するのを防ぐためにGILが必要です。
David Beazley氏は、GILがどのように機能しているかについて素晴らしい ガイド を持っています。 彼はまた、Python 3.2で new GIL をカバーしています。 彼の結果は、Pythonアプリケーションのパフォーマンスを最大限にするには、GILの強み、アプリケーション固有のアプリケーションへの影響、コアの数、アプリケーションのボトルネックがどこにあるかを理解する必要があります。
C拡張機能¶
The GIL¶
あなたのスレッドをインタープリタに登録するためにC拡張を書くときには 特別な注意 が必要です。
C拡張機能¶
Cython¶
Cython は、Python用のCとC ++モジュールを書くことができるPython言語のスーパーセットを実装しています。 Cythonでは、コンパイルされたCライブラリから関数を呼び出すこともできます。 Cythonを使うと、Pythonの強力な変数や演算のタイプを利用できます。
Cythonでの厳密な型付けの例を次に示します:
def primes(int kmax):
"""Calculation of prime numbers with additional
Cython keywords"""
cdef int n, k, i
cdef int p[1000]
result = []
if kmax > 1000:
kmax = 1000
k = 0
n = 2
while k < kmax:
i = 0
while i < k and n % p[i] != 0:
i = i + 1
if i == k:
p[k] = n
k = k + 1
result.append(n)
n = n + 1
return result
素数を見つけるためのこのアルゴリズムの実装には、純粋なPythonで実装されている次のものと比較していくつかの追加のキーワードがあります。
def primes(kmax):
"""Calculation of prime numbers in standard Python syntax"""
p = range(1000)
result = []
if kmax > 1000:
kmax = 1000
k = 0
n = 2
while k < kmax:
i = 0
while i < k and n % p[i] != 0:
i = i + 1
if i == k:
p[k] = n
k = k + 1
result.append(n)
n = n + 1
return result
Cythonのバージョンでは、整数型と整数型の配列をC型にコンパイルしてPythonリストを作成することを宣言していることに注意してください。
def primes(int kmax):
"""Calculation of prime numbers with additional
Cython keywords"""
cdef int n, k, i
cdef int p[1000]
result = []
def primes(kmax):
"""Calculation of prime numbers in standard Python syntax"""
p = range(1000)
result = []
違いはなんですか? 上のCythonバージョンでは、標準Cと同様の方法で変数型と整数配列の宣言を見ることができます。たとえば、行3の cdef int n、k、i のようになります。この追加の型宣言(つまり整数) Cythonコンパイラは2番目のバージョンからより効率的なCコードを生成することができます。 標準のPythonコードは *.py
ファイルに保存されますが、Cythonコードは *.pyx
ファイルに保存されます。
スピードの違いは何ですか? 試してみよう!
import time
#activate pyx compiler
import pyximport
pyximport.install()
#primes implemented with Cython
import primesCy
#primes implemented with Python
import primes
print "Cython:"
t1= time.time()
print primesCy.primes(500)
t2= time.time()
print "Cython time: %s" %(t2-t1)
print ""
print "Python"
t1= time.time()
print primes.primes(500)
t2= time.time()
print "Python time: %s" %(t2-t1)
これらの行はいずれも発言が必要です:
import pyximport
pyximport.install()
pyximport モジュールは、 primes 関数のCythonでコンパイルされたバージョンで *.pyx
ファイル(例 primesCy.pyx
)をインポートすることを可能にします。 pyximport.install() コマンドはPythonインタプリタがCythonコンパイラを直接起動してCコードを生成できるようにします。これは自動的に *.so
Cライブラリにコンパイルされます。 Cythonでは、このライブラリをPythonコードで簡単に、効率的にインポートすることができます。 time.time() 関数を使うと、これら2つの異なる呼び出しの間の時間を比較して、500個の素数を見つけることができます。 標準のノートブック(デュアルコアAMD E-450 1.6 GHz)では、測定値は次のとおりです。
Cython time: 0.0054 seconds
Python time: 0.0566 seconds
そしてここに埋め込まれた ARM beaglebone マシンの出力があります:
Cython time: 0.0196 seconds
Python time: 0.3302 seconds
Pyrex¶
Shedskin?¶
並行性¶
Concurrent.futures¶
concurrent.futures モジュールは標準ライブラリのモジュールであり、 “非同期的に呼び出し可能なコールを実行するための高水準インターフェース” を提供します。これは、複数のスレッドやプロセスを並行処理に使用することに関するより複雑な詳細を抽象化し、ユーザが手近な作業を達成することに集中することを可能にします。
concurrent.futures モジュールは ThreadPoolExecutor と ProcessPoolExecutor の2つの主要なクラスを公開します。 ThreadPoolExecutorは、ユーザーがジョブを送信できるワーカースレッドのプールを作成します。これらのジョブは、次のワーカー・スレッドが使用可能になると、別のスレッドで実行されます。
ProcessPoolExecutorは同じ方法で動作しますが、ワーカーに複数のスレッドを使用するのではなく、複数のプロセスを使用します。 これによりGILのサイドステップが可能になりますが、ワーカープロセスに渡される方法のために、picklableオブジェクトのみが実行されて返されます。
GILの仕組みのおかげで、実行中のタスクには多くのブロッキング(ネットワーク経由での要求)があり、タスクが計算上高価な場合にはProcessPoolExecutorを使用するときにThreadPoolExecutorを使用するのが良い方法です。
2つのエグゼキュータを使用して並列実行するには、主に2つの方法があります。 1つの方法は map(func, iterables) メソッドです。 これは、すべてが並行して実行されることを除けば、組み込みの map() 関数とほぼ同じように機能します。 :
from concurrent.futures import ThreadPoolExecutor
import requests
def get_webpage(url):
page = requests.get(url)
return page
pool = ThreadPoolExecutor(max_workers=5)
my_urls = ['http://google.com/']*10 # Create a list of urls
for page in pool.map(get_webpage, my_urls):
# Do something with the result
print(page.text)
さらに、 submit(func, *args, **kwargs) メソッドは、実行可能な呼び出しをスケジュールし (func(*args, **kwargs))、 Future オブジェクトを返します。 呼び出し可能オブジェクトの実行を表します。
Futureオブジェクトは、スケジュールされた呼び出し可能ファイルの進行状況をチェックするために使用できるさまざまなメソッドを提供します。 これらには、
- cancel()
- コールをキャンセルしようとしています。
- cancelled()
- 呼び出しが正常にキャンセルされた場合はTrueを返します。
- running()
- 呼び出しが現在実行中でキャンセルできない場合はTrueを返します。
- done()
- 呼び出しが正常に取り消されたか、実行が終了した場合はTrueを返します。
- result()
- 呼び出しによって返された値を返します。 このコールは、スケジュールされたコールバックがデフォルトで戻るまでブロックされます。
- exception()
- 呼び出しによって発生した例外を返します。 例外が発生しなかった場合、これは None を返します。 これは result() のようにブロックされることに注意してください。
- add_done_callback(fn)
- スケジュールされたコールバックが返ってくるときに実行されるコールバック関数を ( fn(future) として)アタッチします。
from concurrent.futures import ProcessPoolExecutor, as_completed
def is_prime(n):
if n % 2 == 0:
return n, False
sqrt_n = int(n**0.5)
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return n, False
return n, True
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
futures = []
with ProcessPoolExecutor(max_workers=4) as pool:
# Schedule the ProcessPoolExecutor to check if a number is prime
# and add the returned Future to our list of futures
for p in PRIMES:
fut = pool.submit(is_prime, p)
futures.append(fut)
# As the jobs are completed, print out the results
for number, result in as_completed(futures):
if result:
print("{} is prime".format(number))
else:
print("{} is not prime".format(number))
concurrent.futures モジュールには、Futuresを扱うための2つのヘルパー関数が含まれています。 as_completed(futures) 関数は、先物リストのイテレータを返し、完了したときの先物を返します。
提供された先物リストのすべての先物が完了するまで、 wait(futures) 機能は単にブロックされます。
concurrent.futures モジュールの使い方については、公式文書を参照してください。
スレッド¶
標準ライブラリには、ユーザが複数のスレッドを手動で扱うことを可能にする threading モジュールが付属しています。
別のスレッドで関数を実行するのは、呼び出し可能オブジェクトを Thread のコンストラクタに渡し、start() を呼び出すという単純な操作です。
from threading import Thread
import requests
def get_webpage(url):
page = requests.get(url)
return page
some_thread = Thread(get_webpage, 'http://google.com/')
some_thread.start()
スレッドが終了するまで待つには join() を呼び出します:
some_thread.join()
join() を呼び出した後は、スレッドがまだ生存しているかどうかを確認することをお勧めします(結合呼び出しがタイムアウトしたため):
if some_thread.is_alive():
print("join() must have timed out.")
else:
print("Our thread has terminated.")
複数のスレッドが同じメモリセクションにアクセスできるため、同時に複数のスレッドが同じリソースに書き込もうとする状況や、出力が特定のイベントのシーケンスやタイミングに依存する状況が発生することがあります。 これは、 data race または競合状態と呼ばれます。 これが発生すると、出力が文字化けしたり、デバッグが困難な問題が発生することがあります。 良い例がこの stackoverflow post です。
これを避ける方法は、各スレッドが共有リソースに書き込む前に取得する必要がある `Lock`_ を使用することです。 ロックは、コンテキストマネージャプロトコル (with 文)、または acquire() と release() を直接使用して取得して解放することができます。 ここに(やや工夫した)例があります:
from threading import Lock, Thread
file_lock = Lock()
def log(msg):
with file_lock:
open('website_changes.log', 'w') as f:
f.write(changes)
def monitor_website(some_website):
"""
Monitor a website and then if there are any changes,
log them to disk.
"""
while True:
changes = check_for_changes(some_website)
if changes:
log(changes)
websites = ['http://google.com/', ... ]
for website in websites:
t = Thread(monitor_website, website)
t.start()
ここでは、サイトリストの変更を確認するためのスレッドがたくさんあります。変更があった場合は、 log(changes) を呼び出すことによってその変更をファイルに書き込もうとします。 log() が呼び出されると、 with file_lock: でロックを取得するのを待ちます。 これにより、一度に1つのスレッドのみがファイルに書き込まれます。