(更新)一个简单的多线程消费者示例
更新:修改了一些容易卡锁的细节
一条简单的笔记。
需求:我有一批获取的 IP,要一一验证可用性。
一个个验证太慢了,需要使用多线程。
ChatGPT 给出的方法是,先用//分割文本,然后每个线程处理相等数量的 IP。但是,这个方法预分配了所有条目,总有线程特别慢,迟迟难以收尾。
之前写了一个简单的多消费者模型。先把文本全部读进列表,增加一个行计数器。每个线程一次只分配一条数据,在取任何数据前,先将计数器+1,代表对应行已被分配。处理完了再分配下一条数据。
不过,代码很奇怪,我决定用 queue 再重写一下。
以下是一个修改后的通用实现,附带一个简单的进度条。

from os import _exit
from time import sleep
from queue import Queue
from threading import Thread
from signal import signal, SIGINT
from dataclasses import dataclass
from time import time, strftime, gmtime
def consume(queue):
while not queue.empty():
item = queue.get()
''' do something here '''
sleep(0.1)
''' do something here '''
queue.task_done()
@dataclass
class QProgress:
''' a wget-like progress bar '''
queue: Queue
qsize: int = 0 # queue.qsize() by default
width: int = 50 # characters of progress bar
alive = True
def interrupt(self):
self.alive = False
def show(self):
data_size = max(self.queue.qsize(), self.qsize)
start_time, showing_count= time(), 0
while self.alive and showing_count < data_size:
showing_count = data_size-self.queue.qsize()
per = showing_count/data_size
print(''.join([f"\r{('%.2f'%(per*100)).rjust(6,' ')}%[",
f"{'='*int(per*self.width-1)}>".ljust(self.width,' '),
f"] {showing_count}/{data_size} ",
strftime('%H:%M:%S', gmtime(time()-start_time))]),
end='', flush=True)
if self.alive:
print('', 'Done', sep='\n')
if __name__ == "__main__":
queue = Queue()
# 假定一些已生产的数据
data_size = 6661
[queue.put(i) for i in range(data_size)]
# 进度条,启动!
qprogress= QProgress(queue)
Thread(target=qprogress.show, daemon=True).start()
# 消费者们,启动!
num_consumers = 661
threads = [Thread(target=consume, args=(queue,), daemon=True)
for _ in range(num_consumers)]
[t.start() for t in threads]
# 配合 daemon 响应 Ctrl+C
def signal_handler(signal, frame):
qprogress.interrupt()
print('', 'Interrupted', sep='\n')
_exit(0)
signal(SIGINT,signal_handler)
while any(t.is_alive() for t in threads):
sleep(0)