1. SSE 개념
참고 : https://velog.io/@wellbeing-dough/Spring-Boot-SSE-알림-개발
1.1 SSE(Server-Sent-Events) 방식
SSE는 서버와 한번 연결을 맺고 나면, 일정 시간 동안 서버에서 변경이 발생할 때마다 서버에서 클라이언트로 데이터를 전송하는 방법
- 클라이언트는 서버를 구독(SSE Connection을 맺음)
- 서버는 변동사항이 생길 때마다 구독한 클라이언트들에게 데이터를 전송
SSE는 서버에서 클라이언트로 text message를 보내는 브라우저 기반 웹 애플리케이션 기술이며 HTTP의 persistent connections을 기반으로 하는 HTML5 표준 기술
💡 클라이언트가 서버와 크게 통신할 필요가 없는 경우와 단지 업데이트된 데이터만 받아야 하는 실시간 데이터 스트림에 대한 구현의 경우 SSE 사용
1.2 폴링 방식
이벤트가 발생하면 그 이후에 request로 이벤트에 대한 응답을 받는 방식
계속해서 request를 보내는 것이기 때문에 서버의 부담이 생기며, 이벤트가 발생하고 다음 요청까지 이벤트의 반영이 안 되기때문에 실시간을 보장하지 못함
1.3 긴 폴링(Long Polling) 방식
계속적으로 요청하는 게 아니라 connection의 유지 시간을 길게 가짐
커넥션동안 이벤트가 발생하면 지속되고 있는 커넥션으로 response를 보내고 다시 커넥션을 유지할 request를 보내는 방식
실시간은 보장하지만 이벤트가 자주 발생하는 경우에는 서버의 부담은 폴링 방식과 큰 차이가 없게 됨
1.4 웹 소켓 방식
SSE와의 차이점은 HTTP프로토콜이 아닌 별도의 프로토콜을 사용하며 양방향 통신
웹 소켓 포트에 접속해 있는 모든 클라이언트에게 이벤트 방식으로 응답하면 됨
서버만 이벤트를 전송하는 경우에는 비효율적, 채팅과 같은 상황인 양방향 통신에서는 효율적
2. SSE로 푸시 알림 기능 구현하기
2.1 build.gradle 의존성 추가
// sse
implementation 'org.springframework.boot:spring-boot-starter-web-services'
2.2 SSE 구독 Controller 작성
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1/notification")
public class NotificationController {
private final NotificationService notificationService;
private final MemberService memberService;
@GetMapping(path = "/subscribe/{memberId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@Operation(summary = "알림 구독", description = "알림 구독을 위한 SseEmitter 생성")
public ResponseEntity<SseEmitter> subscribe(@PathVariable Long memberId, @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
Member member = memberService.getMemberById(memberId);
return ResponseEntity.ok(notificationService.subscribe(member.getEmail(), lastEventId));
}
}
사용자의 요청을 받아 SSE 구독을 하고, 사용자의 정보를 받아 SSE 연결을 맺게 하는 컨트롤러 작성
- Server-Sent Events(SSE)를 사용하여 클라이언트에게 실시간 이벤트 스트림을 전송하기 위해 MediaType.TEXT_EVENT_STREAM_VALUE를 응답 타입으로 설정
- Last-Event-ID는 헤더에 담겨 오는 값으로 이전에 받지 못한 이벤트가 존재하는 경우(SSE연결에 대한 시간 만료 혹은 종료)에 받은 마지막 이벤트 ID 값을 넘겨받지 못한 데이터부터 받을 수 있게 할 때 필요한 값
2.3 실제 연결을 하는 Service 작성
@Service
@RequiredArgsConstructor
public class NotificationService {
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final EmitterRepository emitterRepository;
private final ObjectMapper objectMapper = new ObjectMapper();
public SseEmitter subscribe(String email, String lastEventId) {
String emitterId = email + "_" + System.currentTimeMillis();
SseEmitter sseEmitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
// 상황별 emitter 삭제 처리
sseEmitter.onCompletion(() -> emitterRepository.deleteEmitterById(emitterId)); //완료 시
sseEmitter.onTimeout(() -> emitterRepository.deleteEmitterById(emitterId)); // 타임 아웃 시
sseEmitter.onError((e) -> emitterRepository.deleteEmitterById(emitterId)); // 에러 발생 시
// 503 Service Unavailable 방지용 dummy event 전송
sendEventToClient(sseEmitter, emitterId, "EventStream Created. [email = " + email + " ]");
// client가 미수신한 event 목록이 존재하는 경우
if (!lastEventId.isEmpty()) {
Map<String, Object> eventCaches = emitterRepository.findAllEventCacheByEmail(email);
eventCaches.entrySet().stream() //미수신 상태인 event 목록 전송
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0) // lastEventId 이후의 event만 전송
.forEach(entry -> sendEventToClient(sseEmitter, entry.getKey(), entry.getValue()));
}
return sseEmitter;
}
}
클라이언트의 sse연결 요청에 응답하기 위해서는 SseEmitter 객체를 만들어 반환해줘야 함
SseEmitter 객체를 만들 때 유효 시간을 주는데, 주는 시간만큼 sse 연결이 유지되고, 시간이 지나면 자동으로 클라이언트에서 재연결 요청을 보내게 됨
SseEmitter 작동 방식 -chatGPT-
- 클라이언트가 서버에 연결: 클라이언트는 SSE 연결을 설정하기 위해 서버의 특정 엔드포인트에 접속합니다. 이때, 서버는 SseEmitter 객체를 생성하고 클라이언트와의 연결을 유지합니다.
- 서버에서 실시간 데이터 전송: 서버는 SseEmitter를 사용해 클라이언트에게 실시간 데이터를 전송할 수 있습니다. 서버는 emitter.send() 메서드를 사용하여 메시지를 보냅니다. 이 메시지는 클라이언트의 이벤트 리스너로 전달됩니다.
- 연결 유지: SseEmitter는 HTTP 연결을 유지한 채로 서버가 데이터를 보낼 때까지 대기합니다. 클라이언트는 이 연결을 통해 실시간으로 데이터를 수신하게 됩니다.
- 연결 종료: 서버에서 데이터를 더 이상 보낼 필요가 없거나 클라이언트가 연결을 종료하면, SseEmitter를 종료(emitter.complete())합니다.
코드 상세사항
- SseEmitter 생성 및 저장: 고유한 emitterId를 생성해 새로운 SseEmitter 객체를 생성하고, 이를 emitterRepository에 저장
- 이벤트 핸들링: SSE 연결이 종료되거나 타임아웃, 에러 발생 시 SseEmitter를 삭제하도록 설정
- 초기 더미 이벤트 전송: 503 오류를 방지하기 위해 클라이언트에 초기 더미 이벤트를 전송
- 미수신 이벤트 처리: 클라이언트가 이전에 수신하지 못한 이벤트가 있으면 이를 다시 전송
2.4 Repository 작성
@Repository
public class EmitterRepository{
// SseEmitter와 이벤트 캐시를 저장할 맵
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}
public void saveEventCache(String eventCacheId, Object event) {
eventCache.put(eventCacheId, event);
}
public Map<String, SseEmitter> findAllEmitterByEmail(String email) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(email))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
public Map<String, Object> findAllEventCacheByEmail(String email) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(email))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
public void deleteEmitterById(String id) {
emitters.remove(id);
}
public void deleteAllEmitterById(String id) {
emitters.keySet().removeIf(key -> key.startsWith(id));
}
public void deleteAllEventCacheById(String id) {
eventCache.keySet().removeIf(key -> key.startsWith(id));
}
}
EmitterRepository 클래스로, Server-Sent Events (SSE)에서 사용하는 SseEmitter 객체와 관련된 데이터를 저장하고 관리하는 역할
이 클래스는 클라이언트와의 SSE 연결을 효율적으로 관리하기 위해 ConcurrentHashMap을 사용하여 동시성을 처리
EmitterRepository 클래스는 SSE 연결을 관리하기 위한 저장소 역할을 합니다. SseEmitter 객체와 이벤트 데이터를 저장하고, 특정 클라이언트와 연관된 데이터를 검색 및 삭제하는 기능을 제공합니다. 이 클래스는 다중 스레드 환경에서도 안전하게 동작하도록 ConcurrentHashMap을 사용해 동시성을 처리하고 있습니다.
출처 : -chatGPT-
2.5 해당 이벤트가 발생할 때마다 알림 전송하기
public void send(Member receiver, Object data) {
String emitterId = null;
try {
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterByEmail(receiver.getEmail());
if (emitters.isEmpty()) {
return;
}
emitterId = emitters.keySet().iterator().next();
emitters.forEach((key, value) -> {
emitterRepository.saveEventCache(key, data);
sendEventToClient(value, key, data);
});
} catch (Exception e) {
emitterRepository.deleteEmitterById(emitterId);
throw new GlobalException(GlobalErrorCode._INTERNAL_SERVER_ERROR);
}
}
private void sendEventToClient(SseEmitter sseEmitter, String emitterId, Object data) {
try {
String jsonData = objectMapper.writeValueAsString(data);
sseEmitter.send(SseEmitter.event()
.id(emitterId)
.name("sse")
.data(jsonData, MediaType.APPLICATION_JSON));
} catch (IOException exception) {
emitterRepository.deleteEmitterById(emitterId);
sseEmitter.completeWithError(exception);
throw new GlobalException(GlobalErrorCode._INTERNAL_SERVER_ERROR);
}
}
특정 수신자에게 실시간 알림 데이터를 보내기 위한 기능을 수행
이 메서드는 SseEmitter 객체를 이용해 알림 데이터를 클라이언트에게 전송하며, 전송 중 문제가 발생할 경우 해당 SseEmitter를 삭제하고 예외를 발생
if (!movieCodeList.isEmpty()) {
List<Member> memberList = memberRepository.findAll();
memberList.forEach(member -> notificationService.send(member, "영화 정보가 업데이트 되었습니다."));
}
영화정보가 업데이트되는 메서드에서 send 메서드를 호출하여 데이터를 실시간으로 전송하도록 함
브라우저로 테스트하기
연결이 이루어진 후, 영화 업데이트가 이루어질 때마다 데이터를 실시간으로 수신받음
'Dev > SpringBoot' 카테고리의 다른 글
인증된 사용자 정보 추출 커스텀 어노테이션 생성 (1) | 2025.02.10 |
---|---|
카카오페이 단건 결제 구현하기 (0) | 2025.02.10 |
카카오 로그인 구현하기(4) - 커스텀 어노테이션을 통해 사용자 정보, Token 정보 가져오기 (0) | 2025.02.10 |
카카오 로그인 구현하기(3) - Redis + 액세스 토큰 재발급하기 (0) | 2025.02.10 |
카카오 로그인 구현하기(2) - JWT + Spring Security (0) | 2025.02.10 |