SSE로 푸시 알림 기능 구현하기

1. SSE 개념

 

참고 : https://velog.io/@wellbeing-dough/Spring-Boot-SSE-알림-개발

 

1.1 SSE(Server-Sent-Events) 방식

SSE는 서버와 한번 연결을 맺고 나면, 일정 시간 동안 서버에서 변경이 발생할 때마다 서버에서 클라이언트로 데이터를 전송하는 방법

  1. 클라이언트는 서버를 구독(SSE Connection을 맺음)
  2. 서버는 변동사항이 생길 때마다 구독한 클라이언트들에게 데이터를 전송

 

SSE는 서버에서 클라이언트로 text message를 보내는 브라우저 기반 웹 애플리케이션 기술이며 HTTP의 persistent connections을 기반으로 하는 HTML5 표준 기술

 

💡 클라이언트가 서버와 크게 통신할 필요가 없는 경우와 단지 업데이트된 데이터만 받아야 하는 실시간 데이터 스트림에 대한 구현의 경우 SSE 사용

 

1.2 폴링 방식

 

이벤트가 발생하면 그 이후에 request로 이벤트에 대한 응답을 받는 방식

 

계속해서 request를 보내는 것이기 때문에 서버의 부담이 생기며, 이벤트가 발생하고 다음 요청까지 이벤트의 반영이 안 되기때문에 실시간을 보장하지 못함

 

1.3 긴 폴링(Long Polling) 방식

 

 

계속적으로 요청하는 게 아니라 connection의 유지 시간을 길게 가짐

 

커넥션동안 이벤트가 발생하면 지속되고 있는 커넥션으로 response를 보내고 다시 커넥션을 유지할 request를 보내는 방식

 

실시간은 보장하지만 이벤트가 자주 발생하는 경우에는 서버의 부담은 폴링 방식과 큰 차이가 없게 됨

 

1.4 웹 소켓 방식

 

 

SSE와의 차이점은 HTTP프로토콜이 아닌 별도의 프로토콜을 사용하며 양방향 통신

 

웹 소켓 포트에 접속해 있는 모든 클라이언트에게 이벤트 방식으로 응답하면 됨

 

서버만 이벤트를 전송하는 경우에는 비효율적, 채팅과 같은 상황인 양방향 통신에서는 효율적

 

2. SSE로 푸시 알림 기능 구현하기

 

2.1 build.gradle 의존성 추가

 

// sse
implementation 'org.springframework.boot:spring-boot-starter-web-services'

 

2.2 SSE 구독 Controller 작성

 

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1/notification")
public class NotificationController {

    private final NotificationService notificationService;
    private final MemberService memberService;

    @GetMapping(path = "/subscribe/{memberId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @Operation(summary = "알림 구독", description = "알림 구독을 위한 SseEmitter 생성")
    public ResponseEntity<SseEmitter> subscribe(@PathVariable Long memberId, @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
        Member member = memberService.getMemberById(memberId);
        return ResponseEntity.ok(notificationService.subscribe(member.getEmail(), lastEventId));
    }

}

 

사용자의 요청을 받아 SSE 구독을 하고, 사용자의 정보를 받아 SSE 연결을 맺게 하는 컨트롤러 작성

 

  • Server-Sent Events(SSE)를 사용하여 클라이언트에게 실시간 이벤트 스트림을 전송하기 위해 MediaType.TEXT_EVENT_STREAM_VALUE를 응답 타입으로 설정
  • Last-Event-ID는 헤더에 담겨 오는 값으로 이전에 받지 못한 이벤트가 존재하는 경우(SSE연결에 대한 시간 만료 혹은 종료)에 받은 마지막 이벤트 ID 값을 넘겨받지 못한 데이터부터 받을 수 있게 할 때 필요한 값

 

2.3 실제 연결을 하는 Service 작성

 

@Service
@RequiredArgsConstructor
public class NotificationService {

    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
    private final EmitterRepository emitterRepository;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public SseEmitter subscribe(String email, String lastEventId) {

        String emitterId = email + "_" + System.currentTimeMillis();
        SseEmitter sseEmitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));

        // 상황별 emitter 삭제 처리
        sseEmitter.onCompletion(() -> emitterRepository.deleteEmitterById(emitterId)); //완료 시
        sseEmitter.onTimeout(() -> emitterRepository.deleteEmitterById(emitterId)); // 타임 아웃 시
        sseEmitter.onError((e) -> emitterRepository.deleteEmitterById(emitterId)); // 에러 발생 시

        // 503 Service Unavailable 방지용 dummy event 전송
        sendEventToClient(sseEmitter, emitterId, "EventStream Created. [email = " + email + " ]");

        // client가 미수신한 event 목록이 존재하는 경우
        if (!lastEventId.isEmpty()) {
            Map<String, Object> eventCaches = emitterRepository.findAllEventCacheByEmail(email);
            eventCaches.entrySet().stream() //미수신 상태인 event 목록 전송
                    .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0) // lastEventId 이후의 event만 전송
                    .forEach(entry -> sendEventToClient(sseEmitter, entry.getKey(), entry.getValue()));
        }

        return sseEmitter;
    }
  
}

 

클라이언트의 sse연결 요청에 응답하기 위해서는 SseEmitter 객체를 만들어 반환해줘야 함

 

SseEmitter 객체를 만들 때 유효 시간을 주는데, 주는 시간만큼 sse 연결이 유지되고, 시간이 지나면 자동으로 클라이언트에서 재연결 요청을 보내게 됨

 

