Computer

VHTS와 병렬 컴퓨팅: 2. python multiprocessing 을 이용한 분자 처리

Novelism 2022. 2. 27. 17:04

 

 

VHTS와 병렬 컴퓨팅: 1. 기초 개념, file split

에 이어 python multiprocessing을 이용한 분자 특성 계산방법을 설명하겠습니다.

 

 python에서 병렬 컴퓨팅을 지원하는 모듈은 여러 가지가 있습니다.

 저는 multiprocessing을 선호합니다.

 concurrent.futures 도 사용해보긴 했는데, 구현 자체는 쉽지만 core수가 많아지면 병렬화 효율성이 크게 떨어집니다.

작업환경은 EPIC 64 cores*2 cpus입니다.

 

분자 데이터 처리, 혹은 특성 계산은 많음은 수의 독립적인 데이터에 대해서 동일한 함수로 처리할 경우에 해당됩니다.

이런 경우에 적합한 병렬구조는 master/workers로 이루어진 구조입니다.

master가 잡을 관리 하면서 worker에게 잡을 분배하는 형태입니다.

그리고 효율적인 일처리를 위해선 queue라는 자료 구조가 적합합니다.

 queue라는 것은 한 줄 서기와 유사합니다.

 창구가 있고 사람들이 각 창구(프로세서)에 줄을 설 경우 (여러 줄 서기) 각 줄의 처리시간이 달라서 줄을 선 순서가 대로 차례가 오지 않습니다.

 물론 그것 자체는 병렬 컴퓨팅에선 별 문제없습니다. 병렬 컴퓨팅에서의 문제는, 여러 줄 서기를 했을 경우 특정 줄은 대기하는 잡이 없는데, 다른 줄은 대기가 길어서 끝나지 않는다는 것입니다. 왜냐하면 최종적으로 프로그램이 끝나는 것은 모든 서브 잡이 종료되었을 때이기 때문입니다. 누군가 놀고 있다는 것은 효율성에서 좋지 않고, 잡 부하가 균일하게 분산되는 편이 좋습니다.

한 줄 서기를 할 경우 어느 창구로든 갈 수 있기에 효율적입니다.

단, 병렬 컴퓨팅에서의 queue는 먼저 들어갔다고 해서 잡이 먼저 끝나는 것은 아닙니다. 데이터마다 잡이 끝날 때까지 걸리는 시간이 다르니까요.

 

 아래 코드는 multiprocessing을 사용해서 logP를 병렬로 계산하는 코드입니다.

import os
import pandas as pd
import numpy as np
from multiprocessing import Manager
from multiprocessing import Process
from multiprocessing import Queue

from rdkit import Chem
from rdkit.Chem import AllChem as AllChem
from rdkit.Chem import Descriptors


def creator(q, data, num_sub_proc):
    for d in data:
        idx = d[0]
        q.put((idx, d[1]))
    for i in range(0, num_sub_proc):
        q.put('DONE')


def worker(q, return_dict):

    pid = os.getpid()
    while True:
        qqq = q.get()
        if qqq == 'DONE':
            # print('proc =', pid)
            break

        (idx, d) = qqq
        mol_id = d[0]
        smi = d[1]
        m = Chem.MolFromSmiles(smi)
        if m is None:
            return_dict[idx] = np.nan
            continue
        logp = Descriptors.MolLogP(m)
        return_dict[idx] = logp

def main():

    num_sub_proc = 32
    data_file = 'smiles.csv'
    df = pd.read_csv(data_file, sep=',')
    num_data = df.shape[0]
    smiles_list = df[['MOL_ID', 'SMILES']].values.tolist()

    data = list(enumerate(smiles_list))
    num_data = len(data)
    num_sub_proc = min(num_sub_proc, num_data)

    q1 = Queue()
    manager = Manager()
    return_dict = manager.dict()
    proc_master = Process(target=creator,
                          args=(q1, data, num_sub_proc))
    proc_master.start()

    procs = []
    for sub_id in range(0, num_sub_proc):
        proc = Process(target=worker, args=(q1, return_dict))
        procs.append(proc)
        proc.start()
    q1.close()
    q1.join_thread()
    proc_master.join()
    for proc in procs:
        proc.join()
    keys = sorted(return_dict.keys())

    logp_list = list()
    for key in keys:
        logp = return_dict[key]
        logp_list += [logp]
    df2 = df.iloc[keys].copy()
    df2['logP'] = logp_list
    out_csv = 'logp.csv'
    df2.to_csv(out_csv, index=False)

