新聞中心
大家好,我是Jensen。一個(gè)想和大家一起打怪升級(jí)的程序員朋友。

創(chuàng)新互聯(lián)建站長期為上千余家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為海西企業(yè)提供專業(yè)的成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作,海西網(wǎng)站改版等技術(shù)服務(wù)。擁有10年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。
咱們在寫Kafka消費(fèi)者的時(shí)候,有沒有發(fā)現(xiàn)一個(gè)很麻煩的事:消費(fèi)消息前每次都要手動(dòng)解析Kafka消息,轉(zhuǎn)換成自己想要的類型,再進(jìn)行業(yè)務(wù)操作,比如:
/**
* 訂單支付成功通知
* @Author 公眾號(hào):架構(gòu)師修行錄
*/
@KafkaListener(topics = "oms.orderPaySuccess", groupId = "fms")
public void orderPaySuccess(ConsumerRecordconsumerRecord) {
// 解析Kafka消息
OrderPaySuccessEvent event = JSON.parseObject(consumerRecord.value(), OrderPaySuccessEvent.class);
// TODO 完成解析成功后的業(yè)務(wù)操作
}
對代碼有潔癖的同志就比較難受了,每次解析的操作都差不多,但這又不是業(yè)務(wù)代碼……于是你就想:有沒有一個(gè)辦法讓系統(tǒng)能夠自動(dòng)解析成自己想要的參數(shù)對象呢?
不過也發(fā)現(xiàn)了一些難以解決的問題,比如:共用了一個(gè)groupId,使用Java線程池來管理,這樣工程會(huì)存在性能瓶頸。
專業(yè)的事還是交給專業(yè)的“人”來做吧,用Kafka組件本身來管理不同分組消費(fèi)會(huì)更靠譜。
Spring官方也發(fā)現(xiàn)這個(gè)問題,并且對此提出了解決方案——spring-messaging包下的HandlerMethodArgumentResolver接口。
我們從官方文檔中看到,Spring-kafka在2.4.2版本支持了對kafka消息進(jìn)行方法參數(shù)級(jí)別轉(zhuǎn)換。
其實(shí),spring-web包下也有這個(gè)同名接口,用于自動(dòng)解析Controller方法上的參數(shù),這塊網(wǎng)上資料比較多我就不詳細(xì)展開了,而spring-messaging包下面這個(gè)接口的非官方資料比較少,我這里給大家總結(jié)一下用法。
HandlerMethodArgumentResolver方法參數(shù)處理器接口很簡單,只有兩個(gè)方法:
public interface HandlerMethodArgumentResolver {
boolean supportsParameter(MethodParameter var1);
@Nullable
Object resolveArgument(MethodParameter var1, Message> var2) throws Exception;
}supportsParameter方法返回是否支持參數(shù)自動(dòng)解析,resolveArgument方法就是具體的解析邏輯,把MQ傳遞參數(shù)轉(zhuǎn)換為具體類型參數(shù)。
我們來看一下實(shí)際案例。
首先實(shí)現(xiàn)HandlerMethodArgumentResolver接口,定義為一個(gè)Spring的Component:
@Component
public class KafkaListenerMethodArgumentResolver implements HandlerMethodArgumentResolver {
@Override
public boolean supportsParameter(@NonNull MethodParameter parameter) {
// 默認(rèn)以com.xxx開頭的類,這樣可以不用在在參數(shù)前加@Payload注解
return parameter.getParameterType().getName().startsWith("com.xxx") || parameter.hasParameterAnnotation(Payload.class);
}
@Override
public Object resolveArgument(@NonNull MethodParameter parameter, @NonNull Message> message) {
Class> parameterType = parameter.getParameterType();
String messageContent = (String) message.getPayload();
Object body;
try {
// 這里定義自己的解析方法
body = JsonUtils.fromJson(messageContent, parameterType);
Objects.requireNonNull(body);
} catch (Throwable cause) {
throw new KafkaException("kafka 消息解析失敗: 非法JSON字符串", cause);
}
// 可選,定義解析后的參數(shù)校驗(yàn)邏輯
validate(parameter, body);
return body;
}
private void validate(MethodParameter parameter, Object target) {
for (Annotation ann : parameter.getParameterAnnotations()) {
Validated validatedAnn = AnnotationUtils.getAnnotation(ann, Validated.class);
if (Objects.nonNull(validatedAnn) || ann.annotationType().getSimpleName().startsWith("Valid")) {
ValidationUtils.valid(target);
}
}
}
}
到這里,一個(gè)類就把參數(shù)自動(dòng)解析搞定了,接下來咱們看看怎么用。
/**
* 訂單支付成功通知
* @Author 公眾號(hào):架構(gòu)師修行錄
*/
@KafkaListener(topics = "oms.orderPaySuccess", groupId = "fms")
public void orderPaySuccess(@Payload OrderPaySuccessEvent orderPaySuccessEvent) {
// 已經(jīng)自動(dòng)解析Kafka消息為orderPaySuccessEvent參數(shù)
// TODO 完成解析成功后的業(yè)務(wù)操作
}
怎么樣,代碼是不是比原來簡潔多了,香不香!
如果你對技術(shù)有追求,不想一直寫業(yè)務(wù)代碼,不妨把項(xiàng)目中所有需要手動(dòng)解析參數(shù)的地方,替換成自定義方法參數(shù)解析器來實(shí)現(xiàn)~
分享題目:Kafka封裝之—方法參數(shù)解析器,用起來真香!
文章源于:http://fisionsoft.com.cn/article/cdhshod.html


咨詢
建站咨詢