SseEmitter 작동 방식 -chatGPT-

  1. 클라이언트가 서버에 연결: 클라이언트는 SSE 연결을 설정하기 위해 서버의 특정 엔드포인트에 접속합니다. 이때, 서버는 SseEmitter 객체를 생성하고 클라이언트와의 연결을 유지합니다.
  2. 서버에서 실시간 데이터 전송: 서버는 SseEmitter를 사용해 클라이언트에게 실시간 데이터를 전송할 수 있습니다. 서버는 emitter.send() 메서드를 사용하여 메시지를 보냅니다. 이 메시지는 클라이언트의 이벤트 리스너로 전달됩니다.
  3. 연결 유지: SseEmitter는 HTTP 연결을 유지한 채로 서버가 데이터를 보낼 때까지 대기합니다. 클라이언트는 이 연결을 통해 실시간으로 데이터를 수신하게 됩니다.
  4. 연결 종료: 서버에서 데이터를 더 이상 보낼 필요가 없거나 클라이언트가 연결을 종료하면, SseEmitter를 종료(emitter.complete())합니다.

코드 상세사항

  • SseEmitter 생성 및 저장: 고유한 emitterId를 생성해 새로운 SseEmitter 객체를 생성하고, 이를 emitterRepository에 저장
  • 이벤트 핸들링: SSE 연결이 종료되거나 타임아웃, 에러 발생 시 SseEmitter를 삭제하도록 설정
  • 초기 더미 이벤트 전송: 503 오류를 방지하기 위해 클라이언트에 초기 더미 이벤트를 전송
  • 미수신 이벤트 처리: 클라이언트가 이전에 수신하지 못한 이벤트가 있으면 이를 다시 전송

 

2.4 Repository 작성

@Repository
public class EmitterRepository{

    // SseEmitter와 이벤트 캐시를 저장할 맵
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    private final Map<String, Object> eventCache = new ConcurrentHashMap<>();

    public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
        emitters.put(emitterId, sseEmitter);
        return sseEmitter;
    }

    public void saveEventCache(String eventCacheId, Object event) {
        eventCache.put(eventCacheId, event);
    }

    public Map<String, SseEmitter> findAllEmitterByEmail(String email) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(email))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    public Map<String, Object> findAllEventCacheByEmail(String email) {
        return eventCache.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(email))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    public void deleteEmitterById(String id) {
        emitters.remove(id);
    }

    public void deleteAllEmitterById(String id) {
        emitters.keySet().removeIf(key -> key.startsWith(id));
    }

    public void deleteAllEventCacheById(String id) {
        eventCache.keySet().removeIf(key -> key.startsWith(id));
    }
}

 

EmitterRepository 클래스로, Server-Sent Events (SSE)에서 사용하는 SseEmitter 객체와 관련된 데이터를 저장하고 관리하는 역할

 

이 클래스는 클라이언트와의 SSE 연결을 효율적으로 관리하기 위해 ConcurrentHashMap을 사용하여 동시성을 처리

EmitterRepository 클래스는 SSE 연결을 관리하기 위한 저장소 역할을 합니다. SseEmitter 객체와 이벤트 데이터를 저장하고, 특정 클라이언트와 연관된 데이터를 검색 및 삭제하는 기능을 제공합니다. 이 클래스는 다중 스레드 환경에서도 안전하게 동작하도록 ConcurrentHashMap을 사용해 동시성을 처리하고 있습니다.
출처 : -chatGPT-

 

2.5 해당 이벤트가 발생할 때마다 알림 전송하기

public void send(Member receiver, Object data) {
    String emitterId = null;
    try {
       Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterByEmail(receiver.getEmail());

       if (emitters.isEmpty()) {
           return;
       }

       emitterId = emitters.keySet().iterator().next();

       emitters.forEach((key, value) -> {
           emitterRepository.saveEventCache(key, data);
           sendEventToClient(value, key, data);
       });
   } catch (Exception e) {
       emitterRepository.deleteEmitterById(emitterId);
       throw new GlobalException(GlobalErrorCode._INTERNAL_SERVER_ERROR);
   }
}

private void sendEventToClient(SseEmitter sseEmitter, String emitterId, Object data) {
    try {
        String jsonData = objectMapper.writeValueAsString(data);
        sseEmitter.send(SseEmitter.event()
                .id(emitterId)
                .name("sse")
                .data(jsonData, MediaType.APPLICATION_JSON));
    } catch (IOException exception) {
        emitterRepository.deleteEmitterById(emitterId);
        sseEmitter.completeWithError(exception);
        throw new GlobalException(GlobalErrorCode._INTERNAL_SERVER_ERROR);
    }
}

 

특정 수신자에게 실시간 알림 데이터를 보내기 위한 기능을 수행

 

이 메서드는 SseEmitter 객체를 이용해 알림 데이터를 클라이언트에게 전송하며, 전송 중 문제가 발생할 경우 해당 SseEmitter를 삭제하고 예외를 발생


if (!movieCodeList.isEmpty()) {
    List<Member> memberList = memberRepository.findAll();
    memberList.forEach(member -> notificationService.send(member, "영화 정보가 업데이트 되었습니다."));
}

 

영화정보가 업데이트되는 메서드에서 send 메서드를 호출하여 데이터를 실시간으로 전송하도록 함

 

브라우저로 테스트하기

 

연결이 이루어진 후, 영화 업데이트가 이루어질 때마다 데이터를 실시간으로 수신받음