if __name__ == "__main__":
    main()

 

서브 함수로 creator와 worker가 있습니다.

creator 함수는 master 서브 프로세스에 의해 호출되며, queue에 데이터를 넣어주는 함수입니다.

 queue에 들어가는 데이터는 (idx,(mol_id, smiles)) 입니다.

 queue에 데이터가 더 이상 없을 경우 종료되어야 하는데, 이 부분을 처리하기 위해서 'DONE'을 subprocess의 수만큼 더 넣어줬습니다.

 worker 함수는 worker 서브 프로세스들에 의해 호출되는 함수로, 실제 계산을 담당합니다.

queue로부터 데이터를 하나 받고 그 데이터의 smiles로부터 logP를 계산합니다.

 이때, 일반적인 함수와는 다르게 return_dict이라는 형태의 dictionary에 리턴 값을 입력해주게 됩니다.

 왜냐하면 일반적인 파이선 객체는 프로세스 사이에서 공유되지 않고, subprocess의 함수의 return 도 가져올 수 없기 때문입니다. 

 Manager 객체로부터 생성된 return_dict만 공유가 가능합니다.

또한, queue의 데이터는 넣어준 순서 그대로 유지되는 것이 아니라, 계산이 빨리 끝난 순으로 결과가 나오고, 또한 중간에 에러가 있을 경우 에러가 있는 데이터는 결과가 없을 수도 있어서 결과물이 어느 입력에 의한 것인지 명확히 식별할 수 있어야 합니다.

 dictionary를 사용할 경우, 입력 데이터의 index를 queue에 함께 넣어주고, 입력 데이터의 index를 return_dict의 키로 넣어주면 편리합니다.

 

 main 함수에선 pandas로 데이터를 읽고, 이를 queue에 넣어줄 수 있도록 변형합니다.

 num_sub_proc 가 사용할 worker subprocess의 수입니다. (추가로 master가 하나 더 실행됩니다.)

 manager, queue의 인스턴스와 master process, worker process의 인스턴스를 만들고,

각각을 실행해서 서브 프로세스를 생성합니다.

 잡이 다 진행되면 join 메서드를 실행합니다.

 

 return_dict에 모든 결과가 모여있으니 이것을 다시 읽어서 빠진 데이터를 체크하고, 빠진 데이터들은 dataframe에서 제외합니다. 계산된 데이터를 data frame에 추가한 후 csv로 저장했습니다.

 

 

여기서는 logP를 계산했지만, 분자의 sub-structure 검색이나, fingerprint, 유사성 비교 등 다양한 계산을 할 수 있습니다.

다만, 계산량이 적은 계산의 경우 num_sub_proc 수가 많아질 때 병렬화 효율이 급격하게 떨어지는 경우도 있습니다.

 데이터를 전달하는데 드는 부하가, 데이터를 받아서 수행하는데 드는 부하보다 더 커서 그렇습니다.

 이럴 때는 전체 분자 리스트를 별개의 파일로 쪼개고, 이 계산을 num_sub_proc를 줄여서 서로 다른 리스트에 대해 별개로 돌리는 것이 효율적입니다. 간단하게 말해서 병렬화된 코드를 다시 병렬화하는 것입니다.

때에 따라서 num_sub_proc 256으로 1개의 잡을 돌리는 것보다, num_sub_proc 16으로 16개의 잡을 돌리는 것이 훨씬 효율적입니다. 전에 해본 계산에선 모되는 시간이 4~8배 정도 차이가 났습니다. 병렬 컴퓨팅은 단순히 코드만 만들고 끝나는것이 아니라, 어디에 부하가 걸리는지를 제대로 파악하고 거기에 맞게 파라미터를 조절하는 것이 중요합니다.