[Python] 4. 멀티프로세싱 2
본 포스팅은 “파이썬 동시성 프로그래밍” 책 내용을 기반으로 작성되었습니다. 잘못된 내용이 있을 경우 지적해 주시면 감사드리겠습니다.
4-1. 멀티프로세싱 풀
파이썬 애플리케이션이 멀티프로세스로 동작하면, 멀티프로세싱 모듈 내 다양한 기능을 가진 Pool 클래스를 활용할 수 있다. Pool 클래스는 프로그램 내 여러 자식 프로세스를 쉽게 실행하고, 풀에서 작업자를 선택할 수 있다.
1. concurrent.futures.ProcessPoolExecutor와 Pool의 차이점
프로세스 풀의 multiprocessing.Pool 구현은 병렬 처리 능력을 지원하고자 concurrent.futures.ProcessPoolExecutor과 거의 동일한 구현 형태를 지닌다. 그러나 concurrent.futures.ProcessPoolExecutor는 프로세스 풀 생성을 쉽게 해주는 인터페이스만 지원한다. 이런 간단한 인터페이스는 프로그래머들이 스레드와 프로세스 풀 모두 즉각적으로 시작할 수 있게 하지만 이러한 작업이 복잡해 특정 상황에서 세밀한 조정이 필요할 때 오히려 불필요하다.
ThreadPoolExecutor와 ProcessPoolExcutor 모두 동일한 추상 클래스와 하위 클래스이므로, 상속 메소드를 이해하고 작업하기가 좀 더 쉽다. 일반적으로 multiprocessing.Pool 보다 concurrent.futures.ProcessPoolExecutor가 필요조건에 적합하므로 추천하지만, 더 많은 조작을 필요로 하는 한계에 부딪혔을 때의 대안이 필요하다.
2. 컨텍스트 관리자
from multiprocessing import Pool
def task(n):
print(n)
def main():
with Pool(4) as p: # with 문은 자원 획득하고 사용 후 반납하는 경우 사용! Pool 함수 인자에는 사용할 프로세스 개수 입력!
print(p.map(task, [2, 3, 4]))
if __name__ == '__main__':
main()
(결과) 2
3
4
[None, None, None]
3. 프로세스 풀에 작업 전달하기
위의 예제보다 더 복잡한 경우, 다양한 방법으로 풀과 상호작용하여 해결할 수 있다.
먼저 apply는 ThreadPoolExecutor.submit()과 같다. 즉, 개별적인 작업을 풀 객체에 전달하고자 사용된다. 분리된 프로세스로 각 함수를 처리하지만, 현재의 프로세스가 결과를 내기 전까지는 다음 프로세스를 블록한다.
from multiprocessing import Pool
import time
import os
def myTask(n):
print("Task processed by Process {}".format(os.getpid()))
time.sleep(1)
return n*2
def main():
with Pool(4) as p:
start = time.time()
print(p.apply(myTask, (4,)))
print(p.apply(myTask, (3,)))
print(p.apply(myTask, (2,)))
print(p.apply(myTask, (1,)))
delta_t = time.time()-start
print("Time :",delta_t)
if __name__ == '__main__':
main()
(결과) Task processed by Process 12484
8
Task processed by Process 36968
6
Task processed by Process 20852
4
Task processed by Process 30636
2
Time : 4.244585990905762
반면 병렬 실행 작업이 필요할 경우 apply_async 함수를 바탕으로 풀에 작업을 전달할 수 있다. 다음 예제는 4개의 작업을 프로세싱 풀에 전달하고자 함수 내 for문을 사용한다.
from multiprocessing import Pool
import time
import os
def myTask1(n):
print("Task processed by Process {}".format(os.getpid()))
return n*2
def main():
print("apply_async")
with Pool(4) as p:
tasks = []
for i in range(4):
task = p.apply_async(func=myTask1, args=(i,))
tasks.append(task)
for task in tasks:
task.wait()
print("Result: {}".format(task.get()))
if __name__ == '__main__':
main()
(결과) apply_async
Task processed by Process 38520
Task processed by Process 38520
Result: 0
Task processed by Process 38520
Result: 2
Task processed by Process 38520
Result: 4
Result: 6
task 배열에 전달한 순서 그대로 task.wait()를 호출하여 결과 또한 순서대로 콘솔에 출력된다고 하는데, 잘 된것이 맞는지는 모르겠다. 빠르게 실행되는 것은 하나의 프로세스로 처리가 가능한가보다. 아래 코드 처럼 time.sleep(1)을 주었더니 4개의 프로세스로 병렬처리 수행하였다.
from multiprocessing import Pool
import time
import os
def myTask1(n):
print("Task processed by Process {}".format(os.getpid()))
time.sleep(1)
return n*2
def main():
print("apply_async")
with Pool(4) as p:
start = time.time()
tasks = []
for i in range(4):
task = p.apply_async(func=myTask1, args=(i,))
tasks.append(task)
for task in tasks:
task.wait()
print("Result: {}".format(task.get()))
delta_t = time.time()-start
print("Time :",delta_t)
if __name__ == '__main__':
main()
(결과) apply_async
Task processed by Process 22052
Task processed by Process 41884
Task processed by Process 36356
Task processed by Process 26920
Result: 0
Result: 2
Result: 4
Result: 6
Time : 1.2862725257873535
Leave a comment