defwithdraw(amount): global balance if balance >= amount: time.sleep(1) print(f"Getting money..., the amount is {amount}") balance -= amount print(f"Get {amount}, {balance} remaining...") else: print("Error! You don't have enough money!")
deftest_thread_safe(): """test for race condition, safe mode for entering in queue""" print("If they take turns to withdraw money at the counter in order.") t1 = threading.Thread(target=withdraw, args=(80,)) t2 = threading.Thread(target=withdraw, args=(70,)) t3 = threading.Thread(target=withdraw, args=(50,)) t4 = threading.Thread(target=withdraw, args=(20,))
deftest_thread_unsafe(): """test for race condition, unsafe mode for multithreads""" print("If the four of them withdraw money at the counter simultaneously...") t1 = threading.Thread(target=withdraw, args=(80,)) t2 = threading.Thread(target=withdraw, args=(70,)) t3 = threading.Thread(target=withdraw, args=(50,)) t4 = threading.Thread(target=withdraw, args=(20,))
if __name__ == "__main__": # test for safe single-thread test_thread_safe()
# reset balance balance = 100
# test for race condition: multithreads test_thread_unsafe()
程序实现了一个简单的柜台模拟器,模拟了四个用户以不同的顺序对相同账户取钱的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
If they take turns to withdraw money at the counter inorder. Getting money..., the amount is80 Get80, 20 remaining... Error! You don't have enough money! Error! You don't have enough money! Getting money..., the amount is20 Get20, 0 remaining... If the four of them withdraw money at the counter simultaneously... Getting money..., the amount is80Getting money..., the amount is70 Get80, 20 remaining...
Get70, -50 remaining... Getting money..., the amount is50 Get50, -100 remaining... Getting money..., the amount is20 Get20, -120 remaining...
例如上面的取钱逻辑其实就是一个while循环,如果四个用户一个个按顺序取钱(第二位用户不会在第一位用户取钱操作未完成的时候开始自己的取钱操作),很显然balance的值会被正确的更新,all is well。但是对于第二种情况,四个线程几乎同时启动(第四个线程启动的时间早于第一个线程结束的时间),这就会导致此时balance -= amount的核心逻辑并未执行,四个while循环全部判断为True,导致超出限额的扣款。
defwithdraw_with_locked(amount): global balance with lock: if balance >= amount: time.sleep(1) print(f"Getting money..., the amount is {amount}") balance -= amount print(f"Get {amount}, {balance} remaining...") else: print("Error! You don't have enough money!")
deftest_thread_unsafe_with_locked(): """test for race condition, unsafe mode for multithreads, but with lock""" print("If the four of them withdraw money at the counter simultaneously..., but with lock😋!") t1 = threading.Thread(target=withdraw_with_locked, args=(80,)) t2 = threading.Thread(target=withdraw_with_locked, args=(70,)) t3 = threading.Thread(target=withdraw_with_locked, args=(50,)) t4 = threading.Thread(target=withdraw_with_locked, args=(20,))
If the four of them withdraw money at the counter simultaneously..., but with lock😋! Getting money..., the amount is80 Get80, 20 remaining... Error! You don't have enough money! Error! You don't have enough money! Getting money..., the amount is20 Get20, 0 remaining...
defcpu_task(n: int): """simulation of CPU intensive task
Args: n (int): the total count steps """ while n > 0: n -= 1 print("CPU task ends")
defio_task(): """simulation of IO-intensive task""" time.sleep(1)
deftest_task_cpu(task_function): """test for GIL in CPU-intensive task""" print("Test for CPU-intensive task:") print("Test for single threaded:") start_time = time.time() task_function(500_000_000) end_time = time.time() print(f"Single-threaded execution time: {end_time - start_time:.2f} seconds")
deftest_task_io(task_function): print("\nTest for Io-intensive task:") # single threads: start_time = time.time() for _ inrange(10): io_task() end_time = time.time() print(f"Single-threaded execution time: {end_time - start_time:.2f} seconds")
threads = [threading.Thread(target=io_task) for _ inrange(10)] start_time = time.time() for t in threads: t.start() for t in threads: t.join() end_time = time.time() print(f"Multi-threaded execution time: {end_time - start_time:.2f} seconds")
if __name__ == "__main__": test_task_cpu(cpu_task) test_task_io(io_task)
1 2 3 4 5 6 7 8 9 10 11 12
Test for CPU-intensive task: Test for single threaded: CPU task ends Single-threaded execution time: 20.48 seconds Test for multi threaded: CPU task ends CPU task ends Multi-threaded execution time: 20.52 seconds
Test for Io-intensive task: Single-threaded execution time: 10.01 seconds Multi-threaded execution time: 1.01 seconds
defworker(num): print(f"Worker {num} running in process {os.getpid()}") return
if __name__ == "__main__": processes = [] for i inrange(5): p = multiprocessing.Process(target=worker, args=(i,)) processes.append(p) p.start()
for p in processes: p.join() print("All processes finished")
1 2 3 4 5 6
Worker0 running in process 8140 Worker1 running in process 8141 Worker2 running in process 8142 Worker3 running in process 8143 Worker4 running in process 8144 All processes finished
我们可以使用多进程来改写上面的CPU密集型任务,充分发挥多核CPU的优势。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
defmultiprocess_cpu_task(): print("Using multi-processing") start_time = time.time() processes = [] for i inrange(5): p = multiprocessing.Process(target=cpu_task, args=(100_000_000,)) processes.append(p) p.start()
for p in processes: p.join() end_time = time.time() print(f"All processes finished, time: {end_time - start_time:.2f}")
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Test for CPU-intensive task: Test for single threaded: CPU task ends Single-threaded execution time: 23.63 seconds Test for multi threaded: CPU task ends CPU task ends Multi-threaded execution time: 21.67 seconds Using multi-processing CPU task ends CPU task ends CPU task ends CPU task ends CPU task ends All processes finished, time: 4.44
# imap方法(惰性求值) for res in pool.imap(worker, range(5)): print(res) # 依次输出: 0, 1, 4, 9, 16
同步执行 和 异步执行 见下文
map方法表示对range(10)的每一个值调用函数,最后返回一个列表
imap是惰性求值,最后会迭代输出每一个答案(时间更慢,但内存效率更高)
进程间通信(IPC)
进程之间的GIL,内存等资源都是互相独立的,但是多进程之间也可以进行通信。
Queue
使用队列的push和pop可以实现进程间通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
from multiprocessing import Process, Queue import os
defworker(q): print(f"PID2: {os.getpid()}") q.put("Hello from child process") print(f"Message sent from subprocess: {os.getpid()}")
if __name__ == "__main__": q = Queue() p = Process(target=worker, args=(q,)) print(f"PID1: {os.getpid()}") p.start() print(f"Message received to {os.getpid()}: {q.get()}") p.join()
from multiprocessing import Process, Queue, Pipe import multiprocessing import multiprocessing.connection import os
defworker_queue(q: Queue): print(f"PID2: {os.getpid()}") q.put("Hello from child process") print(f"Message sent from subprocess: {os.getpid()}")
defworker_pipe(conn: multiprocessing.connection.Connection): print(f"PID2: {os.getpid()}") conn.send("Message from child") print(f"Message sent from subprocess: {os.getpid()}") conn.close()
deftest_pipe(): parent_conn, child_conn = Pipe() print(f"PID1: {os.getpid()}") p = Process(target=worker_pipe, args=(child_conn,)) p.start() print(f"Message received to {os.getpid()}: {parent_conn.recv()}") p.join()
deftest_queue(): q = Queue() p = Process(target=worker_queue, args=(q,)) print(f"PID1: {os.getpid()}") p.start() print(f"Message received to {os.getpid()}: {q.get()}") p.join()
if __name__ == "__main__": test_pipe() test_queue()
1 2 3 4 5 6 7 8
PID1:14515 PID2:14516 Message sent from subprocess: 14516 Message received to14515: Message from child PID1:14515 PID2:14517 Message sent from subprocess: 14517 Message received to14515: Hello from child process
defuser_io_simulation(): """Use time.sleep to simulate the wait for user input""" time.sleep(2) return random.randint(1, 5)
defsimulation(): total = 100 while total > 0: total -= 1 if total % 20 == 0: input = user_io_simulation() print(f"User enters the input {input}") total -= input print(f"Now the total value is {total}") print("Done!")
asyncdefuser_io_simulation_2(): """Simulation with async
Returns: int: The user input """ await asyncio.sleep(2) return random.randint(1, 5)
asyncdefsimulation_concurrency(): total = 100 tasks = [] # List to store tasks while total > 0: total -= 1 if total % 20 == 0: # Create a task for user input simulation task = asyncio.create_task(user_io_simulation_2()) tasks.append(task) print(f"Now the total value is: {total}")
# Wait for all tasks to complete results = await asyncio.gather(*tasks) for user_input in results: print(f"User enters the input: {user_input}") total -= user_input
print("Done!") return0
asyncdefmain(): await simulation_concurrency()
if __name__ == "__main__": print("Testing simulation for no concurrency") start_time = time.time() simulation() end_time = time.time() print(f"Time cost: {end_time - start_time:.2f} seconds")
Testing simulation for no concurrency Now the total value is 99 Now the total value is 98 Now the total value is 97 Now the total value is 96 Now the total value is 95 Now the total value is 94 Now the total value is 93 Now the total value is 92 Now the total value is 91 Now the total value is 90 Now the total value is 89 Now the total value is 88 Now the total value is 87 Now the total value is 86 Now the total value is 85 Now the total value is 84 Now the total value is 83 Now the total value is 82 Now the total value is 81 User enters the input 1 Now the total value is 79 Now the total value is 78 Now the total value is 77 Now the total value is 76 Now the total value is 75 Now the total value is 74 Now the total value is 73 Now the total value is 72 Now the total value is 71 Now the total value is 70 Now the total value is 69 Now the total value is 68 Now the total value is 67 Now the total value is 66 Now the total value is 65 Now the total value is 64 Now the total value is 63 Now the total value is 62 Now the total value is 61 User enters the input 3 Now the total value is 57 Now the total value is 56 Now the total value is 55 Now the total value is 54 Now the total value is 53 Now the total value is 52 Now the total value is 51 Now the total value is 50 Now the total value is 49 Now the total value is 48 Now the total value is 47 Now the total value is 46 Now the total value is 45 Now the total value is 44 Now the total value is 43 Now the total value is 42 Now the total value is 41 User enters the input 4 Now the total value is 36 Now the total value is 35 Now the total value is 34 Now the total value is 33 Now the total value is 32 Now the total value is 31 Now the total value is 30 Now the total value is 29 Now the total value is 28 Now the total value is 27 Now the total value is 26 Now the total value is 25 Now the total value is 24 Now the total value is 23 Now the total value is 22 Now the total value is 21 User enters the input 4 Now the total value is 16 Now the total value is 15 Now the total value is 14 Now the total value is 13 Now the total value is 12 Now the total value is 11 Now the total value is 10 Now the total value is 9 Now the total value is 8 Now the total value is 7 Now the total value is 6 Now the total value is 5 Now the total value is 4 Now the total value is 3 Now the total value is 2 Now the total value is 1 User enters the input 2 Now the total value is -2 Done! Time cost: 10.76 seconds Testing simulation for concurrency Now the total value is: 99 Now the total value is: 98 Now the total value is: 97 Now the total value is: 96 Now the total value is: 95 Now the total value is: 94 Now the total value is: 93 Now the total value is: 92 Now the total value is: 91 Now the total value is: 90 Now the total value is: 89 Now the total value is: 88 Now the total value is: 87 Now the total value is: 86 Now the total value is: 85 Now the total value is: 84 Now the total value is: 83 Now the total value is: 82 Now the total value is: 81 Now the total value is: 80 Now the total value is: 79 Now the total value is: 78 Now the total value is: 77 Now the total value is: 76 Now the total value is: 75 Now the total value is: 74 Now the total value is: 73 Now the total value is: 72 Now the total value is: 71 Now the total value is: 70 Now the total value is: 69 Now the total value is: 68 Now the total value is: 67 Now the total value is: 66 Now the total value is: 65 Now the total value is: 64 Now the total value is: 63 Now the total value is: 62 Now the total value is: 61 Now the total value is: 60 Now the total value is: 59 Now the total value is: 58 Now the total value is: 57 Now the total value is: 56 Now the total value is: 55 Now the total value is: 54 Now the total value is: 53 Now the total value is: 52 Now the total value is: 51 Now the total value is: 50 Now the total value is: 49 Now the total value is: 48 Now the total value is: 47 Now the total value is: 46 Now the total value is: 45 Now the total value is: 44 Now the total value is: 43 Now the total value is: 42 Now the total value is: 41 Now the total value is: 40 Now the total value is: 39 Now the total value is: 38 Now the total value is: 37 Now the total value is: 36 Now the total value is: 35 Now the total value is: 34 Now the total value is: 33 Now the total value is: 32 Now the total value is: 31 Now the total value is: 30 Now the total value is: 29 Now the total value is: 28 Now the total value is: 27 Now the total value is: 26 Now the total value is: 25 Now the total value is: 24 Now the total value is: 23 Now the total value is: 22 Now the total value is: 21 Now the total value is: 20 Now the total value is: 19 Now the total value is: 18 Now the total value is: 17 Now the total value is: 16 Now the total value is: 15 Now the total value is: 14 Now the total value is: 13 Now the total value is: 12 Now the total value is: 11 Now the total value is: 10 Now the total value is: 9 Now the total value is: 8 Now the total value is: 7 Now the total value is: 6 Now the total value is: 5 Now the total value is: 4 Now the total value is: 3 Now the total value is: 2 Now the total value is: 1 Now the total value is: 0 User enters the input: 2 User enters the input: 3 User enters the input: 4 User enters the input: 3 User enters the input: 1 Done! Time cost: 2.01 seconds