Celery로 Flask에서 비동기 처리하기
라온화이트햇 핵심연구팀 최정수
Celery는?
Flask는 프로세스를 동기적(Synchronous)으로 처리하기 때문에 업로드된 데이터 처리 또는 이메일 전송과 같이 다소 오래 걸리는 작업이나 무거운 연산 같은 경우 사용자는 웹 서버의 처리가 모두 마무리될 때까지 기다려야 합니다.
따라서 비동기로 실행하도록 직접 구현할 수 도 있지만 미리 구현된 비동기 작업 큐 라이브러리를 사용하는 것이 능률적인 면에서 뛰어납니다.
Celery는 Python으로 작성된 비동기 작업 큐(Asynchronous task queue/job queue)이기 때문에 Flask와 같은 Python Web Framework에 붙여서 사용하기 수월합니다.
Celery의 구성
[Flask + Celery + redis]의 전반적인 아키텍처
Celery는 크게 3가지 구성 요소가 있습니다.
-
Celery Client
백그라운드 작업을 요청하는데 사용 됩니다. 이 문서에서는 Flask가 Celery Client가 됩니다.
-
Celery Workers
Flask와 동일한 서버에서 백그라운드 작업을 실행하는데 사용 됩니다.
-
Message Broker
클라이언트는 메시지 큐를 통해 작업자와 통신 하며 Celery는 이러한 큐를 구현하는 여러 가지 방법을 지원합니다. 가장 일반적으로 사용되는 브로커는 RabbitMQ 및 Redis 입니다.
Celery 예제
Celery를 간단하게 사용해보도록 하겠습니다.
pip install celery
pip install redis
우선 celery를 사용하기 위해서 Celery Workers
인 celery를 설치하고 Message Broker
로 redis를 설치해줍니다.
# tasks.py
from celery import Celery
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
app = Celery('tasks', broker=BROKER_URL, backend=CELERY_RESULT_BACKEND)
@app.task
def add(x, y):
return x + y
BROKER_URL
은 Message Brocker
의 주소로 redis를 사용하기 때문에 redis 주소를 넣어줍니다.
CELERY_RESULT_BACKEND
은 작업을 저장할 backend 주소로 똑같이 redis를 사용하므로 redis 주소를 넣어줍니다.
app
이라는 객체를 Celery로 만들어줍니다. Celery의 첫 번째 인자는 Celery에서 사용할 이름입니다. 또한 Celery의 task로 add 함수를 사용하기 위해서 add 함수를 작성해줍니다.
celery -A tasks worker --loglevel=info
이제 celery를 실행할 차례입니다. app(Celery 객체)을 만들 때 Celery의 이름을 tasks
로 설정했기 때문에 -A 인자로 tasks
를 넘겨주게 됩니다.
celery -A tasks worker --loglevel=info --autoscale=10,3
기본적으로 celery는 1개의 worker로 동작하며 worker를 추가하고 싶으면 --autoscale
인자를 사용하여 worker를 추가할 수 있습니다. 위에는 최대 10개, 최소 3개의 worker를 사용하여 task가 동시에 최소 3개, 최대 10개가 처리 될 수 있다는 뜻입니다.
Celery 실행
Celery를 실행하면 위와 같이 간단한 정보가 출력되며 redis에 접속이 되었고 준비가 되었다는 메세지를 확인할 수 있습니다.
Celery의 Celery Workers
는 Celery, Message Broker
는 Redis, Celery Client
는 아직 정의하지 않았습니다.
간단한 예제 코드임으로 Celery Client
로 Flask가 아닌 python interpreter
를 사용하도록 하겠습니다.
python interpreter에서 add 함수를 celery를 통해 실행
python interpreter
에서 위에서 작성한 tasks.py
파일의 add
함수를 로드해주고 delay
함수를 사용하여 Celery에게 작업 명령을 내립니다.
celery에서 task를 받고 실행된 결과
python interpreter
에서 요청된 add 함수는 Celery
의 로그를 통해 확인할 수 있습니다.
[2020-03-22 23:26:42,448: INFO/MainProcess]
Received task: tasks.add[6abc6f77-071b-4e42-b137-a20ad7f8b290]
python interpreter
에서 요청한 task가 MainProcess
로 부터 도착 한 것을 알 수 있으며 task의 id는 6abc6f77-071b-4e42-b137-a20ad7f8b290
입니다.
[2020-03-22 23:26:42,453: INFO/ForkPoolWorker-1]
Task tasks.add[6abc6f77-071b-4e42-b137-a20ad7f8b290]
succeeded in 0.0032477990025654435s: 31337
ForkPoolWorker-1
가 id가 6abc6f77-071b-4e42-b137-a20ad7f8b290
인 task를 처리 했으며 실행 시간으로 0.0032477990025654435s
가 걸렸고 결과 값이 31337
이라는 것을 알 수 있습니다.
ready 함수와 get 함수의 실행
ready() 함수는 작업이 완료 되었는지 확인하는 함수 입니다. True
일 경우 작업이 완료되었다는 뜻입니다. 반대로 False
인 경우 작업이 진행중이라는 뜻입니다.
get() 함수는 실행 결과를 가져오는 함수입니다.
Celery를 APT Zero에 적용하기
그럼 본격적으로 Flask로 짜여진 APT Zero에 Celery를 적용해보도록 하겠습니다.
APT Zero는 상황에 따라 수 백건의 메일을 보내야합니다. 수 백건의 메일을 발송하는 동안 사용자가 대기하지 않도록 Celery를 적용하여 처리할 수 있습니다.
@app.route("/send", methods=["GET"])
def send():
conn_redis = get_redis()
temp = conn_redis.get('mail_progress')
if temp is None: temp = {}
else: temp = json.loads(temp)
mail_list = get_mail_list()
task = mail_send_process.apply_async([mail_list])
temp[training_id] = task.id
conn_redis.set('mail_progress', json.dumps(temp))
return jsonify({"error": 0})
/send
URL을 통해 메일 전송을 하면 redis
를 사용하여 mail_progress
라는 key에 데이터를 불러옵니다. 해당 데이터는 메일 전송 작업의 진행도를 출력하는 페이지에서 다시 사용합니다. (Celery에서 Message Broker로 사용되는 redis와는 다릅니다. Celery에서 Message Broker로 사용되는 redis는 Celery 내부에서 동작, 사용됩니다. 따라서 사용자는 redis를 import 하지 않아도 됩니다.)
전송 할 mail_list
를 받아온 뒤 @celery.task
로 정의한 함수인 mail_send_process
함수에 mail_list를 인자로 주어 호출합니다.
@celery.task
로 정의한 함수의 사용은 위 예제에서 본 것처럼 delay
함수를 사용해서 호출 할 수 있으며, 추가적으로 apply_async
함수를 사용해서 호출 할 수 있습니다. delay
함수와 apply_async
함수의 차이로는 apply_async
함수는 인자를 list
형태로 전달해야 된다는 차이가 있습니다.
mail_send_process.apply_async
함수가 실행되면 Celery 내부에서 task가 등록이 됩니다. 해당 task의 id를 redis에 넣어줍니다. 이 부분은 위에서 언급했듯이 Celery에서 Message Broker로 사용하기 위해 redis를 사용하는 것이 아닙니다.
@celery.task(bind=True)
def mail_send_process(self, mail_list):
with app.app_context():
total = len(mail_list) - 1
for idx, mail in enumerate(mail_list):
# mail_send #
time.sleep(2.0/random.randint(1,4))
self.update_state(state='PROGRESS', meta={'current': idx, 'total': total})
return {'current': idx, 'total': total}
그럼 이제 @celery.task
로 정의된 mail_send_process
함수에 대해서 살펴봅시다.
Celery는 다른 프로세스로 돌기 때문에 with app.app_context()
안에서 작업을 해줘야 flask에서 하던 작업들을 이어 할 수 있습니다.
mail_send_process
함수에서는 mail_list만큼 반복문을 돌면서 mail을 전송하게 됩니다. 메일을 전송하는 과정은 생략하고 메일 전송시간이 랜덤으로 0.5초 ~ 2초가 걸린다고 가정하고 sleep 함수를 걸어두었습니다. (메일이 발송되는 시간이라고 생각하시면 될 것 같습니다.)
메일이 전송이 되면 update_state
함수로 Celery task의 상태를 update 해줍니다. state는 task의 상태, meta는 전체 메일 발송 갯수 중 몇 개의 메일이 발송 완료 되었는지 확인해주기 위해서 인덱스 값을 넣어 줍니다.
Celery task가 실행됨에 따라서 current
값이 계속 업데이트 됩니다. 이제 우리는 메일 전송 작업의 진행도를 출력해주는 페이지에서 해당 값을 가져와서 progress bar로 표현을 해봅시다.
@app.route("/progress", methods=["POST"])
def get_progress():
task = mail_send_process.AsyncResult(request.form['task_id'])
if task.state == 'PENDING':
response = {'state': task.state, 'current': 0, 'total': 1}
elif task.state != 'FAILURE':
response = {'state': task.state, 'current': task.info.get('current', 0), 'total': task.info.get('total', 1)}
return jsonify(response)
Celery task의 현재 실행 결과를 알려주는 함수는 AsyncResult
함수입니다. mail_send_process.AsyncResult(task_id)
와 같이 사용되며 메일 전송 작업의 진행도를 출력해주는 페이지에서 ajax
를 사용하여 현재 상태 값을 불러올 것이기 때문에 위와 같이 flask
로 함수를 작성해줍니다.
get_progress 결과 1
get_progress 결과 2
ajax를 사용하여 /progress
에 요청하면 get_progress함수가 실행되고 그 결과 입니다. 현재 메일 발송이 진행중이기 때문에 state
는 PROGRESS
로 표시되며 current
와 total
을 가지고 메일 발송이 얼마나 진행되었는지 알 수 있습니다.
Progress Bar를 사용하여 메일 전송 진행률 출력
current
와 total
의 값을 계산하여 위와 같이 Progress Bar로 출력할 수 있습니다.
따라서 사용자는 메일이 발송되는 동안 ‘혹시 중간에 메일 발송이 끊기지 않을까?’ 걱정과 근심, 동글뱅이의 저주로 부터 벗어 날 수 있습니다.
PLUS: Celery Daemonization
Celery는 기본적으로 서비스 형태로 돌아가지 않기 때문에 데몬화로 매번 실행 시켜주지 않도록 합니다. Celery를 데몬화 하는 방법은 아래 문서들을 참고하시면 됩니다.
http://docs.celeryproject.org/en/latest/userguide/daemonizing.html
https://devlog.jwgo.kr/2019/07/05/celery-daemonization/
참고 자료
https://flask.palletsprojects.com/en/0.12.x/patterns/celery/
https://blog.miguelgrinberg.com/post/using-celery-with-flask
https://medium.com/@frassetto.stefano/flask-celery-howto-d106958a15fe
https://dgkim5360.tistory.com/entry/python-celery-asynchronous-system-with-redis