UFO ET IT

다중 처리를 사용하는 셀러리 병렬 분산 작업

ufoet 2020. 11. 23. 20:39
반응형

다중 처리를 사용하는 셀러리 병렬 분산 작업


CPU 집약적 인 셀러리 작업이 있습니다. 이 작업을 더 빠르게 수행하기 위해 많은 EC2 인스턴스에서 모든 처리 능력 (코어)을 사용하고 싶습니다 (다중 처리를 사용하는 셀러리 병렬 분산 작업- 제 생각에는 ) .

용어, 스레딩 , 멀티 프로세싱 , 분산 컴퓨팅 , 분산 병렬 처리는 내가 더 잘 이해하기 위해 노력하고있어 모든 용어이다.

예제 작업 :

  @app.task
  for item in list_of_millions_of_ids:
      id = item # do some long complicated equation here very CPU heavy!!!!!!! 
      database.objects(newid=id).save()

위의 코드 (가능한 경우 예제 포함)를 사용하여 클라우드에서 사용 가능한 모든 컴퓨터에 걸쳐 모든 컴퓨팅 CPU 성능을 활용하여이 작업을 분할 할 수 있도록 허용함으로써 Celery를 사용하여이 작업을 배포하는 방법은 무엇입니까?


귀하의 목표는 다음과 같습니다.

  1. 작업을 여러 머신에 배포 (분산 컴퓨팅 / 분산 병렬 처리)
  2. 모든 CPU (다중 처리 / 스레딩)에 지정된 컴퓨터의 작업을 배포합니다.

셀러리는이 두 가지를 매우 쉽게 할 수 있습니다. 가장 먼저 이해해야 할 점은 각 셀러리 작업자가 기본적 으로 시스템에서 사용 가능한 CPU 코어 수만큼 작업을 실행 하도록 구성 되어 있다는 것입니다.

동시성은 작업을 동시에 처리하는 데 사용되는 프리 포크 작업자 프로세스의 수입니다. 이러한 모든 작업이 바쁘면 새 작업이 처리되기 전에 작업 중 하나가 완료 될 때까지 기다려야합니다.

기본 동시성 수는 해당 시스템 (코어 포함)의 CPU 수이며 -c 옵션을 사용하여 사용자 지정 번호를 지정할 수 있습니다. 최적의 수는 여러 요인에 따라 달라 지므로 권장되는 값은 없지만 작업이 대부분 I / O 바운드 인 경우이를 늘릴 수 있습니다. 실험에 따르면 CPU 수를 두 배 이상 추가하는 것은 거의 발생하지 않습니다. 효과적이고 대신 성능을 저하시킬 가능성이 있습니다.

즉, 각 개별 작업은 다중 CPU / 코어를 사용하기 위해 다중 처리 / 스레딩을 사용하는 것에 대해 걱정할 필요가 없습니다. 대신 셀러리는 사용 가능한 각 CPU를 사용하기에 충분한 작업을 동시에 실행합니다.

그 과정에서 다음 단계는 .NET Framework의 일부 하위 집합 처리를 처리하는 작업을 만드는 것 list_of_millions_of_ids입니다. 여기에는 몇 가지 옵션이 있습니다. 하나는 각 작업이 단일 ID를 처리하도록하는 것이므로 N 개의 작업을 실행합니다 N == len(list_of_millions_of_ids). 이렇게하면 한 명의 작업자가 일찍 끝나고 그냥 대기하는 경우가 없기 때문에 작업이 모든 작업에 균등하게 분배됩니다. 작업이 필요한 경우 대기열에서 ID를 가져올 수 있습니다. 셀러리를 사용하여 (John Doe가 언급했듯이) 이것을 할 수 있습니다 group.

tasks.py :

@app.task
def process_id(item):
    id = item #long complicated equation here
    database.objects(newid=id).save()

그리고 작업을 실행하려면 :

from celery import group
from tasks import process_id

