基于阻塞队列的公用重试方法

in 笔记 with 0 comment

背景

最近工作中遇到个推送B端消息的服务提供方不稳定经常出现超时和返回失败的情况,虽然B端消息推送的成功与否不影响正常流程,但影响用户体验。所以消息尽量还是要及时推送出去,这里做了个重试。

解决

后来我用比较粗糙的while死循环方式进行重试操作,大致如下:

// 重试次数
int repeat = 0;
// 重试上限
int max = 5;
while (true) {
    try {
	// 执行待重试的业务代码
	boolean result = doCoreProcess();
	if (!result) {
            repeat++;
        }
    } catch (Exception e) {
        repeat++;
    }
    if (repeat < max) {
        logger.warn("【xx业务】执行失败,重试{}/{}...", repeat, max);
        continue;
    }
    break;
}

while死循环的重试方式虽然简单暴力但也有明显缺点:

  1. 同步执行,对一些可以支持异步的重试操作不友好,比如推送消息。
  2. 对主要的业务代码有侵入,影响代码可读性
  3. 不公用,对其他需要重试的地方又要写一遍。

优化

后来索性就写一个基于阻塞队列的重试公用静态方法。主要思路就是维护一个阻塞队列,把需要重试的任务放入队列中待执行。可以支持自定义重试次数、是否异步。

RetryUtil.java

/**
 * @Author: wirechen
 * @Date: 2020/6/22 10:05
 * @Description: 公用的重试任务(基于阻塞队列)
 */
@Slf4j
public class RetryUtil {

    /**
     * 消费队列的线程池(该线程池仅是起了异步消费队列的作用)
     */
    private static final ThreadPoolExecutor EXECUTOR
            = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>());

    /**
     * 消费重试任务的线程池(无界队列线程池,可根据业务实际情况调整线程coresize大小,maxsize随便写多少都达不到)
     */
    private static final ThreadPoolExecutor TASK_EXECUTOR
            = new ThreadPoolExecutor(5, 5, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>());

    /**
     * 阻塞队列(存放重试任务的)
     */
    private static final BlockingQueue<RetryTaskItem> TASK_QUEUE = new LinkedBlockingQueue(Integer.MAX_VALUE);
    /**
     * 标识位,标识消费任务队列是否有在执行(长时间没有重试任务则退出队列监听)
     */
    private static volatile AtomicBoolean FLAG = new AtomicBoolean(false);
    /**
     * 重试间隔(可根据实际业务需求调整大小,单位:毫秒)
     */
    private static final Long RETRY_INTERVAL = 5000L;

    /**
     * 执行重试业务(异步)
     * @Author wirechen
     * @Date 2020/6/22 15:19
     * @param supplier 返回的boolean的FI函数(意味着你的这个方法返回必须是true/false,所以try/catch最好在该方法内部捕捉)
     * @param retryTimes 需要重试的次数
     * @param businessName 业务名称(必传,用于定位问题)
     * @return void
     **/
    public static void retryProcess(Supplier<Boolean> supplier, Integer retryTimes, String businessName) {
        RetryTaskItem item = new RetryTaskItem(supplier, retryTimes, businessName);
        try {
            TASK_QUEUE.put(item);
        } catch (InterruptedException e) {
            log.error("【重试任务】首次加入队列被中断....", e);
        }
        if (!FLAG.get()) {
            FLAG.set(true);
            // 永远只会有一个单独的线程执行消费队列
            EXECUTOR.execute(() -> customerQueue());
        }
    }

    /**
     * 同上(同步)
     */
    public static <T> T retryProcessSync(Supplier<T> supplier, Integer retryTimes, String businessName) {
        T t = null;
        while (retryTimes > 0) {
            try {
                t = supplier.get();
                if (t == null) {
                    // 等待5s后再重试
                    Thread.sleep(RETRY_INTERVAL);
                    retryTimes--;
                } else {
                    break;
                }
            } catch (Exception e) {
                log.error("【重试任务】重试任务执行业务代码出错,businessName:{}", businessName, e);
                retryTimes--;
            }
        }
        return t;
    }

    /**
     * 消费任务队列
     * @Author wirechen
     * @Date 2020/6/22 15:34
     * @return void
     **/
    private static void customerQueue() {
        while (true) {
            RetryTaskItem item = null;
            try {
                // 这里会阻塞线程(Blocked),不会占用CPU资源
                item = TASK_QUEUE.poll(10000L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.error("【重试任务】重试任务取出队列被中断....", e);
            }
            if (item == null) {
                // 消费完队列里的所有任务就休息了,别再继续循环浪费CPU资源
                FLAG.set(false);
                break;
            } else {
                // 执行任务
                Boolean result = runSupplier(item);
                if (!result && item.getRetryTimes() > 0) {
                    item.setRetryTimes(item.getRetryTimes() - 1);
                    log.warn("【重试任务】重试任务执行业务代码返回失败,businessName:{},剩余: {}次数", item.getBusinessName(), item.getRetryTimes());
                    RetryTaskItem finalItem = item;
                    TASK_EXECUTOR.execute(() -> {
                        try {
                            // 等待5s后再重新放入队列
                            Thread.sleep(RETRY_INTERVAL);
                            TASK_QUEUE.put(finalItem);
                        } catch (InterruptedException e) {
                            log.error("【重试任务】重试任务再次加入队列被中断,businessName:{}", finalItem.getBusinessName(), e);
                        }
                    });
                } else {
                    log.error("【重试任务】重试任务执行失败,businessName: {}", item.getBusinessName());
                }
            }
        }
    }

    /**
     * 执行supplier方法
     * @param item
     * @return
     */
    private static Boolean runSupplier(RetryTaskItem item) {
        if (item == null) {
            return false;
        }
        Boolean result = false;
        try {
            result = item.getSupplier().get();
        } catch (Exception e) {
            log.error("【重试任务】重试任务执行业务代码出错,businessName:{}", item.getBusinessName(), e);
        }
        return result;
    }

    /**
     * 重试任务对象
     */
    @AllArgsConstructor
    @Data
    static class RetryTaskItem {

        /**
         * 执行的函数(方法),返回Boolean
         */
        private Supplier<Boolean> supplier;
        /**
         * 重试次数
         */
        private Integer retryTimes;
        /**
         * 业务名称
         */
        private String businessName;

    }
}

使用

方法的入口在retryProcess,接收一个Supplier<Boolean>FI函数入参,这样逻辑非常清晰,只需把重试的业务代码直接当作参数来传入。

相比最初的方式,想要重试一个业务方法只需一行代码(重试5次,异步执行,日志定位)
RetryService.retryProcess(() -> doCoreProcess(), 5, "xx业务");

其实这就是JDK8以后比较流行的面向函数式编程的思想。