C ++ 11의 코드 풀링
관련 질문 :
C ++ 11 소개 :
- C ++ 11 : std :: 스레드 풀링?
- C ++ 11의 async (launch :: async) 때문에 많은 생성을 피하기 위해 위해로 풀이 더 이상 사용되지 않는 것이 보입니까?
부스트 소개 :
받을 수 있나요 어떻게 스레드 풀 수 로 작업을 보내 만들고 그들을 계속해서 또 다시 삭제하지 않고를? 이는 영구 단일가 결합하지 않고 재 동기화됨을 의미합니다.
다음과 같은 코드가 있습니다.
namespace {
std::vector<std::thread> workers;
int total = 4;
int arr[4] = {0};
void each_thread_does(int i) {
arr[i] += 2;
}
}
int main(int argc, char *argv[]) {
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
workers.push_back(std::thread(each_thread_does, j));
}
for (std::thread &t: workers) {
if (t.joinable()) {
t.join();
}
}
arr[4] = std::min_element(arr, arr+4);
}
return 0;
}
각 반복마다 많은 것을 작성하고 결합하는 대신에 여러 번만 사용하는 것을 선호합니다.
C ++ 사용할 풀 라이브러리 ( https://github.com/vit-vit/ctpl)를 사용할 수 있습니다 .
그런 다음 방송 코드를 다음으로 바꿀 수 있습니다.
#include <ctpl.h> // or <ctpl_stl.h> if ou do not have Boost library
int main (int argc, char *argv[]) {
ctpl::thread_pool p(2 /* two threads in the pool */);
int arr[4] = {0};
std::vector<std::future<void>> results(4);
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
results[j] = p.push([&arr, j](int){ arr[j] +=2; });
}
for (int j = 0; j < 4; ++j) {
results[j].get();
}
arr[4] = std::min_element(arr, arr + 4);
}
}
원하는 수의 반복을 반복해서 작성하고 삭제하지 않습니다.
같은 풀은 모든 노래가 항상 실행 중임을 의미합니다. 다시 말해도 함수는 절대 반환되지 않습니다. 기본에 의미있는 작업을 제공하는 경우에 수행 할 작업이 있음을 알리고 실제 작업 데이터를 많은 목적으로 간 시스템을 설계해야합니다.
일반적으로 여기에는 많은 동시 데이터 구조가 포함되어 있으며, 대부분의 상황은 변수가 있습니다. 하나 이상의 반복되는 알림을 수신하면 동시 데이터 구조에서 작업을 복구 한 후 처리하고 결과를 방식으로 저장합니다.
그런 다음 많은 작업이 있는지 확인하고 다시 잠자 지 다시 확인합니다.
결론은 보편적으로 적용 할 수있는 "작업"이라는 자연스러운 개념이 없기 때문에 모든 것을 직접 설계해야한다는 것입니다. 그것은 약간의 작업이며, 제대로 이해해야 할 미묘한 문제가 있습니다. (뒤에서 일정 관리를 담당하는 시스템이 마음에 드는 경우 Go에서 프로그래밍 할 수 있습니다.)
이것은 내 대답에서 다른 방법으로 복사되었습니다. 도움이 될 수 있기를 바랍니다.
1) 지원할 수있는 최대 시스템 수로 시작하십시오.
int Num_Threads = thread::hardware_concurrency();
2) 예정된 것이 풀 구현을 Num_Threads에 따라 만들거나 만들 예정이 새 결합을 좋습니다. 성능이 버전보다 느려질 수 있습니다.
각 C ++ 11은 무한 루프를 사용하여 기능을 수행하면서 새로운 작업이 계속 실행되기를 기다립니다.
기능을 기능을 갖춘 풀에 연결하는 방법은 다음과 같습니다.
int Num_Threads = thread::hardware_concurrency();
vector<thread> Pool;
for(int ii = 0; ii < Num_Threads; ii++)
{ Pool.push_back(thread(Infinite_loop_function));}
3) 무한 _ 루프 _ 함수
이 작업을 위해 "while (true)"
void The_Pool:: Infinite_loop_function()
{
while(true)
{
{
unique_lock<mutex> lock(Queue_Mutex);
condition.wait(lock, []{return !Queue.empty() || therminate_pool});
Job = Queue.front();
Queue.pop();
}
Job(); // function<void()> type
}
};
4) 대기열에 작업을 추가하는 기능 만들기
void The_Pool:: Add_Job(function<void()> New_Job)
{
{
unique_lock<mutex> lock(Queue_Mutex);
Queue.push(New_Job);
}
condition.notify_one();
}
5) 임의의 기능을 함수에 바인딩
Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));
이러한 구성 요소를 통합하면 고유 한 동적 스레딩이 있습니다. 이 경우는 항상 실행되어 작업을 기다립니다.
구문 오류가 있으면 죄송합니다. 다만 코드를 입력하면 메모리가 부족합니다. 작업 무결성을 위반하는 완전한 코드를 제공 할 수 없습니다.
편집 : 풀을 종료 후 종료 () 메소드를 호출하십시오.
XXXX::shutdown(){
{ unique_lock<mutex> lock(threadpool_mutex);
terminate_pool = true;} // use this flag in condition.wait
condition.notify_all(); // wake up all threads.
// Join all threads.
for(std::thread &every_thread : thread_vector)
{ every_thread.join();}
thread_vector.empty();
stopped = true; // use this flag in destructor, if not set, call shutdown()
}
기능 풀은 핵심적으로 이벤트 루프로 작동하는 함수에 바인딩 된 프로그램 세트입니다. 이 표준은 작업이 실행되거나 자체 종료 될 때까지 끝없이 기다립니다.
하나의 풀 작업은 작업을 출시하고 다음 작업 실행 정책 (스케줄 규칙, 일련 인스턴스화, 풀 크기)을 정의 (및 수정)하고 명령 및 관련 리소스의 모니터링하는 인터페이스를 제공하는 것입니다.
다재다능한 풀의 경우 작업이 무엇인지, 어떻게 시작되고 중단되고 결과가 무엇인지 (질문에 대한 약속과 미래의 개념 참조) 즉, 응답해야 할 이벤트의 종류를 정의하여 시작해야합니다. , 그들이 어떻게 처리 할 것인지, 어떻게 처리 할 것인지 여부를 결정합니다. 솔루션이 점점 복잡 해짐에 따라 복잡해집니다.
현재 이벤트 처리 도구는 상당히 베어 본 (*)입니다. 뮤텍스, 조건 변수 및 그 위에 몇 가지 추상화 (잠금, 장벽)와 같은 기본 요소가 있습니다. 그러나 어떤 경우에는 폐색이 부적합한 것이 판명 될 수 (이 관련 질문 참조 ), 프리미티브 사용으로 되돌려 야합니다.
다른 문제도 관리해야합니다.
- 신호
- 나는
- 하드웨어 (선호도, 이기종 설정)
이것들은 당신의 환경에서 어떻게 진행하는지?
해당 질문에 대한 이 답변 은 부스트 및 STL을위한 기존 구현을 가리.
많이 많은 문제를 다루지 않는 다른 질문에 매우 복잡한 풀 구현 을 제공했습니다 . 당신은 구축 구축 할 수 있습니다. 영감을 기존에 존재하는 다른 언어로 된 기존 프레임을 사용합니다.
(*) 나는 발견 문제로 볼 수 있습니다. 나는 C에서 상속받은 C ++의 정신이라고 생각합니다.
이와 같은 것이 도움이 될 수 있습니다 (작동하는 앱에서 가져옴).
#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
struct thread_pool {
typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
for (int i = 0; i < threads; ++i) {
auto worker = [this] { return service.run(); };
grp.add_thread(new boost::thread(worker));
}
}
template<class F>
void enqueue(F f) {
service.post(f);
}
~thread_pool() {
service_worker.reset();
grp.join_all();
service.stop();
}
private:
boost::asio::io_service service;
asio_worker service_worker;
boost::thread_group grp;
};
다음과 같이 사용할 수 있습니다.
thread_pool pool(2);
pool.enqueue([] {
std::cout << "Hello from Task 1\n";
});
pool.enqueue([] {
std::cout << "Hello from Task 2\n";
});
쉽게 말하지 않는 것은 쉬운 일 이 아닙니다.
Boost :: asio :: io_service는 매우 효율적인 구현이거나 래핑은 플랫폼 별 래퍼 모음입니다 (예 : Windows의 I / O 완료 포트를).
이것은 매우 간단하고 이해하기 제부 사용하기 처 C ++ 11 표준 라이브러리 만 사용하며 용도에 맞게 보거나 수있는 또 다른 코드 풀 구현입니다. 비용이 많이 드는 비용으로 배송되어야합니다. 수영장 :
https://github.com/progschj/ThreadPool
편집 : 이제 C ++ 17과 개념이 필요합니다. (9/12/16 현재 g ++ 6.0 이상이면 충분합니다.)
템플릿 공은 훨씬 더 많은 노력을 기울일 가치가 있습니다. 명시적인 템플릿 인수가 필요한 함수를 아직 찾지 못했습니다.
또한 적절한 호출 가능한 객체를 취합니다 ( 그리고 여전히 정적으로 타입이 안전합니다 !!! ).
또한 동일한 API를 사용하는 녹색 스레딩 우선 순위 풀이 포함됩니다. 이 클래스는 POSIX 전용입니다. ucontext_t
사용자 공간 작업 전환에 API를 사용합니다 .
이를 위해 간단한 라이브러리를 만들었습니다. 사용 예는 다음과 가변합니다. (내가 직접 작성해야 결정하기 전에 처음으로 작성해야 하나이기 때문에 이것에 대답하고 있습니다.)
bool is_prime(int n){
// Determine if n is prime.
}
int main(){
thread_pool pool(8); // 8 threads
list<future<bool>> results;
for(int n = 2;n < 10000;n++){
// Submit a job to the pool.
results.emplace_back(pool.async(is_prime, n));
}
int n = 2;
for(auto i = results.begin();i != results.end();i++, n++){
// i is an iterator pointing to a future representing the result of is_prime(n)
cout << n << " ";
bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
if(prime)
cout << "is prime";
else
cout << "is not prime";
cout << endl;
}
}
async
모든 (또는 void) 반환 값과 인수 (또는 아예 없음)를 사용하여 모든 함수를 전달할 수 있으며 해당하는 std::future
. 결과를 얻으려면 (또는 작업이 완료 될 때까지 기다리 get()
십시오) 미래에 전화 하십시오.
다음은 github입니다 : https://github.com/Tyler-Hardin/thread_pool .
Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:
function_pool.h
#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>
class Function_pool
{
private:
std::queue<std::function<void()>> m_function_queue;
std::mutex m_lock;
std::condition_variable m_data_condition;
std::atomic<bool> m_accept_functions;
public:
Function_pool();
~Function_pool();
void push(std::function<void()> func);
void done();
void infinite_loop_func();
};
function_pool.cpp
#include "function_pool.h"
Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}
Function_pool::~Function_pool()
{
}
void Function_pool::push(std::function<void()> func)
{
std::unique_lock<std::mutex> lock(m_lock);
m_function_queue.push(func);
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
lock.unlock();
m_data_condition.notify_one();
}
void Function_pool::done()
{
std::unique_lock<std::mutex> lock(m_lock);
m_accept_functions = false;
lock.unlock();
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
m_data_condition.notify_all();
//notify all waiting threads.
}
void Function_pool::infinite_loop_func()
{
std::function<void()> func;
while (true)
{
{
std::unique_lock<std::mutex> lock(m_lock);
m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
if (!m_accept_functions && m_function_queue.empty())
{
//lock will be release automatically.
//finish the thread loop and let it join in the main thread.
return;
}
func = m_function_queue.front();
m_function_queue.pop();
//release the lock
}
func();
}
}
main.cpp
#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>
Function_pool func_pool;
class quit_worker_exception : public std::exception {};
void example_function()
{
std::cout << "bla" << std::endl;
}
int main()
{
std::cout << "stating operation" << std::endl;
int num_threads = std::thread::hardware_concurrency();
std::cout << "number of threads = " << num_threads << std::endl;
std::vector<std::thread> thread_pool;
for (int i = 0; i < num_threads; i++)
{
thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
}
//here we should send our functions
for (int i = 0; i < 50; i++)
{
func_pool.push(example_function);
}
func_pool.done();
for (unsigned int i = 0; i < thread_pool.size(); i++)
{
thread_pool.at(i).join();
}
}
STL 외부에 종속성이없는 스레드 풀은 전적으로 가능합니다. 나는 최근에 똑같은 문제를 해결하기 위해 작은 헤더 전용 스레드 풀 라이브러리 를 작성했습니다 . 동적 풀 크기 조정 (런타임에 작업자 수 변경), 대기, 중지, 일시 중지, 재개 등을 지원합니다. 도움이 되셨기를 바랍니다.
부스트 라이브러리에서 thread_pool 을 사용할 수 있습니다 .
void my_task(){...}
int main(){
int threadNumbers = thread::hardware_concurrency();
boost::asio::thread_pool pool(threadNumbers);
// Submit a function to the pool.
boost::asio::post(pool, my_task);
// Submit a lambda object to the pool.
boost::asio::post(pool, []() {
...
});
}
오픈 소스 커뮤니티에서 스레드 풀 을 사용할 수도 있습니다 .
void first_task() {...}
void second_task() {...}
int main(){
int threadNumbers = thread::hardware_concurrency();
pool tp(threadNumbers);
// Add some tasks to the pool.
tp.schedule(&first_task);
tp.schedule(&second_task);
}
참고 URL : https://stackoverflow.com/questions/15752659/thread-pooling-in-c11
'IT' 카테고리의 다른 글
Wireshark 로컬 호스트 트래픽 배포 (0) | 2020.07.29 |
---|---|
가로 ProgressBar에서 세로 패딩 제거 (0) | 2020.07.29 |
파일 린트를 사용하여 하나의 특정 라인을 무시할 수 있습니까? (0) | 2020.07.29 |
사전을 파일로 저장하는 방법? (0) | 2020.07.28 |
Android에서 서비스 시작 (0) | 2020.07.28 |