다중 데이터 DB Input / Output 로직 성능 향상
최근에 새로운 기능을 추가해야하는 일이 있었는데, 요청사항은 이러하다.
- AS-IS : 현 시점 기준으로 하나의 사이트 오브젝트 목록을 받아온 후 보여준다.
- TO-BE : 사용자가 원하는 시점 기준으로 여러 사이트의 오브젝트 목록을 한 번에 받아와서 보여줘야 함.
Gap 차이로는
1. 사용자가 원하는 시점 기준으로 오브젝트를 볼 수 있음.
2. 하나 이상의 사이트를 모두 한눈에 볼 수 있음.
원하는 시점을 불러오는 기능 부분은 오브젝트 History 관련 내용이오니, 해당 글에서는 여러 사이트 목록을 한 번에 가져오는 로직의 성능 향상 부분에 대해서 다룹니다.
원래 하나의 사이트 오브젝트를 가져오는 로직은 단순하게 이루어진다.
def get_object_list(site: str): -> dict
try:
Q = f'''select * from fn_object_search(site)'''
result = DB.Query(Q, ...args)
return result
except Exception as e:
raise ObjectFindException(str(e))
다중으로 사이트의 오브젝트를 가져와야 하니 당연히 반복문을 사용하여 결과 값들을 합쳐서 한번에 return하면 되겠지 해서 다음과 같이 코드를 작성했었다.
import asyncio
def __get_site_object_list(site: str) -> dict:
try:
Q = f'''select * from fn_object_search($1)'''
result = DB.query(Q, site)
return result
except Exception as e:
raise ObjectFindException(str(e))
async def get_multiple_site_object_list(sites: list[str]) -> list[dict]:
tasks = [__get_site_object_list(site) for site in sites]
results = await asyncio.gather(*tasks, return_exceptions=False)
return results
결론적으로, 위의 코드를 통해 오브젝트를 가져오면 다음과 같은 성능을 나타냈다.
- 1개의 사이트 불러오기 : 평균 2.6초
- 8개의 사이트 불러오기 : 평균 23초
여기서 async를 사용한 비동기처리 함수는 코루틴으로 작동한다.
위의 코드로 빌드하여 개발 서버에 배포 후 이제 실제로 사용할 사이트의 담당자분에게 기능 테스트를 요청하였다.
테스트 결과로는 기능은 잘 동작하나, 로딩이 오래걸린다는 답변을 받게되었다.
분명 코루틴을 써서 가져왔는데 왜 느려지지...? 라는 생각을 처음으로 갖게 되었다.
사실 병렬 처리에 대해 멀티쓰레드가 많이 쓰인다 정도만 알고있지 보통 대부분이 코루틴에 대해서 깊게 생각하고 있진 않다.
일단 결론적으로 느려진 원인에 대해서 말하자면
내가 사용하던 Python DB Connection 라이브러리인 psycopg2는 동기 Blocking DB I/O 방식이기 때문에
비동기 코드에서 호출되더라도 DB 응답을 기다리는 동안 이벤트 루프가 멈추게 되고, 결국 순차적으로 실행되어 성능이 저하되었다.
위의 사유로, psycopg2 라이브러리에서는 코루틴 내부에서 Blocking 호출이 발생하여도
해당 Task가 실행되는 동안 이벤트 루프의 다른 Task가 스케줄링되지 못하고 대기한다.
동시성(concurrency)를 보장받지 못해요.....
아마 요즘 DB Connection 라이브러리들은 이러한 DB Blocking I/O를 병렬 처리하도록 지원하는 것으로 알고 있다.
확인해보니, psycopg3부터는 해당 기능이 개선되었다. 최신 라이브러리 최고...
라이브러리를 업데이트 하는 순간 실사용중인 서비스에 리스크가 너무 크고, 책임지고 바꿔야할 부분들이 많아지기 때문에 일단 나는 라이브러리 업그레이드는 보류한 채 개발 하였다.
그래서 멀티 쓰레드를 활용하여 여러 사이트를 가져오도록 로직을 수정하면 성능이 개선되지 않을까?
라는 방안을 고안하여 바로 로직에 적용해보았다.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict
# workers 수는 환경에 따라 맞게 설정해야 해요..!
executor = ThreadPoolExecutor(max_workers=10)
def __get_site_object_list(site: str) -> Dict:
try:
Q = "SELECT * FROM fn_object_search($1)"
result = DB.query(Q, site)
return result
except Exception as e:
raise ObjectFindException(str(e))
async def get_multiple_site_object_list(sites: List[str]) -> List[Dict]:
loop = asyncio.get_running_loop()
tasks = [loop.run_in_executor(executor, __get_site_object_list, site) for site in sites]
results = await asyncio.gather(*tasks, return_exceptions=False)
return results
이렇게 하면 백엔드 서버와 DB간 Data I/O Task에 멀티 쓰레드 로직을 적용할 수 있다.
결과는 ....!
- 1개의 사이트 불러오기 : 평균 2.6초 -> 작업 x
- 8개의 사이트 불러오기 : 평균 23초 -> 코루틴 적용
- 8개의 사이트 불러오기 : 평균 3.3초 -> 멀티 스레드 적용
N번의 Task 진행을 동시 Task 작업으로 변경하여 실제로 서비스 API 속도를 대폭 개선하였고
해당 담당자의 테스트 결과도 OK 되었다.
