UFO ET IT

다중 처리 풀에서 발생한 예외가 감지되지 않음

ufoet 2020. 11. 21. 08:36
반응형

다중 처리 풀에서 발생한 예외가 감지되지 않음


multiprocessing.Pool 프로세스에서 예외가 발생하면 스택 추적이나 실패한 다른 표시가없는 것 같습니다. 예:

from multiprocessing import Pool 

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go)
p.close()
p.join()

1을 인쇄하고 조용히 멈 춥니 다. 흥미롭게도 BaseException을 발생시키는 것이 대신 작동합니다. 모든 예외에 대한 동작을 BaseException과 동일하게 만드는 방법이 있습니까?


적어도 디버깅 목적으로 문제에 대한 합리적인 해결책이 있습니다. 현재 주요 프로세스에서 예외를 다시 발생시키는 솔루션이 없습니다. 내 첫 번째 생각은 데코레이터를 사용하는 것이었지만 , 모듈의 최상위 수준에서 정의 된 함수 만 피클 할 수 있으므로 바로 그럴 것입니다.

대신 간단한 래핑 클래스와이를 사용하는 Pool 하위 클래스 apply_async(따라서 apply). 나는 map_async독자를위한 연습으로 떠날 것이다 .

import traceback
from multiprocessing.pool import Pool
import multiprocessing

# Shortcut to multiprocessing's logger
def error(msg, *args):
    return multiprocessing.get_logger().error(msg, *args)

class LogExceptions(object):
    def __init__(self, callable):
        self.__callable = callable

    def __call__(self, *args, **kwargs):
        try:
            result = self.__callable(*args, **kwargs)

        except Exception as e:
            # Here we add some debugging help. If multiprocessing's
            # debugging is on, it will arrange to log the traceback
            error(traceback.format_exc())
            # Re-raise the original exception so the Pool worker can
            # clean up
            raise

        # It was fine, give a normal answer
        return result

class LoggingPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)

def go():
    print(1)
    raise Exception()
    print(2)

multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)

p.apply_async(go)
p.close()
p.join()

이것은 나에게 준다 :

1
[ERROR/PoolWorker-1] Traceback (most recent call last):
  File "mpdebug.py", line 24, in __call__
    result = self.__callable(*args, **kwargs)
  File "mpdebug.py", line 44, in go
    raise Exception()
Exception

어쩌면 내가 뭔가를 놓치고 있지만 getResult 객체 메서드가 반환하는 것이 아닙니까? 프로세스 풀을 참조하십시오 .

class multiprocessing.pool.AsyncResult

Pool.apply_async () 및 Pool.map_async (). get ([timeout])에서
반환 된 결과 의 클래스 결과가 도착하면 반환합니다. timeout이 None이 아니고 결과가 timeout 초 내에 도착하지 않으면 multiprocessing.TimeoutError가 발생합니다. 원격 호출에서 예외가 발생하면 해당 예외는 get ()에 의해 다시 발생합니다.

따라서 예제를 약간 수정하면

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()

결과적으로주는

1
Traceback (most recent call last):
  File "rob.py", line 10, in <module>
    x.get()
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
    raise self._value
Exception: foobar

이것은 역 추적을 인쇄하지 않기 때문에 완전히 만족 스럽지는 않지만 아무것도없는 것보다 낫습니다.

업데이트 :이 버그는 Richard Oudkerk에 의해 Python 3.4에서 수정되었습니다. multiprocessing.pool.Async가 전체 역 추적을 반환해야하는 문제 get 메서드를 참조하세요 .


작성 당시 가장 많은 표를 얻은 솔루션에는 다음과 같은 문제가 있습니다.

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()  ## waiting here for go() to complete...
p.close()
p.join()

@dfrankow가 언급했듯이, x.get()작업을 비동기 적으로 실행하는 지점을 파괴하는 에서 대기합니다 . 따라서 효율성을 높이기 위해 (특히 작업자 기능 go이 오래 걸리는 경우) 다음과 같이 변경합니다.

from multiprocessing import Pool

