일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- 알고리즘
- 동적계획법
- DFS
- django
- codingtest
- DART
- issue
- Flutter
- cos pro 1급
- Algorithm
- 파이썬
- 안드로이드
- 개발
- 안드로이드스튜디오
- C++
- DFS와BFS
- cos
- 코딩테스트
- 코드품앗이
- BAEKJOON
- cos pro
- 백준
- 분할정복
- 동적계획법과최단거리역추적
- vuejs
- Python
- Vue
- 코테
- android
- AndroidStudio
- Today
- Total
Development Artist
[Python, FastAPI] 멀티프로세싱, 직렬화 / 역직렬화 심화 본문
들어가며
파이썬에서 CPU 병렬 처리를 활용하려면 multiprocessing 모듈이 필수. multiprocessing은 데이터를 프로세스 간에 전달하기 위해 직렬화(Serialization)를 사용하며, 이는 파이썬의 pickle 모듈로 이루어진다.
하지만, 클래스 객체나 복잡한 데이터 구조를 다룰 때는 직렬화 과정에서 문제가 발생할 수 있다. 이번 포스팅에서는 직렬화/역직렬화의 기초부터 멀티프로세싱에서 자주 발생하는 문제와 해결 방법까지 알아보자.
직렬화와 역직렬화란?
1.1 직렬화(Serialization)
직렬화란 파이썬 객체를 바이트 스트림으로 변환하는 과정이다. 이를 통해 데이터를 파일에 저장하거나, 프로세스 간 데이터를 교환할 수 있다.
1.2 역직렬화(Deserialization)
역직렬화는 바이트 스트림을 다시 원래의 파이썬 객체로 복원하는 과정이다.
1.3 직렬화의 예제
파이썬에서 기본적으로 pickle 모듈을 사용하여 직렬화와 역직렬화를 처리한다.
import pickle
# 직렬화 대상 객체
data = {"name": "John", "age": 30, "city": "New York"}
# 직렬화
serialized_data = pickle.dumps(data)
# 역직렬화
deserialized_data = pickle.loads(serialized_data)
print("직렬화된 데이터:", serialized_data)
print("역직렬화된 데이터:", deserialized_data)
출력:
직렬화된 데이터: b'\x80\x04\x95...'
역직렬화된 데이터: {'name': 'John', 'age': 30, 'city': 'New York'}
멀티프로세싱에서 직렬화의 중요성
2.1 멀티프로세싱 기본 구조
multiprocessing 모듈은 프로세스 간 데이터를 전달할 때 pickle을 사용한다. 따라서, 전달하는 데이터는 반드시 직렬화 가능해야 한다.
예제:
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(square, [1, 2, 3, 4])
print(results)
2.2 직렬화 오류 발생
다음과 같이 직렬화할 수 없는 데이터를 전달하면 오류가 발생한다.
from multiprocessing import Pool
class MyClass:
def __init__(self, value):
self.value = value
def __call__(self, x):
return x + self.value
my_instance = MyClass(10)
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(my_instance, [1, 2, 3, 4])
오류 메시지:
TypeError: can't pickle MyClass objects
멀티프로세싱에서의 직렬화 문제 해결
3.1 직렬화 테스트
멀티프로세싱에서 사용될 데이터를 직렬화할 수 있는지 확인하려면, pickle.dumps()를 사용한다.
import pickle
data = {"key": "value"}
try:
pickle.dumps(data)
print("직렬화 성공!")
except Exception as e:
print(f"직렬화 실패: {e}")
이 테스트를 통해 직렬화 가능한 데이터인지 쉽게 확인할 수 있다.
3.2 직렬화 문제 해결 방법
3.2.1 클래스 메서드의 문제
클래스의 인스턴스 메서드(self.some_method)는 기본적으로 직렬화할 수 없다. 따라서, 이를 함수로 분리하거나 staticmethod로 선언해야 한다.
해결 방법:
from multiprocessing import Pool
class MyClass:
def process(self, x):
return x * x
def independent_function(x):
return x * x
if __name__ == "__main__":
my_instance = MyClass()
# 1. 클래스 메서드를 사용하면 직렬화 오류 발생
# with Pool(4) as pool:
# results = pool.map(my_instance.process, [1, 2, 3, 4])
# 2. 함수로 대체
with Pool(4) as pool:
results = pool.map(independent_function, [1, 2, 3, 4])
print(results)
3.2.2 Pydantic 객체 직렬화
Pydantic의 BaseModel 기반 객체는 기본적으로 직렬화가 가능하다. 그러나, 객체를 명시적으로 딕셔너리(dict())로 변환하면 직렬화 안정성이 더욱 높아진다.
from pydantic import BaseModel
class Meta(BaseModel):
name: str
version: int
meta = Meta(name="example", version=1)
# 직렬화 가능 확인
import pickle
try:
pickle.dumps(meta.dict()) # dict로 변환 후 직렬화
print("직렬화 성공!")
except Exception as e:
print(f"직렬화 실패: {e}")
3.2.3 병렬 처리에서 Pydantic 객체 사용
멀티프로세싱에 Pydantic 객체를 사용할 경우, 아래와 같이 dict()로 변환 후 전달한다.
from multiprocessing import Pool
from pydantic import BaseModel
class Meta(BaseModel):
name: str
version: int
def process_data(data):
# 데이터 처리
print(data)
return data["version"] * 2
if __name__ == "__main__":
meta_list = [Meta(name=f"example_{i}", version=i) for i in range(5)]
processes = [meta.dict() for meta in meta_list] # dict로 변환
with Pool(4) as pool:
results = pool.map(process_data, processes)
print(results)
최종 예제: 직렬화 문제를 해결한 멀티프로세싱
다음은 위에서 다룬 내용을 모두 반영한 최종 코드.
import pickle
from multiprocessing import Pool, cpu_count
from pydantic import BaseModel
# Pydantic 모델 정의
class Meta(BaseModel):
name: str
version: int
# 프로세스 함수
def process_data(data):
print(f"Processing: {data}")
return data["version"] * 2
# 직렬화 테스트
def test_serialization(data):
try:
pickle.dumps(data)
print("직렬화 성공!")
except Exception as e:
print(f"직렬화 실패: {e}")
if __name__ == "__main__":
# 데이터 준비
meta_list = [Meta(name=f"example_{i}", version=i) for i in range(5)]
processes = [meta.dict() for meta in meta_list] # dict로 변환
# 직렬화 가능 여부 확인
for process in processes:
test_serialization(process)
# 멀티프로세싱 실행
with Pool(cpu_count()) as pool:
results = pool.map(process_data, processes)
print("결과:", results)
출력:
직렬화 성공!
직렬화 성공!
직렬화 성공!
직렬화 성공!
직렬화 성공!
Processing: {'name': 'example_0', 'version': 0}
Processing: {'name': 'example_1', 'version': 1}
Processing: {'name': 'example_2', 'version': 2}
Processing: {'name': 'example_3', 'version': 3}
Processing: {'name': 'example_4', 'version': 4}
결과: [0, 2, 4, 6, 8]
결론
- 멀티프로세싱에서 데이터를 전달하려면 pickle을 통한 직렬화가 필수다.
- 클래스 메서드, Pydantic 객체 등 직렬화가 어려운 데이터를 사용할 경우, dict()로 변환하거나 독립된 함수로 대체하여 문제를 해결할 수 있다.
- pickle.dumps()를 활용해 직렬화 가능 여부를 미리 테스트하면 문제를 조기에 파악할 수 있다.