package com.zhangshu.chat.demo.websocket; import cn.hutool.cache.Cache; import cn.hutool.cache.CacheUtil; import com.fasterxml.jackson.databind.ObjectMapper; import com.zhangshu.chat.demo.entity.Room; import com.zhangshu.chat.demo.service.RoomCache; import com.zhangshu.chat.demo.vo.RoomUserVo; import com.zhangshu.chat.demo.vo.UserMessageVo; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.Objects; import java.util.Optional; @Slf4j @Component @ServerEndpoint("/chat/room/{userId}") public class ChatRoom { private final Cache sessionUserCache = CacheUtil.newFIFOCache(16); private final Cache sessionCache = CacheUtil.newFIFOCache(16); private final Cache userSessionCache = CacheUtil.newFIFOCache(16); @Autowired RoomCache roomCache; @Autowired ObjectMapper objectMapper; /** * 建立连接 * * @param session 客户端连接对象 */ @OnOpen public void onOpen(Session session, @PathParam("userId") Long userId) { if (!roomCache.isExistUser(userId)) { sendMessage("用户不在房间中,拒绝连接", session); return; } String sessionId = session.getId().toLowerCase(); sessionUserCache.put(sessionId, userId); sessionCache.put(sessionId, session); userSessionCache.put(userId, sessionId); log.info("客户端连接建立成功,User ID:{},当前在线数:{}", userId, sessionUserCache.size()); } /** * 接收客户端消息 * * @param message 客户端发送的消息内容 * @param session 客户端连接对象 */ @OnMessage @SneakyThrows public void onMessage(String message, Session session) { log.info("服务端接收消息成功,消息内容:{}", message); // 处理消息,并响应给客户端 String sessionId = session.getId().toLowerCase(); Long userId = sessionUserCache.get(sessionId); if (Objects.isNull(userId)) { return; } Room room = roomCache.getRoomByUser(userId); if (Objects.isNull(room)) { return; } Optional userVo = room.getUserList().stream().filter(v -> v.getId().equals(userId)).findFirst(); if (!userVo.isPresent()) { return; } UserMessageVo messageVo = UserMessageVo.builder() .id(userId) .nickname(userVo.get().getNickname()) .username(userVo.get().getUsername()) .message(message) .build(); String text = objectMapper.writeValueAsString(messageVo); room.getUserList().stream() .map(RoomUserVo::getId) .map(userSessionCache::get) .map(sessionCache::get) .forEach(v -> { this.sendMessage(text, v); }); log.info("服务端响应消息成功,接收的User ID:{},响应内容:{}", userId, text); } /** * 处理消息,并响应给客户端 * * @param message 发送的消息内容 * @param session 发送对象 */ private void sendMessage(String message, Session session) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("服务端响应消息异常:{}", e.getMessage()); } } /** * 关闭连接 * * @param session 客户端连接对象 */ @OnClose public void onClose(Session session) { String sessionId = session.getId().toLowerCase(); Long userId = sessionUserCache.get(sessionId); sessionUserCache.remove(sessionId); sessionCache.remove(sessionId); userSessionCache.remove(userId); log.info("客户端连接关闭成功,User ID:{},当前在线数:{}", userId, sessionUserCache.size()); } /** * 连接异常 * * @param session 客户端连接对象 * @param error 异常 */ @OnError public void onError(Session session, Throwable error) { String sessionId = session.getId().toLowerCase(); Long userId = sessionUserCache.get(sessionId); sessionUserCache.remove(sessionId); sessionCache.remove(sessionId); userSessionCache.remove(userId); log.info("连接异常,User ID:{},error:{}", userId, error); } }