当前位置:主页 > 软件编程 > Python代码 >

处理python中多线程与多进程中的数据共享问题

时间:2022-06-16 09:48:30 | 栏目:Python代码 | 点击:

之前在写多线程与多进程的时候,因为一般情况下都是各自完成各自的任务,各个子线程或者各个子进程之前并没有太多的联系,如果需要通信的话我会使用队列或者数据库来完成,但是最近我在写一些多线程与多进程的代码时,发现如果它们需要用到共享变量的话,需要有一些注意的地方

多线程之间的共享数据

标准数据类型在线程间共享

看以下代码

#coding:utf-8
import threading
def test(name,data):
  print("in thread {} name is {}".format(threading.current_thread(),name))
  print("data is {} id(data) is {}".format(data,id(data)))
if __name__ == '__main__':
  d = 5
  name = "杨彦星"
  for i in range(5):
    th = threading.Thread(target=test,args=(name,d))
    th.start()

这里我创建一个全局的int变量d,它的值是5,当我在5个线程中调用test函数时,将d作为参数传进去,那么这5个线程所拥有的是同一个d吗?我在test函数中通过 id(data) 来打印一下它们的ID,得到了如下的结果

in thread <Thread(Thread-1, started 6624)> name is 杨彦星
data is 5 id(data) is 1763791776
in thread <Thread(Thread-2, started 8108)> name is 杨彦星
data is 5 id(data) is 1763791776
in thread <Thread(Thread-3, started 3356)> name is 杨彦星
data is 5 id(data) is 1763791776
in thread <Thread(Thread-4, started 13728)> name is 杨彦星
data is 5 id(data) is 1763791776
in thread <Thread(Thread-5, started 3712)> name is 杨彦星
data is 5 id(data) is 1763791776

从结果中可以看到,在5个子线程中,data的id都是1763791776,说明在主线程中创建了变量d,在子线程中是可以共享的,在子线程中对共享元素的改变是会影响到其它线程的,所以如果要对共享变量进行修改时,也就是线程不安全的,需要加锁。

自定义类型对象在线程间共享

如果我们要自定义一个类呢,将一个对象作为变量在子线程中传递呢?会是什么效果呢?

#coding:utf-8
import threading
class Data:
  def __init__(self,data=None):
    self.data = data
  def get(self):
    return self.data
  def set(self,data):
    self.data = data
def test(name,data):
  print("in thread {} name is {}".format(threading.current_thread(),name))
  print("data is {} id(data) is {}".format(data.get(),id(data)))
if __name__ == '__main__':
  d = Data(10)
  name = "杨彦星"
  print("in main thread id(data) is {}".format(id(d)))
  for i in range(5):
    th = threading.Thread(target=test,args=(name,d))
    th.start()

这里我定义一个简单的类,在主线程初始化了一个该类型的对象d,然后将它作为参数传给子线程,主线程和子线程分别打印了这个对象的id,我们来看一下结果

in main thread id(data) is 2849240813864
in thread <Thread(Thread-1, started 11648)> name is 杨彦星
data is 10 id(data) is 2849240813864
in thread <Thread(Thread-2, started 11016)> name is 杨彦星
data is 10 id(data) is 2849240813864
in thread <Thread(Thread-3, started 10416)> name is 杨彦星
data is 10 id(data) is 2849240813864
in thread <Thread(Thread-4, started 8668)> name is 杨彦星
data is 10 id(data) is 2849240813864
in thread <Thread(Thread-5, started 4420)> name is 杨彦星
data is 10 id(data) is 2849240813864

我们看到,在主线程和子线程中,这个对象的id是一样的,说明它们用的是同一个对象。

无论是标准数据类型还是复杂的自定义数据类型,它们在多线程之间是共享同一个的,但是在多进程中是这样的吗?

多进程之间的共享数据

标准数据类型在进程间共享

还是上面的代码,我们先来看一下int类型的变量的子进程间的共享

#coding:utf-8
import threading
import multiprocessing
def test(name,data):
  print("in thread {} name is {}".format(threading.current_thread(),name))
  print("data is {} id(data) is {}".format(data,id(data)))
