151 lines
5.1 KiB
Java
151 lines
5.1 KiB
Java
package com.zhangshu.chat.demo.websocket;
|
||
|
||
import cn.hutool.cache.Cache;
|
||
import cn.hutool.cache.CacheUtil;
|
||
import cn.hutool.extra.spring.SpringUtil;
|
||
import cn.hutool.json.JSONUtil;
|
||
import com.zhangshu.chat.demo.entity.Room;
|
||
import com.zhangshu.chat.demo.service.RoomCache;
|
||
import com.zhangshu.chat.demo.vo.RoomUserVo;
|
||
import com.zhangshu.chat.demo.vo.UserMessageVo;
|
||
import lombok.extern.slf4j.Slf4j;
|
||
import org.springframework.stereotype.Component;
|
||
|
||
import javax.websocket.*;
|
||
import javax.websocket.server.PathParam;
|
||
import javax.websocket.server.ServerEndpoint;
|
||
import java.io.IOException;
|
||
import java.util.Iterator;
|
||
import java.util.Objects;
|
||
import java.util.Optional;
|
||
|
||
@Slf4j
|
||
@Component
|
||
@ServerEndpoint("/chat/room/{userId}")
|
||
public class ChatRoom {
|
||
private final Cache<String, Long> sessionUserCache = CacheUtil.newFIFOCache(16);
|
||
private final Cache<String, Session> sessionCache = CacheUtil.newFIFOCache(16);
|
||
private final Cache<Long, String> userSessionCache = CacheUtil.newFIFOCache(16);
|
||
|
||
public static RoomCache getRoomCache() {
|
||
return SpringUtil.getBean(RoomCache.class);
|
||
}
|
||
|
||
/**
|
||
* 建立连接
|
||
*
|
||
* @param session 客户端连接对象
|
||
*/
|
||
@OnOpen
|
||
public void onOpen(Session session, @PathParam("userId") Long userId) {
|
||
// if (!getRoomCache().isExistUser(userId)) {
|
||
// sendMessage("用户不在房间中,拒绝连接", session);
|
||
// return;
|
||
// }
|
||
String sessionId = session.getId().toLowerCase();
|
||
sessionUserCache.put(sessionId, userId);
|
||
sessionCache.put(sessionId, session);
|
||
userSessionCache.put(userId, sessionId);
|
||
log.info("客户端连接建立成功,User ID:{},当前在线数:{}", userId, sessionUserCache.size());
|
||
}
|
||
|
||
/**
|
||
* 接收客户端消息
|
||
*
|
||
* @param message 客户端发送的消息内容
|
||
* @param session 客户端连接对象
|
||
*/
|
||
@OnMessage
|
||
public void onMessage(String message, Session session) {
|
||
log.info("服务端接收消息成功,消息内容:{}", message);
|
||
// 处理消息,并响应给客户端
|
||
String sessionId = session.getId().toLowerCase();
|
||
Long userId = sessionUserCache.get(sessionId);
|
||
if (Objects.isNull(userId)) {
|
||
return;
|
||
}
|
||
Room room = getRoomCache().getRoomByUser(userId);
|
||
if (Objects.isNull(room)) {
|
||
for (Session sendSession : sessionCache) {
|
||
UserMessageVo messageVo = UserMessageVo.builder()
|
||
.id(userId)
|
||
.message(message)
|
||
.build();
|
||
String text = JSONUtil.toJsonStr(messageVo);
|
||
this.sendMessage(text, sendSession);
|
||
}
|
||
return;
|
||
}
|
||
Optional<RoomUserVo> userVo = room.getUserList().stream().filter(v -> v.getId().equals(userId)).findFirst();
|
||
if (!userVo.isPresent()) {
|
||
return;
|
||
}
|
||
UserMessageVo messageVo = UserMessageVo.builder()
|
||
.id(userId)
|
||
.nickname(userVo.get().getNickname())
|
||
.username(userVo.get().getUsername())
|
||
.message(message)
|
||
.build();
|
||
String text = JSONUtil.toJsonStr(messageVo);
|
||
room.getUserList().stream()
|
||
.map(RoomUserVo::getId)
|
||
.map(userSessionCache::get)
|
||
.map(sessionCache::get)
|
||
.filter(Objects::nonNull)
|
||
.forEach(v -> {
|
||
this.sendMessage(text, v);
|
||
});
|
||
log.info("服务端响应消息成功,接收的User ID:{},响应内容:{}", userId, text);
|
||
}
|
||
|
||
/**
|
||
* 处理消息,并响应给客户端
|
||
*
|
||
* @param message 发送的消息内容
|
||
* @param session 发送对象
|
||
*/
|
||
|
||
private void sendMessage(String message, Session session) {
|
||
if (!session.isOpen()) {
|
||
return;
|
||
}
|
||
try {
|
||
session.getBasicRemote().sendText(message);
|
||
} catch (IOException e) {
|
||
log.error("服务端响应消息异常:{}", e.getMessage());
|
||
}
|
||
|
||
}
|
||
|
||
/**
|
||
* 关闭连接
|
||
*
|
||
* @param session 客户端连接对象
|
||
*/
|
||
@OnClose
|
||
public void onClose(Session session) {
|
||
String sessionId = session.getId().toLowerCase();
|
||
Long userId = sessionUserCache.get(sessionId);
|
||
sessionUserCache.remove(sessionId);
|
||
sessionCache.remove(sessionId);
|
||
userSessionCache.remove(userId);
|
||
log.info("客户端连接关闭成功,User ID:{},当前在线数:{}", userId, sessionUserCache.size());
|
||
}
|
||
|
||
/**
|
||
* 连接异常
|
||
*
|
||
* @param session 客户端连接对象
|
||
* @param error 异常
|
||
*/
|
||
@OnError
|
||
public void onError(Session session, Throwable error) {
|
||
String sessionId = session.getId().toLowerCase();
|
||
Long userId = sessionUserCache.get(sessionId);
|
||
sessionUserCache.remove(sessionId);
|
||
sessionCache.remove(sessionId);
|
||
userSessionCache.remove(userId);
|
||
log.info("连接异常,User ID:{},error:{}", userId, error);
|
||
}
|
||
}
|