IT

C ++ 11의 코드 풀링

lottoking 2020. 7. 29. 07:31
반응형

C ++ 11의 코드 풀링


관련 질문 :

C ++ 11 소개 :

부스트 소개 :


받을 수 있나요 어떻게 스레드 풀로 작업을 보내 만들고 그들을 계속해서 또 다시 삭제하지 않고를? 이는 영구 단일가 결합하지 않고 재 동기화됨을 의미합니다.


다음과 같은 코드가 있습니다.

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

각 반복마다 많은 것을 작성하고 결합하는 대신에 여러 번만 사용하는 것을 선호합니다.


C ++ 사용할 풀 라이브러리 ( https://github.com/vit-vit/ctpl)를 사용할 수 있습니다 .

그런 다음 방송 코드를 다음으로 바꿀 수 있습니다.

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library

int main (int argc, char *argv[]) {
    ctpl::thread_pool p(2 /* two threads in the pool */);
    int arr[4] = {0};
    std::vector<std::future<void>> results(4);
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            results[j] = p.push([&arr, j](int){ arr[j] +=2; });
        }
        for (int j = 0; j < 4; ++j) {
            results[j].get();
        }
        arr[4] = std::min_element(arr, arr + 4);
    }
}

원하는 수의 반복을 반복해서 작성하고 삭제하지 않습니다.


같은 풀은 모든 노래가 항상 실행 중임을 의미합니다. 다시 말해도 함수는 절대 반환되지 않습니다. 기본에 의미있는 작업을 제공하는 경우에 수행 할 작업이 있음을 알리고 실제 작업 데이터를 많은 목적으로 간 시스템을 설계해야합니다.

일반적으로 여기에는 많은 동시 데이터 구조가 포함되어 있으며, 대부분의 상황은 변수가 있습니다. 하나 이상의 반복되는 알림을 수신하면 동시 데이터 구조에서 작업을 복구 한 후 처리하고 결과를 방식으로 저장합니다.

그런 다음 많은 작업이 있는지 확인하고 다시 잠자 지 다시 확인합니다.

결론은 보편적으로 적용 할 수있는 "작업"이라는 자연스러운 개념이 없기 때문에 모든 것을 직접 설계해야한다는 것입니다. 그것은 약간의 작업이며, 제대로 이해해야 할 미묘한 문제가 있습니다. (뒤에서 일정 관리를 담당하는 시스템이 마음에 드는 경우 Go에서 프로그래밍 할 수 있습니다.)


이것은 내 대답에서 다른 방법으로 복사되었습니다. 도움이 될 수 있기를 바랍니다.

1) 지원할 수있는 최대 시스템 수로 시작하십시오.

int Num_Threads =  thread::hardware_concurrency();

2) 예정된 것이 풀 구현을 Num_Threads에 따라 만들거나 만들 예정이 새 결합을 좋습니다. 성능이 버전보다 느려질 수 있습니다.

각 C ++ 11은 무한 루프를 사용하여 기능을 수행하면서 새로운 작업이 계속 실행되기를 기다립니다.

기능을 기능을 갖춘 풀에 연결하는 방법은 다음과 같습니다.

int Num_Threads = thread::hardware_concurrency();
vector<thread> Pool;
for(int ii = 0; ii < Num_Threads; ii++)
{  Pool.push_back(thread(Infinite_loop_function));}

3) 무한 _ 루프 _ 함수

이 작업을 위해 "while (true)"

void The_Pool:: Infinite_loop_function()
{
    while(true)
    {
        {
            unique_lock<mutex> lock(Queue_Mutex);

            condition.wait(lock, []{return !Queue.empty() || therminate_pool});
            Job = Queue.front();
            Queue.pop();
        }
        Job(); // function<void()> type
    }
};

4) 대기열에 작업을 추가하는 기능 만들기

void The_Pool:: Add_Job(function<void()> New_Job)
{
    {
        unique_lock<mutex> lock(Queue_Mutex);
        Queue.push(New_Job);
    }
    condition.notify_one();
}

5) 임의의 기능을 함수에 바인딩

Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));

이러한 구성 요소를 통합하면 고유 한 동적 스레딩이 있습니다. 이 경우는 항상 실행되어 작업을 기다립니다.