if __name__ == '__main__':
  d = 10
  name = "杨彦星"
  print("in main thread id(data) is {}".format(id(d)))
  for i in range(5):
    pro = multiprocessing.Process(target=test,args=(name,d))
    pro.start()

得到的结果是

in main thread id(data) is 1763791936
in thread <_MainThread(MainThread, started 9364)> name is 杨彦星
data is 10 id(data) is 1763791936
in thread <_MainThread(MainThread, started 9464)> name is 杨彦星
data is 10 id(data) is 1763791936
in thread <_MainThread(MainThread, started 3964)> name is 杨彦星
data is 10 id(data) is 1763791936
in thread <_MainThread(MainThread, started 10480)> name is 杨彦星
data is 10 id(data) is 1763791936
in thread <_MainThread(MainThread, started 13608)> name is 杨彦星
data is 10 id(data) is 1763791936

可以看到它们的id是一样的,说明用的是同一个变量,但是当我尝试把d由int变为了string时,发现它们又不一样了……

if __name__ == '__main__':
  d = 'yangyanxing'
  name = "杨彦星"
  print("in main thread id(data) is {}".format(id(d)))
  for i in range(5):
    pro = multiprocessing.Process(target=test,args=(name,d))
    pro.start()

此时得到的结果是

in main thread id(data) is 2629633397040
in thread <_MainThread(MainThread, started 9848)> name is 杨彦星
data is yangyanxing id(data) is 1390942032880
in thread <_MainThread(MainThread, started 988)> name is 杨彦星
data is yangyanxing id(data) is 2198251377648
in thread <_MainThread(MainThread, started 3728)> name is 杨彦星
data is yangyanxing id(data) is 2708672287728
in thread <_MainThread(MainThread, started 5288)> name is 杨彦星
data is yangyanxing id(data) is 2376058999792
in thread <_MainThread(MainThread, started 12508)> name is 杨彦星
data is yangyanxing id(data) is 2261044040688

于是我又尝试了list、Tuple、dict,结果它们都是不一样的,我又回过头来试着在多线程中使用列表元组和字典,结果它们还是一样的。

这里有一个有趣的问题,如果是int类型,当值小于等于256时,它们在多进程间的id是相同的,如果大于256,则它们的id就会不同了,这个我没有查看原因。

自定义类型对象在进程间共享

#coding:utf-8
import threading
import multiprocessing
class Data:
  def __init__(self,data=None):
    self.data = data
  def get(self):
    return self.data
  def set(self,data):
    self.data = data
def test(name,data):
  print("in thread {} name is {}".format(threading.current_thread(),name))
  print("data is {} id(data) is {}".format(data.get(),id(data)))

if __name__ == '__main__':
  d = Data(10)
  name = "杨彦星"
  print("in main thread id(data) is {}".format(id(d)))
  for i in range(5):
    pro = multiprocessing.Process(target=test,args=(name,d))
    pro.start()

得到的结果是

in main thread id(data) is 1927286591728
in thread <_MainThread(MainThread, started 2408)> name is 杨彦星
data is 10 id(data) is 1561177927752
in thread <_MainThread(MainThread, started 5728)> name is 杨彦星
data is 10 id(data) is 2235260514376
in thread <_MainThread(MainThread, started 1476)> name is 杨彦星
data is 10 id(data) is 2350586073040
in thread <_MainThread(MainThread, started 996)> name is 杨彦星
data is 10 id(data) is 2125002248088
in thread <_MainThread(MainThread, started 10740)> name is 杨彦星
data is 10 id(data) is 1512231669656

可以看到它们的id是不同的,也就是不同的对象。

在多进程间如何共享数据

我们看到,数据在多进程间是不共享的(小于256的int类型除外),但是我们又想在主进程和子进程间共享一个数据对象时该如何操作呢?

在看这个问题之前,我们先将之前的多线程代码做下修改

#coding:utf-8
import threading
import multiprocessing
class Data:
  def __init__(self,data=None):
    self.data = data
  def get(self):
    return self.data
  def set(self,data):
    self.data = data
def test(name,data,lock):
  lock.acquire()
  print("in thread {} name is {}".format(threading.current_thread(),name))
  print("data is {} id(data) is {}".format(data,id(data)))
  data.set(data.get()+1)
  lock.release()
