背景:有个需求,后端在一个时间点要向前端发送websocket消息,前端点击确认作出应答。为防止用户错误点击或页面刷新消息丢失,需要后端在5分钟内判断应答状态,如果5分钟内没有应答,则每隔10秒向前端推送一次,5分钟内应答了或者超过了5分钟,后端结束推送。
这是一个很简单的需求,起个线程,在5分钟之内循环,判断应答状态,如果没有应答则sleep10秒,重新推送一次websocket消息。这是我首先想到的方案。
伪代码:
public static void main(String[] args) {
new Thread(() -> {
long start = System.currentTimeMillis();
do {
LogResp logResp = logService.findById(logId);
if (Status.ON.getStatus().equals(logResp.getStatus())) {
break;
}else {
System.out.println("重新推送消息");
Thread.sleep(10000);
}
} while ((System.currentTimeMillis() - start) / ( 60 * 1000) < 5); // 小于5分钟
}).start();
}
恰巧那段时间在看Reactor
的东西,就在想用Reactor
流式函数的方式能不能实现。然后就试了试!
需求的关键点在于:5分钟超时结束、5分钟内条件判断结束这两个。
5分钟超时结束可以用Flux.takeWhile
满足,而5分钟条件判断结束可以用Flux.takeUntil
满足。
所以最后的伪代码:
public static void main(String[] args) {
Integer retentionPeriod = 5; // 分钟
AtomicBoolean last = new AtomicBoolean(false);
Flux.interval(Duration.ofSeconds(10))
.takeWhile(index -> index < retentionPeriod * 60 / 5)
.doOnNext(index -> {
Long logId = 11L;
// LogResp logResp = logService.findById(logId);
// if (Status.ON.getStatus().equals(logResp.getStatus())
// || retentionPeriod * 60 - (index * 10) <= 0) {
// last.set(true);
// }else {
// long retention = retentionPeriod * 60 - (index * 10);
// System.out.println("重新推送消息");
// }
})
.takeUntil(index -> last.get())
.doOnError((err) -> {
System.out.println("推送异常");
})
.onErrorComplete()
.subscribeOn(Schedulers.parallel())
.subscribe();
}