구문 오류가 있으면 죄송합니다. 다만 코드를 입력하면 메모리가 부족합니다. 작업 무결성을 위반하는 완전한 코드를 제공 할 수 없습니다.

편집 : 풀을 종료 후 종료 () 메소드를 호출하십시오.

XXXX::shutdown(){
{   unique_lock<mutex> lock(threadpool_mutex);
    terminate_pool = true;} // use this flag in condition.wait

condition.notify_all(); // wake up all threads.

// Join all threads.
for(std::thread &every_thread : thread_vector)
{   every_thread.join();}

thread_vector.empty();  
stopped = true; // use this flag in destructor, if not set, call shutdown() 

}


기능 풀은 핵심적으로 이벤트 루프로 작동하는 함수에 바인딩 된 프로그램 세트입니다. 이 표준은 작업이 실행되거나 자체 종료 될 때까지 끝없이 기다립니다.

하나의 풀 작업은 작업을 출시하고 다음 작업 실행 정책 (스케줄 규칙, 일련 인스턴스화, 풀 크기)을 정의 (및 수정)하고 명령 및 관련 리소스의 모니터링하는 인터페이스를 제공하는 것입니다.

다재다능한 풀의 경우 작업이 무엇인지, 어떻게 시작되고 중단되고 결과가 무엇인지 (질문에 대한 약속과 미래의 개념 참조) 즉, 응답해야 할 이벤트의 종류를 정의하여 시작해야합니다. , 그들이 어떻게 처리 할 것인지, 어떻게 처리 할 것인지 여부를 결정합니다. 솔루션이 점점 복잡 해짐에 따라 복잡해집니다.

현재 이벤트 처리 도구는 상당히 베어 본 (*)입니다. 뮤텍스, 조건 변수 및 그 위에 몇 가지 추상화 (잠금, 장벽)와 같은 기본 요소가 있습니다. 그러나 어떤 경우에는 폐색이 부적합한 것이 판명 될 수 (이 관련 질문 참조 ), 프리미티브 사용으로 되돌려 야합니다.

다른 문제도 관리해야합니다.

  • 신호
  • 나는
  • 하드웨어 (선호도, 이기종 설정)

이것들은 당신의 환경에서 어떻게 진행하는지?

해당 질문에 대한 이 답변 은 부스트 ​​및 STL을위한 기존 구현을 가리.

많이 많은 문제를 다루지 않는 다른 질문에 매우 복잡한 풀 구현제공했습니다 . 당신은 구축 구축 할 수 있습니다. 영감을 기존에 존재하는 다른 언어로 된 기존 프레임을 사용합니다.


(*) 나는 발견 문제로 볼 수 있습니다. 나는 C에서 상속받은 C ++의 정신이라고 생각합니다.


이와 같은 것이 도움이 될 수 있습니다 (작동하는 앱에서 가져옴).

#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

struct thread_pool {
  typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;

  thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
    for (int i = 0; i < threads; ++i) {
      auto worker = [this] { return service.run(); };
      grp.add_thread(new boost::thread(worker));
    }
  }

  template<class F>
  void enqueue(F f) {
    service.post(f);
  }

  ~thread_pool() {
    service_worker.reset();
    grp.join_all();
    service.stop();
  }

private:
  boost::asio::io_service service;
  asio_worker service_worker;
  boost::thread_group grp;
};

다음과 같이 사용할 수 있습니다.

thread_pool pool(2);

pool.enqueue([] {
  std::cout << "Hello from Task 1\n";
});

pool.enqueue([] {
  std::cout << "Hello from Task 2\n";
});

쉽게 말하지 않는 것은 쉬운 일 이 아닙니다.

Boost :: asio :: io_service는 매우 효율적인 구현이거나 래핑은 플랫폼 별 래퍼 모음입니다 (예 : Windows의 I / O 완료 포트를).


이것은 매우 간단하고 이해하기 제부 사용하기 처 C ++ 11 표준 라이브러리 만 사용하며 용도에 맞게 보거나 수있는 또 다른 코드 풀 구현입니다. 비용이 많이 드는 비용으로 배송되어야합니다. 수영장 :