if __name__ == '__main__':
  d = Data(0)
  thlist = []
  name = "yang"
  lock = threading.Lock()
  for i in range(5):
    th = threading.Thread(target=test,args=(name,d,lock))
    th.start()
    thlist.append(th)
  for i in thlist:
    i.join()
  print(d.get())

我们这个代码的目的是这样,使用自定义的Data类型对象,当经过5个子线程操作以后,每个子线程对其data值进行加1操作,最后在主线程打印对象的data值。

该输出结果如下

in thread <Thread(Thread-1, started 3296)> name is yang
data is <__main__.Data object at 0x000001A451139198> id(data) is 1805246501272
in thread <Thread(Thread-2, started 9436)> name is yang
data is <__main__.Data object at 0x000001A451139198> id(data) is 1805246501272
in thread <Thread(Thread-3, started 760)> name is yang
data is <__main__.Data object at 0x000001A451139198> id(data) is 1805246501272
in thread <Thread(Thread-4, started 1952)> name is yang
data is <__main__.Data object at 0x000001A451139198> id(data) is 1805246501272
in thread <Thread(Thread-5, started 5988)> name is yang
data is <__main__.Data object at 0x000001A451139198> id(data) is 1805246501272

可以看到在主线程最后打印出来了5,符合我们的预期,但是如果放到多进程中呢?因为多进程下,每个子进程所持有的对象是不同的,所以每个子进程操作的是各自的Data对象,对于主进程的Data对象应该是没有影响的,我们来看下它的结果

#coding:utf-8
import threading
import multiprocessing
class Data:
  def __init__(self,data=None):
    self.data = data
  def get(self):
    return self.data
  def set(self,data):
    self.data = data
def test(name,data,lock):
  lock.acquire()
  print("in thread {} name is {}".format(threading.current_thread(),name))
  print("data is {} id(data) is {}".format(data,id(data)))
  data.set(data.get()+1)
  lock.release()
if __name__ == '__main__':
  d = Data(0)
  thlist = []
  name = "yang"
  lock = multiprocessing.Lock()
  for i in range(5):
    th = multiprocessing.Process(target=test,args=(name,d,lock))
    th.start()
    thlist.append(th)
  for i in thlist:
    i.join()
  print(d.get())

它的输出结果是:

in thread <_MainThread(MainThread, started 7604)> name is yang
data is <__mp_main__.Data object at 0x000001D110130EB8> id(data) is 1997429477048
in thread <_MainThread(MainThread, started 12108)> name is yang
data is <__mp_main__.Data object at 0x000002C4E88E0E80> id(data) is 3044738469504
in thread <_MainThread(MainThread, started 3848)> name is yang
data is <__mp_main__.Data object at 0x0000027827270EF0> id(data) is 2715076202224
in thread <_MainThread(MainThread, started 12368)> name is yang
data is <__mp_main__.Data object at 0x000002420EA80E80> id(data) is 2482736991872
in thread <_MainThread(MainThread, started 4152)> name is yang
data is <__mp_main__.Data object at 0x000001B1577F0E80> id(data) is 1861188783744

最后的输出是0,说明了子进程对于主进程传入的Data对象操作其实对于主进程的对象是不起作用的,我们需要怎样的操作才能实现子进程可以操作主进程的对象呢?我们可以使用 multiprocessing.managers 下的 BaseManager 来实现

#coding:utf-8
import threading
import multiprocessing
from multiprocessing.managers import BaseManager
class Data:
  def __init__(self,data=None):
    self.data = data
  def get(self):
    return self.data
  def set(self,data):
    self.data = data
BaseManager.register("mydata",Data)
def test(name,data,lock):
  lock.acquire()
  print("in thread {} name is {}".format(threading.current_thread(),name))
  print("data is {} id(data) is {}".format(data,id(data)))
  data.set(data.get()+1)
  lock.release()
def getManager():
  m = BaseManager()
  m.start()
  return m
if __name__ == '__main__':
  manager = getManager()
  d = manager.mydata(0)
  thlist = []
  name = "yang"
  lock = multiprocessing.Lock()
  for i in range(5):
    th = multiprocessing.Process(target=test,args=(name,d,lock))
    th.start()
    thlist.append(th)
  for i in thlist:
    i.join()
  print(d.get())

