Thursday, August 27, 2015

MQTT retain message flow in WSO2 MB





This article will explain basics of retain feature as it's defined in MQTT 3.1specification[1], Simple use case of retain feature and high level architecture of how MQTT retain messages handled in WSO2 message broker 3.0.0[2].

What is MQTT retain ?


MQTT retain is a method to keep certain messages (retain flagged) within broker so future subscribers for same topic can receive these messages. According to MQTT 3.1 specification[1] following are the main attributes of retain feature in MQTT.

  • Publishing client decides if a particular message should kept in broker for future subscribers. (set retain flag upon publishing the message)
  • When a new subscriber subscribed for given topic if there is a retained message for that topic it should be delivered to subscriber upon subscription.
  • Retain message should remove from broker if an empty payload received for a given topic with retain flag set to true.

All retain messages will honor QOS and other basic MQTT rules defined in specification[1].


Simple use-case for MQTT retain


MQTT is a light weight messaging protocol which mainly focused on IoT (Internet of things) developments. There can be instances where network connections or even sensors itself not available in practical scenarios. By using MQTT retain feature it's possible to keep a last known good value for future subscribers upon subscribing.

For example, Smart temperature monitor can check if the temperature outside operational range and send warnings with retain flag enabled (temporary warning message). Any newly joined equipment (subscriber) can take precautions to handle the situation even if the  temperature monitor is offline/ broken at the time of subscribing since it'll receive the retained warning message.

Once parameters are within desired range it can remove the retain enabled message from broker (Remove the warning) by sending empty payload message with retain enabled. This will remove the retain message and future subscribers won't receive it.


High level architecture of MQTT retain implementation in WSO2 MB 3.0.0


There are two paths in retain feature.  Namely there are as follows.
  • Retain Publish Path (Message flow path when MQTT message received with retain flag set by the broker)
  • Retain Delivery Path (Message flow path where retain message delivered to MQTT topic subscriber upon subscription)

Retain Publish Path


Following diagram shows start to end message flow path when a retain message hit on broker. 



Retain Publish Path

When a MQTT message received to the broker retain state will pass through disruptor before processing it.

In PersistenceStoreConnector MQTT message metadata will be converted to andes message metadata and retain state will be preserved in andes metadata for further processing.

Inside disruptor, onUpdateEvent retain message will put to the event list. Message writer will write to data stores once message writer event triggered.

After onUpdateEvent triggered messageWriter Event will be triggered. This event will call MessagingEngine to write retained messages to datasource.

MessagingEngine will call store retain messages methods in MessageStore interface and call relevant data source implementation to store retain messages. Unlike normal MQTT/AMQP messages, retain messages will be stored on separate retain specific table on MB message store. (MB_RETAINED_CONTENT, MB_RETAINED_METADATA)


Retain Delivery Path


Following diagram shows start to end message delivery path when a subscriber subscribe to a topic and that particular topic has retained messages stored on broker side.


Retain Delivery Path



Retain delivery path triggers when a MQTT subscriber subscribed to a topic. On state change event updateState() in AndesInboundStateEvent will call handleOpenSubscriptionEvent().
OpenSubscriptionEvent() will check if subscription instance is a MQTT and if it's MQTT check for retained messages for subscribed topic.

Retained message metadata will be retrieved(if there's any) from messagingEngine by calling getRetainedMessageByTopic method. This method will check if there's any wildcard matches[1] for subscribed topic as well.

If there's any retained message metadata found for subscribed topic contents of that message will send to the subscriber directly.

This concludes life cycle of a retained message once subscriber receives the message.



References

[1] http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf
[2] http://wso2.com/products/message-broker/