코딩은 마라톤

[Java] Thread-Safe 동기화 방식(Lock)과 Thread-Safe Queue 본문

Language/Java

[Java] Thread-Safe 동기화 방식(Lock)과 Thread-Safe Queue

anxi 2025. 3. 21. 11:01

학습하게된 계기

https://www.youtube.com/watch?v=uWcn7omddxs&t=753s

 

 

대용량 트래픽은 어떻게 대처해야하는지 궁금했었고, 마침 해당 영상을 우연히 보게 되었습니다.

영상을 간단히 소개하면, 쿠폰 이벤트를 진행하였고 이때 대규모 트래픽이 발생하여 문제를 해결해나가는 과정을 설명한 영상입니다.

 

AS-IS

대규모 트래픽이 들어오면 컨트롤러에서 Redis로 바로 전달하는 시스템.

부하를 전부 레디스로 처리하는 방식이라 문제 발생.

 

 

TO-BE

트래픽을 컨트롤러와 Redis 사이에 In-Memory Queue와 Scheduler를 사용하여 요청을 쪼개어 전달

TPS를 Queue와 Scheduler를 통해 항상 쪼개어 전달할 수 있으므로 분산할 수 있음.

 

위 영상을 보고 어떻게 구현하는 건지 찾아보았고, 특히 인메모리 큐를 사용하면 멀티쓰레드 환경에서 문제가 발생하지 않을지 궁금했다.

인메모리 큐를 찾아보다가 가끔씩 봤었던 ConcurrentQueue를 사용하면 Thread-Safe하다는 것을 보게 되었고, 과연 Thread-Safe과 그렇지 않을 때는 무슨 차이가 있을지 궁금해서 공부해보았다.

 

https://github.com/backend-bbusigi/BBUSINSA/issues/9

 

[Feat] 인메모리 큐 & 스케줄러 기반 Redis 쿠폰 발급 처리 · Issue #9 · backend-bbusigi/BBUSINSA

✨ 이슈 내용 대규모 트래픽이 발생했을 때, 요청을 인메모리 큐와 스케줄러를 통해 부하를 줄이며 레디스로 처리합니다. 💡 작업 내용 쿠폰 개수를 db에 저장 부하 테스트 진행 인메모리 큐와

github.com

 

 

Thread-Safe

Thread-Safe란?

멀티스레딩 환경에서 여러 스레드가 동시에 같은 코드 영역에 접근하거나 데이터를 공유할 때, 올바른 실행 결과를 보장하는 코드의 속성을 의미해요.

 

즉, 여러 스레드로부터 동시에 코드가 호출되어도 각 스레드가 경쟁하지 않고 안전하게 수행될 수 있는 것입니다.

따라서 Thread-Safe하기 위해 동기화하는 것이 중요합니다.

(동기화는 한 스레드가 작업중인 작업을 다른 스레드가 간섭하지 못하게 막는 것입니다.)

 

자바에서 제공하는 Thread-Safe 동기화 방식

1. synchronized

첫 번째로는 synchronized입니다. synchronized로 선언된 블럭(메서드)은 한 스레드만 접근할 수 있게 돼요.

public synchronized void addItem() {
    // 여러 스레드가 동시에 접근할 수 없도록 동기화
}
---
public void addItem() {
    synchronized(this) {
        // 여러 스레드가 동시에 접근할 수 없도록 동기화
    }
}

 

이런 식으로 synchronized로 선언된 블럭(메서드)은 한 스레드가 수행을 마치거나 예외 발생하여 메서드에서 빠져 나올때까지 Blocking돼요. 그러나 한 스레드만 접근이 가능하기에 매우 간편하지만 성능에 큰 영향을 끼칠 수 있어서 많이 사용하진 않습니다.

또한 synchronized를 사용하는 것을 “암시적 Lock 사용”이라고도 해요.

 

