新聞中心
前言
1. 觀察者模式定義
觀察者模式,也可以稱之為發(fā)布訂閱模式,它在GoF 的《設(shè)計模式》中,是這么定義的:

Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically。
翻譯過來就是:觀察者模式定義對象間的一種一對多的依賴關(guān)系,當(dāng)一個對象的狀態(tài)發(fā)生改變時,所有依賴于它的對象都得到通知并被完成業(yè)務(wù)的更新。
觀察者模式屬于行為模式,一個對象(被觀察者)的狀態(tài)發(fā)生改變,所有的依賴對象(觀察者對象)都將得到通知,進行廣播通知。它的主要成員就是觀察者和被觀察者。
被觀察者(Observerable):目標(biāo)對象,狀態(tài)發(fā)生變化時,將通知所有的觀察者。
觀察者(observer):接受被觀察者的狀態(tài)變化通知,執(zhí)行預(yù)先定義的業(yè)務(wù)。
2. 觀察者模式的應(yīng)用場景
哪些場景我們可以考慮使用觀察者模式呢?
我們?nèi)粘I钪校鋵嵕陀杏^察者模式類似的例子。比如,我們訂閱了報社一年的報紙。每天報社印刷好報紙,就送到我們手中。我們就是觀察者,報社就是被觀察者。
而日常開發(fā)中,觀察者模式的使用場景主要表現(xiàn)在:完成一件事情后,通知處理某個邏輯。如,登陸成功發(fā)個IM消息,支付成功發(fā)個郵件消息或者發(fā)個抽獎消息,用戶評論成功給他發(fā)個積分等等。
舉個詳細點的例子吧,登陸注冊應(yīng)該是最常見的業(yè)務(wù)場景了,我們就拿注冊來說事,大家經(jīng)常會遇到類似的場景,就是用戶注冊成功后,我們給用戶發(fā)一條IM消息,又或者發(fā)個郵件等等,因此經(jīng)常有如下的代碼:
void register(User user){
insertRegisterUser(user);
sendIMMessage();
sendEmail();
}這塊代碼會有什么問題呢?如果產(chǎn)品又加需求:現(xiàn)在注冊成功的用戶,再給用戶發(fā)一條短信通知。于是你又得改register方法的代碼了。這是不是違反了開閉原則啦。
void register(User user){
insertRegisterUser(user);
sendIMMessage();
sendMobileMessage();
sendEmail();
}并且,如果調(diào)發(fā)短信的接口失敗了,是不是又影響到用戶注冊了?!這時候,是不是得加個異步方法,異步發(fā)通知消息才好?其實這種場景,我們可以使用異步非阻塞的觀察者模式優(yōu)化的。
3. 如何實現(xiàn)一個簡單的觀察者模式
我們先來看下,簡單的觀察者模式如何實現(xiàn)??梢赃@么定義
- 一個主題接口Subject(聲明添加、刪除、通知觀察者方法)。
- 一個Observer觀察者接口。
- 一個創(chuàng)建主題的類ObserverableImpl?(即被觀察者),實現(xiàn)了Subject接口。
- 各個觀察者的差異化實現(xiàn)。
為了通俗易懂,可以這樣理解觀察者模式:就是被觀察者(ObserverableImpl)做了一件事情,或者說發(fā)布了一個主題(Subject),然后這件事情通知到各個相關(guān)的不同的人(不同的觀察者,Observer的差異化實現(xiàn)者)。
一個主題接口。
public interface Subject {
/**
* 添加觀察者
* @param observer
*/
void addServer(Observer observer);
/**
* 移除觀察者
* @param observer
*/
void removeServer(Observer observer);
/**
* 通知觀察者
* @param msg
*/
void notifyAllObservers(String msg);
}一個Observer接口。
/**
* 觀察者
*
*/
public interface Observer {
/**
* 更新消息
* @param msg
*/
void update(String msg);
}
一個創(chuàng)建主題的類ObserverableImpl(即被觀察者),同時有觀察者列表的屬性(其實就是說觀察者要事先注冊到被觀察者)。
public class ObserverableImpl implements Subject {
/**
* 存儲被觀察者
*/
private List observers = new ArrayList();
@Override
public void addServer(Observer observer) {
observers.add(observer);
}
@Override
public void removeServer(Observer observer) {
observers.remove(observer);
}
@Override
public void notifyAllObservers(String msg) {
for (Observer observer : observers) {
observer.update(msg);
}
}
} 觀察者的差異化實現(xiàn),以及使用。
public class ObserverOneImpl implements Observer {
@Override
public void update(String msg) {
System.out.println("ObserverOne is notified,"+msg);
}
}
public class ObserverTwoImpl implements Observer {
@Override
public void update(String msg) {
System.out.println("ObserverTwo is notified,"+msg);
}
}
public class ObserverDemoTest {
public static void main(String[] args) {
Subject subject = new ObserverableImpl();
//添加觀察者
subject.addObserver(new ObserverOneImpl());
subject.addObserver(new ObserverTwoImpl());
//通知
subject.notifyAllObservers("關(guān)注公眾號:撿田螺的小男孩");
}
}
//輸出
ObserverOne is notified,關(guān)注公眾號:撿田螺的小男孩
ObserverTwo is notified,關(guān)注公眾號:撿田螺的小男孩就這樣,我們實現(xiàn)了觀察者模式啦,是不是很簡單?不過上面的代碼,只能算是觀察者模式的模板代碼,只能反映大體的設(shè)計思路。接下來,我們看下在工作中,是如何使用觀察者模式的。
4. 工作中,如何使用觀察者模式的
觀察者模式的實現(xiàn)有兩種方式,同步阻塞方式和異步非阻塞方式。第3小節(jié)就是一個同步阻塞方式的觀察者模式。我們來看下,日常工作的例子:用戶注冊成功發(fā)消息的例子,如何實現(xiàn)。本小節(jié)分同步阻塞、異步阻塞、spring觀察者模式三個方向探討。
- 同步阻塞方式的觀察模式
- 異步非阻塞方式的觀察者模式
- spring觀察者模式應(yīng)用
4.1 同步阻塞方式的觀察模式
我們可以把用戶注冊,當(dāng)做被觀察者實現(xiàn)的邏輯,然后發(fā)消息就是觀察者的實現(xiàn)邏輯。
假設(shè)有兩個觀察者,分 別是發(fā)QQ消息和手機消息,于是有以下代碼:
public interface RegisterObserver {
void sendMsg(String msg);
}
@Service
public class ObserverMobileImpl implements RegisterObserver {
@Override
public void sendMsg(String msg) {
System.out.println("發(fā)送手機短信消息"+msg);
}
}
@Service
public class ObserverQQImpl implements RegisterObserver {
@Override
public void sendMsg(String msg) {
System.out.println("發(fā)送QQ消息"+msg);
}
}直接可以通過spring的ApplicationContextAware,初始化觀察者列表,然后用戶注冊成功,通知觀察者即可。代碼如下:
@RestController
public class UserController implements ApplicationContextAware{
@Autowired
private UserService userService;
//觀察者列表
private CollectionregObservers;
@RequestMapping("register")
public String register(UserParam userParam) {
//注冊成功過(類似于被觀察者,做了某件事)
userService.addUser(userParam);
//然后就開始通知各個觀察者。
for(RegisterObserver temp:regObservers){
temp.sendMsg("注冊成功");
}
return "SUCCESS";
}
//利用spring的ApplicationContextAware,初始化所有觀察者
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
regObservers = new ArrayList<>(applicationContext.getBeansOfType(RegisterObserver.class).values());
}
}
可以發(fā)現(xiàn),觀察者模式,就是將不同的行為代碼解耦,也就是說將觀察者和被觀察者代碼解耦。但是這里大家會發(fā)現(xiàn),這是同步阻塞式的觀察者模式,是有缺點的,比如發(fā)QQ消息異常,就會影響用戶注冊,或者發(fā)消息因為某些原因耗時,就影響了用戶注冊,所以可以考慮異步非阻塞的觀察者模式。
4.2 異步非阻塞方式的觀察者模式
如何實現(xiàn)異步非阻塞,最簡單就是另開個線程嘛,即新開個線程或者線程池異步跑觀察者通知。代碼如下:
@RestController
public class UserController implements ApplicationContextAware{
@Autowired
private UserService userService;
private CollectionregObservers;
private Executor executor = Executors.newFixedThreadPool(10);
@RequestMapping("register")
public String register(UserParam userParam) {
userService.addUser(userParam);
//異步通知每個觀察者
for (RegisterObserver temp : regObservers) {
executor.execute(() -> {
temp.sendMsg("注冊成功");
});
}
return "SUCCESS";
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
regObservers = new ArrayList<>(applicationContext.getBeansOfType(RegisterObserver.class).values());
}
}
線程池實現(xiàn)的異步非阻塞方式,還是可以的,但是異步執(zhí)行邏輯都耦合在了register()函數(shù)中,不是很優(yōu)雅,也增加了這部分業(yè)務(wù)代碼的維護成本。一般日常工作中,我們會用spring那一套觀察者模式等。
4.3 spring觀察者模式應(yīng)用
spring的觀察者模式使用也是比較簡單的,就是先定義個事件,繼承于ApplicationEvent:
public class MessageEvent extends ApplicationEvent {
public MessageEvent(Object source) {
super(source);
}
}然后定義一個事件監(jiān)聽器MessageListener,類似于觀察者,它實現(xiàn)ApplicationListener接口。
@Component
public class MessageListener implements ApplicationListener{
@Override
public void onApplicationEvent(MessageEvent messageEvent) {
System.out.println("用戶注冊成功,執(zhí)行監(jiān)聽事件"+messageEvent.getSource());
}
}
用戶注冊成功后,applicationEventPublisher(類似于被觀察者)發(fā)布事件即可,代碼如下:
@RestController
public class UserController implements ApplicationContextAware{
@Autowired
private UserService userService;
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@RequestMapping("springListenRegister")
public String springListenRegister(UserParam userParam) {
System.out.println("開始注冊");
userService.addUser(userParam);
//用戶注冊成功,發(fā)布事件
applicationEventPublisher.publishEvent(new MessageEvent("666"));
return "SUCCESS";
}
運行結(jié)果:
開始注冊
用戶注冊成功,執(zhí)行監(jiān)聽事件666
這個也是同步阻塞的方式實現(xiàn)的,等下下個小節(jié)先介紹完spring觀察者模式的原理,田螺哥再來教大家如何抽取一個通用的異步非阻塞觀察者模式哈。
5. Spring觀察者模式原理
Spring 中實現(xiàn)的觀察者模式包含三部分:分別是Event事件(相當(dāng)于消息)、Listener監(jiān)聽者(相當(dāng)于觀察者)、Publisher發(fā)送者(相當(dāng)于被觀察者)。用個圖表示就是這樣:
這個ApplicationEvent是放到哪里的,監(jiān)聽者AppliactionListener是如何監(jiān)聽到的。接下來,我們來看下spring框架的觀察者原理是怎樣哈。
我們先來看下ApplicationEventPublisher源代碼(被觀察者/發(fā)布者)
@FunctionalInterface
public interface ApplicationEventPublisher {
default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}
void publishEvent(Object event);
}
ApplicationEventPublisher它只是一個函數(shù)式接口,我們再看下它接口方法的實現(xiàn)。它的具體實現(xiàn)類是AbstractApplicationContext,這個類代碼有點多,我把關(guān)鍵部分代碼貼出來了:
public abstract class AbstractApplicationContext extends ... {
//監(jiān)聽者(觀察者列表)
private final Set> applicationListeners;
//構(gòu)造器,初始化觀察者列表
public AbstractApplicationContext() {
this.applicationListeners = new LinkedHashSet();
//...
}
//發(fā)布事件
public void publishEvent(ApplicationEvent event) {
this.publishEvent(event, (ResolvableType)null);
}
public void publishEvent(Object event) {
this.publishEvent(event, (ResolvableType)null);
}
//發(fā)布事件接口實現(xiàn)
protected void publishEvent(Object event, ResolvableType eventType) {
//...
Object applicationEvent;
if (event instanceof ApplicationEvent) {
//如果event是ApplicationEvent對象,或者是它的子類
applicationEvent = (ApplicationEvent)event;
} else {
// 如果不是ApplicationEvent對象或者它的子類,則將其包裝成PayloadApplicationEvent事件,并獲取對應(yīng)的事件類型
applicationEvent = new PayloadApplicationEvent(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType();
}
}
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
} else {
//真正的消息發(fā)送,是通過它。獲取ApplicationEventMulticaster,調(diào)用multicastEvent方法廣播事件
this.getApplicationEventMulticaster().multicastEvent(
(ApplicationEvent)applicationEvent, eventType);
}
//如果當(dāng)前命名空間還有父親節(jié)點,也需要給父親推送該消息
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext)this.parent).publishEvent(event, eventType);
} else {
this.parent.publishEvent(event);
}
}
}
//添加觀察者(監(jiān)聽者)
public void addApplicationListener(ApplicationListener> listener) {
Assert.notNull(listener, "ApplicationListener must not be null");
if (this.applicationEventMulticaster != null) {
this.applicationEventMulticaster.addApplicationListener(listener);
} else {
this.applicationListeners.add(listener);
}
}
//觀察者列表
public Collection> getApplicationListeners() {
return this.applicationListeners;
}
// 注冊監(jiān)聽器
protected void registerListeners() {
//把提前存儲好的監(jiān)聽器添加到監(jiān)聽器容器中到ApplicationEventMulticaster
for (ApplicationListener> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
//獲取類型是ApplicationListener的beanName集合,此處不會去實例化bean
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
Set earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
//如果存在earlyEventsToProcess,提前處理這些事件
if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
} 通過以上代碼,我們可以發(fā)現(xiàn),真正的消息發(fā)送,實際上是通過事件廣播器ApplicationEventMulticaster 這個接口來完成的。multicastEvent是主要方法,這個方法的實現(xiàn)在類SimpleApplicationEventMulticaster中,我們一起來看下源碼:
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
...
//線程池
@Nullable
protected Executor getTaskExecutor() {
return this.taskExecutor;
}
public void setTaskExecutor(@Nullable Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 根據(jù)event類型獲取適合的監(jiān)聽器
Executor executor = getTaskExecutor();
for (ApplicationListener> listener : getApplicationListeners(event, type)) {
if (executor != null) {
//如果executor不為空,異步調(diào)用執(zhí)行監(jiān)聽器中的方法
executor.execute(() -> invokeListener(listener, event));
}
else {
//調(diào)用監(jiān)聽器的方法
invokeListener(listener, event);
}
}
}
protected void invokeListener(ApplicationListener> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
//如果存在ErrorHandler,調(diào)用監(jiān)聽器方法(會用try...catch包一下)
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
//如果拋出異常則調(diào)用ErrorHandler來處理異常。
errorHandler.handleError(err);
}
}
else {
否則直接調(diào)用監(jiān)聽器方法
doInvokeListener(listener, event);
}
}
...
}可以發(fā)現(xiàn),默認情況下,spring實現(xiàn)的觀察者模式,同步阻塞的。如果想異步執(zhí)行事件,可以自定義SimpleApplicationEventMulticaster,然后構(gòu)造一下executor線程池就好啦。代碼如下:
/**
* 公眾號:撿田螺的小男孩
*/
@Component
public class ListenerConfig {
//把線程池賦值進去
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(simpleAsyncTaskExecutor());
return simpleApplicationEventMulticaster;
}
@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
return new SimpleAsyncTaskExecutor();
}
}
demo跑一下,運行結(jié)果:
注冊開始
當(dāng)前線程名稱http-nio-8080-exec-1
注冊結(jié)束
用戶注冊成功2,執(zhí)行監(jiān)聽事件666Sat Jun 18 11:44:07 GMT+08:00 2022
當(dāng)前線程名稱:SimpleAsyncTaskExecutor-20
當(dāng)前線程名稱:SimpleAsyncTaskExecutor-19
用戶注冊成功,執(zhí)行監(jiān)聽事件666Sat Jun 18 11:44:12 GMT+08:00 2022
如果手動新建SimpleApplicationEventMulticaster,并設(shè)置taskExecutor的話,所有的監(jiān)聽響應(yīng)事件,都是異步執(zhí)行的哦。而有些有些場景我們希望同步執(zhí)行的,這時候這種實現(xiàn)方式就不好了。
其實spring提供了@Async注解,可以用來實現(xiàn)異步。具體怎么實現(xiàn)呢?其實很簡單,只需要在配置類加上@EnableAsync,接著在需要異步執(zhí)行的監(jiān)聽實現(xiàn)方法。加上@Async即可。代碼實現(xiàn)如下:
/**
* 關(guān)注公眾號:撿田螺的小男孩
* 更多實戰(zhàn)干貨
*/
@Component
@EnableAsync //配置類加上```@EnableAsync```
public class ListenerConfig2 {
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
return simpleApplicationEventMulticaster;
}
}
@Component
public class MessageAsyncListener3 implements ApplicationListener{
@Async //方法異步注解
@Override
public void onApplicationEvent(MessageEvent messageEvent) {
System.out.println("用戶注冊成功3,執(zhí)行監(jiān)聽事件" + messageEvent.getSource() + new Date());
System.out.println("當(dāng)前線程名稱:"+Thread.currentThread().getName());
}
}
日常開發(fā)中,異步執(zhí)行也可以自己手動通過線程池來開啟啦?;氐轿覀儽疚牡暮蠖怂季S主題,如果每個開發(fā),都自己定義觀察者模式的實現(xiàn),這種代碼會很混亂,所以最好是實現(xiàn)一個可擴展,通用的觀察者模板。
6. 基于spring觀察者模式,抽取一個模板
接下來的最后小節(jié),跟大家一起基于spring的觀察者模式,一步一步實現(xiàn)并抽取個模板哈。
我們要基于spring實現(xiàn)觀察者模式的話,就包括這三步:
定義Event事件(相當(dāng)于消息),一般定義一個Event對象,繼承ApplicationEvent。
定義Listener監(jiān)聽者(相當(dāng)于觀察者),實現(xiàn)接口ApplicationListener。
Publisher發(fā)送者(相當(dāng)于被觀察者),通過ApplicationEventPublisher發(fā)布。
6.1 定義Event事件對象
既然我們要抽取觀察者模板,那肯定不是每個人自己寫自己的Event,然后都去繼承ApplicationEvent。
我們可以自己定義一個項目相關(guān)的,通用的BaseEvent類,然后一些相關(guān)通用的信息屬性可以放進去,比如eventId或者流水號bizSeq什么的,都可以,看你們項目需要哈。以下代碼,我定義一個空空如也的BaseEvent。
/**
* 關(guān)注公眾號:撿田螺的小男孩
* 更多實戰(zhàn)干貨
* @desc : 事件基礎(chǔ)對象
*/
public class BaseEvent extends ApplicationEvent {
public BaseEvent(Object source) {
super(source);
}
public BaseEvent() {
this("");
}
}
如果你的觀察者模式,是注冊成功之后,發(fā)個消息的,你就可以聲明一個消息類事件對象RegisterMessageEvent,繼承通用的BaseEvent即可。然后屬性可以自定義就好,比如messageId。
public class RegisterMessageEvent extends BaseEvent{
private String msgId;
public RegisterMessageEvent(String msgId) {
super();
this.msgId = msgId;
}
public String getMsgId() {
return msgId;
分享名稱:后端思維篇:如何抽取一個觀察者模板
URL網(wǎng)址:http://fisionsoft.com.cn/article/cdcegeh.html


咨詢
建站咨詢
