Celery로 Flask에서 비동기 처리하기

Programming

라온화이트햇 핵심연구팀 최정수

Celery는?

Flask는 프로세스를 동기적(Synchronous)으로 처리하기 때문에 업로드된 데이터 처리 또는 이메일 전송과 같이 다소 오래 걸리는 작업이나 무거운 연산 같은 경우 사용자는 웹 서버의 처리가 모두 마무리될 때까지 기다려야 합니다.

따라서 비동기로 실행하도록 직접 구현할 수 도 있지만 미리 구현된 비동기 작업 큐 라이브러리를 사용하는 것이 능률적인 면에서 뛰어납니다.

Celery는 Python으로 작성된 비동기 작업 큐(Asynchronous task queue/job queue)이기 때문에 Flask와 같은 Python Web Framework에 붙여서 사용하기 수월합니다.

Celery의 구성

/assets/42d61630-a1de-4d35-81fb-eb412914c5d7/d1012894-eded-4168-8e03-3b0ac88e7edf.png

[Flask + Celery + redis]의 전반적인 아키텍처

Celery는 크게 3가지 구성 요소가 있습니다.

  1. Celery Client

    백그라운드 작업을 요청하는데 사용 됩니다. 이 문서에서는 Flask가 Celery Client가 됩니다.

  2. Celery Workers

    Flask와 동일한 서버에서 백그라운드 작업을 실행하는데 사용 됩니다.

  3. Message Broker

    클라이언트는 메시지 큐를 통해 작업자와 통신 하며 Celery는 이러한 큐를 구현하는 여러 가지 방법을 지원합니다. 가장 일반적으로 사용되는 브로커는 RabbitMQ 및 Redis 입니다.

Celery 예제

Celery를 간단하게 사용해보도록 하겠습니다.

pip install celery
pip install redis

우선 celery를 사용하기 위해서 Celery Workers인 celery를 설치하고 Message Broker로 redis를 설치해줍니다.

# tasks.py
from celery import Celery

BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
app = Celery('tasks', broker=BROKER_URL, backend=CELERY_RESULT_BACKEND)

@app.task
def add(x, y):
        return x + y

BROKER_URLMessage Brocker의 주소로 redis를 사용하기 때문에 redis 주소를 넣어줍니다.

CELERY_RESULT_BACKEND은 작업을 저장할 backend 주소로 똑같이 redis를 사용하므로 redis 주소를 넣어줍니다.

app이라는 객체를 Celery로 만들어줍니다. Celery의 첫 번째 인자는 Celery에서 사용할 이름입니다. 또한 Celery의 task로 add 함수를 사용하기 위해서 add 함수를 작성해줍니다.

celery -A tasks worker --loglevel=info

이제 celery를 실행할 차례입니다. app(Celery 객체)을 만들 때 Celery의 이름을 tasks로 설정했기 때문에 -A 인자로 tasks를 넘겨주게 됩니다.

celery -A tasks worker --loglevel=info --autoscale=10,3

기본적으로 celery는 1개의 worker로 동작하며 worker를 추가하고 싶으면 --autoscale인자를 사용하여 worker를 추가할 수 있습니다. 위에는 최대 10개, 최소 3개의 worker를 사용하여 task가 동시에 최소 3개, 최대 10개가 처리 될 수 있다는 뜻입니다.

/assets/c8abca99-7b7b-4e19-8c71-01361a5f0cb3/4ef0745d-424a-4636-95f3-d15eb73b9b02.png

Celery 실행

Celery를 실행하면 위와 같이 간단한 정보가 출력되며 redis에 접속이 되었고 준비가 되었다는 메세지를 확인할 수 있습니다.

Celery의 Celery Workers는 Celery, Message Broker는 Redis, Celery Client는 아직 정의하지 않았습니다.

간단한 예제 코드임으로 Celery Client로 Flask가 아닌 python interpreter를 사용하도록 하겠습니다.

/assets/8e3de390-81c8-44a2-b496-3280902b3ef7/d1a26be4-9325-4e9f-ba93-d4b207847325.png

python interpreter에서 add 함수를 celery를 통해 실행