jobs = group(process_id.s(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()

또 다른 옵션은 목록을 더 작은 조각으로 나누고 그 조각을 작업자에게 배포하는 것입니다. 이 접근 방식은 일부 작업자가 작업을 계속하는 동안 대기중인 작업자가있을 수 있기 때문에 일부주기를 낭비 할 위험이 있습니다. 그러나 셀러리 문서 에서는 이러한 우려가 종종 근거가 없다고 말합니다 .

일부는 작업을 청킹하면 병렬 처리가 저하 될 수 있다고 걱정할 수 있지만 바쁜 클러스터에서는 거의 적용되지 않으며 메시징 오버 헤드를 피하고 있기 때문에 성능이 크게 향상 될 수 있습니다.

따라서 목록을 청크하고 각 작업에 청크를 배포하면 메시징 오버 헤드가 줄어들어 성능이 더 우수하다는 것을 알 수 있습니다. 한 번에 하나의 ID를 수행하는 대신 각 ID를 계산하고 목록에 저장 한 다음 DB에 전체 목록을 추가하는 방식으로 데이터베이스에 대한 부하를 약간 줄일 수도 있습니다. . 청크 접근 방식은 다음과 같습니다.

tasks.py :

@app.task
def process_ids(items):
    for item in items:
        id = item #long complicated equation here
        database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.

작업을 시작하려면 :

from tasks import process_ids

jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()

어떤 청킹 크기가 최상의 결과를 제공하는지 약간 실험 해 볼 수 있습니다. 메시징 오버 헤드를 줄이면서 작업자가 다른 작업자보다 훨씬 빨리 청크를 완료하고 할 일이없는 상태로 대기하는 일이 없도록 크기를 충분히 작게 유지하는 최적의 지점을 찾고 싶습니다.


유통의 세계에서 무엇보다도 기억해야 할 것은 단 하나뿐입니다.

조기 최적화는 모든 악의 근원입니다. D. Knuth 작성

분명하게 들리지만 이중 확인을 배포하기 전에 최상의 알고리즘을 사용하고 있습니다 (존재하는 경우 ...). 하지만 배포를 최적화하는 것은 다음 세 가지 간의 균형을 맞추는 작업입니다.

  1. 영구 매체에서 데이터 쓰기 / 읽기,
  2. 매체 A에서 매체 B로 데이터 이동,
  3. 데이터 처리,

Computers are made so the closer you get to your processing unit (3) the faster and more efficient (1) and (2) will be. The order in a classic cluster will be : network hard drive, local hard drive, RAM, inside processing unit territory... Nowadays processors are becoming sophisticated enough to be considered as an ensemble of independent hardware processing units commonly called cores, these cores process data (3) through threads (2). Imagine your core is so fast that when you send data with one thread you are using 50% of the computer power, if the core has 2 threads you will then use 100%. Two threads per core is called hyper threading, and your OS will see 2 CPUs per hyper threaded core.

Managing threads in a processor is commonly called multi-threading. Managing CPUs from the OS is commonly called multi-processing. Managing concurrent tasks in a cluster is commonly called parallel programming. Managing dependent tasks in a cluster is commonly called distributed programming.

So where is your bottleneck ?

  • In (1): Try to persist and stream from the upper level (the one closer to your processing unit, for example if network hard drive is slow first save in local hard drive)
  • In (2): This is the most common one, try to avoid communication packets not needed for the distribution or compress "on the fly" packets (for example if the HD is slow, save only a "batch computed" message and keep the intermediary results in RAM).
  • In (3): You are done! You are using all the processing power at your disposal.

What about Celery ?

Celery is a messaging framework for distributed programming, that will use a broker module for communication (2) and a backend module for persistence (1), this means that you will be able by changing the configuration to avoid most bottlenecks (if possible) on your network and only on your network. First profile your code to achieve the best performance in a single computer. Then use celery in your cluster with the default configuration and set CELERY_RESULT_PERSISTENT=True :

from celery import Celery

app = Celery('tasks', 
             broker='amqp://guest@localhost//',
             backend='redis://localhost')

@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
    #code that does stuff
    return result

During execution open your favorite monitoring tools, I use the default for rabbitMQ and flower for celery and top for cpus, your results will be saved in your backend. An example of network bottleneck is tasks queue growing so much that they delay execution, you can proceed to change modules or celery configuration, if not your bottleneck is somewhere else.


Why not use group celery task for this?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

Basically, you should divide ids into chunks (or ranges) and give them to a bunch of tasks in group.

For smth more sophisticated, like aggregating results of particular celery tasks, I have successfully used chord task for similar purpose:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

Increase settings.CELERYD_CONCURRENCY to a number that is reasonable and you can afford, then those celery workers will keep executing your tasks in a group or a chord until done.

Note: due to a bug in kombu there were trouble with reusing workers for high number of tasks in the past, I don't know if it's fixed now. Maybe it is, but if not, reduce CELERYD_MAX_TASKS_PER_CHILD.

Example based on simplified and modified code I run:

@app.task
def do_matches():
    match_data = ...
    result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())

summarize gets results of all single_batch_processor tasks. Every task runs on any Celery worker, kombu coordinates that.

Now I get it: single_batch_processor and summarize ALSO have to be celery tasks, not regular functions - otherwise of course it will not be parallelized (I'm not even sure chord constructor will accept it if it's not a celery task).


Adding more celery workers will certainly speed up executing the task. You might have another bottleneck though: the database. Make sure it can handle the simultaneous inserts/updates.

Regarding your question: You are adding celery workers by assigning another process on your EC2 instances as celeryd. Depending on how many workers you need you might want to add even more instances.

참고URL : https://stackoverflow.com/questions/23916413/celery-parallel-distributed-task-with-multiprocessing

반응형