diff --git a/pom.xml b/pom.xml index 8b9a98c..480a71b 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,10 @@ org.springframework.boot spring-boot-starter-aop + + org.springframework.boot + spring-boot-starter-websocket + diff --git a/src/main/java/com/zhangshu/chat/demo/config/WebSocketConfig.java b/src/main/java/com/zhangshu/chat/demo/config/WebSocketConfig.java new file mode 100644 index 0000000..feb4f02 --- /dev/null +++ b/src/main/java/com/zhangshu/chat/demo/config/WebSocketConfig.java @@ -0,0 +1,18 @@ +package com.zhangshu.chat.demo.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +@Configuration +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } + +} diff --git a/src/main/java/com/zhangshu/chat/demo/constant/ERoomUserType.java b/src/main/java/com/zhangshu/chat/demo/constant/ERoomUserType.java index 7880882..9aada0e 100644 --- a/src/main/java/com/zhangshu/chat/demo/constant/ERoomUserType.java +++ b/src/main/java/com/zhangshu/chat/demo/constant/ERoomUserType.java @@ -1,6 +1,7 @@ package com.zhangshu.chat.demo.constant; public enum ERoomUserType { + lord(), broadcaster(), audience(), ; diff --git a/src/main/java/com/zhangshu/chat/demo/service/EventServiceImpl.java b/src/main/java/com/zhangshu/chat/demo/service/EventServiceImpl.java index 495d710..2e39963 100644 --- a/src/main/java/com/zhangshu/chat/demo/service/EventServiceImpl.java +++ b/src/main/java/com/zhangshu/chat/demo/service/EventServiceImpl.java @@ -56,6 +56,7 @@ public class EventServiceImpl implements EventService { .username(user.getUsername()) .nickname(user.getNickname()) .type(ERoomUserType.broadcaster) + .joinTime(System.currentTimeMillis()) .lastClientSeq(eventDto.getClientSeq()) .build()); } @@ -77,6 +78,7 @@ public class EventServiceImpl implements EventService { .id(eventDto.getUid()) .nickname(user.getNickname()) .type(ERoomUserType.audience) + .joinTime(System.currentTimeMillis()) .lastClientSeq(eventDto.getClientSeq()) .build()); } diff --git a/src/main/java/com/zhangshu/chat/demo/service/RoomCache.java b/src/main/java/com/zhangshu/chat/demo/service/RoomCache.java index b9f739f..bae80c2 100644 --- a/src/main/java/com/zhangshu/chat/demo/service/RoomCache.java +++ b/src/main/java/com/zhangshu/chat/demo/service/RoomCache.java @@ -6,15 +6,18 @@ import cn.hutool.core.lang.UUID; import com.zhangshu.chat.demo.constant.ERoomUserType; import com.zhangshu.chat.demo.entity.Room; import com.zhangshu.chat.demo.vo.RoomUserVo; +import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Objects; @Component public class RoomCache { private final Cache roomCache = CacheUtil.newFIFOCache(4); + private final Cache userRoomCache = CacheUtil.newFIFOCache(16); /** * 正在创建 */ @@ -26,6 +29,24 @@ public class RoomCache { return resp; } + /** + * + * @param userId + * @return + * NullException + */ + public Room getRoomByUser(Long userId){ + String roomId = userRoomCache.get(userId); + if (StringUtils.isBlank(roomId)) { + return null; + } + return roomCache.get(roomId); + } + + public boolean isExistUser(Long userId){ + return userRoomCache.containsKey(userId); + } + public synchronized Room addCreating(String roomName) { Room room = Room.builder() .id(UUID.fastUUID().toString()) @@ -55,7 +76,11 @@ public class RoomCache { public synchronized void addUser(String roomId, RoomUserVo user) { Room room = roomCache.get(roomId); if (Objects.nonNull(room) && room.getUserList().stream().noneMatch(v -> v.getId().equals(user.getId()))) { + if (room.getUserList().size() == 0) { + user.setType(ERoomUserType.lord); + } room.getUserList().add(user); + userRoomCache.put(user.getId(), room.getId()); } } @@ -66,7 +91,16 @@ public class RoomCache { public synchronized void removeUser(String roomId, Long userId) { Room room = roomCache.get(roomId); if (Objects.nonNull(room)) { - room.getUserList().stream().filter(v -> v.getId().equals(userId)).findFirst().ifPresent(v -> room.getUserList().remove(v)); + room.getUserList().stream().filter(v -> v.getId().equals(userId)).findFirst().ifPresent(user -> { + room.getUserList().remove(user); + userRoomCache.remove(userId); + //设置最早的一个成为房主 + if (ERoomUserType.lord.equals(user.getType())) { + room.getUserList().stream().filter(v->ERoomUserType.broadcaster.equals(v.getType())) + .min(Comparator.comparing(RoomUserVo::getJoinTime)) + .ifPresent(user2 -> user2.setType(ERoomUserType.lord)); + } + }); } } diff --git a/src/main/java/com/zhangshu/chat/demo/service/RoomServiceImpl.java b/src/main/java/com/zhangshu/chat/demo/service/RoomServiceImpl.java index 39d13b9..c4a45a5 100644 --- a/src/main/java/com/zhangshu/chat/demo/service/RoomServiceImpl.java +++ b/src/main/java/com/zhangshu/chat/demo/service/RoomServiceImpl.java @@ -11,6 +11,9 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.List; +import java.util.stream.Collectors; + @Slf4j @Service public class RoomServiceImpl implements RoomService { @@ -20,7 +23,14 @@ public class RoomServiceImpl implements RoomService { @Override public CommonPageResult page(PageDto pageDto) { - return CommonPageResult.of(RoomConvert.INSTANCE.convert(roomCache.list())); + List roomVoList = roomCache.list().stream().map(this::buildRoomVo).collect(Collectors.toList()); + return CommonPageResult.of(roomVoList); + } + + private RoomVo buildRoomVo(Room room){ + RoomVo roomVo = RoomConvert.INSTANCE.convert(room); + roomVo.setUserCount(room.getUserList().size()); + return roomVo; } @Override diff --git a/src/main/java/com/zhangshu/chat/demo/vo/RoomUserVo.java b/src/main/java/com/zhangshu/chat/demo/vo/RoomUserVo.java index de7c559..d114d7d 100644 --- a/src/main/java/com/zhangshu/chat/demo/vo/RoomUserVo.java +++ b/src/main/java/com/zhangshu/chat/demo/vo/RoomUserVo.java @@ -16,4 +16,5 @@ public class RoomUserVo { private String nickname; private ERoomUserType type; private Long lastClientSeq; + private Long joinTime; } diff --git a/src/main/java/com/zhangshu/chat/demo/vo/RoomVo.java b/src/main/java/com/zhangshu/chat/demo/vo/RoomVo.java index 3b9f41c..66e5c09 100644 --- a/src/main/java/com/zhangshu/chat/demo/vo/RoomVo.java +++ b/src/main/java/com/zhangshu/chat/demo/vo/RoomVo.java @@ -6,4 +6,5 @@ import lombok.Data; public class RoomVo { private String id; private String name; + private Integer userCount; } diff --git a/src/main/java/com/zhangshu/chat/demo/vo/UserMessageVo.java b/src/main/java/com/zhangshu/chat/demo/vo/UserMessageVo.java new file mode 100644 index 0000000..29fdf6c --- /dev/null +++ b/src/main/java/com/zhangshu/chat/demo/vo/UserMessageVo.java @@ -0,0 +1,31 @@ +package com.zhangshu.chat.demo.vo; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel(value = "UserMessageVo", description = "用户发送的聊天消息") +public class UserMessageVo implements Serializable { + private static final long serialVersionUID = 1L; + + @ApiModelProperty(name = "id", value = "user id") + private Long id; + + @ApiModelProperty(name = "username", value = "用户名") + private String username; + + @ApiModelProperty(name = "nickname", value = "昵称") + private String nickname; + + @ApiModelProperty(name = "message", value = "消息内容") + private String message; +} diff --git a/src/main/java/com/zhangshu/chat/demo/websocket/ChatRoom.java b/src/main/java/com/zhangshu/chat/demo/websocket/ChatRoom.java new file mode 100644 index 0000000..4a7cda0 --- /dev/null +++ b/src/main/java/com/zhangshu/chat/demo/websocket/ChatRoom.java @@ -0,0 +1,139 @@ +package com.zhangshu.chat.demo.websocket; + +import cn.hutool.cache.Cache; +import cn.hutool.cache.CacheUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.zhangshu.chat.demo.entity.Room; +import com.zhangshu.chat.demo.service.RoomCache; +import com.zhangshu.chat.demo.vo.RoomUserVo; +import com.zhangshu.chat.demo.vo.UserMessageVo; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +@Slf4j +@Component +@ServerEndpoint("/chat/room/{userId}") +public class ChatRoom { + private final Cache sessionUserCache = CacheUtil.newFIFOCache(16); + private final Cache sessionCache = CacheUtil.newFIFOCache(16); + private final Cache userSessionCache = CacheUtil.newFIFOCache(16); + @Autowired + RoomCache roomCache; + @Autowired + ObjectMapper objectMapper; + + /** + * 建立连接 + * + * @param session 客户端连接对象 + */ + @OnOpen + public void onOpen(Session session, @PathParam("userId") Long userId) { + if (roomCache.isExistUser(userId)) { + sendMessage("用户不在房间中,拒绝连接", session); + return; + } + String sessionId = session.getId().toLowerCase(); + sessionUserCache.put(sessionId, userId); + sessionCache.put(sessionId, session); + userSessionCache.put(userId, sessionId); + log.info("客户端连接建立成功,User ID:{},当前在线数:{}", userId, sessionUserCache.size()); + } + + /** + * 接收客户端消息 + * + * @param message 客户端发送的消息内容 + * @param session 客户端连接对象 + */ + @OnMessage + @SneakyThrows + public void onMessage(String message, Session session) { + log.info("服务端接收消息成功,消息内容:{}", message); + // 处理消息,并响应给客户端 + String sessionId = session.getId().toLowerCase(); + Long userId = sessionUserCache.get(sessionId); + if (Objects.isNull(userId)) { + return; + } + Room room = roomCache.getRoomByUser(userId); + if (Objects.isNull(room)) { + return; + } + Optional userVo = room.getUserList().stream().filter(v -> v.getId().equals(userId)).findFirst(); + if (!userVo.isPresent()) { + return; + } + UserMessageVo messageVo = UserMessageVo.builder() + .id(userId) + .nickname(userVo.get().getNickname()) + .username(userVo.get().getUsername()) + .message(message) + .build(); + String text = objectMapper.writeValueAsString(messageVo); + room.getUserList().stream() + .map(RoomUserVo::getId) + .map(userSessionCache::get) + .map(sessionCache::get) + .forEach(v -> { + this.sendMessage(text, v); + }); + log.info("服务端响应消息成功,接收的User ID:{},响应内容:{}", userId, text); + } + + /** + * 处理消息,并响应给客户端 + * + * @param message 发送的消息内容 + * @param session 发送对象 + */ + + private void sendMessage(String message, Session session) { + try { + session.getBasicRemote().sendText(message); + } catch (IOException e) { + log.error("服务端响应消息异常:{}", e.getMessage()); + } + + } + + /** + * 关闭连接 + * + * @param session 客户端连接对象 + */ + @OnClose + public void onClose(Session session) { + String sessionId = session.getId().toLowerCase(); + Long userId = sessionUserCache.get(sessionId); + sessionUserCache.remove(sessionId); + sessionCache.remove(sessionId); + userSessionCache.remove(userId); + log.info("客户端连接关闭成功,User ID:{},当前在线数:{}", userId, sessionUserCache.size()); + } + + /** + * 连接异常 + * + * @param session 客户端连接对象 + * @param error 异常 + */ + @OnError + public void onError(Session session, Throwable error) { + String sessionId = session.getId().toLowerCase(); + Long userId = sessionUserCache.get(sessionId); + sessionUserCache.remove(sessionId); + sessionCache.remove(sessionId); + userSessionCache.remove(userId); + log.info("连接异常,User ID:{},error:{}", userId, error); + } +}