본문 바로가기
▶ Back-End/Python

GPU 작업 관리 예제 - GMS-CORE

by 오늘도 코딩 2025. 1. 21.
728x90
반응형

Redis 큐 기반 GPU 별 작업 할당 및 처리

*자세한 설명 생략

 

 

▷ 시스템 구성도

 

 

▷ 프로젝트 구조

< 파일 따라가기 >

*순서대로 나열

 

▷ const.py

▷ redis_config.py

▷ gpu_batch_service.py

▷ gpu_service.py

▷ logger.py

▷ main.py

 

 

▷ const.py

"""
===========================
CONSTANTS
===========================
"""


"""
[GPU]
"""
GLOBAL_GPU_WORK = {} #GPU 작업 상태
GPU_MONITOR_KEY = "node-01"


"""
[Redis]
"""
REDIS_IP = ""
REDIS_PORT = 6379
REDIS_SUBSCRIBE = "gpu_channel"

"""
[TEST]
"""
TEST_IMG_PATH = "./test.png"

 

 

▷ redis_config.py

import redis
from app.config import const
from app.utils.logger import log


"""
===========================
Redis Config
===========================
"""

"""
Redis 클라이언트 설정
"""
redis_client = redis.StrictRedis(const.REDIS_IP, const.REDIS_PORT, socket_timeout=1)

async def start_subscribe():
    """
    Redis 연결 확인 후 채널 구독
    """
    # Redis 연결 확인 (연결 안 되었으면 재시도)
    while True:
        log.info("Redis 연결 시도")
        try:
            redis_client.ping()
            log.info("Redis 연결 성공")
            break  # 연결되면 루프 종료
        except redis.exceptions.RedisError:
            log.error("Redis 연결 실패")

    # Redis 채널 구독 시작
    pubsub = redis_client.pubsub()
    pubsub.subscribe(const.REDIS_SUBSCRIBE)
    log.info("Redis 구독 시작")

 

 

▷ gpu_batch_service.py

import asyncio
import json
import threading
import redis
import app.gpu.service.gpu_service as gpu_service
from app.config import const
from app.config.redis_config import redis_client, start_subscribe
from app.utils.logger import log

"""
===========================
GPU Batch Service
===========================
"""

async def gpu_queue_batch():
    """
    [큐 작업확인 및 작업 처리]

    》 큐에서 작업을 1초 주기로 확인 후 처리하는 함수
    """
    while True:
        try:

            # GPU 상태 확인
            gpu_info = await gpu_service.get_gpu_status()

            # GPU 상태 업데이트
            redis_client.set(const.GPU_MONITOR_KEY, json.dumps(gpu_info))
            
            # 사용 가능한 첫번째 GPU
            available_gpu = next(
                (
                    gpu
                    for gpu in gpu_info
                        if gpu["gpu_usage"] <= 10 
                        and not gpu_service.get_gpu_work(gpu["index"])
                ),
                None
            )

            if available_gpu:

                # GPU 작업 할당 
                gpu_index = available_gpu["index"]

                # 큐에서 작업을 꺼내서 GPU에 할당
                job_data = redis_client.lpop("gpu_image")

                if job_data:

                    # 배치 작업은 계속 반복되고, 각 GPU는 쓰레드를 생성하여 작업 처리
                    threading.Thread(target=gpu_service.gpu_job, args=(gpu_index,)).start()

            # 작업을 할당 못 한 경우 1초 후 다시 확인
            await asyncio.sleep(1)

        except redis.exceptions.RedisError:
            log.error("Redis 연결 끊김")
            await start_subscribe()

 

 

▷ gpu_service.py

import subprocess
import torch
import socket
from app.config import const
from app.utils.logger import log
from app.config.redis_config import redis_client 

"""
===========================
GPU Service
===========================
"""

def set_gpu_work(value: bool, gpu_index: int):
    """
    [GPU 작업 상태 설정]
    
    》 특정 GPU에 대해 작업 상태를 설정
    
    @param value: 작업 상태 (True: 작업 중, False: 작업 완료)
    @param gpu_index: GPU 인덱스
    """
    const.GLOBAL_GPU_WORK[gpu_index] = value


def get_gpu_work(gpu_index: int):
    """
    [GPU 작업 상태 조회]
    
    》 특정 GPU의 작업 상태 조회
    
    @param gpu_index: GPU 인덱스
    
    @return: 해당 GPU의 작업 상태 (작업 중: True, 작업 완료: False)
    """
    return const.GLOBAL_GPU_WORK.get(gpu_index, False)  
    

