• 微信公众号:美女很有趣。 工作之余,放松一下,关注即送10G+美女照片!

day37

开发技术 开发技术 5小时前 1次浏览

一、进程间的数据是隔离的

from multiprocessing import Process

def task():
    global n
    n = 100
    print("子进程中:", n)


if __name__ == '__main__':

    p = Process(target=task, )
    p.start()
    n = 10
    # task()

    print("主进程中:", n)

# 运行结果    
# 主进程中: 10
# 子进程中: 100

 

二、Queue=》队列

1、进程间的通信

IPC(Inter-Process  Communication)

2、队列

概念介绍

创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递

Queue([maxsize]) 创建共享的进程队列

参数:maxsize是队列中心中允许的最大项数。如果省略此处参数,则无大小限制。

底层队列使用管道和锁定实现

方法介绍

Queue([maxsize]):创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具有以下方法:

q.get( [ block [ ,timeout ] ] ):返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

q.get_nowait() :同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) :将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

q.qsize() :返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。

q.empty() :判断q是否有位置,如果没有返回False,若还有位置返回True

q.full() :如果q已满,返回True;如果q未满,返回False。

其他方法(了解)

q.close() :关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

q.cancel_join_thread() :不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

q.join_thread() :连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

三、解决进程间的数据隔离问题

from multiprocessing import Queue, Process
import os, time

def task(queue):
    print("这个进程id:%s开始放数据了" % os.getpid())
    time.sleep(2)
    queue.put('ly is dsb')
    print("这个进程id:%s数据放完了" % os.getpid())

if __name__ == '__main__':
    q = Queue(3)
    p = Process(target=task, args=(q,))
    p.start()

    print("主进程")

    res = q.get()
    print("主进程中取值:", res)

 

四、多进程放入数据到Queue

from multiprocessing import Queue, Process
import os, time

def get_task(q):
    print("%s:%s" % (os.getpid(), q.get()))

def put_task(q):
    q.put("%s开始写数据了" % os.getpid())

if __name__ == '__main__':
    q = Queue(3)

    p = Process(target=put_task, args=(q,))
    p.start()

    p1 = Process(target=put_task, args=(q,))
    p1.start()

    p2 = Process(target=get_task, args=(q,))
    p2.start()

    p3 = Process(target=get_task, args=(q,))
    p3.start()

 

五、生产者消费者模型

代码演示:

import os
import time
import random
from multiprocessing import Process,Queue

# 版本1:
# 生产者:
def producer(queue):
    # 把数据全部放在Queue
    for i in range(10):
        data = "这个进程id:%s, 蒸了%s个包子" % (os.getpid(), i)
        print(data)

        time.sleep(random.randint(1,3))
        # 放入数据
        queue.put("第%s个包子" % i)

def consumer(queue):
    while True:
        res = queue.get()
        data = "这个进程id:%s, 吃了%s" % (os.getpid(), res)
        print(data)

if __name__ == '__main__':

    q = Queue(3)
    p = Process(target=producer, args=(q, ))
    p.start()

    p1 = Process(target=consumer, args=(q,))
    p1.start()

# 版本2:
# 生产者:
def producer(queue):
    # 把数据全部放在Queue
    for i in range(10):
        data = "这个进程id:%s, 蒸了%s个包子" % (os.getpid(), i)
        print(data)

        time.sleep(random.randint(1, 3))
        # 放入数据
        queue.put("第%s个包子" % i)
    queue.put(None)

def consumer(queue):
    while True:
        res = queue.get()
        if not res:break
        data = "这个进程id:%s, 吃了%s" % (os.getpid(), res)
        print(data)

if __name__ == '__main__':
    q = Queue(3)
    p = Process(target=producer, args=(q,))
    p.start()

    p1 = Process(target=consumer, args=(q,))
    p1.start()

# 版本3:
# 生产者:
def producer(queue):
    # 把数据全部放在Queue
    for i in range(10):
        data = "这个进程id:%s, 蒸了%s个包子" % (os.getpid(), i)
        print(data)

        time.sleep(random.randint(1, 3))
        # 放入数据
        queue.put("第%s个包子" % i)

def consumer(queue):
    while True:
        res = queue.get()
        if not res:break
        data = "这个进程id:%s, 吃了%s" % (os.getpid(), res)
        print(data)

if __name__ == '__main__':
    q = Queue(3)
    p = Process(target=producer, args=(q,))
    p.start()

    p1 = Process(target=consumer, args=(q,))
    p1.start()

    # time.sleep(1000)
    # none放在这里是不行的,原因是主进程直接执行了put none, 消费者直接获取到None, 程序直接结束了
    p.join()
    q.put(None)

# 版本4:多生产者 多消费者
# 生产者:
def producer(queue, food):
    # 把数据全部放在Queue
    for i in range(10):
        data = "这个进程id:%s, 生产了%s个%s" % (os.getpid(), i, food)
        print(data)

        time.sleep(random.randint(1, 3))
        # 放入数据
        queue.put("第%s个%s" % (i, food))

def consumer(queue):
    while True:
        res = queue.get()
        if not res:break
        data = "这个进程id:%s, 吃了%s" % (os.getpid(), res)
        print(data)

if __name__ == '__main__':
    q = Queue(3)
    p1 = Process(target=producer, args=(q, '面包'))
    p2 = Process(target=producer, args=(q, '奶粉'))
    p3 = Process(target=producer, args=(q, '冰淇淋'))
    p1.start()
    p2.start()
    p3.start()

    p4 = Process(target=consumer, args=(q,))
    p5 = Process(target=consumer, args=(q,))
    p4.start()
    p5.start()

    # time.sleep(1000)
    # none放在这里是不行的,原因是主进程直接执行了put none, 消费者直接获取到None, 程序直接结束了
    # p.join()
    # q.put(None)

    p1.join()
    p2.join()
    p3.join()

    q.put(None)
    q.put(None)
    q.put(None)


# 版本5:多生产者 多消费者  消费者大于生产者
# 生产者:
def producer(queue, food):
    # 把数据全部放在Queue
    for i in range(10):
        data = "这个进程id:%s, 生产了%s个%s" % (os.getpid(), i, food)
        print(data)

        time.sleep(random.randint(1, 3))
        # 放入数据
        queue.put("第%s个%s" % (i, food))

def consumer(queue, name):
    while True:
        try:
            res = queue.get(timeout=5)
            if not res:break
            data = "这个消费者:%s, 吃了%s" % (name, res)
            print(data)
        except Exception as e:
            print(e)
            break

if __name__ == '__main__':
    q = Queue(3)
    p1 = Process(target=producer, args=(q, '面包'))
    p2 = Process(target=producer, args=(q, '奶粉'))
    p3 = Process(target=producer, args=(q, '冰淇淋'))
    p1.start()
    p2.start()
    p3.start()

    p4 = Process(target=consumer, args=(q, '许鹏'))
    p5 = Process(target=consumer, args=(q, '勇哥'))
    p6 = Process(target=consumer, args=(q, '勇哥2'))
    p7 = Process(target=consumer, args=(q, '勇哥3'))
    p4.start()
    p5.start()
    p6.start()
    p7.start()

    # time.sleep(1000)
    # none放在这里是不行的,原因是主进程直接执行了put none, 消费者直接获取到None, 程序直接结束了
    # p.join()
    # q.put(None)

    p1.join()
    p2.join()
    p3.join()

    q.put(None)
    q.put(None)
    q.put(None)

"""
Queue:
    httpsqs
    rabbiemq
    kafka

"""

 


程序员灯塔
转载请注明原文链接:day37
喜欢 (0)