Multi Thread

2017/11/15 python

多线程是提高计算效率一个很好的方法,特别是在强化学习中,多线程得到的数据可以有更小的相关性,可以促进训练的稳定性和收敛性。所以有必要好好了解多线程的创建和控制。下面我们就来讲解python的threading模块,需要注意的是python的多线程采用著名的全局解释锁(GIL),不能利用多核的优势,所以基本只适用于IO密集型,多线程进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率;如果多线程进程是CPU密集型的,那多线程并不能带来效率上的提升,相反还可能会因为线程的频繁切换,导致效率下降:

基本函数

先给出线程的一些基本函数

import threading
disp=lambda x:print(x)

def threadJob():
	disp('this is thread of %s'%threading.current_thread())

disp(threading.active_count())		#激活的线程数目
disp(threading.enumerate())			#查看所有的线程信息
disp(threading.current_thread())	#目前正在进行的线程

def main():
	thread=threading.Thread(target=threadJob)	#target参数编程线程的工作
	thread.start()

if __name__ == '__main__':
	main()

同步性

如下代码:

def thread_job():
    print("T1 start")
    time.sleep(1)
    print("T1 finish")

thread = threading.Thread(target=thread_job, name='T1')
thread.start()
print("All done")

结果会是All done 在T1 finish之前显示,因为这是一个独立的线程,所以有时候我们需要对线程加以控制,才能达到我们想要的结果。使用join可以控制多个线程的顺序:

def thread_job(name='Thread',SleepT=1):
    print(name+" start")
    time.sleep(SleepT)
    print(name+" finish")

thread_1 = threading.Thread(target=thread_job,kwargs={'name':'T1','SleepT':5})
thread_2 = threading.Thread(target=thread_job,kwargs={'name':'T2','SleepT':1})
thread_1.start() # 开启T1
thread_2.start() # 开启T2
print("All done")
'''result
	T1 start
	T2 start
	All done
	T2 finish
	T1 finish
'''
thread_1.start()	# 开启T1
thread_2.start()	# 开启T2
thread_2.join()		# 等待T2线程结束
thread_1.join() 	# 等待T1线程结束	
print("All done")
'''result
	T1 start
	T2 start
	T2 finish
	T1 finish
	All done
'''

存储线程结果

在多线程函数中定义一个Queue ,用来保存返回值,代替return,以下是一个案例:

import threading
import time
from queue import Queue

disp=lambda x:print(x)

def thread_job(I,q):
	for i in range(len(I)):
		I[i]=I[i]**2
	q.put(I)#多线程函数不能用return返回变量

def multiThread():
	q=Queue()#用于存放返回值,代替return
	threads=[]
	data=[[1,2,3],[2,3,4],[3,4,5]]
	for i in range(3):#定义三个线程
		T=threading.Thread(target=thread_job,args=(data[i],q))
		#参数要单独放,否则相当于没有开线程,还是在主线程计算
		T.start()#开始线程
		threads.append(T)
	for T in threads:
		T.join()#等待所有线程都结束
	result=[]
	for _ in range(3):
		result.append(q.get())#按照顺序取出接结果
	disp(result)
    disp(data)#data也是会改变的,和result一样

if __name__ == '__main__':
	multiThread()
#result [[1, 4, 9], [4, 9, 16], [9, 16, 25]]

当多线程操作同一个参数的时候一定要小心,很可能会造成数据混乱。

线程锁

当一个全局变量在多线程中被同时引用的时候,由于顺序未知,可能会产生问题,比如下面程序的打印顺序就是未知的:

import threading
disp=lambda x:print(x)

def job1():
    global A
    for i in range(10):
        A+=1
        print('job1',A)

def job2():
    global A
    for i in range(10):
        A+=10
        print('job2',A)

if __name__== '__main__':
    A=0
    t1=threading.Thread(target=job1)
    t2=threading.Thread(target=job2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

lock在不同线程使用同一共享内存时,能够确保线程之间互不影响,使用lock的方法是, 在每个线程执行运算修改共享内存之前,执行lock.acquire()将共享内存上锁, 确保当前线程执行时,内存不会被其他线程访问,执行运算完毕后,使用lock.release()将锁打开, 保证其他的线程可以使用该共享内存。

import threading
disp=lambda x:print(x)

def job1():
    global A,lock
    lock.acquire()#封锁内存
    for i in range(10):
        A+=1
        print('job1',A)
    lock.release()#释放封锁

def job2():
    global A,lock
    lock.acquire()#封锁内存
    for i in range(10):
        A+=10
        print('job2',A)
    lock.release()#释放封锁

if __name__== '__main__':
    lock=threading.Lock()	#定义线程锁
    A=0
    t1=threading.Thread(target=job1)
    t2=threading.Thread(target=job2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

线程通信

多线程之间的通信在任何语言一直是个难点。Python提供了非常简单的通信机制 Threading.Event,通用的条件变量。多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活。

This is one of the simplest mechanisms for communication between threads: one thread signals an event and other threads wait for it.

An event object manages an internal flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true.

下面是一个案例:

import threading
import time

class VehicleThread(threading.Thread): #继承类
    def __init__(self, threadName, event):
        threading.Thread.__init__(self, name=threadName)
        self.threadEvent = event  	#事件

    def run(self):	#重写函数	
        print("Waiting green light")
        self.threadEvent.wait()
        print("Car can pass")


greenLight = threading.Event()
T=VehicleThread("Vehicle",greenLight)

greenLight.clear()	#启动红灯
print("RED LIGHT!") 

T.start()			#开始开车
time.sleep(3)

greenLight.set()	#启动绿灯
print("GREEN LIGHT!")
time.sleep(1)

我们可以创建多个Event控制不同事件在不同线程的发生顺序。

Search

    Table of Contents