有一种场景,提供的API接口逻辑流程如下:
- 请求参数校验,通过后将请求落库;
- 根据落库数据调用N(>=1)个下游接口;
- 根据下游接口响应更新数据状态。
其中第2个步骤,会有2种实现:
实现方式 |
优势 |
劣势 |
同步调用 |
简单,可直接响应终态 |
自己提供的接口耗时受下游影响,容易导致调用方超时。 |
异步调用 |
自己提供的接口耗时不受下游影响 |
只能返回“处理中”状态,调用方需进行查询确认。 |
既然有各自的优势和劣势,能否有一种方案,能汇合优势减少劣势呢。
那必须有!
思路:
- 记录调用开始时间;
- 使用异步线程调用;
- 设置一个定时器,
- 异步线程超过自己设置的时长还没有结束,返回自己落库的数据(即调用方获取到的是“处理中”状态);
- 异步线程未超过自己设置的时长已经有结果,用结果更新落库数据,直接返回终态结果;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Slf4j public abstract class RunWrapper {
private static final Object HOLDER = new Object();
public static <T> T safeAsyncRun(Supplier<T> asyncSupplier, Supplier<T> timeoutSupplier, AsyncOptions asyncOptions) { int waitMilliSeconds = asyncOptions.getWaitMilliSeconds(); Preconditions.checkArgument(waitMilliSeconds > 0);
long startTime = System.currentTimeMillis(); BlockingQueue<Object> finishedFlagQueue = new LinkedBlockingQueue<>(1); AtomicReference<T> asyncResultRef = new AtomicReference<>(); try { DefaultExecutors.IO_ASYNC_EXECUTOR.submit(() -> { try { asyncResultRef.set(asyncSupplier.get()); } catch (Exception e) { log.error("async run failed!!!", e); } finally { Preconditions.checkArgument(finishedFlagQueue.add(HOLDER)); } }); for (; ; ) { if (finishedFlagQueue.poll(100, TimeUnit.MILLISECONDS) != null) { break; } if (System.currentTimeMillis() > startTime + waitMilliSeconds) { break; } } } catch (Exception e) { log.error("submit run failed!!!", e); } T asyncResult = asyncResultRef.get(); if (asyncResult != null) { return asyncResult; } else { return timeoutSupplier.get(); } } }
|
使用的代码大概如下:
1 2 3 4 5 6 7 8 9 10
| 请求数据 数据1 = 根据请求生成 数据库保存「数据1」 return RunWrapper.safeAsyncRun(() -> { 接口调用, 状态更新, 数据1, }, () -> 根据「数据1」的ID,重新查询「数据1」并返回), 超时配置 );
|
为什么需要重新查询一次呢,因为「数据1」已经在异步线程中了,可能部分属性已经根据下游接口的响应更新了,但又处理不完整(如:还没有更新数据库,不一定成功),是“脏数据”。