Redis 큐 기반 GPU 별 작업 할당 및 처리
*자세한 설명 생략
▷ 시스템 구성도
▷ 프로젝트 구조
< 파일 따라가기 >
*순서대로 나열
▷ const.py
"""
===========================
CONSTANTS
===========================
"""
"""
[GPU]
"""
GLOBAL_GPU_WORK = {} #GPU 작업 상태
GPU_MONITOR_KEY = "node-01"
"""
[Redis]
"""
REDIS_IP = ""
REDIS_PORT = 6379
REDIS_SUBSCRIBE = "gpu_channel"
"""
[TEST]
"""
TEST_IMG_PATH = "./test.png"
▷ redis_config.py
import redis
from app.config import const
from app.utils.logger import log
"""
===========================
Redis Config
===========================
"""
"""
Redis 클라이언트 설정
"""
redis_client = redis.StrictRedis(const.REDIS_IP, const.REDIS_PORT, socket_timeout=1)
async def start_subscribe():
"""
Redis 연결 확인 후 채널 구독
"""
# Redis 연결 확인 (연결 안 되었으면 재시도)
while True:
log.info("Redis 연결 시도")
try:
redis_client.ping()
log.info("Redis 연결 성공")
break # 연결되면 루프 종료
except redis.exceptions.RedisError:
log.error("Redis 연결 실패")
# Redis 채널 구독 시작
pubsub = redis_client.pubsub()
pubsub.subscribe(const.REDIS_SUBSCRIBE)
log.info("Redis 구독 시작")
▷ gpu_batch_service.py
import asyncio
import json
import threading
import redis
import app.gpu.service.gpu_service as gpu_service
from app.config import const
from app.config.redis_config import redis_client, start_subscribe
from app.utils.logger import log
"""
===========================
GPU Batch Service
===========================
"""
async def gpu_queue_batch():
"""
[큐 작업확인 및 작업 처리]
》 큐에서 작업을 1초 주기로 확인 후 처리하는 함수
"""
while True:
try:
# GPU 상태 확인
gpu_info = await gpu_service.get_gpu_status()
# GPU 상태 업데이트
redis_client.set(const.GPU_MONITOR_KEY, json.dumps(gpu_info))
# 사용 가능한 첫번째 GPU
available_gpu = next(
(
gpu
for gpu in gpu_info
if gpu["gpu_usage"] <= 10
and not gpu_service.get_gpu_work(gpu["index"])
),
None
)
if available_gpu:
# GPU 작업 할당
gpu_index = available_gpu["index"]
# 큐에서 작업을 꺼내서 GPU에 할당
job_data = redis_client.lpop("gpu_image")
if job_data:
# 배치 작업은 계속 반복되고, 각 GPU는 쓰레드를 생성하여 작업 처리
threading.Thread(target=gpu_service.gpu_job, args=(gpu_index,)).start()
# 작업을 할당 못 한 경우 1초 후 다시 확인
await asyncio.sleep(1)
except redis.exceptions.RedisError:
log.error("Redis 연결 끊김")
await start_subscribe()
▷ gpu_service.py
import subprocess
import torch
import socket
from app.config import const
from app.utils.logger import log
from app.config.redis_config import redis_client
"""
===========================
GPU Service
===========================
"""
def set_gpu_work(value: bool, gpu_index: int):
"""
[GPU 작업 상태 설정]
》 특정 GPU에 대해 작업 상태를 설정
@param value: 작업 상태 (True: 작업 중, False: 작업 완료)
@param gpu_index: GPU 인덱스
"""
const.GLOBAL_GPU_WORK[gpu_index] = value
def get_gpu_work(gpu_index: int):
"""
[GPU 작업 상태 조회]
》 특정 GPU의 작업 상태 조회
@param gpu_index: GPU 인덱스
@return: 해당 GPU의 작업 상태 (작업 중: True, 작업 완료: False)
"""
return const.GLOBAL_GPU_WORK.get(gpu_index, False)
async def get_gpu_status():
"""
[GPU 상태 조회 및 작업 가능 여부 확인]
》 CUDA 사용 가능 여부 및 GPU 상태 정보, 각 GPU의 사용률 기준으로 작업 가능 여부를 반환
@return
"""
# CUDA 사용 가능 여부
if not torch.cuda.is_available():
return {"message": "CUDA is not available"}
# 서버 IP 주소 가져오기
ip_address = socket.gethostbyname(socket.gethostname())
# nvidia-smi 명령어로 GPU 상태 정보 조회(CSV)
result = subprocess.run(
[
"nvidia-smi",
"--query-gpu=index,gpu_name,uuid,utilization.gpu,memory.total,memory.used,memory.free",
"--format=csv,noheader,nounits"
],
capture_output=True, text=True
)
# nvidia-smi 명령어 실패 시 오류 메시지 반환
if result.returncode != 0:
return {"message": "nvidia-smi Error", "error": result.stderr}
# 결과 파싱
gpu_info = []
for line in result.stdout.strip().splitlines():
index, gpu_name, uuid, usage, total_memory, used_memory, free_memory = line.split(",")
gpu_usage = int(usage.strip())
# GPU 정보 추가
gpu_info.append({
"ip_address": ip_address,
"index": int(index.strip()), # GPU 인덱스 추가
"gpu_uuid": uuid.strip(), # GPU UUID
"gpu_name": gpu_name.strip(), # GPU 이름
"gpu_usage": gpu_usage, # GPU 사용률 (%)
"memory_total": int(total_memory.strip()), # 총 메모리 (MB)
"memory_used": int(used_memory.strip()), # 사용 중인 메모리 (MB)
"memory_free": int(free_memory.strip()) # 남은 메모리 (MB)
})
return gpu_info
def gpu_job(gpu_index: int):
"""
[GPU 작업]
》 특정 GPU 작업 처리
@param gpu_index: GPU 인덱스
@return: 생성된 이미지 파일 경로(FileResponse)
"""
log.info("■■■■■■■■■■■■■■■■ GPU " + str(gpu_index) + " 작업 시작 ■■■■■■■■■■■■■■■■")
try:
# GPU 작업 상태 설정
set_gpu_work(True, gpu_index)
# cuda: index 로 GPU 작업 할당
gpu = "cuda:" + str(gpu_index)
# GPU 작업 수행
tensor = torch.randn(10000, 10000).to(gpu)
for _ in range(100):
result = tensor * tensor + tensor - tensor
result = result.cpu()
# 이미지 처리
with open(const.TEST_IMG_PATH, "rb") as file:
image_bytes = file.read()
# Redis에 결과 전송
redis_client.publish(const.REDIS_SUBSCRIBE, image_bytes)
log.info("■■■■■■■■■■■■■■■■ GPU " + str(gpu_index) + " 알람 완료 ■■■■■■■■■■■■■■■■")
return image_bytes
finally:
set_gpu_work(False, gpu_index)
log.info("■■■■■■■■■■■■■■■■ GPU " + str(gpu_index) + " 작업 종료 ■■■■■■■■■■■■■■■■")
▷ logger.py
import logging
import colorlog
import os
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime, timedelta
"""
===========================
Logger Setting
===========================
"""
def cleanup_logs(log_directory, days_to_keep=2):
"""
[로그 파일 정리 함수]
》 로그 파일을 정리하여 지정된 일수만큼의 파일만 유지
@param log_directory: 로그 파일이 저장된 디렉터리
@param days_to_keep: 유지할 로그 파일의 일수
"""
now = datetime.now()
cutoff_date = now - timedelta(days=days_to_keep)
for filename in os.listdir(log_directory):
# 로그 파일 이름이 "log_YYYY-MM-DD.log" 형식인지 확인
if filename.startswith("log_") and filename.endswith(".log"):
try:
# 파일 이름에서 날짜를 추출
file_date_str = filename[4:-4] # "log_"와 ".log" 제거
file_date = datetime.strptime(file_date_str, '%Y-%m-%d')
# 보존 기간을 초과한 파일 삭제
if file_date < cutoff_date:
os.remove(os.path.join(log_directory, filename))
except ValueError:
# 날짜 형식이 올바르지 않은 파일은 무시
continue
def setup_logger():
"""
[로거 설정]
》 로거를 설정 및 기존 로그 파일을 정리
"""
# 로그 디렉터리와 파일 이름 설정
log_directory = "."
log_filename = "log_" + datetime.now().strftime('%Y-%m-%d') + ".log"
# 기존 로그 파일 정리
cleanup_logs(log_directory, days_to_keep=2)
# 로그 포맷 설정
log_formatter = colorlog.ColoredFormatter(
fmt='%(log_color)s%(levelname)s: [%(asctime)s||%(thread)d] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 콘솔 핸들러
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_formatter)
console_handler.setLevel(logging.INFO)
# 파일 핸들러
log_file_handler = TimedRotatingFileHandler(
log_filename, when='midnight', interval=1, backupCount=1, encoding='utf-8'
)
log_file_handler.setFormatter(logging.Formatter(
fmt='[%(asctime)s||%(thread)d] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
log_file_handler.setLevel(logging.INFO)
# 로거 설정
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(console_handler)
logger.addHandler(log_file_handler)
return logger
log = setup_logger()
▷ main.py
import asyncio
from fastapi import FastAPI
from contextlib import asynccontextmanager
from app.gpu.service.gpu_batch_service import gpu_queue_batch
from app.config.redis_config import start_subscribe
from app.utils.logger import log
"""
서버 시작 시 Redis에서 주기적으로 큐를 확인하고 작업을 처리
"""
@asynccontextmanager
async def startApp(app: FastAPI):
log.info("GMS_CORE 1.0.0 START")
# 최초 Redis 연결 및 구독
await start_subscribe()
# GPU 작업 배치 시작
asyncio.create_task(gpu_queue_batch())
yield # 서버 종료될 때까지 유지
"""
[서버 실행]
uvicorn main:app --reload --host 0.0.0.0 --port 9998
localhost:9998/docs
"""
app = FastAPI(lifespan=startApp)
▷ 관련 글
FAST API 설치
명령어 하나로 간단하게 설치*아래 공식 문서 참고 ▷ FAST API 설치// 모든 의존성 및 기능 설치// uvicorn(서버) 포함> pip install "fastapi[all]"// 부분 설치 가능> pip install fastapi> pip install uvicorn *명령
coding-today.tistory.com
FAST API Project 생성 방법
Visual Studio Code에서 FAST API Project 생성 방법과 서버 구동 및 결과 확인*자세한 설명 생략 ▷ Project 생성 방법 ① Project Directory 생성 ② main.py 생성 ③ 실행> uvicorn main:app --reload ▷ 결과 확인*We
coding-today.tistory.com
GPU 작업 관리 예제 - GMS-WAS
Redis 큐 기반 GPU 별 작업 할당 및 처리*자세한 설명 생략 ▷ 시스템 구성도 ▷ 프로젝트 구조*필요한 부분만 필터링 *순서대로 나열 ▷ RedisConfig.java▷ TaskManager.java▷ IMG1000Controller.java▷ I
coding-today.tistory.com
GPU 작업 관리 예제 - 배포 및 결과 확인
Redis 큐 기반 GPU 별 작업 할당 및 처리*자세한 설명 생략 ▷ 시스템 구성도 ▷ Redis 설치 및 구동*Docker에서 간단하게 설치해서 사용# Redis Docker 이미지 다운로드docker pull redis:7.0.11# Redis 컨테이
coding-today.tistory.com
'▶ Back-End > Python' 카테고리의 다른 글
Windows 10 PyTorch 설치 (0) | 2024.12.16 |
---|---|
FAST API Project 생성 방법 (0) | 2024.11.28 |
FAST API 설치 (2) | 2024.11.28 |
Django 기초 예제 - 게시판(수정) (0) | 2024.02.19 |
Django 기초 예제 - 게시판(삭제) (0) | 2024.02.16 |
댓글