MQTT on Pulsar (MoP)
MQTT-on-Pulsar (aka MoP) was developed to support MQTT protocol natively on Apache Pulsar.
Get started
Download or build MoP protocol handler
- clone this project from GitHub to your local.
git clone https://github.com/Macrometacorp/c8streams-mop.git
cd c8streams-mop
- build the project.
mvn clean install -DskipTests
- the nar file can be found at this location.
./mqtt-impl/target/mop-${version}.nar
- deploy the built artifacts (nar/pom) to Maven repo
./mqtt-impl/mvn clean deploy
Install MoP protocol handler
All what you need to do is to configure the Pulsar broker to run the Mop protocol handler as a plugin, that is, add configurations in Pulsar's configuration file, such as broker.conf or standalone.conf.
-
Set the configuration of the MoP protocol handler.
Add the following properties and set their values in Pulsar configuration file, such as
conf/broker.conforconf/standalone.conf.Property Set it to the following value Default value messagingProtocolsmqtt null protocolHandlerDirectoryLocation of MoP NAR file ./protocols Example
messagingProtocols=mqtt protocolHandlerDirectory=./protocols -
Set MQTT server listeners.
Note
The hostname in listeners should be the same as Pulsar broker's
advertisedAddress.Example
mqttListeners=mqtt://127.0.0.1:1883 advertisedAddress=127.0.0.1
Restart Pulsar brokers to load MoP
After you have installed the MoP protocol handler to Pulsar broker, you can restart the Pulsar brokers to load MoP.
How to use Proxy
To use proxy, complete the following steps. For detailed steps, refer to Deploy a cluster on bare metal.
-
Prepare a ZooKeeper cluster.
-
Initialize the cluster metadata.
-
Prepare a BookKeeper cluster.
-
Copy the
pulsar-protocol-handler-mqtt-${version}.narto the$PULSAR_HOME/protocolsdirectory. -
Start broker.
broker config
messagingProtocols=mqtt protocolHandlerDirectory=./protocols brokerServicePort=6651 mqttListeners=mqtt://127.0.0.1:1883 advertisedAddress=127.0.0.1 mqttProxyEnable=true mqttProxyPort=5682
Verify MoP
There are many MQTT client can be used to verify MoP such as http://workswithweb.com/mqttbox.html, https://www.hivemq.com/mqtt-toolbox. You can choose a cli tool or interface tool to verify the MoP.
Verify with fusesource mqtt-client
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.16</version>
</dependency>
Publish messages and consume messages:
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = { new Topic("persistent://public/default/my-topic", QoS.AT_LEAST_ONCE) };
connection.subscribe(topics);
// publish message
connection.publish("persistent://public/default/my-topic", "Hello MOP!".getBytes(), QoS.AT_LEAST_ONCE, false);
// receive message
Message received = connection.receive();