一、前言
记录一次自己使用微服务整合阿里云的百炼大模型,需要用到Redis来记录最近五条信息,已能够保证上下文的连通性,Ai和用户之间的对话是使用的MongoDB来进行存储。然后我这篇文章是介绍了两种请求方式,一种是通过Http请求,一种是通过WebSocket+Netty的方式,如果你还没有Redis可以先去安装对应环境或者可以将Redis改为通过使用List来对最近的消息进行存储。话不多说,开始。
二、引入依赖
(1)相关Maven依赖
<!-- alibaba-ai-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-ai</artifactId>
<version>2023.0.1.2</version>
</dependency>
<!-- redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!-- mongodb-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
(2)yaml配置
spring:
redis:
host: 192.168.254.100
port: 6379
password: 123456
cloud:
ai:
tongyi:
connection:
#这里的api-key需要你到阿里云大模型控制台获取,具体获取可以百度
api-key: 您的api-key
data:
mongodb:
host: 192.168.254.100
port: 27017
database: chat
password: 'mongo' #注意这里的密码一定要加单引号包裹起来,不然会连接不上
username: mongo
authentication-database: admin
三、Http请求方式
(1)Controller层
@RestController
@RequestMapping("/ai")
@CrossOrigin
public class AiController {
@Autowired
private AiService aiService;
/**
* 调用百炼大模型
* @param message 询问消息
* @return 结果
*/
@GetMapping("/tongyi")
public R<String> completion(@RequestParam(value = "message", defaultValue = "") String message) {
return aiService.handleAI(message,0L);
}
/**
* 获取与ai的聊天记录
* @param timestamp 时间戳(作为搜索游标)
* @param size 每页显示行数
* @return 结果
*/
@GetMapping("/getAiChatLogs")
public TableDataInfo getAiChatLogs(@RequestParam(defaultValue = "0") Long timestamp,
@RequestParam(defaultValue = "20") int size){
return aiService.getAiChatLogs(timestamp,size);
}
}
(2)Service层
public interface AiService {
/**
* 调用百炼大模型
* @param message 询问消息
* @return 结果
*/
R<String> handleAI(String message, Long userId);
/**
* 获取与ai的聊天记录
* @param timestamp 时间戳(作为搜索游标)
* @param size 每页显示行数
* @return 结果
*/
TableDataInfo getAiChatLogs(Long timestamp, int size);
}
(3)ServiceImpl层
@Service
@Slf4j
public class AiServiceImpl implements AiService {
private static final String REDIS_KEY_PREFIX = "ai:chat:history:"; // Redis键前缀
private static final int MAX_HISTORY_ROUNDS = 5; // 保留最近5轮对话
@Autowired
private RedisService redisService;
@Autowired
private MongoTemplate mongoTemplate;
/**
* 调用百炼大模型
* @param message 询问消息
* @return 结果
*/
@Override
public R<String> handleAI(String message, Long userId) {
Generation gen = new Generation();
// 从 Redis 中获取历史消息
List<Message> messages = getHistoryFromRedis(userId);
// 添加用户消息
Message userMsg = Message.builder()
.role(Role.USER.getValue())
.content(message)
.build();
messages.add(userMsg);
// 构建请求参数
GenerationParam param = GenerationParam.builder()
.model("qwen-turbo") //指定使用的 AI 模型名称
.messages(messages) //设置对话上下文 (说明:接收一个 List<Message> 对象,包含用户与 AI 的历史对话记录。模型会根据完整的消息列表理解上下文关系,生成连贯的回复)
.resultFormat(GenerationParam.ResultFormat.MESSAGE) //指定响应格式(MESSAGE 表示返回结构化消息对象(包含角色、内容等元数据))
.topK(50) //控制候选词范围(每个生成步骤仅考虑概率最高的前 50 个候选词。增大该值会提高多样性但可能降低准确性,减小则使输出更集中)
.temperature(0.8f) //调节输出随机性(0.8 属于中等偏高随机性,适合需要创造性但保持一定连贯性的场景)
.topP(0.8) //动态候选词选择(核采样)
.seed(1234) //设置随机种子(固定种子(如 1234)可使生成结果可重复)
.build();
try {
// 调用API并获取回复
GenerationResult result = gen.call(param);
Message aiMessage = result.getOutput().getChoices().get(0).getMessage();
String content = aiMessage.getContent();
// 将AI回复加入对话历史
messages.add(aiMessage);
// 保存对话历史到 Redis
saveHistoryToRedis(userId, messages);
return R.ok(content);
} catch (NoApiKeyException | InputRequiredException e) {
log.error("调用模型出错---->{}",e.getMessage());
throw new RuntimeException(e);
}
}
/**
* 获取与ai的聊天记录
* @param timestamp 时间戳(作为搜索游标)
* @param size 每页显示行数
* @return 结果
*/
@Override
public TableDataInfo getAiChatLogs(Long timestamp, int size) {
// 创建分页请求,按create_time降序
Query query = new Query()
.with(Sort.by(Sort.Direction.DESC, "timestamp"))
.limit(size);
//添加用户作为条件
Long userId = SecurityUtils.getUserId();
query.addCriteria(Criteria.where("userId").is(userId));
if (timestamp != null && timestamp>0) {
// 添加条件:timestamp < 上一页最后一条记录的 timestamp
query.addCriteria(Criteria.where("timestamp").lt(timestamp));
}
List<AiChat> aiChats = mongoTemplate.find(query, AiChat.class);
Collections.reverse(aiChats);
TableDataInfo tableDataInfo = new TableDataInfo();
tableDataInfo.setCode(200);
tableDataInfo.setMsg("成功");
tableDataInfo.setRows(aiChats);
return tableDataInfo;
}
/**
* 从 Redis 获取历史对话记录
*/
private List<Message> getHistoryFromRedis(Long userId) {
String redisKey = REDIS_KEY_PREFIX + userId;
Object obj = redisService.get(redisKey);
if (obj instanceof String) {
return JSON.parseArray((String) obj, Message.class);
}
List<Message> objects = new ArrayList<>();
// 添加系统消息(只在会话首次建立时添加)
Message systemMsg = Message.builder()
.role(Role.SYSTEM.getValue())
.content("你的身份是一名AI教练,你只回答关于健身方面的问题,其他问题可以委婉表明自己只能回答健身有关的问题!")
.build();
objects.add(systemMsg);
return objects;
}
/**
* 保存对话历史到 Redis
*/
private void saveHistoryToRedis(Long userId, List<Message> messages) {
truncateHistory(messages);
String redisKey = REDIS_KEY_PREFIX + userId;
// 转换为JSON字符串存储
String jsonString = JSON.toJSONString(messages);
redisService.set(redisKey, jsonString, 30 * 60);
}
/**
* 截断历史记录,保留最近的对话轮次
*/
private void truncateHistory(List<Message> messages) {
int maxSize = 1 + MAX_HISTORY_ROUNDS * 2;
if (messages.size() > maxSize) {
List<Message> truncated = new ArrayList<>();
// 添加类型校验
if (messages.get(0) != null) {
truncated.add(messages.get(0));
}
int start = Math.max(1, messages.size() - MAX_HISTORY_ROUNDS * 2);
truncated.addAll(messages.subList(start, messages.size()));
messages.clear();
messages.addAll(truncated);
}
}
}
四、WebSocket+Netty方式
(1)创建Session层用于保存连接与用户的关联
- 创建AiSession
/**
* 存储ai业务中用户与netty之间的关联关系
*/
public interface AiSession {
void save(Long userId, Channel channel);
Channel getChannelByUserId(Long userId);
Long getUserIdByChannel(Channel channel);
void removeSessionByUserId(Long userId);
void removeSessionByChannel(Channel channel);
void clearAllSession();
}
- AiSession对应的实现类
@Service
public class AiSessionImpl implements AiSession {
//用户id与Channel连接(key:用户id,value:channel)
private final Map<Long, Channel> userIdLinkChannel = new HashMap<>();
//Channel与用户id连接(key:channel,value:用户id)
private final Map<Channel, Long> channelLinkUserId = new HashMap<>();
/**
* 保存userId和Channel关系
* @param userId 用户id
* @param channel channel
*/
@Override
public void save(Long userId, Channel channel) {
userIdLinkChannel.put(userId,channel);
channelLinkUserId.put(channel,userId);
}
/**
* 根据用户id获取Channel
* @param userId 用户id
* @return 结果
*/
@Override
public Channel getChannelByUserId(Long userId) {
return userIdLinkChannel.get(userId);
}
/**
* 根据Channel获取用户id
* @param channel Channel
* @return 结果
*/
@Override
public Long getUserIdByChannel(Channel channel) {
return channelLinkUserId.get(channel);
}
/**
* 根据用户id删除userId和Channel相互关联
* @param userId 用户id
*/
@Override
public void removeSessionByUserId(Long userId) {
Channel channelByUserId = getChannelByUserId(userId);
channelLinkUserId.remove(channelByUserId);
userIdLinkChannel.remove(userId);
}
/**
* 根据用户Channel删除userId和Channel相互关联
* @param channel channel
*/
@Override
public void removeSessionByChannel(Channel channel) {
Long userIdByChannel = getUserIdByChannel(channel);
userIdLinkChannel.remove(userIdByChannel);
channelLinkUserId.remove(channel);
}
/**
* 清空所有关联关系
*/
@Override
public void clearAllSession() {
userIdLinkChannel.clear();
channelLinkUserId.clear();
}
}
(2)Netty配置
- 创建WebSocketNettyServer
@Slf4j
@Component
public class WebSocketNettyServer {
@Autowired
private AiInitializer aiInitializer;
private final ServerBootstrap aiServerBootstrap = new ServerBootstrap();
private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
@PostConstruct
public void WebSocketNettyServerInit() {
/* // 初始化服务器启动对象
// 主线程池
NioEventLoopGroup mainGrp = new NioEventLoopGroup();
// 从线程池
NioEventLoopGroup subGrp = new NioEventLoopGroup();*/
aiServerBootstrap
// 指定使用上面创建的两个线程池
.group(bossGroup, workerGroup)
// 指定Netty通道类型
.channel(NioServerSocketChannel.class)
// 指定通道初始化器用来加载当Channel收到事件消息后
.childHandler(aiInitializer);
}
public void start() throws InterruptedException {
// 绑定服务器端口,以异步的方式启动服务器
ChannelFuture futureRelays = aiServerBootstrap.bind("0.0.0.0",6000).sync();
if (futureRelays.isSuccess()){
log.info("ai-netty初始化完成,端口6000)");
}
}
}
- 创建AiInitializer
@Component
public class AiInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private AiHandler aiHandler;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//获取对应的管道
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline
.addLast(new HttpServerCodec())
//添加对大数据流的支持
.addLast(new ChunkedWriteHandler())
//添加聚合器
.addLast(new HttpObjectAggregator(1024 * 64*64))
//设置websocket连接前缀前缀
//心跳检查(30秒)
.addLast(new IdleStateHandler(30, 0, 0))
//添加自定义处理器
.addLast(new WebSocketServerProtocolHandler("/ws",null,true))
.addLast(aiHandler);
}
}
- 创建AiHandler
@Component
@Slf4j
public class AiHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Autowired
private AiSession aiSession;
@Autowired
private AiService tongYiService;
@Autowired
private MongoTemplate mongoTemplate;
@Override
public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
JSONObject jsonObject = JSON.parseObject(msg.text());
//获取消息类型
Object method = jsonObject.get("method");
// 处理消息
//ping
if ("ping".equals(method)){
LoginUser loginUser = AuthUtil.getLoginUser(jsonObject.get("Authorization").toString());
if (Objects.isNull(loginUser)){
//表明重新登陆
AiResponse responseData = new AiResponse();
responseData.setCode(10002);
responseData.setValue("relogin");
responseData.setMethod("error");
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(responseData)));
return;
}
//返回ack,表示心跳正常
aiSession.save(loginUser.getUserid(),ctx.channel());
AiResponse responseData = new AiResponse();
responseData.setValue(String.valueOf(System.currentTimeMillis()));
responseData.setMethod("ack");
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(responseData)));
}else if ("send".equals(method)){ //消息发送
Long userId = aiSession.getUserIdByChannel(ctx.channel());
if (Objects.nonNull(userId)){
Object value = jsonObject.get("value");
log.info("发送的内容------->{}",value);
//请求大模型api
R<String> result = tongYiService.handleAI(value.toString().trim(),userId);
//封装回复消息
String aiReponseText = result.getData();
log.info("Ai回复的内容-------->{}",aiReponseText);
AiResponse responseData = new AiResponse();
responseData.setCode(200);
responseData.setValue(aiReponseText);
responseData.setMethod("response");
//返回消息
ChannelFuture channelFuture = ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(responseData)));
channelFuture.addListener(listener->{
if (listener.isSuccess()){
//封装用户发送的消息
AiChat userChat = new AiChat();
userChat.setId(IdUtils.randomUUID());
userChat.setShowText(value.toString());
userChat.setIsUser(true);
userChat.setText(value.toString());
userChat.setTimestamp((Long) jsonObject.get("timestamp"));
userChat.setUserId(userId);
//封装ai回复消息
AiChat aiChat = new AiChat();
aiChat.setId(IdUtils.randomUUID());
aiChat.setShowText(aiReponseText);
aiChat.setText(aiReponseText);
aiChat.setIsUser(false);
aiChat.setTimestamp(System.currentTimeMillis());
aiChat.setUserId(userId);
//保存回复的消息
mongoTemplate.insertAll(Arrays.asList(userChat,aiChat));
}
});
}else{
//重新登陆
AiResponse responseData = new AiResponse();
responseData.setCode(10002);
responseData.setValue("relogin");
responseData.setMethod("error");
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(responseData)));
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 添加连接
System.out.println("新连接: " + ctx.channel().id().asShortText());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 断开连接
System.out.println("断开连接: " + ctx.channel().id().asShortText());
aiSession.removeSessionByChannel(ctx.channel());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
log.info("{}---心跳超时--->{}", ctx.channel().id().asShortText(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 异常处理
cause.printStackTrace();
// ctx.close();
}
}
- 创建StartListener
@Component
public class StartListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private WebSocketNettyServer webSocketNettyServer;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
try {
//启动netty服务
webSocketNettyServer.start();
} catch (Exception ignored) {
}
}
}
然后netty相关配置就搭建完成了,前端通过websocket请求路径ws://主机:6000/ws就可以连接到netty上来了,然后就可以通过websocket进行消息的发送和对回复的消息进推送了。
我使用的是uniapp搭建的小程序和app端,实测是可以的,PC的也肯定是可以