암시적 Lock

  • synchronized는 고유 락을 사용하는데, synchronized 메서드가 호출되면 해당 스레드는 고유 락을 획득하고 메서드가 종료되면 락을 해제한다.
  • 자바의 모든 객체는 lock을 가지고 있다. 이를 고유 락(intrinsic lock) 또는 모니터 락(monitor lock)이라고 부른다.

 

2. 명시적 Lock 사용 (java.util.concurrent.lock)

암시적 Lock과 달리 직접 Lock 객체를 명시적으로 생성하여 제어할 수 있게 됩니다.

Lock 프레임워크에서 java.util.concurrent.lock를 제공하여 직접 동기화를 구현할 수 있습니다.

 

아래에도 엄청 많다.

 

Lock 프레임워크는 Lock Interface를 기반으로 하며, 일반적으로 ReentrantLock(재진입 락)을 사용한다.

synchronized와 다른 점은 명시적으로 스레드를 잠금, 해제할 수 있으며 tryLock()과 같은 논블로킹 특성도 제공합니다.

 

public void displayRecord(Object document) {
    displayLock.lock();
    // if (displayLock.tryLock()) 사용하여 lock 획득 못할 시 대기하지 않게 할 수 있음
  
    try {
          
        Thread.sleep(50);
        System.out.println(Thread.currentThread().getName() + ": Displaying...");
          
    } catch (InterruptedException e) {
        e.printStackTrace();
          
    } finally {
            displayLock.unlock();
    }
}

Lock Interface vs Synchronization

 

따라서 Lock을 사용할 때, Redis Lock을 사용하거나 다른 Lock을 알아봐도 좋지만 상황에 따라서 Java에서 제공하는 Lock을 써도 좋을 거 같아요!


Thread-Safe한 Queue 종류

Blocking Queue

 

Lock(ReentrantLock)을 대체로 사용한다.

 

1. ArrayBlockingQueue

ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<>(3);
  • 배열을 기반으로 제한된 크기의 Blocking Queue
  • 고정된 크기를 가지며, 생성 시 지정한 용령 이후 크기 변경 불가
  • 큐가 가득 차면 요소를 추가하는(put) 작업은 블로킹
  • 큐가 비어있으면 요소를 가져오려는(take) 작업은 블로킹

2. LinkedBlockingQueue

LinkedBlockingQueue<String> q = new LinkedBlockingQueue<>(3);
  • 내부적으로 Lock을 사용
  • 큐가 가득 차면 요소를 추가하는(put) 작업은 블로킹
  • 큐가 비어있으면 요소를 가져오려는(take) 작업은 블로킹
  • 크기를 고정할 수 있고, 지정하지 않으면 무제한 크기로 동작한다.

3. PriorityBlockingQueue

PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
  • 요소의 순서가 아닌 우선순위에 따라 정렬 (PriorityQueue와 동일)
  • 용량 제한 X → put() 삽입 연산은 블로킹되지 않음
  • 큐가 비어있을 경우에만 take()를 호출한 스레드는 블로킹된다.

4. SynchronousQueue

SynchronousQueue<Integer> queue = new SynchronousQueue<>();
  • 용량 자체가 존재하지 않음.
  • put()과 take()가 서로 다른 스레드에서 실행되어야 함.
    • 즉 put()은 다른 스레드가 take()할 때까지 블로킹한다.
      • put()을 호출한 스레드가 take()를 호출하는 스레드가 등장할 때까지 대기한다는 뜻
      • 따라서 만약 put()만 했을 경우, take()를 다른 스레드가 수행하지 않음 영원히 대기함.

Non-Blocking Queue

1. ConcurrentLinkedQueue

ConcurrentLinkedQueue<Integer> q = new ConcurrentLinkedQueue<>();
  • ReentrantLock 같은 락을 사용하지 않음
  • CAS(Compare-And-Swap) 연산을 사용하여 동기화 없이 안전하게 처리

Not-Thread-Safe 테스트 (Queue_LinkedList)

LinkedList

Note that this implementation is not synchronized.

 

즉 Queue의 구현체인 LinkedList는 동기화되지 않는다고 말합니다.

 

