2.2 KiB
2.2 KiB
MCP 协议的实现,可以有2种方式,
- stdio,把 jar 提供出去,别人本地配置引入即可使用,但这样的方式不太适合做统一网关服务。
- sse
项目采用 DDD 架构,首先聚焦 domain 层。
会话管理服务
private final ScheduledExecutorService cleanupScheduler = Executors.newSingleThreadScheduledExecutor();
其中 private 说明这个变量这能在类内部访问,声明为 final 则此变量只能赋值一次。newSingleThreadScheduledExecutor 代表只有一个工作线程。
一个带有定时功能的单线程定时队列。
private final Map<String, SessionConfigVO> activeSessions = new ConcurrentHashMap<>();
考虑点在于这个会话是否会被多个线程操作处理。
public SessionManagementService() {
cleanupScheduler.scheduleAtFixedRate(this::cleanupExpiredSessions, 5, 5, TimeUnit.MINUTES);
log.info("会话管理服务已启动,会话超时时间: {} 分钟", SESSION_TIMEOUT_MINUTES);
}
可以在构造中执行一些方法。
实时通讯
对立面轮训
SSE(Server sent Events)
基于普通的 HTTP 长连接,Content-type: text/event-stream
消息格式,每条消息以 \n\n 结尾,
SSE(协议)
↑
需要一个好的服务器框架来实现它
↓
Spring WebFlux(响应式 Web 框架) ← 最适合实现 SSE 的框架
↑
因为它是响应式(非阻塞),非常适合长连接
↓
在 WebFlux 里,用 Sinks + Flux<ServerSentEvent> 来产生和推送 SSE 事件
客户端发起 SSE 请求
↓
Netty EventLoop 接收请求(线程A)
↓
Controller 返回 sink.asFlux() → 注册“有新事件时推送”的回调
↓
线程A 立刻释放,去处理其他请求
↓
...(连接保持打开,线程A 忙别的)
你的服务层调用 sink.tryEmitNext(新消息)
↓
Reactor 通知 Netty:“这个连接有数据要写”
↓
EventLoop 线程(可能是线程B)被唤醒
↓
执行回调:把消息转成 "data: xxx\n\n" 格式 → 写入 Socket
↓
写完后,线程B 立刻释放,继续干别的