https://github.com/progschj/ThreadPool


편집 : 이제 C ++ 17과 개념이 필요합니다. (9/12/16 현재 g ++ 6.0 이상이면 충분합니다.)

템플릿 공은 훨씬 더 많은 노력을 기울일 가치가 있습니다. 명시적인 템플릿 인수가 필요한 함수를 아직 찾지 못했습니다.

또한 적절한 호출 가능한 객체를 취합니다 ( 그리고 여전히 정적으로 타입이 안전합니다 !!! ).

또한 동일한 API를 사용하는 녹색 스레딩 우선 순위 풀이 포함됩니다. 이 클래스는 POSIX 전용입니다. ucontext_t사용자 공간 작업 전환에 API를 사용합니다 .


이를 위해 간단한 라이브러리를 만들었습니다. 사용 예는 다음과 가변합니다. (내가 직접 작성해야 결정하기 전에 처음으로 작성해야 하나이기 때문에 이것에 대답하고 있습니다.)

bool is_prime(int n){
  // Determine if n is prime.
}

int main(){
  thread_pool pool(8); // 8 threads

  list<future<bool>> results;
  for(int n = 2;n < 10000;n++){
    // Submit a job to the pool.
    results.emplace_back(pool.async(is_prime, n));
  }

  int n = 2;
  for(auto i = results.begin();i != results.end();i++, n++){
    // i is an iterator pointing to a future representing the result of is_prime(n)
    cout << n << " ";
    bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
    if(prime)
      cout << "is prime";
    else
      cout << "is not prime";
    cout << endl;
  }  
}

async모든 (또는 void) 반환 값과 인수 (또는 아예 없음)를 사용하여 모든 함수를 전달할 수 있으며 해당하는 std::future. 결과를 얻으려면 (또는 작업이 완료 될 때까지 기다리 get()십시오) 미래에 전화 하십시오.

다음은 github입니다 : https://github.com/Tyler-Hardin/thread_pool .


Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:

function_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>

class Function_pool
{

private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;
    std::atomic<bool> m_accept_functions;

public:

    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    void done();
    void infinite_loop_func();
};

function_pool.cpp

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}

Function_pool::~Function_pool()
{
}

void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}

void Function_pool::done()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_accept_functions = false;
    lock.unlock();
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    m_data_condition.notify_all();
    //notify all waiting threads.
}

void Function_pool::infinite_loop_func()
{
    std::function<void()> func;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(m_lock);
            m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
            if (!m_accept_functions && m_function_queue.empty())
            {
                //lock will be release automatically.
                //finish the thread loop and let it join in the main thread.
                return;
            }
            func = m_function_queue.front();
            m_function_queue.pop();
            //release the lock
        }
        func();
    }
}

main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

class quit_worker_exception : public std::exception {};

void example_function()
{
    std::cout << "bla" << std::endl;
}

int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
    }

    //here we should send our functions
    for (int i = 0; i < 50; i++)
    {
        func_pool.push(example_function);
    }
    func_pool.done();
    for (unsigned int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
}

STL 외부에 종속성이없는 스레드 풀은 전적으로 가능합니다. 나는 최근에 똑같은 문제를 해결하기 위해 작은 헤더 전용 스레드 풀 라이브러리작성했습니다 . 동적 풀 크기 조정 (런타임에 작업자 수 변경), 대기, 중지, 일시 중지, 재개 등을 지원합니다. 도움이 되셨기를 바랍니다.


부스트 라이브러리에서 thread_pool사용할 수 있습니다 .

void my_task(){...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    boost::asio::thread_pool pool(threadNumbers);

    // Submit a function to the pool.
    boost::asio::post(pool, my_task);

    // Submit a lambda object to the pool.
    boost::asio::post(pool, []() {
      ...
    });
}

오픈 소스 커뮤니티에서 스레드 풀사용할 수도 있습니다 .

void first_task() {...}    
void second_task() {...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    pool tp(threadNumbers);

    // Add some tasks to the pool.
    tp.schedule(&first_task);
    tp.schedule(&second_task);
}

참고 URL : https://stackoverflow.com/questions/15752659/thread-pooling-in-c11

반응형