(更新)一个简单的多线程消费者示例
更新:修改了一些容易卡锁的细节
一条简单的笔记。
需求:我有一批获取的 IP,要一一验证可用性。
一个个验证太慢了,需要使用多线程。
ChatGPT 给出的方法是,先用//
分割文本,然后每个线程处理相等数量的 IP。但是,这个方法预分配了所有条目,总有线程特别慢,迟迟难以收尾。
之前写了一个简单的多消费者模型。先把文本全部读进列表,增加一个行计数器。每个线程一次只分配一条数据,在取任何数据前,先将计数器+1,代表对应行已被分配。处理完了再分配下一条数据。
不过,代码很奇怪,我决定用 queue
再重写一下。
以下是一个修改后的通用实现,附带一个简单的进度条。
from os import _exit from time import sleep from queue import Queue from threading import Thread from signal import signal, SIGINT from dataclasses import dataclass from time import time, strftime, gmtime def consume(queue): while not queue.empty(): item = queue.get() ''' do something here ''' sleep(0.1) ''' do something here ''' queue.task_done() @dataclass class QProgress: ''' a wget-like progress bar ''' queue: Queue qsize: int = 0 # queue.qsize() by default width: int = 50 # characters of progress bar alive = True def interrupt(self): self.alive = False def show(self): data_size = max(self.queue.qsize(), self.qsize) start_time, showing_count= time(), 0 while self.alive and showing_count < data_size: showing_count = data_size-self.queue.qsize() per = showing_count/data_size print(''.join([f"\r{('%.2f'%(per*100)).rjust(6,' ')}%[", f"{'='*int(per*self.width-1)}>".ljust(self.width,' '), f"] {showing_count}/{data_size} ", strftime('%H:%M:%S', gmtime(time()-start_time))]), end='', flush=True) if self.alive: print('', 'Done', sep='\n') if __name__ == "__main__": queue = Queue() # 假定一些已生产的数据 data_size = 6661 [queue.put(i) for i in range(data_size)] # 进度条,启动! qprogress= QProgress(queue) Thread(target=qprogress.show, daemon=True).start() # 消费者们,启动! num_consumers = 661 threads = [Thread(target=consume, args=(queue,), daemon=True) for _ in range(num_consumers)] [t.start() for t in threads] # 配合 daemon 响应 Ctrl+C def signal_handler(signal, frame): qprogress.interrupt() print('', 'Interrupted', sep='\n') _exit(0) signal(SIGINT,signal_handler) while any(t.is_alive() for t in threads): sleep(0)