def go(x):
    print(1)
    # task_that_takes_a_long_time()
    raise Exception("Can't go anywhere.")
    print(2)
    return x**2

p = Pool()
results = []
for x in range(1000):
    results.append( p.apply_async(go, [x]) )

p.close()

for r in results:
     r.get()

장점 : 작업자 함수는 비동기 적으로 실행되므로 예를 들어 여러 코어에서 많은 작업을 실행하는 경우 원래 솔루션보다 훨씬 효율적입니다.

단점 : 작업자 함수에 예외가있는 경우 풀이 모든 작업을 완료 한 후에 만 발생 합니다. 이것은 바람직한 행동 일 수도 있고 아닐 수도 있습니다. @colinfang의 의견에 따라 수정하여이 문제를 해결했습니다.


이 데코레이터로 예외 로깅 성공했습니다.

import traceback, functools, multiprocessing

def trace_unhandled_exceptions(func):
    @functools.wraps(func)
    def wrapped_func(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except:
            print 'Exception in '+func.__name__
            traceback.print_exc()
    return wrapped_func

질문의 코드를 사용하면

@trace_unhandled_exceptions
def go():
    print(1)
    raise Exception()
    print(2)

p = multiprocessing.Pool(1)

p.apply_async(go)
p.close()
p.join()

프로세스 풀에 전달하는 함수를 장식하기 만하면됩니다. 이 작업의 핵심은 @functools.wraps(func)그렇지 않으면 다중 처리가 PicklingError.

위의 코드는

1
Exception in go
Traceback (most recent call last):
  File "<stdin>", line 5, in wrapped_func
  File "<stdin>", line 4, in go
Exception

import logging
from multiprocessing import Pool

def proc_wrapper(func, *args, **kwargs):
    """Print exception because multiprocessing lib doesn't return them right."""
    try:
        return func(*args, **kwargs)
    except Exception as e:
        logging.exception(e)
        raise

def go(x):
    print x
    raise Exception("foobar")

p = Pool()
p.apply_async(proc_wrapper, (go, 5))
p.join()
p.close()

프로세스에서 예외의 전체 역 추적을 보여주는 RemoteException.py 모듈을 만들었습니다 . Python2. 다운로드 하여 코드에 추가하세요.

import RemoteException

@RemoteException.showError
def go():
    raise Exception('Error!')

if __name__ == '__main__':
    import multiprocessing
    p = multiprocessing.Pool(processes = 1)
    r = p.apply(go) # full traceback is shown here

pdb를 사용해 보겠습니다.

import pdb
import sys
def handler(type, value, tb):
  pdb.pm()
sys.excepthook = handler

Since you have used apply_sync, I guess the use case is want to do some synchronize tasks. Use callback for handling is another option. Please note this option is available only for python3.2 and above and not available on python2.7.

from multiprocessing import Pool

def callback(result):
    print('success', result)

def callback_error(result):
    print('error', result)

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)

# You can do another things

p.close()
p.join()

Since there are already decent answers for multiprocessing.Pool available, I will provide a solution using a different approach for completeness.

For python >= 3.2 the following solution seems to be the simplest:

from concurrent.futures import ProcessPoolExecutor, wait

def go():
    print(1)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor() as p:
    for i in range(10):
        futures.append(p.submit(go))

results = [f.result() for f in futures]

Advantages:

  • very little code
  • raises an exception in the main process
  • provides a stack trace
  • no external dependencies

For more info about the API please check: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor

Additionally, if you are submitting a large number of tasks and you would like your main process to fail as soon as one of your tasks fail, you can use the following snippet:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time


def go():
    print(1)
    time.sleep(0.3)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor(1) as p:
    for i in range(10):
        futures.append(p.submit(go))

    for f in as_completed(futures):
        if f.exception() is not None:
            for f in futures:
                f.cancel()
            break

[f.result() for f in futures]

All of the other answers fail only once all tasks have been executed.

참고URL : https://stackoverflow.com/questions/6728236/exception-thrown-in-multiprocessing-pool-not-detected

반응형