更新:修改了一些容易卡锁的细节

一条简单的笔记。

需求:我有一批获取的 IP,要一一验证可用性。

一个个验证太慢了,需要使用多线程。
ChatGPT 给出的方法是,先用//分割文本,然后每个线程处理相等数量的 IP。但是,这个方法预分配了所有条目,总有线程特别慢,迟迟难以收尾。

之前写了一个简单的多消费者模型。先把文本全部读进列表,增加一个行计数器。每个线程一次只分配一条数据,在取任何数据前,先将计数器+1,代表对应行已被分配。处理完了再分配下一条数据。
不过,代码很奇怪,我决定用 queue 再重写一下。

以下是一个修改后的通用实现,附带一个简单的进度条。

from os import _exit
from time import sleep
from queue import Queue
from threading import Thread
from signal import signal, SIGINT
from dataclasses import dataclass
from time import time, strftime, gmtime

def consume(queue):
    while not queue.empty():
        item = queue.get()

        ''' do something here '''
        sleep(0.1)
        ''' do something here '''

        queue.task_done()

@dataclass
class QProgress:
    ''' a wget-like progress bar '''
    queue: Queue
    qsize: int = 0 # queue.qsize() by default
    width: int = 50 # characters of progress bar

    alive = True

    def interrupt(self):
        self.alive = False

    def show(self):
        data_size = max(self.queue.qsize(), self.qsize)
        start_time, showing_count= time(), 0
        while self.alive and showing_count < data_size:
            showing_count = data_size-self.queue.qsize()
            per = showing_count/data_size
            print(''.join([f"\r{('%.2f'%(per*100)).rjust(6,' ')}%[",
                f"{'='*int(per*self.width-1)}>".ljust(self.width,' '),
                f"] {showing_count}/{data_size} ",
                strftime('%H:%M:%S', gmtime(time()-start_time))]),
                end='', flush=True)

        if self.alive:
            print('', 'Done', sep='\n')

if __name__ == "__main__":

    queue = Queue()

    # 假定一些已生产的数据
    data_size = 6661
    [queue.put(i) for i in range(data_size)]

    # 进度条,启动!
    qprogress= QProgress(queue)
    Thread(target=qprogress.show, daemon=True).start()

    # 消费者们,启动!
    num_consumers = 661

    threads = [Thread(target=consume, args=(queue,), daemon=True)
        for _ in range(num_consumers)]
    [t.start() for t in threads]

    # 配合 daemon 响应 Ctrl+C
    def signal_handler(signal, frame):
        qprogress.interrupt()
        print('', 'Interrupted', sep='\n')
        _exit(0)

    signal(SIGINT,signal_handler)

    while any(t.is_alive() for t in threads):
        sleep(0)