python interpreter에서 위에서 작성한 tasks.py파일의 add함수를 로드해주고 delay 함수를 사용하여 Celery에게 작업 명령을 내립니다.

/assets/7badeda1-0cb4-4c28-97d3-afb724301411/420a7439-4d2d-4c43-99e5-b8636fd86236.png

celery에서 task를 받고 실행된 결과

python interpreter에서 요청된 add 함수는 Celery의 로그를 통해 확인할 수 있습니다.

[2020-03-22 23:26:42,448: INFO/MainProcess]
Received task: tasks.add[6abc6f77-071b-4e42-b137-a20ad7f8b290]

python interpreter에서 요청한 task가 MainProcess로 부터 도착 한 것을 알 수 있으며 task의 id는 6abc6f77-071b-4e42-b137-a20ad7f8b290 입니다.

[2020-03-22 23:26:42,453: INFO/ForkPoolWorker-1]
Task tasks.add[6abc6f77-071b-4e42-b137-a20ad7f8b290]
succeeded in 0.0032477990025654435s: 31337

ForkPoolWorker-1가 id가 6abc6f77-071b-4e42-b137-a20ad7f8b290인 task를 처리 했으며 실행 시간으로 0.0032477990025654435s가 걸렸고 결과 값이 31337이라는 것을 알 수 있습니다.

/assets/04c29365-c351-454a-a73e-7e5902746ed4/fdff8b51-2f5b-4e32-ac8b-8af49f8a9799.png

ready 함수와 get 함수의 실행

ready() 함수는 작업이 완료 되었는지 확인하는 함수 입니다. True일 경우 작업이 완료되었다는 뜻입니다. 반대로 False인 경우 작업이 진행중이라는 뜻입니다.

get() 함수는 실행 결과를 가져오는 함수입니다.

Celery를 APT Zero에 적용하기

그럼 본격적으로 Flask로 짜여진 APT Zero에 Celery를 적용해보도록 하겠습니다.

APT Zero는 상황에 따라 수 백건의 메일을 보내야합니다. 수 백건의 메일을 발송하는 동안 사용자가 대기하지 않도록 Celery를 적용하여 처리할 수 있습니다.

@app.route("/send", methods=["GET"])
def send():
	conn_redis = get_redis()
	temp = conn_redis.get('mail_progress')
	if temp is None: temp = {}
	else: temp = json.loads(temp)

	mail_list = get_mail_list()

	task = mail_send_process.apply_async([mail_list])
	temp[training_id] = task.id

	conn_redis.set('mail_progress', json.dumps(temp))
	return jsonify({"error": 0})

/send URL을 통해 메일 전송을 하면 redis를 사용하여 mail_progress라는 key에 데이터를 불러옵니다. 해당 데이터는 메일 전송 작업의 진행도를 출력하는 페이지에서 다시 사용합니다. (Celery에서 Message Broker로 사용되는 redis와는 다릅니다. Celery에서 Message Broker로 사용되는 redis는 Celery 내부에서 동작, 사용됩니다. 따라서 사용자는 redis를 import 하지 않아도 됩니다.)

전송 할 mail_list를 받아온 뒤 @celery.task로 정의한 함수인 mail_send_process함수에 mail_list를 인자로 주어 호출합니다.

@celery.task 로 정의한 함수의 사용은 위 예제에서 본 것처럼 delay 함수를 사용해서 호출 할 수 있으며, 추가적으로 apply_async함수를 사용해서 호출 할 수 있습니다. delay 함수와 apply_async함수의 차이로는 apply_async함수는 인자를 list형태로 전달해야 된다는 차이가 있습니다.

mail_send_process.apply_async함수가 실행되면 Celery 내부에서 task가 등록이 됩니다. 해당 task의 id를 redis에 넣어줍니다. 이 부분은 위에서 언급했듯이 Celery에서 Message Broker로 사용하기 위해 redis를 사용하는 것이 아닙니다.

@celery.task(bind=True)
def mail_send_process(self, mail_list):
	with app.app_context():
		total = len(mail_list) - 1
		for idx, mail in enumerate(mail_list):
			# mail_send #
			time.sleep(2.0/random.randint(1,4))
			self.update_state(state='PROGRESS', meta={'current': idx, 'total': total})
		return {'current': idx, 'total': total}