async def get_gpu_status():
    """
    [GPU 상태 조회 및 작업 가능 여부 확인]

    》 CUDA 사용 가능 여부 및 GPU 상태 정보, 각 GPU의 사용률 기준으로 작업 가능 여부를 반환

    @return
    """

    # CUDA 사용 가능 여부
    if not torch.cuda.is_available():
        return {"message": "CUDA is not available"}

    # 서버 IP 주소 가져오기
    ip_address = socket.gethostbyname(socket.gethostname())

    # nvidia-smi 명령어로 GPU 상태 정보 조회(CSV)
    result = subprocess.run(
        [
            "nvidia-smi", 
            "--query-gpu=index,gpu_name,uuid,utilization.gpu,memory.total,memory.used,memory.free", 
            "--format=csv,noheader,nounits"
        ],
        capture_output=True, text=True
    )

    # nvidia-smi 명령어 실패 시 오류 메시지 반환
    if result.returncode != 0:
        return {"message": "nvidia-smi Error", "error": result.stderr}

    # 결과 파싱
    gpu_info = []
    for line in result.stdout.strip().splitlines():
        index, gpu_name, uuid, usage, total_memory, used_memory, free_memory = line.split(",")
        gpu_usage = int(usage.strip())

        # GPU 정보 추가
        gpu_info.append({
            "ip_address": ip_address,
            "index": int(index.strip()),  # GPU 인덱스 추가
            "gpu_uuid": uuid.strip(),  # GPU UUID
            "gpu_name": gpu_name.strip(),  # GPU 이름
            "gpu_usage": gpu_usage,  # GPU 사용률 (%)
            "memory_total": int(total_memory.strip()),  # 총 메모리 (MB)
            "memory_used": int(used_memory.strip()),  # 사용 중인 메모리 (MB)
            "memory_free": int(free_memory.strip())  # 남은 메모리 (MB)
        })

    return gpu_info


def gpu_job(gpu_index: int):
    """
    [GPU 작업]
    
    》 특정 GPU 작업 처리
    
    @param gpu_index: GPU 인덱스

    @return: 생성된 이미지 파일 경로(FileResponse)
    """
    log.info("■■■■■■■■■■■■■■■■ GPU " + str(gpu_index) + " 작업 시작 ■■■■■■■■■■■■■■■■")

    try:
        # GPU 작업 상태 설정
        set_gpu_work(True, gpu_index)
        
        # cuda: index 로 GPU 작업 할당
        gpu = "cuda:" + str(gpu_index)

        # GPU 작업 수행
        tensor = torch.randn(10000, 10000).to(gpu)
        for _ in range(100):
            result = tensor * tensor + tensor - tensor
            result = result.cpu()

        # 이미지 처리
        with open(const.TEST_IMG_PATH, "rb") as file:
            image_bytes = file.read()

        # Redis에 결과 전송
        redis_client.publish(const.REDIS_SUBSCRIBE, image_bytes)
        log.info("■■■■■■■■■■■■■■■■ GPU " + str(gpu_index) + " 알람 완료 ■■■■■■■■■■■■■■■■")

        return image_bytes

    finally:
        set_gpu_work(False, gpu_index)
        log.info("■■■■■■■■■■■■■■■■ GPU " + str(gpu_index) + " 작업 종료 ■■■■■■■■■■■■■■■■")

 

 

▷ logger.py

import logging
import colorlog
import os
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime, timedelta

"""
===========================
Logger Setting
===========================
"""

def cleanup_logs(log_directory, days_to_keep=2):
    """
    [로그 파일 정리 함수]

    》 로그 파일을 정리하여 지정된 일수만큼의 파일만 유지

    @param log_directory: 로그 파일이 저장된 디렉터리
    @param days_to_keep: 유지할 로그 파일의 일수
    """
    now = datetime.now()
    cutoff_date = now - timedelta(days=days_to_keep)

    for filename in os.listdir(log_directory):
        # 로그 파일 이름이 "log_YYYY-MM-DD.log" 형식인지 확인
        if filename.startswith("log_") and filename.endswith(".log"):
            try:
                # 파일 이름에서 날짜를 추출
                file_date_str = filename[4:-4]  # "log_"와 ".log" 제거
                file_date = datetime.strptime(file_date_str, '%Y-%m-%d')
                
                # 보존 기간을 초과한 파일 삭제
                if file_date < cutoff_date:
                    os.remove(os.path.join(log_directory, filename))
            except ValueError:
                # 날짜 형식이 올바르지 않은 파일은 무시
                continue


