Optical
Aberration

Chat client using MQTT NIO

I recently wrote a MQTT client with Swift NIO. You can find it here. This article covers how to create a simple chat client using said library.

For those who don't know MQTT (Message Queuing Telemetry Transport) is a lightweight publish/subscribe messaging protocol used extensively in IoT (Internet of Things) technologies. The MQTT protocol defines two types of network entities: a message broker and a client. The broker receives all the messages from the clients and then routes these to the appropriate destination clients based on whether they are subscribed to the relevant message topic. The protocol is lightweight making it ideal for IoT devices which don't necessarily have a great deal of processing power.

Swift NIO is a low level event driven networking library written by Apple. It is described as being like Netty but written for Swift. I have had a fair amount of experience using Swift NIO with my work on Soto but I have never implemented a client for a specific protocol with it. Given this I thought I would use this as an excuse to learn a little more about both Swift NIO and MQTT. The result was MQTT NIO

Concept

We'll keep this fairly simple. We are going to create a console application that will allow you to connect to a MQTT server and particular topic, with a username. You will be able to send messages to a topic and receive all other messages published to the same topic. Originally I wrote an article detailing all the code in the chat client but ditched that in favour of something that concentrates just on the MQTT elements of client. You can see full source for the client here.

Client initialization

The MQTT client is setup with the default configuration but could be configured to connect to a websocket or TLS connection if required. You need an identifier for your client. If you connect with an identifier already used, the broker will disconnect the other client using that identifier. We prefix the identifier with "MQTTNIOChat-" to avoid clashing with other systems using the MQTT broker.

self.mqttClient = MQTTClient(
    host: command.servername,
    port: command.port,
    identifier: "MQTTNIOChat-\(command.username)",
    eventLoopGroupProvider: .createNew,
    configuration: .init()
)

Connection

The first thing we need to do is connect to the MQTT broker. In the sample I use a locally running broker but you could use your own, or a public broker. Please note if you use a public broker all your messages will visible to the rest of the world.

func setup() -> EventLoopFuture<Void> {
    // connect, subscribe and publish joined message
    self.mqttClient.connect(cleanSession: true).flatMap { hasSession -> EventLoopFuture<Void> in
        let subscription = MQTTSubscribeInfo(topicFilter: self.topicName, qos: .exactlyOnce)
        return self.mqttClient.subscribe(to: [subscription])
    }
    .flatMap { _ in
        self.addListeners()
        return self.sendMessage("Joined!")
    }
}

In mqttClient.connect we set cleanSession to true. This means if there is already an existing session for this user (identifier) then we will ditch it. The existing session will include subscriptions to topics we are maybe no longer interested in. We then subscribe to the topic we are interested in. In the subscription we set qos to .exactlyOnce to ensure we only receive published messages once. If we set it to .atLeastOnce we could receive messages twice if there are connection issues. We would need to keep a record of previously received messages to avoid outputting them twice. If we set it to .atMostOnce we could miss messages if there are connection issues.

Receiving messages

Once the subscription has gone through we call addListeners(). This adds a listener waiting for PUBLISH events from the broker. For every PUBLISH event the callback is called.

func addListeners() {
    self.mqttClient.addPublishListener(named: "ListenForChat") { result in
        switch result {
        case .failure:
            self.closePromise.fail(error)
        case .success(let publishInfo):
            if publishInfo.topicName == self.topicName {
                receiveMessage(publishInfo.payload)
            }
        }
    }
}

If the listener calls the callback with an error we fail the closePromise EventLoopPromise setup in the chat client object. If we receive a message packet and the message topic is equal to the topic we are subscribed to then we call the receiveMessage function. This parses the message packet (it is formatting in JSON), and as long as you didn't send it, outputs it to the console.

Sending messages

The main loop of the client reads from stdin using readLine and then creates a message packet and uses MQTTClient.publish to publish the packet to the MQTT broker. The sendMessage function is as follows.

func sendMessage(_ message: String) -> EventLoopFuture<Void> {
    let packet = ChatPacket(from: command.username, message: message)
    var buffer = ByteBufferAllocator().buffer(capacity: 0)
    try? JSONEncoder().encode(packet, into: &buffer)
    return self.mqttClient.publish(to: self.topicName, payload: buffer, qos: .exactlyOnce)
}

Construct ChatPacket, encode into JSON formatted ByteBuffer and then publish the ByteBuffer to the topic.

Errors

I run the mainloop on the global default system queue using DispatchQueue and leave the main thread waiting on a result from the closePromise. If there is an error the client will exit as the closePromise is fulfilled with an error. We can also catch a disconnection from the broker using a closeListener which fails the closePromise.

func addListeners() {
    ...
    self.mqttClient.addCloseListener(named: "CheckForClose") { result in
        self.closePromise.fail(MQTTChatClientError(message: "Lost connection"))
    }
}

Extending

This is a really simple example of how MQTT can be used. I probably spent more time trying to get the ANSI escape sequences working (for overwriting the prompt with new messages) than the chat functionality. It has no real user management or access control. You would need additional services for that. But it gives you a quick idea what an MQTT broker can be used for. Please go to https://github.com/adam-fowler/mqtt-nio-chat and checkout the full running example.