그럼 이제 @celery.task로 정의된 mail_send_process함수에 대해서 살펴봅시다.

Celery는 다른 프로세스로 돌기 때문에 with app.app_context()안에서 작업을 해줘야 flask에서 하던 작업들을 이어 할 수 있습니다.

mail_send_process함수에서는 mail_list만큼 반복문을 돌면서 mail을 전송하게 됩니다. 메일을 전송하는 과정은 생략하고 메일 전송시간이 랜덤으로 0.5초 ~ 2초가 걸린다고 가정하고 sleep 함수를 걸어두었습니다. (메일이 발송되는 시간이라고 생각하시면 될 것 같습니다.)

메일이 전송이 되면 update_state함수로 Celery task의 상태를 update 해줍니다. state는 task의 상태, meta는 전체 메일 발송 갯수 중 몇 개의 메일이 발송 완료 되었는지 확인해주기 위해서 인덱스 값을 넣어 줍니다.

Celery task가 실행됨에 따라서 current 값이 계속 업데이트 됩니다. 이제 우리는 메일 전송 작업의 진행도를 출력해주는 페이지에서 해당 값을 가져와서 progress bar로 표현을 해봅시다.

@app.route("/progress", methods=["POST"])
def get_progress():
	task = mail_send_process.AsyncResult(request.form['task_id'])
	if task.state == 'PENDING':
		response = {'state': task.state, 'current': 0, 'total': 1}
	elif task.state != 'FAILURE':
		response = {'state': task.state, 'current': task.info.get('current', 0), 'total': task.info.get('total', 1)}
	return jsonify(response)

Celery task의 현재 실행 결과를 알려주는 함수는 AsyncResult함수입니다. mail_send_process.AsyncResult(task_id)와 같이 사용되며 메일 전송 작업의 진행도를 출력해주는 페이지에서 ajax를 사용하여 현재 상태 값을 불러올 것이기 때문에 위와 같이 flask로 함수를 작성해줍니다.

/assets/3fe2098f-4fd6-4f6a-8138-bf2b43a4adb6/aba03af6-a4ae-4373-8291-339b32606157.png

get_progress 결과 1

/assets/67f1fa00-ab9f-4ae4-a737-429b00e25d97/68d40757-eed5-4cff-a96e-a93a2a72a414.png

get_progress 결과 2

ajax를 사용하여 /progress에 요청하면 get_progress함수가 실행되고 그 결과 입니다. 현재 메일 발송이 진행중이기 때문에 statePROGRESS로 표시되며 currenttotal을 가지고 메일 발송이 얼마나 진행되었는지 알 수 있습니다.

/assets/b2e1541d-8974-469f-b95b-9378d0afe1b6/4ed9b2b1-d96b-49ba-a495-75babd584ebf.png

Progress Bar를 사용하여 메일 전송 진행률 출력

currenttotal의 값을 계산하여 위와 같이 Progress Bar로 출력할 수 있습니다.

/assets/8170c614-7638-4d50-8b30-a92ef4cb25ed/c41731ac-55a2-4c68-bab2-7afc4e62dba0.gif

따라서 사용자는 메일이 발송되는 동안 ‘혹시 중간에 메일 발송이 끊기지 않을까?’ 걱정과 근심, 동글뱅이의 저주로 부터 벗어 날 수 있습니다.

PLUS: Celery Daemonization

Celery는 기본적으로 서비스 형태로 돌아가지 않기 때문에 데몬화로 매번 실행 시켜주지 않도록 합니다. Celery를 데몬화 하는 방법은 아래 문서들을 참고하시면 됩니다.

http://docs.celeryproject.org/en/latest/userguide/daemonizing.html

https://devlog.jwgo.kr/2019/07/05/celery-daemonization/

참고 자료

https://flask.palletsprojects.com/en/0.12.x/patterns/celery/

https://blog.miguelgrinberg.com/post/using-celery-with-flask

https://medium.com/@frassetto.stefano/flask-celery-howto-d106958a15fe

https://dgkim5360.tistory.com/entry/python-celery-asynchronous-system-with-redis