C++多线程案例

一、引言

前面几篇文章主要在概念层面讲了std::thread、std::mutex、死锁和条件变量,这一篇我们换个方式,用一个可以编译运行的小工程,把这些东西全部串起来实际体验一遍。这个小工程的目标很简单,用C++11写一个小型线程池,主线程把一批任务丢进线程池,多个工作线程从共享任务队列中取任务执行。任务队列通过std::mutex和std::condition_variable来保护和同步,这样在没有任务的时候线程会睡觉,有任务的时候被唤醒,同时再写一个deadlock_demo()函数,故意再写出一个“两把锁+两个线程”的经典死锁场景,方便对比“正确写法”和“错误示范”。(源码在最下面)

二、整体设计

整体设计如下图所示:

中间是一个std::queue<std::function<void()>>类型的任务队列,队列上方挂着一把std::mutex负责互斥访问,下方挂着一个std::condition_variable,用来让工作线程在队列为空时进入等待状态。左侧的主线程负责往队列里提交任务:拿锁、push任务、解锁,然后调用notify_one()提醒某个工作线程可以醒来干活。右侧则是由若干个工作线程,它们在一个循环里,先拿锁,如果队列为空且没有停止信号,就调用wait()在条件变量上睡觉,被唤醒后重新拿锁,从队列里取一个任务,把锁释放,然后再自己的栈上执行这个任务。这样一来,线程数量是固定的,任务数可以很多,而且不会忙等浪费CPU。

同样的在线程外,我们再定义一个stats结构体,用一把单独的mutex保护两个计数器,total_tasks记录提交的任务数,completed_tasks记录已经执行完成的任务数。每当主线程调用submit()提交任务时就调用一次on_task_submitted()加一,每个任务执行完后调用一次on_task_completed()加一,结束的还可以看到类似这样的[Stats] total=20 completed=17的统计信息。

三、源码与示例输出

整体结构已经说明了源码的设计内容,具体实现可以自己分析以下完整的

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <functional>
#include <chrono>
class ThreadPool {
public:
    explicit ThreadPool(std::size_t worker_count)
        : stop_(false) {
        if (worker_count == 0)
            worker_count = 1;
        for (std::size_t i = 0; i < worker_count; ++i) {
            workers_.emplace_back(&ThreadPool::worker_loop, this, i);
        }
    }

    // 禁止拷贝,简单起见
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    // 提交一个任务
    void submit(std::function<void()> task) {
        {
            std::lock_guard<std::mutex> lock(queue_mutex_);
            if (stop_) {
                throw std::runtime_error("submit on stopped ThreadPool");
            }
            tasks_.push(std::move(task));
        }
        queue_cv_.notify_one();
    }

    // 通知线程池停止,等待所有线程退出
    void shutdown() {
        {
            std::lock_guard<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        queue_cv_.notify_all();
        for (std::thread& t : workers_) {
            if (t.joinable()) {
                t.join();
            }
        }
        workers_.clear();
    }

    ~ThreadPool() {
        // 保证析构时已经停掉;如果用户忘了显式 shutdown,这里自动做一次
        shutdown_no_throw();
    }

private:
    void worker_loop(std::size_t index) {
        for (;;) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                // 没任务且没停,就等待;stop_ 为真或者有任务时被唤醒
                queue_cv_.wait(lock, [this] {
                    return stop_ || !tasks_.empty();
                    });

                if (stop_ && tasks_.empty()) {
                    // 没有剩余任务,且收到停止信号,退出线程
                    return;
                }

                task = std::move(tasks_.front());
                tasks_.pop();
            }
            // 在不持锁的情况下执行任务
            task();
        }
    }

    void shutdown_no_throw() {
        {
            std::lock_guard<std::mutex> lock(queue_mutex_);
            if (stop_)
                return;
            stop_ = true;
        }
        queue_cv_.notify_all();
        for (std::thread& t : workers_) {
            if (t.joinable()) {
                try {
                    t.join();
                }
                catch (...) {
                    // 析构阶段不要再抛异常
                }
            }
        }
        workers_.clear();
    }

    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queue_mutex_;
    std::condition_variable queue_cv_;
    bool stop_;
};

// 用来统计完成的任务数,演示 mutex 保护共享数据
struct Stats {
    std::mutex m;
    std::size_t total_tasks = 0;
    std::size_t completed_tasks = 0;

    void on_task_submitted() {
        std::lock_guard<std::mutex> lock(m);
        ++total_tasks;
    }

    void on_task_completed() {
        std::lock_guard<std::mutex> lock(m);
        ++completed_tasks;
    }

    void print() {
        std::lock_guard<std::mutex> lock(m);
        std::cout << "[Stats] total=" << total_tasks
            << " completed=" << completed_tasks << '\n';
    }
};

// 一个模拟耗时工作的任务:打印、sleep 一下、更新统计信息
std::function<void()> make_task(int id, Stats* stats) {
    return [id, stats] {
        {
            std::cout << "Task " << id
                << " is running on thread "
                << std::this_thread::get_id() << '\n';
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(100 + (id % 5) * 50));
        if (stats) {
            stats->on_task_completed();
        }
        };
}

// 一个安全的线程池演示:创建线程池,提交任务,等待结束
void safe_thread_pool_demo() {
    Stats stats;
    const std::size_t worker_count =
        std::max(2u, std::thread::hardware_concurrency());

    std::cout << "Starting ThreadPool with "
        << worker_count << " worker threads\n";

    ThreadPool pool(worker_count);

    const int task_count = 20;
    for (int i = 0; i < task_count; ++i) {
        stats.on_task_submitted();
        pool.submit(make_task(i, &stats));
    }

    // 可以在任务执行过程中隔一会儿打印一下统计信息
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    stats.print();

    pool.shutdown();
    stats.print();
    std::cout << "ThreadPool demo finished.\n";
}

// 一个典型的死锁实验:故意用相反顺序加锁
void deadlock_demo() {
    std::mutex m1, m2;

    auto t1 = std::thread([&] {
        std::lock_guard<std::mutex> lock1(m1);
        std::cout << "t1 locked m1\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::cout << "t1 trying to lock m2...\n";
        std::lock_guard<std::mutex> lock2(m2); // 这里可能卡死
        std::cout << "t1 locked m2\n";
        });

    auto t2 = std::thread([&] {
        std::lock_guard<std::mutex> lock2(m2);
        std::cout << "t2 locked m2\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::cout << "t2 trying to lock m1...\n";
        std::lock_guard<std::mutex> lock1(m1); // 这里可能卡死
        std::cout << "t2 locked m1\n";
        });

    t1.join();
    t2.join();
}

int main() {
    std::cout << "=== Safe ThreadPool demo ===\n";
    safe_thread_pool_demo();

    std::cout << "\n=== Deadlock demo (optional) ===\n";
    std::cout << "当前默认不运行死锁实验,避免程序卡死。\n";
    std::cout << "如果想体验死锁,请在 main() 里调用 deadlock_demo()。\n";
    // deadlock_demo(); // 小心:取消注释后程序可能会卡住不退出

    return 0;
}

这是正常添加20个任务并成功执行的结果:

这是启动死锁示例的输出:程序会一直卡在这里

--------------

本文标题为:

C++多线程案例

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