package com.zhangshu.chat.demo.websocket; import cn.hutool.cache.Cache; import cn.hutool.cache.CacheUtil; import cn.hutool.extra.spring.SpringUtil; import cn.hutool.json.JSONUtil; import com.zhangshu.chat.demo.config.UserDetailsImpl; import com.zhangshu.chat.demo.entity.Room; import com.zhangshu.chat.demo.service.RoomCache; import com.zhangshu.chat.demo.vo.RoomUserVo; import com.zhangshu.chat.demo.vo.UserMessageVo; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.security.Principal; import java.util.*; @Slf4j @Component @ServerEndpoint("/chat/room/{roomId}") public class ChatRoom { private String roomId; private final static Map> ROOM_SESSION_MAP = new HashMap<>(); private final static Cache SESSION_CACHE = CacheUtil.newFIFOCache(16); public static RoomCache getRoomCache() { return SpringUtil.getBean(RoomCache.class); } /** * 建立连接 * * @param session 客户端连接对象 */ @OnOpen public void onOpen(Session session, @PathParam("roomId") String roomId) { // if (getRoomCache().createSuccess(roomId)) { // sendRefuseConnectionMessage(session); // return; // } // UserDetailsImpl userInfo = (UserDetailsImpl) session.getUserPrincipal(); // if (Objects.isNull(userInfo) || !getRoomCache().isExistUser(userInfo.getId())) { // sendRefuseConnectionMessage(session); // return; // } String sessionId = session.getId().toLowerCase(); SESSION_CACHE.put(sessionId, session); if (StringUtils.isBlank(this.roomId)) { this.roomId = roomId; } ROOM_SESSION_MAP.computeIfAbsent(roomId, k -> new ArrayList<>()).add(sessionId); // log.info("客户端连接建立成功,User ID:{},当前在线数:{}", roomId, sessionCache.size()); } /** * 接收客户端消息 * * @param message 客户端发送的消息内容 * @param session 客户端连接对象 */ @OnMessage public void onMessage(String message, Session session) { log.info("服务端接收消息成功,消息内容:{}", message); // 处理消息,并响应给客户端 // Room room = getRoomCache().get(this.roomId); // if (Objects.isNull(room)) { // return; // } // UserDetailsImpl userInfo = (UserDetailsImpl) session.getUserPrincipal(); // if (Objects.isNull(userInfo)) { // return; // } // UserMessageVo messageVo = UserMessageVo.builder() // .id(userInfo.getId()) // .nickname(userInfo.getNickname()) // .username(userInfo.getUsername()) // .message(message) // .build(); UserMessageVo messageVo = UserMessageVo.builder() .message(message) .build(); String text = JSONUtil.toJsonStr(messageVo); ROOM_SESSION_MAP.get(roomId).stream().map(SESSION_CACHE::get).filter(Objects::nonNull) .forEach(v -> this.sendMessage(text, v)); // log.info("服务端响应消息成功,接收的User ID:{},响应内容:{}", userId, text); } /** * 处理消息,并响应给客户端 * * @param message 发送的消息内容 * @param session 发送对象 */ private void sendMessage(String message, Session session) { if (!session.isOpen()) { return; } try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("服务端响应消息异常:{}", e.getMessage()); } } private void sendRefuseConnectionMessage(Session session) { this.sendMessage("拒绝连接", session); } /** * 关闭连接 * * @param session 客户端连接对象 */ @OnClose public void onClose(Session session) { String sessionId = session.getId().toLowerCase(); SESSION_CACHE.remove(sessionId); List sessionList = ROOM_SESSION_MAP.get(roomId); if (sessionList.size() <= 1) { ROOM_SESSION_MAP.remove(roomId); } else { sessionList.remove(sessionId); } log.info("客户端连接关闭成功,sessionId ID:{},当前在线数:{}", sessionId, sessionList.size()); } /** * 连接异常 * * @param session 客户端连接对象 * @param error 异常 */ @OnError public void onError(Session session, Throwable error) { String sessionId = session.getId().toLowerCase(); SESSION_CACHE.remove(sessionId); List sessionList = ROOM_SESSION_MAP.get(roomId); if (sessionList.size() <= 1) { ROOM_SESSION_MAP.remove(roomId); } else { sessionList.remove(sessionId); } log.info("连接异常,sessionId ID:{},error:{}", sessionId, error); } }