2.1 thread & jthread

thread 传递参数

  • 如果thread执行的函数参数是一个引用的话,需要std::ref
     void update_data_for_widget(widget_id w,widget_data& data); // 1  
     void oops_again(widget_id w)  {  
      widget_data data;  std::thread t(update_data_for_widget,w,std::ref(data)); // 2  
      display_status();  
      t.join();  
      process_widget_data(data); 
     } 
    
  • 这依赖于 std::thread 实例的可移动且不可复 制性。不可复制性表示在某一时间点,一个 std::thread 实例只能关联一个执行线程。可移动性使得开发者可 以自己决定,哪个实例拥有线程实际执行的所有权
  • std::thread::hardware_concurrency() 会返回并发线程的数量。例如,多核系统中, 返回值可以是CPU核芯的数量。返回值也仅仅是一个标识,当无法获取时,函数返回0。
    // parallel accumulate
    template<typename Iterator,typename T>  
    struct accumulate_block  {  
      void operator()(Iterator first,Iterator last,T& result)  {  result=std::accumulate(first,last,result);  
      }  
    }; 
    template<typename Iterator,typename T>  
    T parallel_accumulate(Iterator first,Iterator last,T init)  
    {  
      unsigned long const length=std::distance(first,last);  
      if(!length) // 1  
          return init;  
      unsigned long const min_per_thread=25;  
      unsigned long const max_threads=  (length+min_per_thread-1)/min_per_thread; // 2  
      unsigned long const hardware_threads=  std::thread::hardware_concurrency();  unsigned long const num_threads= // 3 
      std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);  unsigned long const block_size=length/num_threads; // 4  
      std::vector<T> results(num_threads);  
      std::vector<std::thread> threads(num_threads-1); // 5  
      Iterator block_start=first;  
      for(unsigned long i=0; i < (num_threads-1); ++i)  
      {  
          Iterator block_end=block_start;  
          std::advance(block_end,block_size); // 6  
          threads[i]=std::thread( // 7  
              accumulate_block<Iterator,T>(),  
              block_start,block_end,std::ref(results[i]));
          block_start=block_end; // 8  
      }  
      accumulate_block<Iterator,T>()(  block_start,last,results[num_threads-1]); // 9 
      for (auto& entry : threads)  entry.join(); // 10 
      return std::accumulate(results.begin(),results.end(),init); // 11 
    }
    

jthread

  • C++11 引入了 std::thread 类型,其与操作系统提供的线程对应,但该类型有一个严重的设计缺 陷: 不是 RAII 类型。

thread 存在的问题

std::thread 要求在其生命周期结束时,若表示正在运行的线程,则调用 join()(等待线程结束) 或 detach()(让线程在后台运行)。若两者都没有调用,析构函数会立即导致异常的程序终止 (在某些系统上导致段错误)。

如果通过确保在离开作用域时调用 join() 来对异常作出反应,而不解决异常。不幸的是,这可能会导致阻塞 (永远)。然而,调用 detach() 也是一个问题,因为线程在程序的后台继续运行,使用 CPU 时间和资源,而这些时间和资源现在可能会销毁。若在更复杂的上下文中使用多个线程,问题会变得更糟,并且会产生非常糟糕的代码。

void foo(){
  std::thread t1{task1, name, val};
  std::thread t2;
  try {
    t2 = std::thread{task2, name, val};
    ...
  }
  catch(...){
    t1.join();
    if(t2.joinable()) {
      t2.join();
    }
    throw;
  }

  t1.join();
  t2.join();
}

std::jthread

  • std::jthread 解决了这些问题,它是 RAII 类型。若线程是可汇入的 (“j”代表“汇入”),析构函数会自动调用 join()。
  • 内置停止机制:std::jthread 与 std::stop_token 集成,支持直接请求停止线程
void foo(){
  std::jthread t1{task1, name, val};
  std::jthread t2{task2, name, val};
  ...

  t1.join();
  t2.join();
}

jthread的停止令牌和停止回调

  • 自动管理停止令牌:当使用 std::jthread 时,不需要手动创建 std::stop_source。std::jthread 自动包含一个内部的 std::stop_source,并在启动线程时将相关的 std::stop_token 传递给线程函数。
  • 接收停止令牌:线程函数可以直接接受一个 std::stop_token 参数,该令牌由 std::jthread 提供,确保与线程的内部停止机制同步。
  • 定期检查停止请求:在线程函数中,应定期调用 std::stop_token::stop_requested() 来检查是否接收到停止请求。这为安全且及时地停止执行提供了机制。
  • 响应停止请求:一旦 std::stop_token 表明停止已被请求,线程函数应采取必要的步骤来安全地终止,这可能包括资源的清理和状态的保存。
#include <iostream>
#include <chrono>
#include <thread>

// 使用 std::jthread 运行的函数
void task(std::stop_token stoken) {
    while (!stoken.stop_requested()) {
        std::cout << "任务正在运行..." << std::endl;
        // 模拟一些工作
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    std::cout << "任务已收到停止请求,现在停止运行。" << std::endl;
}

int main() {
    // 创建 std::jthread,自动处理停止令牌
    std::jthread worker(task);

    // 模拟主线程运行一段时间后需要停止子线程
    std::this_thread::sleep_for(std::chrono::seconds(5));
    std::cout << "主线程请求停止子线程..." << std::endl;
    
    // 触发停止请求
    worker.request_stop();

    // std::jthread 在析构时自动加入
    return 0;
}

std::stop_token 和 std::stop_callback的其他使用案例;

另外, std::stop_token 和 std::stop_callback 并不局限于与线程(如 std::jthread)的使用,它们独立于线程的,用于程序中的任何地方,以提供一种灵活的停止信号处理机制。

#include <iostream>
#include <chrono>
#include <stop_token>

int main() {
    std::stop_source source;
    std::stop_token token = source.get_token();

    // 模拟一些可以被取消的工作
    auto startTime = std::chrono::steady_clock::now();
    auto endTime = startTime + std::chrono::seconds(10);  // 设定10秒后结束任务

    while (std::chrono::steady_clock::now() < endTime) {
        if (token.stop_requested()) {
            std::cout << "Task was canceled!" << std::endl;
            break;
        }
        std::cout << "Working..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));

        // 模拟在某个条件下请求停止
        if (std::chrono::steady_clock::now() > startTime + std::chrono::seconds(5)) {
            source.request_stop();
        }
    }

    if (!token.stop_requested()) {
        std::cout << "Task completed normally." << std::endl;
    }

    return 0;
}
  • stop_token与std::thread结合
#include <iostream>
#include <thread>
#include <stop_token>
#include <chrono>

void threadFunction(std::stop_token stoken) {
    std::stop_callback callback(stoken, []() {
        std::cout << "Stop request received.\n";
    });
   // 4. 定期检查停止请求
    while (!stoken.stop_requested()) {
        std::cout << "Running...\n";
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
   // 5. 响应取消请求
    std::cout << "Thread finishing.\n";
}

int main() {
     // 1. 创建并发起取消请求的源
    std::stop_source stopSource;
     // 2. 生成停止令牌
    std::stop_token stoken = stopSource.get_token();
    // 3. 传递停止令牌
    std::thread t(threadFunction, stoken);

    std::this_thread::sleep_for(std::chrono::seconds(5));
     // 触发停止请求
    stopSource.request_stop();

    t.join();
    std::cout << "Thread stopped.\n";
    return 0;
}