테스트 과정

  • 3000개의 스레드 생성 → ExecutorService를 사용해 FixedThreadPool(3000) 설정
  • Queue로 LinkedList 사용 → (Thread-Safe하지 않은 구조)
  • 각 스레드에서 수행하는 작업
    • queue.add(i); → 현재 값 i를 Queue에 추가
    • Thread.sleep(0); → 현재 스레드가 CPU 실행을 양보 (다른 스레드가 실행될 수 있게 함)
    • queue.add(i); → 다시 한 번 i 값을 Queue에 추가
  • Queue 크기를 출력 및 검증
    • 예상 크기: 3000 * 2 = 6000
    • 하지만, LinkedList는 Thread-Safe하지 않아서 데이터 손실 가능성 있음
      • 여러 스레드에서 동시에 임계영역에 진입하여 추가할 경우, 데이터 손실이 일어날 수 있음
    • 즉, 실제 크기는 6000보다 작을 수 있어 테스트 실패 가능성 존재
    @Test
    void 스레드_세이프하지않은_큐_테스트() throws InterruptedException {
        int numberOfThreads = 3000;
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        CountDownLatch latch = new CountDownLatch(numberOfThreads);

        Queue<Integer> queue = new LinkedList<>();

        for (int i = 0; i < numberOfThreads; i++) {
            int value = i;
            executorService.submit(() -> {
                queue.add(value);
                try {
                    Thread.sleep(0);
                } catch (InterruptedException ignored) {}
                queue.add(value);
                latch.countDown();
            });
        }

        latch.await();
        executorService.shutdown();

        System.out.println("스레드 세이프하지 않은 큐 크기: " + queue.size());
        Assertions.assertThat(queue.size()).isEqualTo(numberOfThreads * 2);
    }

 

테스트를 수행하면 성공할 때도 있지만, 대부분 실패합니다.

 

Thread-Safe 테스트 (Queue_ConcurrentLinkedQueue)

ConcurrentLinkedQueue

An unbounded thread-safe queue based on linked nodes.

 

즉 Queue의 구현체인 ConcurrentLinkedQueue는 미리 정해진 크기 제한이 없는 무제한의 Thread-Safe한 Queue입니다.

 

@Test
void 쓰레드_세이프한_큐_테스트() throws InterruptedException {
    int numberOfThreads = 3000;
    ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
    CountDownLatch latch = new CountDownLatch(numberOfThreads);

    Queue<Integer> queue = new ConcurrentLinkedQueue<>();

    for (int i = 0; i < numberOfThreads; i++) {
        int value = i;
        executorService.submit(() -> {
            queue.add(value);
            try {
                Thread.sleep(0);
            } catch (InterruptedException ignored) {}
            queue.add(value);
            latch.countDown();
        });
    }

    latch.await();

    int actualSize = queue.size();
    System.out.println("쓰레드 세이프한 큐 크기: " + actualSize);

    Assertions.assertThat(actualSize).isEqualTo(numberOfThreads * 2);
}

 

 

역시나 테스트를 성공한다.

 

배운 점

백엔드 개발을 해오면서 synchronized를 사용하거나 자바에서 제공하는 락을 사용하지 않았다. 사실 자바에서 락을 제공하는 지도 몰랐다.. 보통 락이란 DB 단의 낙관적, 비관적 락만 알고 있었는데 애플리케이션 단의 락도 있음을 알게 되었다.

 

또한 인메모리 큐를 학습하기 전에 쿠폰을 제공하는 과정에서도 동시성 문제가 발생하여 이 부분을 Redis 분산락을 이용해 해결하였다.

그래서 락을 사용하려면 Redis 락을 사용해야겠다고 생각했었는데, 레디스를 프로젝트에서 이미 사용해오지 않았다면 굳이 레디스를 사용하는 것 보다 자바에서 제공하는 락을 사용하는 것도 하나의 방법이 될 수 있을 거라 생각했다! 

 

참고자료