使用 from multiprocessing.managers import BaseManager 引入 BaseManager以后,在定义完Data类型之后,使用 BaseManager.register("mydata",Data) 将Data类型注册到BaseManager中,并且给了它一个名字叫 mydata ,之后就可以使用 BaseManager 对象的这个名字来初始化对象,我们来看一下输出

C:\Python35\python.exe F:/python/python3Test/multask.py
in thread <_MainThread(MainThread, started 12244)> name is yang
data is <__mp_main__.Data object at 0x000001FE1B7D9668> id(data) is 2222932504080
in thread <_MainThread(MainThread, started 2860)> name is yang
data is <__mp_main__.Data object at 0x000001FE1B7D9668> id(data) is 1897574510096
in thread <_MainThread(MainThread, started 2748)> name is yang
data is <__mp_main__.Data object at 0x000001FE1B7D9668> id(data) is 2053415775760
in thread <_MainThread(MainThread, started 7812)> name is yang
data is <__mp_main__.Data object at 0x000001FE1B7D9668> id(data) is 2766155820560
in thread <_MainThread(MainThread, started 2384)> name is yang
data is <__mp_main__.Data object at 0x000001FE1B7D9668> id(data) is 2501159890448

我们看到,虽然在每个子进程中使用的是不同的对象,但是它们的值却是可以“共享”的。

标准的数据类型也可以通过multiprocessing库中的Value对象,举一个简单的例子

#coding:utf-8
import threading
import multiprocessing
from multiprocessing.managers import BaseManager
class Data:
  def __init__(self,data=None):
    self.data = data
  def get(self):
    return self.data
  def set(self,data):
    self.data = data
BaseManager.register("mydata",Data)
def test(name,data,lock):
  lock.acquire()
  print("in thread {} name is {}".format(threading.current_thread(),name))
  print("data is {} id(data) is {}".format(data,id(data)))
  data.value +=1
  lock.release()
if __name__ == '__main__':
  d = multiprocessing.Value("l",10) #
  print(d)
  thlist = []
  name = "yang"
  lock = multiprocessing.Lock()
  for i in range(5):
    th = multiprocessing.Process(target=test,args=(name,d,lock))
    th.start()
    thlist.append(th)
  for i in thlist:
    i.join()
  print(d.value)

这里使用 d = multiprocessing.Value("l",10) 初始化了一个数字类型的对象,这个类型是 Synchronized wrapper for c_long , multiprocessing.Value 在初始化时,第一个参数是类型,第二个参数是值,具体支持的类型如下

还可以使用ctypes库里和类初始化字符串

>>> from ctypes import c_char_p
>>> s = multiprocessing.Value(c_char_p, b'\xd1\xee\xd1\xe5\xd0\xc7')
>>> print(s.value.decode('gbk'))

杨彦星

还可以使用Manager对象初始list,dict等

#coding:utf-8
import multiprocessing
def func(mydict, mylist):
  # 子进程改变dict,主进程跟着改变
  mydict["index1"] = "aaaaaa" 
  # 子进程改变List,主进程跟着改变 
  mydict["index2"] = "bbbbbb"
  mylist.append(11) 
  mylist.append(22)
  mylist.append(33)
if __name__ == "__main__":
  # 主进程与子进程共享这个字典
  mydict = multiprocessing.Manager().dict()
  # 主进程与子进程共享这个List
  mylist = multiprocessing.Manager().list(range(5)) 
  p = multiprocessing.Process(target=func, args=(mydict, mylist))
  p.start()
  p.join()
  print(mylist)
  print(mydict)

其实我们这里所说的共享只是数据值上的共享,因为在多进程中,各自持有的对象都不相同,所以如果想要同步状态需要曲线救国。不过这种在自己写的小项目中可以简单的使用,如果做一些大一点的项目,还是建议不要使用这种共享数据的方式,这种大大的增加了程序间的耦合性,使用逻辑变得复杂难懂,所以建议还是使用队列或者数据为进行间通信的渠道。

总结

您可能感兴趣的文章:

相关文章