おいも貴婦人ブログ

生物系博士課程満期退学をしたAIエンジニアのブログ。

フィボナッチ数列を複数のスレッドを使って実行する。

 Python3で並列計算をする場合、multiprocessingを使うことになると思います。このモジュールは、threadingの発展系でAPIが非常に似ているようです。今回は、multiprocessingを理解する前に、threadingモジュールを使ってフィボナッチ数列をスレッド化したいと思います。参考文献は文末に示しています。
 このプログラムは、フィボナッチ数列の関数に複数の値を同時に与え、その答えを求めます。

#!/usr/bin/env python3
# coding :utf-8

import logging, threading
## --- logging ---
## エラーログをとるためのモジュール。
## URL:http://docs.python.jp/3.4/library/logging.html

from queue import Queue
## --- queue ---
## スレッド間で共有するデータを格納するためのモジュール。
## URL:http://docs.python.jp/3.4/library/queue.html

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(message)s')

ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)
fibo_dict = {}
shared_queue = Queue()
input_list = [3 ,10, 5, 7]

queue_condition = threading.Condition()
## ある条件で、データの同期をするためのオブジェクト。
## この場合は、プロセスやスレッドの生成をするため。

def fibonacci_task(condition):
    ## フィボナッチ数列をスレッドを生成させて、計算するための関数。
    with condition:
        while shared_queue.empty():
            logger.info("[%s] - waiting for elements in queue.."
                        % threading.current_thread().name)
            condition.wait()
            ## shared_queueが埋まるまで待つ。
        else:
            value = shared_queue.get()
            ## 実行されている各スレッドがshared_queueから要素を取得する。
            ## fibo_dictから値を受け取り、フィボナッチ数列の計算が行われる。
            a, b = 0, 1
            for item in range(value):
                a, b = b, a+b
                fibo_dict[value] = a
        shared_queue.task_done()
        ## それぞれのスレッドが完了したか知るための関数。
        logger.debug("[%s] - fibonacci of key [%d] with result [%d]"
                     % (threading.current_thread().name, value, fibo_dict[value]))
        
def queue_task(condition):
    ## shared_queueに要素が受け渡されているかを確認するための関数。
    logging.debug('Stating queue_task...')
    with condition:
        for item in input_list:
            shared_queue.put(item)
        logging.debug("Notifying fibonacci_task threads that the queue is ready to consume..")
        condition.notifyAll()
        ## それぞれのスレッドが準備できているか知るための関数。

threads = [threading.Thread(daemon=True, target=fibonacci_task, args=(queue_condition,)) for i in range(4)]
## 4つのスレッドを生成。

[thread.start() for thread in threads]
## 4つのスレッドを実行。
## 要素が渡されていないので、この場合はwaitしている。

prod = threading.Thread(name='queue_task_thread', daemon=True, target=queue_task, args=(queue_condition,))
prod.start()
## shared_queueにそれぞれの値をセットし、実行する。

[thread.join() for thread in threads]
## すべての子スレッドが終わる前にこのプログラムが終わらないようにする。
    

実行結果

2015-10-03 22:19:42,687 - [Thread-1] - waiting for elements in queue..
2015-10-03 22:19:42,687 - [Thread-2] - waiting for elements in queue..
2015-10-03 22:19:42,688 - [Thread-3] - waiting for elements in queue..
2015-10-03 22:19:42,688 - [Thread-4] - waiting for elements in queue..
2015-10-03 22:19:42,688 - Stating queue_task...
2015-10-03 22:19:42,688 - Notifying fibonacci_task threads that the queue is ready to consume..
2015-10-03 22:19:42,688 - [Thread-1] - fibonacci of key [3] with result [2]
2015-10-03 22:19:42,689 - [Thread-2] - fibonacci of key [10] with result [55]
2015-10-03 22:19:42,689 - [Thread-3] - fibonacci of key [5] with result [5]
2015-10-03 22:19:42,689 - [Thread-4] - fibonacci of key [7] with result [13]


Parallel Programming With Python

Parallel Programming With Python