def setup_logger():
    """
    [로거 설정]

    》 로거를 설정 및 기존 로그 파일을 정리
    """
    # 로그 디렉터리와 파일 이름 설정
    log_directory = "."
    log_filename = "log_" + datetime.now().strftime('%Y-%m-%d') + ".log"

    # 기존 로그 파일 정리
    cleanup_logs(log_directory, days_to_keep=2)

    # 로그 포맷 설정
    log_formatter = colorlog.ColoredFormatter(
        fmt='%(log_color)s%(levelname)s:     [%(asctime)s||%(thread)d] %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    # 콘솔 핸들러
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(log_formatter)
    console_handler.setLevel(logging.INFO)

    # 파일 핸들러
    log_file_handler = TimedRotatingFileHandler(
        log_filename, when='midnight', interval=1, backupCount=1, encoding='utf-8'
    )
    log_file_handler.setFormatter(logging.Formatter(
        fmt='[%(asctime)s||%(thread)d] %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    ))
    log_file_handler.setLevel(logging.INFO)

    # 로거 설정
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    logger.addHandler(console_handler)
    logger.addHandler(log_file_handler)

    return logger


log = setup_logger()

 

 

▷ main.py

import asyncio
from fastapi import FastAPI
from contextlib import asynccontextmanager
from app.gpu.service.gpu_batch_service import gpu_queue_batch
from app.config.redis_config import start_subscribe
from app.utils.logger import log

"""
서버 시작 시 Redis에서 주기적으로 큐를 확인하고 작업을 처리
"""
@asynccontextmanager
async def startApp(app: FastAPI):
    log.info("GMS_CORE 1.0.0 START")
    
    # 최초 Redis 연결 및 구독
    await start_subscribe()

    # GPU 작업 배치 시작
    asyncio.create_task(gpu_queue_batch())

    yield  # 서버 종료될 때까지 유지

"""
[서버 실행]

uvicorn main:app --reload --host 0.0.0.0 --port 9998
localhost:9998/docs
"""
app = FastAPI(lifespan=startApp)

 

 

▷ 관련 글

 

FAST API 설치

명령어 하나로 간단하게 설치*아래 공식 문서 참고  ▷ FAST API 설치// 모든 의존성 및 기능 설치// uvicorn(서버) 포함> pip install "fastapi[all]"// 부분 설치 가능> pip install fastapi> pip install uvicorn *명령

coding-today.tistory.com

 

FAST API Project 생성 방법

Visual Studio Code에서 FAST API Project 생성 방법과 서버 구동 및 결과 확인*자세한 설명 생략  ▷ Project 생성 방법 ① Project Directory 생성 ② main.py 생성 ③ 실행> uvicorn main:app --reload  ▷ 결과 확인*We

coding-today.tistory.com

 

GPU 작업 관리 예제 - GMS-WAS

Redis 큐 기반 GPU 별 작업 할당 및 처리*자세한 설명 생략  ▷ 시스템 구성도  ▷ 프로젝트 구조*필요한 부분만 필터링   *순서대로 나열 ▷ RedisConfig.java▷ TaskManager.java▷ IMG1000Controller.java▷ I

coding-today.tistory.com

 

GPU 작업 관리 예제 - 배포 및 결과 확인

Redis 큐 기반 GPU 별 작업 할당 및 처리*자세한 설명 생략  ▷ 시스템 구성도  ▷ Redis 설치 및 구동*Docker에서 간단하게 설치해서 사용# Redis Docker 이미지 다운로드docker pull redis:7.0.11# Redis 컨테이

coding-today.tistory.com

 

 

728x90
728x90

'▶ Back-End > Python' 카테고리의 다른 글

Windows 10 PyTorch 설치  (0) 2024.12.16
FAST API Project 생성 방법  (0) 2024.11.28
FAST API 설치  (2) 2024.11.28
Django 기초 예제 - 게시판(수정)  (0) 2024.02.19
Django 기초 예제 - 게시판(삭제)  (0) 2024.02.16

댓글