一次机房停电引发的思考

package com.qiaofang.tortoise.gateway.component;


import com.qiaofang.tortoise.gateway.config.KafkaAsyncProperties; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/** * kafka异步操作工具类 * * @author chenhao * @version 1.0 * @date 2020/7/2 3:47 下午 */ public class KafkaAsyncUtil {
private final KafkaTemplate kafkaTemplate;
private final KafkaAsyncProperties kafkaAsyncProperties;
public KafkaAsyncUtil(KafkaTemplate kafkaTemplate, KafkaAsyncProperties kafkaAsyncProperties) { this.kafkaTemplate = kafkaTemplate; this.kafkaAsyncProperties = kafkaAsyncProperties; init(); }
private ThreadPoolTaskExecutor executor;
private void init() { executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(kafkaAsyncProperties.getThreadPoolCoreThreads()); executor.setMaxPoolSize(kafkaAsyncProperties.getThreadPoolMaxThreads()); executor.setQueueCapacity(kafkaAsyncProperties.getThreadPoolQueueSize()); executor.setThreadNamePrefix("kafka-async-util-pool-"); //高容忍消息丢失场景,工作队列满了之后直接丢弃 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.initialize(); }
/** * 发送消息 * * @param topic * @param data */ public void send(String topic, Object data) { executor.execute(() -> kafkaTemplate.send(topic, data)); }
}
/** * kafka异步操作相关配置 * @author chenhao * @version 1.0 * @date 2020/7/2 3:47 下午 */ @Data @ConfigurationProperties(prefix = "tortoise.kafka.async") public class KafkaAsyncProperties {
/** * core */ private Integer threadPoolCoreThreads = 3; /** * max */ private Integer threadPoolMaxThreads = 3; /** * queue大小 */ private Integer threadPoolQueueSize = 10000;
}