基于阻塞队列的公用重试方法

in 笔记 with 0 comment

背景

最近工作中遇到个推送B端消息的服务提供方不稳定经常出现超时和返回失败的情况,虽然B端消息推送的成功与否不影响正常流程,但影响用户体验。所以消息尽量还是要及时推送出去,这里做了个重试。

解决

后来我用比较粗糙的while死循环方式进行重试操作,大致如下:

// 重试次数
int repeat = 0;
// 重试上限
int max = 5;
while (true) {
    try {
	// 执行待重试的业务代码
	boolean result = doCoreProcess();
	if (!result) {
            repeat++;
        }
    } catch (Exception e) {
        repeat++;
    }
    if (repeat < max) {
        logger.warn("【xx业务】执行失败,重试{}/{}...", repeat, max);
        continue;
    }
    break;
}

while死循环的重试方式虽然简单暴力但也有明显缺点:

  1. 同步执行,对一些可以支持异步的重试操作不友好,比如推送消息。
  2. 对主要的业务代码有侵入,影响代码可读性。
  3. 不公用,对其他需要重试的地方又要写一遍。

优化

后来索性就写一个基于阻塞队列的重试公用静态方法。主要思路就是维护一个阻塞队列,把需要重试的任务放入队列中待执行。可以支持自定义重试次数、是否异步。

定义一个重试任务类

/**
 * @Author: wirechen
 * @Date: 2020/6/22 13:57
 * @Description: 重试任务
 */
public class RetryTaskItem {

    /**
     * 执行的函数(方法),返回Boolean
     */
    private Supplier<Boolean> supplier;
    /**
     * 重试次数
     */
    private Integer retryTimes;
    /**
     * 业务名称
     */
    private String businessName;

    public RetryTaskItem(Supplier<Boolean> supplier, Integer retryTimes, String businessName) {
        this.supplier = supplier;
        this.retryTimes = retryTimes;
        this.businessName = businessName;
    }

    public Supplier<Boolean> getSupplier() {
        return supplier;
    }

    public void setSupplier(Supplier<Boolean> supplier) {
        this.supplier = supplier;
    }

    public Integer getRetryTimes() {
        return retryTimes;
    }

    public void setRetryTimes(Integer retryTimes) {
        this.retryTimes = retryTimes;
    }

    public String getBusinessName() {
        return businessName;
    }

    public void setBusinessName(String businessName) {
        this.businessName = businessName;
    }
}

重试逻辑服务

/**
 * @Author: wirechen
 * @Date: 2020/6/22 10:05
 * @Description: 公用的重试任务(基于阻塞队列)
 */
public class RetryService {
    /**
     * SLF日志
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(RetryService.class);
    /**
     * 消费队列的线程池
     */
    private static final ThreadPoolExecutor EXECUTOR
            = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
    /**
     * 阻塞队列
     */
    private static final BlockingQueue<RetryTaskItem> TASK_QUEUE = new LinkedBlockingQueue();
    /**
     * 标识位,标识消费任务队列是否有在执行
     */
    private static volatile AtomicBoolean FLAG = new AtomicBoolean(false);

    /**
     * 执行需要重试的业务
     * @Author wirechen
     * @Date 2020/6/22 15:19
     * @param supplier 返回的boolean的FI函数(意味着你的这个方法返回必须是true/false,所以try/catch最好在该方法内部捕捉)
     * @param retryTimes 需要重试的次数(默认5次)
     * @param businessName 业务名称(必传,用于定位问题)
     * @param async 是否异步执行(true-立即放入任务队列异步执行 / false-首次任务同步执行,失败后加入队列异步执行)
     * @return void
     **/
    public static void retryProcess(Supplier<Boolean> supplier, Integer retryTimes, String businessName, Boolean async) {
        RetryTaskItem item = new RetryTaskItem(supplier, retryTimes, businessName);
        Boolean result = false;
        if (async) {
            // 如果要异步执行则重试次数+1(第一次执行不算重试次数)
            item.setRetryTimes(item.getRetryTimes() + 1);
        } else {
            result = runSupplier(item);
        }
        if (!result) {
            try {
                TASK_QUEUE.put(item);
            } catch (InterruptedException e) {
                LOGGER.error("【重试任务】首次加入队列被中断....", e);
            }
            if (!FLAG.get()) {
                FLAG.set(true);
                // 永远只会有一个单独的线程执行消费队列
                EXECUTOR.execute(() -> customerQueue());
            }
        }
    }
    /**
     * 同上,默认异步执行
     */
    public static void retryProcess(Supplier<Boolean> supplier, Integer retryTimes, String businessName) {
        retryProcess(supplier, retryTimes, businessName, true);
    }
    /**
     * 同上,默认重试5次
     */
    public static void retryProcess(Supplier<Boolean> supplier, String businessName, Boolean async) {
        retryProcess(supplier, 5, businessName, async);
    }
    /**
     * 同上,默认异步执行+重试5次
     */
    public static void retryProcess(Supplier<Boolean> supplier, String businessName) {
        retryProcess(supplier, 5, businessName, true);
    }

    /**
     * 执行supplier方法
     * @param item
     * @return
     */
    private static Boolean runSupplier(RetryTaskItem item) {
        if (item == null) {
            return false;
        }
        Boolean result = false;
        try {
            result = item.getSupplier().get();
        } catch (Exception e) {
            LOGGER.error("【重试任务】重试任务执行业务代码出错,businessName:{}", item.getBusinessName(), e);
        }
        return result;
    }

    /**
     * 消费任务队列
     * @Author wirechen
     * @Date 2020/6/22 15:34
     * @return void
     **/
    private static void customerQueue() {
        while (true) {
            RetryTaskItem item = null;
            try {
                // 这里会阻塞线程(Blocked),不会占用CPU资源
                item = TASK_QUEUE.poll(10000L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                LOGGER.error("【重试任务】重试任务取出队列被中断....", e);
            }
            if (item == null) {
                FLAG.set(false);
                break;
            } else {
                // 执行任务
                Boolean result = runSupplier(item);
                if (!result && item.getRetryTimes() > 0) {
                    item.setRetryTimes(item.getRetryTimes() - 1);
                    LOGGER.warn("【重试任务】重试任务执行业务代码返回失败,businessName:{},剩余: {}次数", item.getBusinessName(), item.getRetryTimes());
                    try {
                        // 等待5s后再重新放入队列
                        Thread.sleep(5000L);
                        TASK_QUEUE.put(item);
                    } catch (InterruptedException e) {
                        LOGGER.error("【重试任务】重试任务再次加入队列被中断,businessName:{}", item.getBusinessName(), e);
                    }
                }
            }
        }
    }
}

使用

方法的入口在retryProcess,接收一个Supplier<Boolean>FI函数入参,这样逻辑非常清晰,只需把重试的业务代码直接当作参数来传入。

相比最初的方式,想要重试一个业务方法只需一行代码(重试5次,异步执行)
RetryService.retryProcess(() -> doCoreProcess(), 5, "xx业务");

其实这就是JDK8以后比较流行的面向函数式编程的思想。