跳转至

python中线程、进程、协程

基于进程的并行

协程 asyncio

线程

线程是操作系统中最小的可调度单位,是在进程中执行的实体,可以让程序在单个进程内并发执行多个任务。 Python 的标准库提供了 threading 模块,可以用来创建和管理线程。线程之间共享同一个进程的内存空间,因此需要考虑线程安全问题,例如使用锁或者其他同步机制来保证共享资源的正确性。

进程

进程是操作系统中分配资源和调度的基本单位,每个进程有自己独立的地址空间和数据栈,可以并发地执行多个任务。 Python 的标准库提供了 multiprocessing 模块,可以用来创建和管理进程。由于进程之间拥有独立的内存空间,因此不需要考虑线程安全问题,但需要注意进程间通信的问题。

协程

协程是一种用户态的轻量级线程,可以在单个线程内实现并发,因为协程可以在运行过程中暂停并恢复执行。 Python3.5及以后版本提供了 asyncio 模块,可以用来创建和管理协程。 协程可以避免多线程和多进程的切换和通信开销,因此在 I/O 密集型的场景下表现优异。 不过需要注意,协程的实现需要遵循特定的语法规则,例如使用 async 和 await 关键字来定义和调用协程函数。

进程

# 通过继承 multiprocessing 的 Process ,实现进程类,然后实现 run 方法,即可在此方法中实现进程要运行的内容。

import time
from multiprocessing import Process

class Test(Process):
    def __init__(self):
        super().__init__()

    def run(self):
        while True:
            print("process b is run")
            time.sleep(1)


if __name__ == "__main__":
    # 直接使用 Test 实例化对象,然后调用对象的成员函数 start() 即可。
    t = Test()
    t.start()
    while True:
        print("process a run")
        time.sleep(1)           

线程

## threading.Thread 来实现线程类,然后通过实例化,生成对象,调用对象的 start() 即可启动线程。

import threading
import time

class ThreadTest (threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        while True:
            print(f"i am in thread {self.name}")
            time.sleep(1)


if __name__ == "__main__":
    threads = []
    for i in range(4):
        t = ThreadTest(i)
        threads.append(t)

    for t in threads:
        t.start()

    for t in threads:
        t.join()

协程

与线程、进程不同的是,协程并不是操作系统物理层面存在的一种程序。

协程是程序级别的,由程序编写者自己操控整个协程的生命周期。这样就实现了类似操作系统操作多线程一样的效果,但是省下了现成的切换中造成的资源消耗。

而通过程序来操纵协程,就造成了cpu 一直在运行,并且是多个协程一直在运行的假象,也就变成了并发。

示例1

import asyncio

async def foo():
    print('Start foo')
    await asyncio.sleep(7)
    print('End foo')

async def bar():
    print('Start bar')

    await asyncio.sleep(6)
    print('End bar')



async def startall():
    # 其中使用 asyncio.gather() 并发执行 foo() 和 bar() 这两个异步函数
    await asyncio.gather(foo(), bar())


 # 事件循环允许程序异步执行任务,而不是像同步代码一样等待一个任务完成后再执行下一个任务。
loop = asyncio.get_event_loop()

# 我们通过 loop.run_until_complete(startall()) 运行整个程序,
# 即将 startall 函数作为参数传递给 run_until_complete(),事件循环将等待 startall 函数执行完成并返回结果,然后关闭事件循环。
loop.run_until_complete(startall())



loop = asyncio.get_event_loop() # 获取事件循环对象
loop.run_until_complete(main()) # 运行协程
loop.close() # 关闭事件循环

示例2

await关键字通常用于等待另一个协程或异步函数执行完成,并返回结果。await能够使当前协程挂起,等待异步操作完成后再继续执行。这个过程是非阻塞的,因为在等待异步操作完成的同时,事件循环可以继续处理其他协程的任务。使用await关键字的主要好处是可以在异步操作完成之前,将CPU时间让给其他协程,提高程序的并发性能。另外,await还可以将异步操作的结果直接返回给协程,使得代码编写更加简洁。

import asyncio

async def do_some_work():
    print("开始工作")
    await asyncio.sleep(1) # 等待1秒钟
    return "工作完成"

async def main():
    print("启动协程")
    result = await do_some_work() # 等待do_some_work()协程执行完毕,并获取其返回结果
    print(f"协程执行完毕,结果为:{result}")

asyncio.run(main()) # 运行协程

线程定时器对象 Timer

1
2
3
4
5
6
from threading import Timer
def hello():
    print("hello, world")

t = Timer(1.0, hello)
t.start()

线程信号量对象 BoundedSemaphore

限制并发使用;相当于lock的一个门换成多个门

import threading
import time

# 创建一个有上限的信号量,上限为2
bounded_semaphore = threading.BoundedSemaphore(2)

def worker(thread_id):
    with bounded_semaphore:
        print(f"线程 {thread_id} 进入关键区域。")
        time.sleep(2)
        print(f"线程 {thread_id} 离开关键区域。")

# 创建多个线程并启动
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

# 等待所有线程结束
for t in threads:
    t.join()

print("所有线程已完成。")

线程条件对象 Condition

满足条件即可继续执行;内置锁

import threading
from threading import Thread
import time
my_lock = threading.Lock()
cv = threading.Condition(my_lock)

def list_menu():
    print("等待订单")
    time.sleep(1)
    return True
def make_pic():
    with cv:
        for i in range(3,0,-1):
            print(f"\r制作面包中。。{i}秒",end="",flush=True)
            time.sleep(1)
        print("\n面包制作完成")
        cv.notify()
    return True
def use_pie():
    cv.acquire()
    cv.wait_for(list_menu)
    print("用户下个订单---等待收货")
    cv.wait()
    print("拿到了")
    cv.release()
    return True
def send():
    time.sleep(3)

g1 = Thread(target=make_pic,args=[])
g2 = Thread(target=use_pie,args=[])

g2.start()
g1.start()

线程栅栏对象 Barrier

栅栏对象在协调多个线程的执行时非常有用,特别是在需要保持同步的场景中

import threading

# 创建一个栅栏,指定等待的线程数
barrier = threading.Barrier(2)  # 3 表示需要等待的线程数

def worker():
    print("Worker started")
    barrier.wait()  # 在这里等待其他线程
    print("Worker after barrier")

# 创建多个线程
threads = [threading.Thread(target=worker) for _ in range(3)]
import time
# 启动所有线程
for t in threads:
    time.sleep(1)
    t.start()

# 等待所有线程结束
for t in threads:
    t.join()