Multiprocessing Queue, Pool 및 Locking을 사용하는 단순한 예
http://docs.python.org/dev/library/multiprocessing.html 에서 문서를 계속했지만 여전히 다중 처리 대기열, 풀 및 잠금으로 어려움을두고 있습니다. 그리고 지금은 아래 예제를 만들 수 있습니다.
Queue와 Pool의 경우 개념을 제대로 이해했는지 모르겠습니다. 틀 렸으면 바로 수정 해주세요. 내가 달성하려는 것은 한 번에 2 개의 요청을 처리하는 것입니다 (이 예에서는 데이터 목록에 8 개 있음). 그래서 무엇을 가지고 있습니까? 두 개의 서로 다른 대기열 (최대 2 개)을 처리 할 수있는 2 개의 프로세스를 사용하여 생성 할 풀 또는 2 개의 입력을 처리해야합니까? 잠금은 출력을 인쇄하는 것입니다.
import multiprocessing
import time
data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_handler(var1):
for indata in var1:
p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
p.start()
def mp_worker(inputs, the_time):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
if __name__ == '__main__':
mp_handler(data)
문제에 대한 최선의 해결책은 Pool
. Queue
s를 사용 하고 별도의 "대기열 공급"기능을 아마도 과잉 일 것입니다.
여기에 프로그램의 약간 재 배열 버전,이 시간 만이 프로세스 A의 coralled Pool
. 원본 코드를 최소한으로 변경하는 가장 쉬운 방법이라고 생각합니다.
import multiprocessing
import time
data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_worker((inputs, the_time)):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
def mp_handler():
p = multiprocessing.Pool(2)
p.map(mp_worker, data)
if __name__ == '__main__':
mp_handler()
것으로하는 mp_worker()
기능 현관은 이제 하나의 인자 (앞의 두 인수 튜플) 수용하기 때문에을 map()
하위 목록에 user-의 입력 입력 데이터, 당신의 노동자 함수에 하나의 인수 로 주어진 각각의 하위리스트 업 기능의 덩어리 .
다수 :
Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Processs c Waiting 6 seconds
Process b DONE
Processs d Waiting 8 seconds
Process c DONE
Processs e Waiting 1 seconds
Process e DONE
Processs f Waiting 3 seconds
Process d DONE
Processs g Waiting 5 seconds
Process f DONE
Processs h Waiting 7 seconds
Process g DONE
Process h DONE
아래 @Thales 의견에 따라 편집하십시오.
프로세스가 가상 쌍으로 실행 가능한 "각 풀 제한에 대한 잠금"을 원하는 경우 ala :
A 대기 B 대기 | A 완료, B 완료 | C 대기, D 대기 | C 완료, D 완료 | ...
그런 다음 함수를 변경하여 각 데이터 쌍에 대해 풀 (2 개 프로세스)을 시작합니다.
def mp_handler():
subdata = zip(data[0::2], data[1::2])
for task1, task2 in subdata:
p = multiprocessing.Pool(2)
p.map(mp_worker, (task1, task2))
이제 출력은 다음과 가변됩니다.
Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Process b DONE
Processs c Waiting 6 seconds
Processs d Waiting 8 seconds
Process c DONE
Process d DONE
Processs e Waiting 1 seconds
Processs f Waiting 3 seconds
Process e DONE
Process f DONE
Processs g Waiting 5 seconds
Processs h Waiting 7 seconds
Process g DONE
Process h DONE
이 질문과 100 % 관련이 없을 수 있습니다. 당신이 사용하는 예를 검색하면 Google에서 가장 먼저 표시됩니다.
이 항목을 인스턴스화하여 큐에 넣을 수 있고 큐가 완료 될 때까지 기다릴 수있는 기본 예제 클래스입니다. 그게 내가 필요한 전부입니다.
from multiprocessing import JoinableQueue
from multiprocessing.context import Process
class Renderer:
queue = None
def __init__(self, nb_workers=2):
self.queue = JoinableQueue()
self.processes = [Process(target=self.upload) for i in range(nb_workers)]
for p in self.processes:
p.start()
def render(self, item):
self.queue.put(item)
def upload(self):
while True:
item = self.queue.get()
if item is None:
break
# process your item here
self.queue.task_done()
def terminate(self):
""" wait until queue is empty and terminate processes """
self.queue.join()
for p in self.processes:
p.terminate()
r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()
이 주제에 대한 나의 개인적인 이동은 다음과 같다.
요점, (풀 요청 환영!) : https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec
import multiprocessing
import sys
THREADS = 3
# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()
def func_worker(args):
"""This function will be called by each thread.
This function can not be a class method.
"""
# Expand list of args into named args.
str1, str2 = args
del args
# Work
# ...
# Serial-only Portion
GLOBALLOCK.acquire()
print(str1)
print(str2)
GLOBALLOCK.release()
def main(argp=None):
"""Multiprocessing Spawn Example
"""
# Create the number of threads you want
pool = multiprocessing.Pool(THREADS)
# Define two jobs, each with two args.
func_args = [
('Hello', 'World',),
('Goodbye', 'World',),
]
try:
# Spawn up to 9999999 jobs, I think this is the maximum possible.
# I do not know what happens if you exceed this.
pool.map_async(func_worker, func_args).get(9999999)
except KeyboardInterrupt:
# Allow ^C to interrupt from any thread.
sys.stdout.write('\033[0m')
sys.stdout.write('User Interupt\n')
pool.close()
if __name__ == '__main__':
main()
Komodo Edit (win10)와 같은 편집기를 사용하는 모든 사람을 sys.stdout.flush()
위해 다음을 추가하십시오 .
def mp_worker((inputs, the_time)):
print " Process %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
sys.stdout.flush()
또는 첫 번째 줄로 :
if __name__ == '__main__':
sys.stdout.flush()
이것은 펼쳐지는 동안 무슨 일이 일어나는지 보는 데 도움이됩니다. 검은 색 명령 줄 상자를 볼 필요가 없습니다.
다음은 내 코드의 예입니다 (스레드 풀의 경우 클래스 이름 만 변경하면 프로세스 풀이 생김).
def execute_run(rp):
... do something
pool = ThreadPoolExecutor(6)
for mat in TESTED_MATERIAL:
for en in TESTED_ENERGIES:
for ecut in TESTED_E_CUT:
rp = RunParams(
simulations, DEST_DIR,
PARTICLE, mat, 960, 0.125, ecut, en
)
pool.submit(execute_run, rp)
pool.join()
원래:
pool = ThreadPoolExecutor(6)
6 개의 스레드에 대한 풀을 만듭니다.- 그런 다음 풀에 작업을 추가하는 for가 많이 있습니다.
pool.submit(execute_run, rp)
풀에 작업을 추가하고 첫 번째 arogument는 스레드 / 프로세스에서 호출 된 함수이고 나머지 인수는 호출 된 함수로 전달됩니다.pool.join
모든 작업이 완료 될 때까지 기다립니다.
'IT' 카테고리의 다른 글
Rx Observable에서 어떻게 ʻawait` 할 수 있습니까? (0) | 2020.10.08 |
---|---|
{} 문을 사용하여 내부에서 반환을 호출하는 것이 좋은 방법입니까? (0) | 2020.10.08 |
셀에서 복사 할 때 따옴표를 생략 (0) | 2020.10.08 |
Kotlin : 공개 get private set var (0) | 2020.10.08 |
C ++ 읽기 및 쓰기는 int 원자입니까? (0) | 2020.10.08 |