Python でスレッド間通信をマスターする:同期とデータ共有
スレッド間通信とは、Python マルチスレッド プログラム内のスレッド間の通信と同期を可能にするプロセスを指します。
一般に、Python のスレッドはプロセス内で同じメモリ空間を共有するため、スレッド モジュールが提供する共有変数、オブジェクト、特殊な同期メカニズムを通じてデータを交換し、アクティビティを調整できます。
スレッド間通信を容易にするために、スレッド モジュールは、ロック、イベント、条件、セマフォ オブジェクトなどのさまざまな同期プリミティブを提供します。このチュートリアルでは、マルチスレッド プログラムのスレッド間の通信を提供するために Event オブジェクトと Condition オブジェクトを使用する方法を学習します。
イベント オブジェクト
Event オブジェクトは、スレッドが待機または設定できるように内部フラグの状態を管理します。イベント オブジェクトは、このフラグの状態を制御するメソッドを提供し、スレッドが共有条件に基づいてアクティビティを同期できるようにします。
フラグは最初は false ですが、set() メソッドで true になり、clear() メソッドで false にリセットされます。 wait() メソッドは、フラグが true になるまでブロックします。
以下は、Event オブジェクトの主要なメソッドです -
- is_set():内部フラグが true の場合にのみ True を返します。
- set():内部フラグを true に設定します。それが true になるのを待っているすべてのスレッドが目覚めます。フラグが true になると wait() を呼び出すスレッドはまったくブロックされません。
- clear():内部フラグを false にリセットします。その後、set() が呼び出されて内部フラグが再び true に設定されるまで、wait() を呼び出すスレッドはブロックされます。
- wait(timeout=None):内部フラグが true になるまでブロックします。エントリ時に内部フラグが true の場合は、すぐに戻ります。それ以外の場合は、別のスレッドが set() を呼び出してフラグを true に設定するまで、またはオプションのタイムアウトが発生するまでブロックします。 timeout 引数が存在し、None ではない場合、操作のタイムアウトを秒単位で指定する浮動小数点数にする必要があります。
例
次のコードは、交通信号の状態 (緑または赤) によって制御されている交通の流れをシミュレートしようとします。
プログラムには 2 つのスレッドがあり、2 つの異なる関数を対象としています。 signal_state() 関数は、信号が緑から赤に変化したことを示すイベントを定期的にセットおよびリセットします。
Traffic_flow() 関数は、イベントが設定されるのを待ち、設定されたままになるまでループを実行します。
from threading import Event, Thread
import time
terminate = False
def signal_state():
global terminate
while not terminate:
time.sleep(0.5)
print("Traffic Police Giving GREEN Signal")
event.set()
time.sleep(1)
print("Traffic Police Giving RED Signal")
event.clear()
def traffic_flow():
global terminate
num = 0
while num < 10 and not terminate:
print("Waiting for GREEN Signal")
event.wait()
print("GREEN Signal ... Traffic can move")
while event.is_set() and not terminate:
num += 1
print("Vehicle No:", num," Crossing the Signal")
time.sleep(1)
print("RED Signal ... Traffic has to wait")
event = Event()
t1 = Thread(target=signal_state)
t2 = Thread(target=traffic_flow)
t1.start()
t2.start()
# Terminate the threads after some time
time.sleep(5)
terminate = True
# join all threads to complete
t1.join()
t2.join()
print("Exiting Main Thread")
出力
上記のコードを実行すると、次の出力が得られます-
Waiting for GREEN Signal Traffic Police Giving GREEN Signal GREEN Signal ... Traffic can move Vehicle No: 1 Crossing the Signal Traffic Police Giving RED Signal RED Signal ... Traffic has to wait Waiting for GREEN Signal Traffic Police Giving GREEN Signal GREEN Signal ... Traffic can move Vehicle No: 2 Crossing the Signal Vehicle No: 3 Crossing the Signal Traffic Police Giving RED Signal Traffic Police Giving GREEN Signal Vehicle No: 4 Crossing the Signal Traffic Police Giving RED Signal RED Signal ... Traffic has to wait Traffic Police Giving GREEN Signal Traffic Police Giving RED Signal Exiting Main Thread
条件オブジェクト
Python のスレッド モジュールの Condition オブジェクトは、より高度な同期メカニズムを提供します。これにより、スレッドは続行する前に別のスレッドからの通知を待つことができます。 Condition オブジェクトは常にロックに関連付けられており、スレッド間の信号伝達のメカニズムを提供します。
以下は、threading.Condition() クラスの構文です-
threading.Condition(lock=None)
以下は、Condition オブジェクトの主要なメソッドです-
- acquire(*args):基礎となるロックを取得します。このメソッドは、基になるロックの対応するメソッドを呼び出します。戻り値はメソッドが返すものです。
- release():基礎となるロックを解放します。このメソッドは、基になるロックの対応するメソッドを呼び出します。戻り値はありません。
- wait(timeout=None):このメソッドは、基礎となるロックを解放し、別のスレッドの同じ条件変数に対する Notice() または Notice_all() 呼び出しによってウェイクアップされるまで、またはオプションのタイムアウトが発生するまでブロックします。目覚めるかタイムアウトになると、ロックを再取得して戻ります。
- wait_for(predicate, timeout=None):このユーティリティ メソッドは、述語が満たされるまで、またはタイムアウトが発生するまで、繰り返し wait() を呼び出すことができます。戻り値は述語の最後の戻り値であり、メソッドがタイムアウトした場合は False と評価されます。
- notify(n=1):このメソッドは、条件変数を待機している最大で n 個のスレッドを起動します。待機中のスレッドがない場合は何も行われません。
- notify_all():この条件を待機しているすべてのスレッドを起動します。このメソッドは、notify() と同様に動作しますが、1 つのスレッドではなく、待機中のすべてのスレッドを起動します。このメソッドの呼び出し時に呼び出しスレッドがロックを取得していない場合は、RuntimeError が発生します。
例
この例では、Python のスレッド モジュールの Condition オブジェクトを使用した単純な形式のスレッド間通信を示します。ここで、thread_a と thread_b は、Condition オブジェクトを使用して通信されます。thread_a は、thread_b から通知を受け取るまで待機します。 thread_b は、thread_a に通知する前に 2 秒間スリープしてから終了します。
from threading import Condition, Thread
import time
c = Condition()
def thread_a():
print("Thread A started")
with c:
print("Thread A waiting for permission...")
c.wait()
print("Thread A got permission!")
print("Thread A finished")
def thread_b():
print("Thread B started")
with c:
time.sleep(2)
print("Notifying Thread A...")
c.notify()
print("Thread B finished")
Thread(target=thread_a).start()
Thread(target=thread_b).start()
出力
上記のコードを実行すると、次の出力が得られます-
Thread A started Thread A waiting for permission... Thread B started Notifying Thread A... Thread B finished Thread A got permission! Thread A finished
例
以下は、スレッド間の通信を提供するために Condition オブジェクトがどのように使用されるかを示す別のコードです。ここでは、スレッド t2 が taskB() 関数を実行し、スレッド t1 が taskA() 関数を実行します。 t1 スレッドは条件を取得して通知します。
その時点で、t2 スレッドは待機状態になります。条件が解放されると、待機中のスレッドは、通知関数によって生成された乱数の消費を開始します。
from threading import Condition, Thread
import time
import random
numbers = []
def taskA(c):
for _ in range(5):
with c:
num = random.randint(1, 10)
print("Generated random number:", num)
numbers.append(num)
print("Notification issued")
c.notify()
time.sleep(0.3)
def taskB(c):
for i in range(5):
with c:
print("waiting for update")
while not numbers:
c.wait()
print("Obtained random number", numbers.pop())
time.sleep(0.3)
c = Condition()
t1 = Thread(target=taskB, args=(c,))
t2 = Thread(target=taskA, args=(c,))
t1.start()
t2.start()
t1.join()
t2.join()
print("Done")
このコードを実行すると、次の出力が生成されます-
waiting for update Generated random number: 2 Notification issued Obtained random number 2 Generated random number: 5 Notification issued waiting for update Obtained random number 5 Generated random number: 1 Notification issued waiting for update Obtained random number 1 Generated random number: 9 Notification issued waiting for update Obtained random number 9 Generated random number: 2 Notification issued waiting for update Obtained random number 2 Done
Python