背景
最近工作中遇到个推送B端消息的服务提供方不稳定经常出现超时和返回失败的情况,虽然B端消息推送的成功与否不影响正常流程,但影响用户体验。所以消息尽量还是要及时推送出去,这里做了个重试。
解决
后来我用比较粗糙的while死循环方式进行重试操作,大致如下:
// 重试次数
int repeat = 0;
// 重试上限
int max = 5;
while (true) {
try {
// 执行待重试的业务代码
boolean result = doCoreProcess();
if (!result) {
repeat++;
}
} catch (Exception e) {
repeat++;
}
if (repeat < max) {
logger.warn("【xx业务】执行失败,重试{}/{}...", repeat, max);
continue;
}
break;
}
while死循环的重试方式虽然简单暴力但也有明显缺点:
- 同步执行,对一些可以支持异步的重试操作不友好,比如推送消息。
- 对主要的业务代码有侵入,影响代码可读性。
- 不公用,对其他需要重试的地方又要写一遍。
优化
后来索性就写一个基于阻塞队列的重试公用静态方法。主要思路就是维护一个阻塞队列,把需要重试的任务放入队列中待执行。可以支持自定义重试次数、是否异步。
RetryUtil.java
/**
* @Author: wirechen
* @Date: 2020/6/22 10:05
* @Description: 公用的重试任务(基于阻塞队列)
*/
@Slf4j
public class RetryUtil {
/**
* 消费队列的线程池(该线程池仅是起了异步消费队列的作用)
*/
private static final ThreadPoolExecutor EXECUTOR
= new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
/**
* 消费重试任务的线程池(无界队列线程池,可根据业务实际情况调整线程coresize大小,maxsize随便写多少都达不到)
*/
private static final ThreadPoolExecutor TASK_EXECUTOR
= new ThreadPoolExecutor(5, 5, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
/**
* 阻塞队列(存放重试任务的)
*/
private static final BlockingQueue<RetryTaskItem> TASK_QUEUE = new LinkedBlockingQueue(Integer.MAX_VALUE);
/**
* 标识位,标识消费任务队列是否有在执行(长时间没有重试任务则退出队列监听)
*/
private static volatile AtomicBoolean FLAG = new AtomicBoolean(false);
/**
* 重试间隔(可根据实际业务需求调整大小,单位:毫秒)
*/
private static final Long RETRY_INTERVAL = 5000L;
/**
* 执行重试业务(异步)
* @Author wirechen
* @Date 2020/6/22 15:19
* @param supplier 返回的boolean的FI函数(意味着你的这个方法返回必须是true/false,所以try/catch最好在该方法内部捕捉)
* @param retryTimes 需要重试的次数
* @param businessName 业务名称(必传,用于定位问题)
* @return void
**/
public static void retryProcess(Supplier<Boolean> supplier, Integer retryTimes, String businessName) {
RetryTaskItem item = new RetryTaskItem(supplier, retryTimes, businessName);
try {
TASK_QUEUE.put(item);
} catch (InterruptedException e) {
log.error("【重试任务】首次加入队列被中断....", e);
}
if (!FLAG.get()) {
FLAG.set(true);
// 永远只会有一个单独的线程执行消费队列
EXECUTOR.execute(() -> customerQueue());
}
}
/**
* 同上(同步)
*/
public static <T> T retryProcessSync(Supplier<T> supplier, Integer retryTimes, String businessName) {
T t = null;
while (retryTimes > 0) {
try {
t = supplier.get();
if (t == null) {
// 等待5s后再重试
Thread.sleep(RETRY_INTERVAL);
retryTimes--;
} else {
break;
}
} catch (Exception e) {
log.error("【重试任务】重试任务执行业务代码出错,businessName:{}", businessName, e);
retryTimes--;
}
}
return t;
}
/**
* 消费任务队列
* @Author wirechen
* @Date 2020/6/22 15:34
* @return void
**/
private static void customerQueue() {
while (true) {
RetryTaskItem item = null;
try {
// 这里会阻塞线程(Blocked),不会占用CPU资源
item = TASK_QUEUE.poll(10000L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("【重试任务】重试任务取出队列被中断....", e);
}
if (item == null) {
// 消费完队列里的所有任务就休息了,别再继续循环浪费CPU资源
FLAG.set(false);
break;
} else {
// 执行任务
Boolean result = runSupplier(item);
if (!result && item.getRetryTimes() > 0) {
item.setRetryTimes(item.getRetryTimes() - 1);
log.warn("【重试任务】重试任务执行业务代码返回失败,businessName:{},剩余: {}次数", item.getBusinessName(), item.getRetryTimes());
RetryTaskItem finalItem = item;
TASK_EXECUTOR.execute(() -> {
try {
// 等待5s后再重新放入队列
Thread.sleep(RETRY_INTERVAL);
TASK_QUEUE.put(finalItem);
} catch (InterruptedException e) {
log.error("【重试任务】重试任务再次加入队列被中断,businessName:{}", finalItem.getBusinessName(), e);
}
});
} else {
log.error("【重试任务】重试任务执行失败,businessName: {}", item.getBusinessName());
}
}
}
}
/**
* 执行supplier方法
* @param item
* @return
*/
private static Boolean runSupplier(RetryTaskItem item) {
if (item == null) {
return false;
}
Boolean result = false;
try {
result = item.getSupplier().get();
} catch (Exception e) {
log.error("【重试任务】重试任务执行业务代码出错,businessName:{}", item.getBusinessName(), e);
}
return result;
}
/**
* 重试任务对象
*/
@AllArgsConstructor
@Data
static class RetryTaskItem {
/**
* 执行的函数(方法),返回Boolean
*/
private Supplier<Boolean> supplier;
/**
* 重试次数
*/
private Integer retryTimes;
/**
* 业务名称
*/
private String businessName;
}
}
使用
方法的入口在retryProcess
,接收一个Supplier<Boolean>
FI函数入参,这样逻辑非常清晰,只需把重试的业务代码直接当作参数来传入。
相比最初的方式,想要重试一个业务方法只需一行代码(重试5次,异步执行,日志定位)
RetryService.retryProcess(() -> doCoreProcess(), 5, "xx业务");
其实这就是JDK8以后比较流行的面向函数式编程的思想。