Development Artist

[Python, FastAPI] 멀티프로세싱, 직렬화 / 역직렬화 심화 본문

TroubleShooting/Etc

[Python, FastAPI] 멀티프로세싱, 직렬화 / 역직렬화 심화

JMcunst 2025. 1. 24. 09:59
728x90
반응형

들어가며

파이썬에서 CPU 병렬 처리를 활용하려면 multiprocessing 모듈이 필수. multiprocessing은 데이터를 프로세스 간에 전달하기 위해 직렬화(Serialization)를 사용하며, 이는 파이썬의 pickle 모듈로 이루어진다.

 

하지만, 클래스 객체나 복잡한 데이터 구조를 다룰 때는 직렬화 과정에서 문제가 발생할 수 있다. 이번 포스팅에서는 직렬화/역직렬화의 기초부터 멀티프로세싱에서 자주 발생하는 문제와 해결 방법까지 알아보자.

 

직렬화와 역직렬화란?

1.1 직렬화(Serialization)

직렬화란 파이썬 객체를 바이트 스트림으로 변환하는 과정이다. 이를 통해 데이터를 파일에 저장하거나, 프로세스 간 데이터를 교환할 수 있다.

1.2 역직렬화(Deserialization)

역직렬화는 바이트 스트림을 다시 원래의 파이썬 객체로 복원하는 과정이다.

1.3 직렬화의 예제

파이썬에서 기본적으로 pickle 모듈을 사용하여 직렬화와 역직렬화를 처리한다.

import pickle

# 직렬화 대상 객체
data = {"name": "John", "age": 30, "city": "New York"}

# 직렬화
serialized_data = pickle.dumps(data)

# 역직렬화
deserialized_data = pickle.loads(serialized_data)

print("직렬화된 데이터:", serialized_data)
print("역직렬화된 데이터:", deserialized_data)

출력:

직렬화된 데이터: b'\x80\x04\x95...'
역직렬화된 데이터: {'name': 'John', 'age': 30, 'city': 'New York'}

멀티프로세싱에서 직렬화의 중요성

2.1 멀티프로세싱 기본 구조

multiprocessing 모듈은 프로세스 간 데이터를 전달할 때 pickle을 사용한다. 따라서, 전달하는 데이터는 반드시 직렬화 가능해야 한다.

예제:

from multiprocessing import Pool

def square(x):
    return x * x

if __name__ == "__main__":
    with Pool(4) as pool:
        results = pool.map(square, [1, 2, 3, 4])
        print(results)

2.2 직렬화 오류 발생

다음과 같이 직렬화할 수 없는 데이터를 전달하면 오류가 발생한다.

from multiprocessing import Pool

class MyClass:
    def __init__(self, value):
        self.value = value

    def __call__(self, x):
        return x + self.value

my_instance = MyClass(10)

if __name__ == "__main__":
    with Pool(4) as pool:
        results = pool.map(my_instance, [1, 2, 3, 4])

오류 메시지:

TypeError: can't pickle MyClass objects

멀티프로세싱에서의 직렬화 문제 해결

3.1 직렬화 테스트

멀티프로세싱에서 사용될 데이터를 직렬화할 수 있는지 확인하려면, pickle.dumps()를 사용한다.

import pickle

data = {"key": "value"}

try:
    pickle.dumps(data)
    print("직렬화 성공!")
except Exception as e:
    print(f"직렬화 실패: {e}")

이 테스트를 통해 직렬화 가능한 데이터인지 쉽게 확인할 수 있다.

3.2 직렬화 문제 해결 방법

3.2.1 클래스 메서드의 문제

클래스의 인스턴스 메서드(self.some_method)는 기본적으로 직렬화할 수 없다. 따라서, 이를 함수로 분리하거나 staticmethod로 선언해야 한다.

 

해결 방법:

from multiprocessing import Pool

class MyClass:
    def process(self, x):
        return x * x

def independent_function(x):
    return x * x

if __name__ == "__main__":
    my_instance = MyClass()

    # 1. 클래스 메서드를 사용하면 직렬화 오류 발생
    # with Pool(4) as pool:
    #     results = pool.map(my_instance.process, [1, 2, 3, 4])

    # 2. 함수로 대체
    with Pool(4) as pool:
        results = pool.map(independent_function, [1, 2, 3, 4])
        print(results)

3.2.2 Pydantic 객체 직렬화

Pydantic의 BaseModel 기반 객체는 기본적으로 직렬화가 가능하다. 그러나, 객체를 명시적으로 딕셔너리(dict())로 변환하면 직렬화 안정성이 더욱 높아진다.

from pydantic import BaseModel

class Meta(BaseModel):
    name: str
    version: int

meta = Meta(name="example", version=1)

# 직렬화 가능 확인
import pickle

try:
    pickle.dumps(meta.dict())  # dict로 변환 후 직렬화
    print("직렬화 성공!")
except Exception as e:
    print(f"직렬화 실패: {e}")

3.2.3 병렬 처리에서 Pydantic 객체 사용

멀티프로세싱에 Pydantic 객체를 사용할 경우, 아래와 같이 dict()로 변환 후 전달한다.

from multiprocessing import Pool
from pydantic import BaseModel

class Meta(BaseModel):
    name: str
    version: int

def process_data(data):
    # 데이터 처리
    print(data)
    return data["version"] * 2

if __name__ == "__main__":
    meta_list = [Meta(name=f"example_{i}", version=i) for i in range(5)]
    processes = [meta.dict() for meta in meta_list]  # dict로 변환

    with Pool(4) as pool:
        results = pool.map(process_data, processes)
        print(results)

최종 예제: 직렬화 문제를 해결한 멀티프로세싱

다음은 위에서 다룬 내용을 모두 반영한 최종 코드.

import pickle
from multiprocessing import Pool, cpu_count
from pydantic import BaseModel

# Pydantic 모델 정의
class Meta(BaseModel):
    name: str
    version: int

# 프로세스 함수
def process_data(data):
    print(f"Processing: {data}")
    return data["version"] * 2

# 직렬화 테스트
def test_serialization(data):
    try:
        pickle.dumps(data)
        print("직렬화 성공!")
    except Exception as e:
        print(f"직렬화 실패: {e}")

if __name__ == "__main__":
    # 데이터 준비
    meta_list = [Meta(name=f"example_{i}", version=i) for i in range(5)]
    processes = [meta.dict() for meta in meta_list]  # dict로 변환

    # 직렬화 가능 여부 확인
    for process in processes:
        test_serialization(process)

    # 멀티프로세싱 실행
    with Pool(cpu_count()) as pool:
        results = pool.map(process_data, processes)
        print("결과:", results)

출력:

직렬화 성공!
직렬화 성공!
직렬화 성공!
직렬화 성공!
직렬화 성공!
Processing: {'name': 'example_0', 'version': 0}
Processing: {'name': 'example_1', 'version': 1}
Processing: {'name': 'example_2', 'version': 2}
Processing: {'name': 'example_3', 'version': 3}
Processing: {'name': 'example_4', 'version': 4}
결과: [0, 2, 4, 6, 8]

결론

  • 멀티프로세싱에서 데이터를 전달하려면 pickle을 통한 직렬화가 필수다.
  • 클래스 메서드, Pydantic 객체 등 직렬화가 어려운 데이터를 사용할 경우, dict()로 변환하거나 독립된 함수로 대체하여 문제를 해결할 수 있다.
  • pickle.dumps()를 활용해 직렬화 가능 여부를 미리 테스트하면 문제를 조기에 파악할 수 있다.
728x90
반응형
Comments