接口请求合并技巧,用好了效率直接翻倍!
一、什么是请求合并
在WEB项目中,我们一般会使用HTTP协议来处理请求
那么我们与服务器交互方式将会是这样的,一次请求,一次处理
我们都知道,调用批量接口相比调用非批量接口有更大的性能优势(因为减少了IO交互操作),在高并发情况下,如果有非常频繁的接口请求发生的话,我们则可以考虑请求合并了,将多个请求进行一定的等待延迟,当请求累计达到一定量级的时候,进行批量请求处理
二、请求合并的优缺点
所谓请求合并,就是讲多次请求合并为一次批量请求
优点:
将多次请求处理进行一定时间或请求数量的等待,使之合并成为一次请求,减少IO交互
缺点:
由于请求需要等待指定时间或指定请求数量,所以合并的接口存在延时,故对请求合并的接口有所限制,该接口不能对响应及时性有要求,支持一定时间的延迟
三、请求合并技术实现
采用定时线程池ScheduledExecutorService,与内存队列LinkedBlockingDeque进行实现请求合并
❝原理是将用户的请求进行缓存起来,缓存的请求数量达到指定数量或达到定时线程池执行时,将已有多个单请求处理合并为多处理,调用批量接口进行操作
❞
依赖
-
只需要JDK,无需任何第三方依赖
批量请求合并工具类定义如下:
核心原理就是 将请求放入队列,放入时检测内存队列数量是否超过设置阈值,以及时间阈值到期触发定时线程池执行
package com.leilei.support;
import lombok.extern.log4j.Log4j2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* @author lei
* @desc 请求合并工具类
**/
@Log4j2
public class BatchCollapser<T, R> {
private static final Map<Class, BatchCollapser> BATCH_INSTANCE =new ConcurrentHashMap<>();
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private final LinkedBlockingDeque<T> batchContainer = new LinkedBlockingDeque<>();
private final BatchHandler<List<T>, R> handler;
private final int countThreshold;
/**
* constructor
*
* @param handler 处理器
* @param countThreshold 数量阈值,达到此阈值后触发处理器
* @param timeThreshold 时间阈值,达到此时间后触发处理器
*/
private BatchCollapser(BatchHandler<List<T>, R> handler, int countThreshold, long timeThreshold) {
this.handler = handler;
this.countThreshold = countThreshold;
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
this.popUpAndHandler(BatchHandlerType.BATCH_HANDLER_TYPE_TIME);
} catch (Exception e) {
log.error("pop-up container exception", e);
}
}, timeThreshold, timeThreshold, TimeUnit.SECONDS);
}
/**
* 添加请求元素入队
* @param event
*/
public void addRequestParam(T event) {
batchContainer.add(event);
if (batchContainer.size() >= countThreshold) {
popUpAndHandler(BatchHandlerType.BATCH_HANDLER_TYPE_DATA);
}
}
/**
* 从队列获取请求,并进行批量处理
* @param handlerType
*/
private void popUpAndHandler(BatchHandlerType handlerType) {
List<T> tryHandlerList = Collections.synchronizedList(new ArrayList<>(countThreshold));
batchContainer.drainTo(tryHandlerList, countThreshold);
if (tryHandlerList.size() < 1) {
return;
}
try {
R handle = handler.handle(tryHandlerList, handlerType);
log.info("批处理工具执行result:{}", handle);
} catch (Exception e) {
log.error("batch execute error, transferList:{}", tryHandlerList, e);
}
}
/**
* 获取合并器实例
*
* @param batchHandler 处理执行器
* @param countThreshold 阈值数量(队列数量)
* @param timeThreshold 阈值时间 单位秒(目前设置是触发后获取阈值数量请求,可根据需要修改)
* @param <E>
* @param <R>
* @return
*/
public static <E, R> BatchCollapser<E, R> getInstance(BatchHandler<List<E>, R> batchHandler, int countThreshold, long timeThreshold) {
Class jobClass = batchHandler.getClass();
if (BATCH_INSTANCE.get(jobClass) == null) {
synchronized (BatchCollapser.class) {
BATCH_INSTANCE.putIfAbsent(jobClass, new BatchCollapser<>(batchHandler, countThreshold, timeThreshold));
}
}
return BATCH_INSTANCE.get(jobClass);
}
/**
* 请求处理接口
*
* @param <T>
* @param <R>
*/
public interface BatchHandler<T, R> {
/**
* 处理用户具体请求
*
* @param input
* @param handlerType
* @return
*/
R handle(T input, BatchHandlerType handlerType);
}
/**
* 合并执行类型枚举
*/
public enum BatchHandlerType {
/**
* 数量类型
*/
BATCH_HANDLER_TYPE_DATA,
/**
* 时间类型
*/
BATCH_HANDLER_TYPE_TIME,
}
}
使用方式如下:
package com.leilei.support;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* @author lei
* @desc
**/
@Service
public class ProductService implements BatchCollapser.BatchHandler<List<Integer>, Integer> {
private BatchCollapser<Integer, Integer> batchCollapser;
@PostConstruct
private void postConstructorInit() {
// 当请求数量达到20个,或每过5s合并执行一次请求
batchCollapser = BatchCollapser.getInstance(ProductService.this, 20, 5);
}
@Override
public Integer handle(List<Integer> input, BatchCollapser.BatchHandlerType handlerType) {
System.out.println("处理类型:" + handlerType + ",接受到批量请求参数:" + input);
return input.stream().mapToInt(x -> x).sum();
}
/**
* 假设我这里是300ms一次请求
*/
@Scheduled(fixedDelay = 300)
public void aaa() {
Integer requestParam = (int) (Math.random() * 100) + 1;
batchCollapser.addRequestParam(requestParam);
System.out.println("当前请求参数:" + requestParam);
}
}
@Data
public class Product {
private Integer id;
private String notes;
}
当然以上工具类仅仅只是DEMO,各位大佬可自行完善,权衡请求合并利弊,降低服务器在高并发请求时的压力
扫码领红包微信赞赏支付宝扫码领红包
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。侵权投诉:375170667@qq.com