이번 요구사항 중 특이한 요구사항이 있었다. 사실 특이... 까진 아니긴 한데, 설계 하다보니 이건 특이할 수 밖에 없는 상황이 생겨버림. 대략적인 설명은 다음과 같다.

기본전제 - FTP 업로드/다운로드

1. 한 명의 사용자가 여러개의 데이터를 FTP 업로드/다운로드 신청을 할 수 있다
2. 여러명의 사용자가 1번의 요구사항과 같이 신청할 수 있다
3. 한 명의 사용자는 한 건의 FTP 사용량만 가진다. 즉, 한 명이 10건을 신청해도 1건씩 업로드/다운로드가 되는 형식
4. 5명의 사용자가 각각 5건의 FTP 데이터 전송 요청을 하는 경우, 5건의 FTP 전송만 생성된다
5. 일반적으로 전송되는 데이터의 사이즈는 최소 수십 GB를 기본으로 한다 (FTP 속도가 오래걸림)

 

원래대로라면 그냥 신청을 하는 WEB단에서 API 서버로 FTP 전송 요청을 하면 수행하는 구조로 생각했는데, 기본적인 전송 단위가 몇기가 단위이다보니 자칫하다가는 Web Session Timeout이 발생할 수도 있는 상황. 그리고 3번과 같은 요구사항에 의해 결국 FTP 전송과 관련된 DB Table을 설계하고, API 서버에서는 해당 Table을 주기적(Scheduled)으로 Select 하면서 전송건이 있는지를 체크하는 방식으로 설계를 했다.

그리고 4번의 전송건과 관련해서 결국 Async 방식으로 FTP 호출을 하는 것으로 구현했다. 즉, 5명의 사용자가 각기 5개의 데이터를 전송요청 하는 경우, Async Thread에 의해 5개의 FTP 세션을 생성하고 전송할 것이다.

 

그런데 여기서 문제가 발생한다.

가령 DB 호출 스케쥴러가 10분에 한 번씩 작동한다고 가정하자. 첫 구동시 A 데이터(사이즈 100기가)가 Select 되어 FTP 전송을 하는데, 대략적으로 100분이 걸린다고 할 때, 전송이 완료되는 시점까지 스케쥴러는 이 데이터를 계속 호출할 것이다. 단순하게 DB 쿼리로 걸러내면 되지 않을까? 라고 생각했으나, DB 쿼리로 걸러낸다 하더라도 내부적으로 방지하는 로직을 구현해야 할 것으로 판단했다. 

 

중복실행 방지 Async를 위해  제미니에게 물어보니, Async Queue를 관리하는 로직을 구현해주었고, 사용법이 쉬워 적용해보았더니 아주 잘 동작하게 되었다. 이하는 해당 로직의 간단한 소스코드와 예제이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.stereotype.Component;
 
@Component
public class QueueCache {
    private static final QueueCache instance = new QueueCache();
    private final ConcurrentHashMap<String, Boolean> cache = new ConcurrentHashMap<>();
 
    private QueueCache() {
    }
 
    public static QueueCache getInstance() {
        return instance;
    }
    
    // key가 Cache에 있는지 확인
    public boolean isReq(String key) {
        return cache.containsKey(key);
    }
    
    // 신규 작업시 Key를 Cache에 등록
    public void putReq(String key) {
        cache.put(key, true);
    }
 
    // 작업 완료시 Key를 Cache에서 삭제
    public void removeReq(String key) {
        cache.remove(key);
    }
}
cs

 

싱글톤 패턴으로 모든 클래스에서 단 하나의 인스턴스만 생성하는 구조로, 여러개의 쓰레드가 접근하여도 단 하나만의 키를 관리하는 방식으로 구현되어있다.

주석에 달린 설명과 같이 사용법이 크게 어렵지는 않다. 나의 경우에는 아래와 같은 방식으로 로그가 발생하였다.

1
2
3
4
[scheduling-1- INFO k.a.testAPI.common.testController - === Queue 에 있음 - USERKEY-1
[scheduling-1- INFO k.a.testAPI.common.testController - === Queue 에 있음 - USERKEY-2
[scheduling-1- INFO k.a.testAPI.common.testController - === Queue 에 있음 - USERKEY-3
[scheduling-1- INFO k.a.testAPI.common.testController - === Queue 에 있음 - USERKEY-4
cs

 

USERKEY-1~4 까지 Unique 한 값으로 QueueCache에 등록하고, 모든 로직을 수행하기 직전에 isReq(USERKEY-1)하는 식으로 Queue에 등록되어있는지를 확인한다. 없는 경우, Queue에 등록(putReq)을 한 후, 모든 작업이 종료되거나 또는 exception이 발생하는 경우 removeReq를 통해 Queue 에서 삭제를 한다.

 

이로인해 수시간 동안 동작하는 경우에도 안정적으로 Async FTP 전송을 수행할 수 있었다. 메데타시 메데타시.

블로그 이미지

김생선

세상의 모든것을 어장관리

,