mqtt-broker
A production-grade MQTT 3.1.1 broker implementation in Rust.
Features
- π High Performance β Async I/O with Tokio, handles 10,000+ concurrent connections
- π‘ Full MQTT 3.1.1 β All 14 packet types with complete protocol compliance
- π― QoS 0/1/2 β Complete delivery guarantees with proper state machines
- π³ Topic Wildcards β O(k) trie-based matching for
+and#wildcards - πΎ Retained Messages β Automatic delivery to new subscribers
- π Session Persistence β Clean and persistent session support
- β±οΈ Keep-Alive β Automatic connection timeout detection
- π‘οΈ Production Ready β Comprehensive test suite with 176+ tests
What is MQTT?
MQTT (Message Queuing Telemetry Transport) is a lightweight publish-subscribe messaging protocol designed for constrained devices and low-bandwidth, high-latency networks. Itβs widely used in:
- IoT (Internet of Things) β Sensor data collection, device control
- Home Automation β Smart home devices, lighting, thermostats
- Mobile Applications β Push notifications, real-time messaging
- Industrial Systems β SCADA, telemetry, monitoring
Quick Example
# Terminal 1: Start the broker
mqtt-broker
# Terminal 2: Subscribe to temperature readings
mqtt-consumer "sensors/+/temperature"
# Terminal 3: Publish a temperature reading
mqtt-producer "sensors/living-room/temperature" "23.5Β°C"
Next Steps
- Installation β Get mqtt-broker running
- Quick Start β Your first pub/sub in 5 minutes
- MQTT Concepts β Learn the protocol basics
Installation
Requirements
- Rust 1.70+ β Install Rust
- Git β For cloning the repository
From Source (Recommended)
# Clone the repository
git clone https://github.com/iliasichinava/mqtt-broker.git
cd mqtt-broker
# Build in release mode
cargo build --release
# The binaries are now in target/release/
ls target/release/mqtt-*
The following binaries will be built:
| Binary | Description |
|---|---|
mqtt-broker | The MQTT broker server |
mqtt-producer | CLI tool to publish messages |
mqtt-consumer | CLI tool to subscribe to topics |
Install Globally
To install the binaries to your Cargo bin directory (~/.cargo/bin/):
cargo install --path .
Now you can run the commands from anywhere:
mqtt-broker --help
mqtt-producer --help
mqtt-consumer --help
From GitHub Directly
Install directly from the repository without cloning:
cargo install --git https://github.com/iliasichinava/mqtt-broker.git
Verify Installation
# Check broker version/help
mqtt-broker --help
# Expected output:
# MQTT Broker - Production-grade MQTT 3.1.1 broker
#
# Usage: mqtt-broker [OPTIONS]
# ...
Next Steps
Now that you have mqtt-broker installed, head to the Quick Start guide to run your first pub/sub workflow.
Quick Start
This guide will get you up and running with mqtt-broker in 5 minutes.
Step 1: Start the Broker
Open a terminal and start the broker:
mqtt-broker
You should see:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MQTT Broker v0.1.0 Starting β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£
β Protocol: MQTT 3.1.1 β
β Address: 0.0.0.0:1883 β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The broker is now listening for connections on port 1883.
Step 2: Subscribe to a Topic
Open a second terminal and subscribe to a topic:
mqtt-consumer "hello/world"
Output:
Connecting to 127.0.0.1:1883...
Connected as 'mqtt-consumer-12345'
Subscribed to 1 topic(s):
hello/world -> SuccessQoS0
Waiting for messages... (Press Ctrl+C to exit)
The consumer is now waiting for messages on the hello/world topic.
Step 3: Publish a Message
Open a third terminal and publish a message:
mqtt-producer "hello/world" "Hello, MQTT!"
Output:
Connecting to 127.0.0.1:1883...
Connected as 'mqtt-producer-12346'
Published to 'hello/world': Hello, MQTT!
Done.
Step 4: See the Message
Back in the consumer terminal, you should see:
[1] Topic: hello/world | QoS: AtMostOnce | Retain: false
Payload: Hello, MQTT!
π Congratulations! Youβve just sent your first MQTT message.
Using Wildcards
MQTT supports powerful topic wildcards:
Single-Level Wildcard (+)
Subscribe to all roomsβ temperature:
mqtt-consumer "home/+/temperature"
This matches:
home/kitchen/temperatureβhome/bedroom/temperatureβhome/kitchen/humidityβ
Multi-Level Wildcard (#)
Subscribe to everything under home/:
mqtt-consumer "home/#"
This matches:
home/kitchen/temperatureβhome/bedroom/humidityβhome/garage/door/statusβ
Using QoS Levels
Specify Quality of Service for reliable delivery:
# QoS 1 - At least once delivery
mqtt-producer "sensors/critical" "alert!" --qos 1
# QoS 2 - Exactly once delivery
mqtt-producer "transactions/payment" "confirmed" --qos 2
Retained Messages
Retain the last message on a topic:
# Publish with retain flag
mqtt-producer "device/status" "online" --retain
# New subscribers will immediately receive "online"
mqtt-consumer "device/status"
Next Steps
- Configuration β Customize broker settings
- Topics & Wildcards β Deep dive into topic patterns
- Quality of Service β Understanding QoS levels
Configuration
Broker Configuration
The broker can be configured via command-line arguments:
mqtt-broker [OPTIONS]
Available Options
| Option | Default | Description |
|---|---|---|
-h, --host <HOST> | 0.0.0.0 | Host address to bind to |
-p, --port <PORT> | 1883 | Port to listen on |
--max-clients <N> | 10000 | Maximum concurrent connections |
--help | β | Print help information |
Examples
# Listen on localhost only
mqtt-broker --host 127.0.0.1
# Use a different port
mqtt-broker --port 8883
# Limit connections
mqtt-broker --max-clients 1000
# Combine options
mqtt-broker --host 0.0.0.0 --port 1883 --max-clients 5000
Environment Variables
Enable debug logging with the RUST_LOG environment variable:
# Basic info logging
RUST_LOG=info mqtt-broker
# Debug logging
RUST_LOG=debug mqtt-broker
# Trace logging (very verbose)
RUST_LOG=trace mqtt-broker
# Module-specific logging
RUST_LOG=mqtt_broker::server=debug mqtt-broker
Client Configuration
Producer Options
mqtt-producer [OPTIONS] <TOPIC> <MESSAGE>
| Option | Default | Description |
|---|---|---|
-h, --host <HOST> | 127.0.0.1 | Broker host address |
-p, --port <PORT> | 1883 | Broker port |
-q, --qos <QOS> | 0 | QoS level (0, 1, 2) |
-r, --retain | false | Retain the message |
-c, --client <ID> | auto | Client ID |
Consumer Options
mqtt-consumer [OPTIONS] <TOPIC>...
| Option | Default | Description |
|---|---|---|
-h, --host <HOST> | 127.0.0.1 | Broker host address |
-p, --port <PORT> | 1883 | Broker port |
-q, --qos <QOS> | 0 | Maximum QoS level |
-c, --client <ID> | auto | Client ID |
-n, --count <N> | unlimited | Exit after N messages |
Default Values
The broker uses sensible defaults for production:
| Setting | Default | Notes |
|---|---|---|
| Max packet size | 256 KB | Per MQTT 3.1.1 spec |
| Connect timeout | 10 seconds | Time to receive CONNECT |
| Keep-alive multiplier | 1.5x | Per MQTT spec |
| Max inflight messages | 65535 | Per QoS 1/2 flows |
| Offline queue size | 1000 | Messages per session |
| Session expiry | 24 hours | For persistent sessions |
Planned Configuration
Future versions will support:
- Configuration file (TOML/YAML)
- TLS/SSL settings
- Authentication backends
- Access control lists (ACL)
- Clustering configuration
mqtt-broker
The MQTT broker server.
Usage
mqtt-broker [OPTIONS]
Options
| Option | Short | Default | Description |
|---|---|---|---|
--host <HOST> | -h | 0.0.0.0 | Host address to bind to |
--port <PORT> | -p | 1883 | Port to listen on |
--max-clients <N> | β | 10000 | Maximum concurrent connections |
--help | β | β | Print help information |
Examples
Start with defaults
mqtt-broker
Listens on 0.0.0.0:1883 (all interfaces, standard MQTT port).
Listen on localhost only
mqtt-broker --host 127.0.0.1
Only accepts connections from the local machine.
Custom port
mqtt-broker --port 8883
Useful when running multiple brokers or avoiding privileged ports.
Production settings
mqtt-broker --host 0.0.0.0 --port 1883 --max-clients 50000
With logging
RUST_LOG=info mqtt-broker
Log levels: error, warn, info, debug, trace
Signals
| Signal | Action |
|---|---|
SIGINT (Ctrl+C) | Graceful shutdown |
SIGTERM | Graceful shutdown |
During graceful shutdown, the broker:
- Stops accepting new connections
- Sends DISCONNECT to all clients
- Waits for clients to disconnect (up to 30 seconds)
- Exits cleanly
Output
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MQTT Broker v0.1.0 Starting β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£
β Protocol: MQTT 3.1.1 β
β Address: 0.0.0.0:1883 β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2026-01-31T12:00:00Z INFO mqtt_broker::server: Client connected: sensor-01 (192.168.1.100:54321)
2026-01-31T12:00:05Z INFO mqtt_broker::server: Client connected: dashboard (192.168.1.101:54322)
2026-01-31T12:00:10Z INFO mqtt_broker::server: Client disconnected: sensor-01
Exit Codes
| Code | Meaning |
|---|---|
0 | Clean shutdown |
1 | Error (binding failed, invalid arguments, etc.) |
mqtt-producer
Command-line tool for publishing MQTT messages.
Usage
mqtt-producer [OPTIONS] <TOPIC> <MESSAGE>
Arguments
| Argument | Description |
|---|---|
<TOPIC> | Topic to publish to |
<MESSAGE> | Message payload (text) |
Options
| Option | Short | Default | Description |
|---|---|---|---|
--host <HOST> | -h | 127.0.0.1 | Broker host address |
--port <PORT> | -p | 1883 | Broker port |
--qos <QOS> | -q | 0 | Quality of Service (0, 1, 2) |
--retain | -r | false | Retain message on broker |
--client <ID> | -c | auto | Client ID |
--help | β | β | Print help |
Examples
Simple publish
mqtt-producer "sensors/temperature" "23.5"
Publish with QoS 1
mqtt-producer --qos 1 "sensors/humidity" "65%"
Waits for PUBACK confirmation from broker.
Publish with QoS 2
mqtt-producer --qos 2 "payments/transaction" '{"id": "tx-123", "amount": 99.99}'
Exactly-once delivery with full 4-way handshake.
Retained message
mqtt-producer --retain "device/status" "online"
New subscribers immediately receive this message.
Custom broker
mqtt-producer -h 192.168.1.100 -p 1883 "home/lights" "on"
With specific client ID
mqtt-producer --client "sensor-gateway-01" "sensors/batch" '{"temp": 23, "humidity": 65}'
JSON payload
mqtt-producer "events/user" '{"event": "login", "user": "alice", "timestamp": 1706745600}'
Multi-word message
mqtt-producer "notifications/alert" "Temperature exceeds threshold"
Output
Successful publish (QoS 0)
Connecting to 127.0.0.1:1883...
Connected as 'mqtt-producer-12345'
Published to 'sensors/temperature': 23.5
Done.
Successful publish (QoS 1)
Connecting to 127.0.0.1:1883...
Connected as 'mqtt-producer-12345'
Published to 'sensors/temperature': 23.5
Message acknowledged (QoS 1)
Done.
Successful publish (QoS 2)
Connecting to 127.0.0.1:1883...
Connected as 'mqtt-producer-12345'
Published to 'sensors/temperature': 23.5
Message delivered exactly once (QoS 2)
Done.
Connection failure
Connecting to 127.0.0.1:1883...
Failed to connect: Connection refused (os error 61)
Exit Codes
| Code | Meaning |
|---|---|
0 | Message published successfully |
1 | Error (connection failed, invalid arguments, timeout) |
Tips
- Use single quotes for JSON payloads to avoid shell escaping issues
- Retained messages persist until replaced or cleared with empty payload
- QoS 2 has higher latency but guarantees exactly-once delivery
mqtt-consumer
Command-line tool for subscribing to MQTT topics.
Usage
mqtt-consumer [OPTIONS] <TOPIC>...
Arguments
| Argument | Description |
|---|---|
<TOPIC>... | One or more topic filters to subscribe to |
Topic filters support wildcards:
+β Single-level wildcard#β Multi-level wildcard
Options
| Option | Short | Default | Description |
|---|---|---|---|
--host <HOST> | -h | 127.0.0.1 | Broker host address |
--port <PORT> | -p | 1883 | Broker port |
--qos <QOS> | -q | 0 | Maximum QoS level |
--client <ID> | -c | auto | Client ID |
--count <N> | -n | unlimited | Exit after N messages |
--help | β | β | Print help |
Examples
Subscribe to a specific topic
mqtt-consumer "sensors/temperature"
Subscribe with wildcards
# All topics under sensors/
mqtt-consumer "sensors/#"
# All temperature readings
mqtt-consumer "sensors/+/temperature"
# Multiple patterns
mqtt-consumer "sensors/#" "alerts/#"
Limit message count
# Exit after 10 messages
mqtt-consumer -n 10 "sensors/#"
With QoS 1
mqtt-consumer --qos 1 "critical/#"
Custom broker
mqtt-consumer -h 192.168.1.100 -p 1883 "home/#"
With specific client ID
mqtt-consumer --client "dashboard-main" "telemetry/#"
Output
Subscription confirmation
Connecting to 127.0.0.1:1883...
Connected as 'mqtt-consumer-12345'
Subscribed to 2 topic(s):
sensors/# -> SuccessQoS0
alerts/# -> SuccessQoS0
Waiting for messages... (Press Ctrl+C to exit)
Receiving messages
[1] Topic: sensors/living-room/temperature | QoS: AtMostOnce | Retain: false
Payload: 23.5
[2] Topic: sensors/kitchen/humidity | QoS: AtMostOnce | Retain: false
Payload: 65%
[3] Topic: alerts/fire | QoS: AtLeastOnce | Retain: false
Payload: Smoke detected in garage
Retained message
[1] Topic: device/status | QoS: AtMostOnce | Retain: true
Payload: online
The Retain: true indicates this message was stored on the broker.
With message limit
[1] Topic: sensors/temp | QoS: AtMostOnce | Retain: false
Payload: 23.5
...
[10] Topic: sensors/temp | QoS: AtMostOnce | Retain: false
Payload: 24.1
Received 10 message(s), exiting.
Total messages received: 10
Disconnection
^C
Received Ctrl+C, disconnecting...
Total messages received: 42
Wildcard Patterns
Single-level wildcard (+)
Matches exactly one topic level.
| Pattern | Matches | Doesnβt Match |
|---|---|---|
home/+/temperature | home/kitchen/temperature | home/temperature |
home/bedroom/temperature | home/floor1/kitchen/temperature | |
+/+/status | device/sensor/status | device/status |
Multi-level wildcard (#)
Matches zero or more topic levels. Must be last in the filter.
| Pattern | Matches |
|---|---|
home/# | home, home/kitchen, home/kitchen/temperature |
sensors/temperature/# | sensors/temperature, sensors/temperature/celsius |
# | Everything (all topics) |
Exit Codes
| Code | Meaning |
|---|---|
0 | Clean exit (Ctrl+C or message limit reached) |
1 | Error (connection failed, invalid arguments) |
Tips
- Use quotes around wildcards to prevent shell expansion:
"sensors/#" - The
#wildcard can only appear at the end of a filter - Subscribe to
#to see all broker traffic (debugging) - Use
-n 1to receive just one message and exit
MQTT Protocol
MQTT (Message Queuing Telemetry Transport) is a lightweight publish-subscribe messaging protocol designed for constrained devices and unreliable networks.
Version
This broker implements MQTT 3.1.1 (OASIS Standard, 2014).
Core Concepts
Publish-Subscribe Model
Unlike request-response protocols (HTTP), MQTT uses a publish-subscribe pattern:
ββββββββββββ ββββββββββ ββββββββββββ
β Publisherβ ββββββββ β Broker β ββββββββ βSubscriberβ
ββββββββββββ Pub ββββββββββ Sub ββββββββββββ
β β β
β PUBLISH β β
β topic: temp β β
β payload: 23.5 β β
β ββββββββββββββββββΊ β β
β β PUBLISH β
β β topic: temp β
β β payload: 23.5 β
β β ββββββββββββββββββΊ β
- Publishers send messages to topics
- Subscribers receive messages from topics theyβve subscribed to
- Broker routes messages between publishers and subscribers
Decoupling
MQTT provides three types of decoupling:
- Space β Publishers and subscribers donβt need to know each other
- Time β They donβt need to be online simultaneously (with persistence)
- Synchronization β Operations are asynchronous
Packet Types
MQTT defines 14 packet types:
| Type | Direction | Purpose |
|---|---|---|
| CONNECT | Client β Broker | Initiate connection |
| CONNACK | Broker β Client | Acknowledge connection |
| PUBLISH | Both | Publish a message |
| PUBACK | Both | QoS 1 acknowledgment |
| PUBREC | Both | QoS 2 received |
| PUBREL | Both | QoS 2 release |
| PUBCOMP | Both | QoS 2 complete |
| SUBSCRIBE | Client β Broker | Subscribe to topics |
| SUBACK | Broker β Client | Acknowledge subscription |
| UNSUBSCRIBE | Client β Broker | Unsubscribe from topics |
| UNSUBACK | Broker β Client | Acknowledge unsubscription |
| PINGREQ | Client β Broker | Keep-alive ping |
| PINGRESP | Broker β Client | Keep-alive response |
| DISCONNECT | Client β Broker | Clean disconnect |
Connection Lifecycle
Client Broker
β β
β CONNECT β
β - client_id: "sensor-01" β
β - clean_session: true β
β - keep_alive: 60 β
β βββββββββββββββββββββββββββββββββββββΊ β
β β
β CONNACK β
β - code: 0 β
β βββββββββββββββββββββββββββββββββββββ β
β β
β ... publish/subscribe ... β
β β
β PINGREQ β
β βββββββββββββββββββββββββββββββββββββΊ β
β PINGRESP β
β βββββββββββββββββββββββββββββββββββββ β
β β
β DISCONNECT β
β βββββββββββββββββββββββββββββββββββββΊ β
β β
Keep-Alive
The keep-alive mechanism ensures connections stay active:
- Client specifies keep-alive interval in CONNECT (e.g., 60 seconds)
- Client must send a packet within 1.5Γ the interval
- If no data to send, client sends PINGREQ
- Broker responds with PINGRESP
- If broker receives nothing within 1.5Γ interval, it closes connection
Wire Format
MQTT uses a compact binary format:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Fixed Header β
ββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββ€
β Packet Type (4) β Flags (4) β
ββββββββββββββββββββ΄βββββββββββββββββββββββββββββββββββ€
β Remaining Length (1-4 bytes, variable int) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Variable Header β
β (depends on packet type) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Payload β
β (depends on packet type) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The variable-length integer encoding allows:
- 1 byte for values 0-127
- 2 bytes for values up to 16,383
- 3 bytes for values up to 2,097,151
- 4 bytes for values up to 268,435,455 (256 MB max packet)
References
Topics & Wildcards
Topics are the addressing mechanism in MQTT. Publishers send messages to topics, and subscribers receive messages from topics theyβve subscribed to.
Topic Names
A topic name is the destination for a published message.
Rules
- UTF-8 encoded string
- At least 1 character
- Maximum 65,535 bytes
- Case-sensitive (
Sensor/Tempβsensor/temp) - Can contain spaces (but not recommended)
- Cannot contain wildcards (
+or#) - Cannot contain null character
Best Practices
β
Good topics:
home/kitchen/temperature
sensors/device-01/status
usa/california/sf/weather
β Avoid:
/leading/slash (creates empty first level)
trailing/slash/ (creates empty last level)
double//slash (creates empty middle level)
Spaces In Topic (harder to work with)
Topic Levels
Topics are hierarchical, separated by /:
home / kitchen / temperature
β β β
β β βββ Level 3
β βββββββββββββββ Level 2
βββββββββββββββββββββββ Level 1
Topic Filters
A topic filter is used when subscribing. It can include wildcards to match multiple topics.
Single-Level Wildcard (+)
Matches exactly one topic level.
# Subscribe
mqtt-consumer "home/+/temperature"
| Published Topic | Matches? |
|---|---|
home/kitchen/temperature | β Yes |
home/bedroom/temperature | β Yes |
home/temperature | β No (missing level) |
home/floor1/kitchen/temperature | β No (extra level) |
More examples:
| Filter | Matches |
|---|---|
+/tennis/# | sport/tennis/player1 |
sport/+/player1 | sport/tennis/player1, sport/golf/player1 |
+/+ | a/b, foo/bar (exactly 2 levels) |
Multi-Level Wildcard (#)
Matches zero or more topic levels. Must be the last character.
# Subscribe to everything under home/
mqtt-consumer "home/#"
| Published Topic | Matches? |
|---|---|
home | β Yes |
home/kitchen | β Yes |
home/kitchen/temperature | β Yes |
home/floor1/kitchen/oven/status | β Yes |
office/kitchen | β No |
More examples:
| Filter | Matches |
|---|---|
# | Everything (all topics) |
sport/# | sport, sport/tennis, sport/tennis/player1 |
sport/tennis/# | sport/tennis, sport/tennis/player1/ranking |
Combining Wildcards
You can use both wildcards in one filter:
mqtt-consumer "+/sensors/+/temperature/#"
Matches:
home/sensors/kitchen/temperatureoffice/sensors/room1/temperature/celsius
System Topics
Topics starting with $ are reserved for broker internals:
$SYS/broker/version
$SYS/broker/uptime
$SYS/broker/clients/connected
- Clients cannot publish to
$topics - Wildcard
#does not match$topics - Must explicitly subscribe:
$SYS/#
Topic Design Patterns
Hierarchical (Location-based)
{country}/{state}/{city}/{building}/{floor}/{room}/{sensor}
usa/california/sf/office1/floor2/room201/temperature
Entity-based
{entity_type}/{entity_id}/{attribute}
device/sensor-001/temperature
user/alice/presence
Event-based
{domain}/{event_type}/{entity}
orders/created/order-123
payments/processed/tx-456
Command & Response
# Commands
device/{device_id}/command
device/{device_id}/command/reboot
# Responses
device/{device_id}/response
device/{device_id}/status
Implementation Details
This broker uses a trie data structure for O(k) topic matching, where k is the number of topic levels. This enables efficient matching even with millions of subscriptions.
Root
βββ home
β βββ kitchen
β β βββ temperature [subscriber: client-1]
β βββ + (wildcard)
β βββ humidity [subscriber: client-2]
βββ sensors
βββ # (multi-wildcard) [subscriber: client-3]
Quality of Service (QoS)
QoS levels define the delivery guarantee between a client and the broker.
Overview
| Level | Name | Guarantee | Use Case |
|---|---|---|---|
| 0 | At most once | Fire and forget | Telemetry, non-critical data |
| 1 | At least once | Guaranteed, may duplicate | Important events |
| 2 | Exactly once | Guaranteed, no duplicates | Financial transactions |
QoS 0 β At Most Once
The simplest level. Message is sent once with no acknowledgment.
Publisher Broker Subscriber
β β β
β PUBLISH (QoS 0) β β
β βββββββββββββββββββββΊ β β
β β PUBLISH (QoS 0) β
β β βββββββββββββββββββββΊ β
β β β
Characteristics:
- Fastest (no round trips)
- No retransmission
- Message may be lost
- No duplicate detection needed
When to use:
- Sensor readings at high frequency
- Data where occasional loss is acceptable
- When bandwidth/latency is critical
QoS 1 β At Least Once
Message is guaranteed to arrive, but may arrive multiple times.
Publisher Broker Subscriber
β β β
β PUBLISH (QoS 1) β β
β packet_id: 1 β β
β βββββββββββββββββββββΊ β β
β β PUBLISH (QoS 1) β
β β packet_id: 1 β
β β βββββββββββββββββββββΊ β
β β β
β β PUBACK β
β β packet_id: 1 β
β β βββββββββββββββββββββ β
β PUBACK β β
β packet_id: 1 β β
β βββββββββββββββββββββ β β
Characteristics:
- One round trip (PUBLISH β PUBACK)
- Message stored until acknowledged
- Retransmitted if no PUBACK received
- Duplicates possible on retransmit
When to use:
- Important events that must arrive
- When duplicates can be handled (idempotent operations)
- Status updates, alerts
QoS 2 β Exactly Once
Most reliable level. Guarantees exactly one delivery.
Publisher Broker Subscriber
β β β
β PUBLISH (QoS 2) β β
β packet_id: 1 β β
β βββββββββββββββββββββΊ β β
β β β
β PUBREC β β
β packet_id: 1 β β
β βββββββββββββββββββββ β β
β β PUBLISH (QoS 2) β
β β βββββββββββββββββββββΊ β
β β PUBREC β
β β βββββββββββββββββββββ β
β PUBREL β β
β packet_id: 1 β β
β βββββββββββββββββββββΊ β β
β β PUBREL β
β β βββββββββββββββββββββΊ β
β β PUBCOMP β
β β βββββββββββββββββββββ β
β PUBCOMP β β
β packet_id: 1 β β
β βββββββββββββββββββββ β β
Characteristics:
- Two round trips (4-way handshake)
- Highest latency
- No duplicates
- State must be tracked
When to use:
- Financial transactions
- Billing events
- Critical commands
- When duplicates are unacceptable
QoS Downgrade
The actual QoS is the minimum of:
- Publisherβs QoS
- Subscriberβs maximum QoS
Publisher QoS Subscriber Max QoS Actual Delivery
2 0 0
2 1 1
1 2 1
0 2 0
Example:
# Publisher sends with QoS 2
mqtt-producer --qos 2 "topic" "message"
# Subscriber subscribed with QoS 1
mqtt-consumer --qos 1 "topic"
# Message delivered with QoS 1 (minimum)
Packet Identifiers
QoS 1 and 2 use packet IDs to track message flows:
- 16-bit unsigned integer (1-65535)
- Unique per client per direction
- Reusable after flow completes
- 0 is reserved (not used)
State Machines
QoS 1 Publisher State
βββββββββββββββ PUBLISH βββββββββββββββ
β Idle β βββββββββββββββΊ β Awaiting β
β β β PUBACK β
βββββββββββββββ ββββββββ¬βββββββ
β² β
β PUBACK β
βββββββββββββββββββββββββββββββββ
QoS 2 Publisher State
βββββββββββββββ PUBLISH βββββββββββββββ
β Idle β βββββββββββββββΊ β Awaiting β
β β β PUBREC β
βββββββββββββββ ββββββββ¬βββββββ
β² β PUBREC
β βΌ
β βββββββββββββββ
β PUBCOMP β Awaiting β
ββββββββββββββββββββββββ β PUBCOMP β
ββββββββ¬βββββββ
β PUBREL
βΌ
Performance Considerations
| Aspect | QoS 0 | QoS 1 | QoS 2 |
|---|---|---|---|
| Packets | 1 | 2 | 4 |
| Latency | Lowest | Medium | Highest |
| Bandwidth | Lowest | Medium | Highest |
| Memory | None | Moderate | Higher |
| Reliability | None | High | Highest |
Best Practices
- Default to QoS 0 for high-frequency telemetry
- Use QoS 1 for important data where duplicates are okay
- Use QoS 2 sparingly β only when exactly-once is required
- Design for idempotency β prefer QoS 1 + idempotent handlers over QoS 2
- Consider trade-offs β QoS 2 is 4Γ the packets of QoS 0
Sessions
MQTT sessions allow state to persist between client connections.
Session Types
Clean Session (clean_session: true)
A fresh start every connection:
- All subscriptions are cleared on disconnect
- All queued messages are discarded
- No state persists between connections
# Default behavior β clean session
mqtt-consumer "sensors/#"
# Disconnect and reconnect β must resubscribe
Persistent Session (clean_session: false)
State persists across connections:
- Subscriptions are maintained
- QoS 1/2 messages are queued while offline
- Queued messages delivered on reconnect
Session State
The broker stores the following for each session:
| Component | Description |
|---|---|
| Client ID | Unique identifier |
| Subscriptions | Topic filters and QoS levels |
| QoS 1 outbound | Messages awaiting PUBACK |
| QoS 2 outbound | Messages in QoS 2 flow |
| QoS 2 inbound | Received messages awaiting PUBREL |
| Offline queue | Messages for offline client |
Connection Flow
Clean Session
Client Broker
β β
β CONNECT β
β client_id: "sensor-01" β
β clean_session: true β
β βββββββββββββββββββββββββββββββββββββββΊ β
β β Creates new session
β CONNACK β
β session_present: 0 β
β βββββββββββββββββββββββββββββββββββββββ β
Persistent Session (First Connect)
Client Broker
β β
β CONNECT β
β client_id: "sensor-01" β
β clean_session: false β
β βββββββββββββββββββββββββββββββββββββββΊ β
β β Creates new session
β CONNACK β
β session_present: 0 β
β βββββββββββββββββββββββββββββββββββββββ β
Persistent Session (Reconnect)
Client Broker
β β
β CONNECT β
β client_id: "sensor-01" β
β clean_session: false β
β βββββββββββββββββββββββββββββββββββββββΊ β
β β Finds existing session
β CONNACK β
β session_present: 1 β
β βββββββββββββββββββββββββββββββββββββββ β
β β
β β Delivers queued messages
β PUBLISH β
β PUBLISH β
β βββββββββββββββββββββββββββββββββββββββ β
Session Takeover
If a client connects with an ID thatβs already connected:
- Broker disconnects the existing client
- New client takes over the session
- This prevents duplicate client IDs
Client A (sensor-01) Broker Client B (sensor-01)
β β β
β βββ Connected βββΊ β β
β β β
β β CONNECT β
β β client_id: sensor-01
β β ββββββββββββββββββββ β
β β β
[Disconnected] β Takes over session β
β β β
β β CONNACK β
β β βββββββββββββββββββββΊβ
Offline Message Queuing
When a client with a persistent session disconnects:
- Broker keeps the session
- Incoming messages (matching subscriptions) are queued
- When client reconnects, queued messages are delivered
Subscriber Broker Publisher
(offline) β β
X β β
β β PUBLISH "sensors/t" β
β β βββββββββββββββββββββ β
β β β
β [Queues message for subscriber]
β β β
β CONNECT β β
β ββββββββββββββββββββββΊ β β
β β β
β CONNACK (session: 1) β β
β ββββββββββββββββββββββ β β
β β β
β PUBLISH "sensors/t" β (queued message) β
β ββββββββββββββββββββββ β β
Queue Limits
To prevent memory exhaustion, this broker limits:
| Setting | Default | Description |
|---|---|---|
| Max offline messages | 1000 | Per client |
| Max inflight | 65535 | QoS 1/2 in progress |
| Session expiry | 24 hours | After disconnect |
When the queue is full, oldest messages are dropped (or new messages, depending on policy).
Session Expiry
Sessions donβt live forever:
- After
session_expirytime, inactive sessions are cleaned up - This prevents unbounded memory growth
- Default: 24 hours after last disconnect
Client ID Rules
- Must be 1-23 characters (per MQTT 3.1.1)
- Characters:
a-z,A-Z,0-9 - If empty, broker generates one (requires
clean_session: true) - Must be unique across all connected clients
Best Practices
- Use persistent sessions for reliability β Critical IoT devices
- Use clean sessions for ephemeral clients β Dashboards, CLI tools
- Set appropriate expiry β Balance reliability vs. memory
- Handle session_present β Client should check CONNACK
- Use consistent client IDs β Enables session recovery
- Size queues appropriately β Based on expected offline duration
Implementation Details
This broker uses:
- In-memory session storage (default)
- LRU eviction for session expiry
- Separate queues per QoS level
- Atomic session takeover
Architecture Overview
This broker is built with modularity and testability as core principles.
High-Level Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TCP Listener β
β (tokio::net::TcpListener) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Transport Layer β
β β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β MqttCodec β β Connection β β
β β (Framed<TcpStream>) β ββββββββΊβ (async wrapper) β β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Codec Layer β
β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββββ β
β β Decoder β β Encoder β β Packet Types β β
β β (MQTT 3.1.1) β β (MQTT 3.1.1) β β (16 packet types) β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Server Layer β
β β
β βββββββββββββββββββββββ βββββββββββββββββββββββββββββββ β
β β MqttServer β β ClientHandler β β
β β (accept loop) ββββββββββΊβ (per-connection state) β β
β βββββββββββββββββββββββ βββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Session Layer β
β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββββ β
β β Session β β QoS State β β Packet ID Alloc β β
β β (client state) β β Machines β β (1-65535) β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Router Layer β
β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββββ β
β β Router β β Registry β β Retain Store β β
β β(msg distribution)β β (subscriptions) β β (retained msgs) β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Topic Matcher Layer β
β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββββ β
β β TopicTrie β β TopicFilter β β Validation β β
β β (O(levels)) β β (+, #, exact) β β (UTF-8, limits) β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Persistence Layer β
β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββββ β
β β Storage Trait β β MemoryStore β β (Future: Disk) β β
β β (abstract) β β (default) β β β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Data Flow
Publish Flow (QoS 1)
Publisher Broker Subscriber
β β β
β PUBLISH (QoS 1) β β
β βββββββββββββββββββββΊβ β
β β β
β βββββ Route to matching ββββββΊβ
β β subscribers β
β β β
β β PUBLISH β
β βββββββββββββββββββββββββββββββΊ
β β β
β ββββββββββββββββββββββ PUBACK β
β β β
β PUBACK β β
ββββββββββββββββββββββββ β
Subscribe Flow
Client Broker
β β
β SUBSCRIBE β
β topic: "sensors/#" β
β qos: 1 β
β ββββββββββββββββββββββΊβ
β β
β β ββββββββββββββββββββββββ
β β β 1. Validate filter β
β β β 2. Add to trie β
β β β 3. Store in session β
β β β 4. Check retain β
β β ββββββββββββββββββββββββ
β β
β SUBACK β
β granted: [1] β
βββββββββββββββββββββββββ
β β
β PUBLISH (retained) β
βββββββββββββββββββββββββ
Concurrency Model
Task Structure
βββββββββββββββββββββββββββ
β Main Task β
β (accept connections) β
βββββββββββββ¬ββββββββββββββ
β
βββββββββββββββββββββΌββββββββββββββββββββ
βΌ βΌ βΌ
ββββββββββββββββββ ββββββββββββββββββ ββββββββββββββββββ
β Client Task β β Client Task β β Client Task β
β (handle one) β β (handle one) β β (handle one) β
ββββββββββββββββββ ββββββββββββββββββ ββββββββββββββββββ
Each client gets its own async task for:
- Reading packets from socket
- Processing MQTT protocol
- Writing responses
Shared State
ββββββββββββββββββββββββββ
β Shared State β
β (Arc<Router>) β
β β
β ββββββββββββββββββββ β
β β Subscription Trieβ β
β β (RwLock) β β
β ββββββββββββββββββββ β
β ββββββββββββββββββββ β
β β Session Store β β
β β (RwLock) β β
β ββββββββββββββββββββ β
β ββββββββββββββββββββ β
β β Retain Store β β
β β (RwLock) β β
β ββββββββββββββββββββ β
ββββββββββββββββββββββββββ
β² β² β²
β β β
ββββββββββββββββββββββ β ββββββββββββββββββββββ
β β β
Client Task 1 Client Task 2 Client Task 3
Lock granularity:
- Read locks for subscription lookups (fast, parallel)
- Write locks for subscribe/unsubscribe (rare)
- Per-session locks for client state
Error Handling Strategy
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Error Hierarchy β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β MqttError (top-level) β
β βββ CodecError (packet encoding/decoding) β
β β βββ InvalidPacketType β
β β βββ MalformedRemainingLength β
β β βββ InvalidUtf8 β
β β βββ PacketTooLarge β
β β β
β βββ ProtocolError (MQTT violations) β
β β βββ InvalidClientId β
β β βββ InvalidTopicFilter β
β β βββ PacketIdInUse β
β β β
β βββ SessionError (session management) β
β β βββ SessionNotFound β
β β βββ SessionTakeover β
β β β
β βββ IoError (network) β
β βββ (std::io::Error wrapped) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Memory Management
Per-Connection Memory
| Component | Typical Size |
|---|---|
| TCP buffer (read) | 8 KB |
| TCP buffer (write) | 8 KB |
| Session state | ~1 KB |
| Subscription entries | ~100 bytes each |
| Inflight messages | ~1 KB each |
Global Memory
| Component | Typical Size |
|---|---|
| Subscription trie | O(unique topic levels) |
| Session store | O(connected clients) |
| Retain store | O(retained messages) |
| Offline queues | O(queued messages) |
Performance Characteristics
| Operation | Time Complexity | Notes |
|---|---|---|
| Topic match | O(topic levels) | Trie traversal |
| Subscribe | O(filter levels) | Trie insertion |
| Unsubscribe | O(filter levels) | Trie removal |
| Publish routing | O(subscribers) | For each match |
| Session lookup | O(1) | HashMap |
| Packet decode | O(packet size) | Linear scan |
| Packet encode | O(packet size) | Linear write |
Design Decisions
Why Trie for Topics?
- Wildcards require prefix matching
- O(levels) vs O(total subscriptions)
- Memory efficient for shared prefixes
- Natural hierarchy representation
Why In-Memory Storage?
- Simplicity for learning/prototyping
- Sufficient for most use cases
- Trait abstraction allows disk later
- Avoids I/O latency
Why One Task Per Client?
- Simple mental model
- No complex message passing
- Backpressure handled naturally
- Easy cleanup on disconnect
Why RwLock vs Channel?
- Subscriptions are read-heavy
- Parallel reads scale well
- Writes are rare (subscribe/unsubscribe)
- Simpler than actor model
Module Reference
This broker is organized into well-defined modules, each with a single responsibility.
Module Dependency Graph
βββββββββββββββββββ
β bin/broker β
β bin/producer β
β bin/consumer β
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β server β
ββββββββββ¬βββββββββ
β
ββββββββββββββββββββΌβββββββββββββββββββ
βΌ βΌ βΌ
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β session β β router β β transport β
ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ
β β β
β ββββββββ΄βββββββ β
β βΌ βΌ β
β βββββββββββββ βββββββββββββ β
β βtopic_matchβ βpersistenceβ β
β βββββββββββββ βββββββββββββ β
β β
βββββββββββββββββββ¬ββββββββββββββββββββ
βΌ
βββββββββββββββ
β codec β
βββββββββββββββ
codec β MQTT 3.1.1 Protocol
Location: src/codec/
Purpose: Encode and decode all 16 MQTT 3.1.1 packet types.
Key Types
#![allow(unused)]
fn main() {
// All packet types
pub enum MqttPacket {
Connect(ConnectPacket),
Connack(ConnackPacket),
Publish(PublishPacket),
Puback(PubackPacket),
Pubrec(PubrecPacket),
Pubrel(PubrelPacket),
Pubcomp(PubcompPacket),
Subscribe(SubscribePacket),
Suback(SubackPacket),
Unsubscribe(UnsubscribePacket),
Unsuback(UnsubackPacket),
Pingreq,
Pingresp,
Disconnect,
}
// Decoder
pub struct MqttDecoder;
impl MqttDecoder {
pub fn decode(bytes: &mut BytesMut) -> Result<Option<MqttPacket>>;
}
// Encoder
pub struct MqttEncoder;
impl MqttEncoder {
pub fn encode(packet: &MqttPacket, buf: &mut BytesMut) -> Result<()>;
}
}
Files
| File | Description |
|---|---|
mod.rs | Module re-exports |
packet.rs | Packet type definitions |
types.rs | Common types (QoS, ConnectFlags) |
decode.rs | Packet decoding logic |
encode.rs | Packet encoding logic |
varint.rs | Variable-length integer codec |
error.rs | Codec error types |
tests.rs | Unit tests (60 tests) |
topic_matcher β Wildcard Matching
Location: src/topic_matcher/
Purpose: Efficient topic filter matching with wildcard support.
Key Types
#![allow(unused)]
fn main() {
// Parsed topic filter
pub struct TopicFilter {
pub pattern: String,
pub levels: Vec<Level>,
}
pub enum Level {
Exact(String),
SingleWildcard, // +
MultiWildcard, // #
}
// Trie-based matcher
pub struct TopicTrie<T> {
// Insert subscription
pub fn insert(&mut self, filter: &TopicFilter, value: T);
// Find all matching subscriptions
pub fn matches(&self, topic: &str) -> Vec<&T>;
// Remove subscription
pub fn remove(&mut self, filter: &TopicFilter) -> Option<T>;
}
}
Wildcard Rules
| Pattern | Matches | Doesnβt Match |
|---|---|---|
sensors/+/temp | sensors/room1/temp | sensors/temp |
sensors/# | sensors/a/b/c | other/topic |
+/+/+ | a/b/c | a/b |
Files
| File | Description |
|---|---|
mod.rs | Module re-exports |
filter.rs | TopicFilter parsing |
trie.rs | Trie implementation |
validation.rs | Topic/filter validation |
tests.rs | Unit tests (23 tests) |
session β Connection State
Location: src/session/
Purpose: Manage client sessions and QoS state machines.
Key Types
#![allow(unused)]
fn main() {
// Client session
pub struct Session {
pub client_id: String,
pub clean_session: bool,
pub subscriptions: HashMap<String, QoS>,
pub connection_state: ConnectionState,
}
// QoS 1 state machine
pub enum Qos1State {
AwaitingPuback { packet_id: u16, message: Message },
Complete,
}
// QoS 2 state machine
pub enum Qos2State {
AwaitingPubrec { packet_id: u16, message: Message },
AwaitingPubrel { packet_id: u16 },
AwaitingPubcomp { packet_id: u16 },
Complete,
}
// Packet ID allocator
pub struct PacketIdAllocator {
pub fn allocate(&mut self) -> Option<u16>;
pub fn release(&mut self, id: u16);
}
}
Files
| File | Description |
|---|---|
mod.rs | Module re-exports |
session.rs | Session management |
connection.rs | Connection state |
qos.rs | QoS state machines |
packet_id.rs | Packet ID allocation |
tests.rs | Unit tests (24 tests) |
router β Message Distribution
Location: src/router/
Purpose: Route published messages to matching subscribers.
Key Types
#![allow(unused)]
fn main() {
// Central router
pub struct Router {
subscriptions: TopicTrie<Subscriber>,
retain_store: RetainStore,
sessions: SessionStore,
}
impl Router {
// Add subscription
pub fn subscribe(&self, client_id: &str, filter: &str, qos: QoS);
// Remove subscription
pub fn unsubscribe(&self, client_id: &str, filter: &str);
// Route message to subscribers
pub fn route(&self, message: &PublishPacket) -> Vec<Delivery>;
// Store retained message
pub fn retain(&self, topic: &str, message: Option<Message>);
}
// Subscription registry
pub struct Registry {
pub fn add(&self, filter: &str, subscriber: Subscriber);
pub fn remove(&self, filter: &str, client_id: &str);
pub fn matches(&self, topic: &str) -> Vec<Subscriber>;
}
}
Files
| File | Description |
|---|---|
mod.rs | Module re-exports |
router.rs | Main router logic |
registry.rs | Subscription registry |
retain.rs | Retained message store |
tests.rs | Unit tests (24 tests) |
persistence β Storage Abstraction
Location: src/persistence/
Purpose: Abstract storage for sessions, messages, and retained data.
Key Types
#![allow(unused)]
fn main() {
// Storage trait
#[async_trait]
pub trait Storage: Send + Sync {
// Session operations
async fn store_session(&self, session: &Session) -> Result<()>;
async fn load_session(&self, client_id: &str) -> Result<Option<Session>>;
async fn delete_session(&self, client_id: &str) -> Result<()>;
// Message queue operations
async fn enqueue(&self, client_id: &str, message: Message) -> Result<()>;
async fn dequeue(&self, client_id: &str) -> Result<Vec<Message>>;
// Retained message operations
async fn store_retained(&self, topic: &str, message: Option<Message>) -> Result<()>;
async fn load_retained(&self, topic: &str) -> Result<Option<Message>>;
}
// In-memory implementation
pub struct MemoryStore {
sessions: RwLock<HashMap<String, Session>>,
queues: RwLock<HashMap<String, VecDeque<Message>>>,
retained: RwLock<HashMap<String, Message>>,
}
}
Files
| File | Description |
|---|---|
mod.rs | Module re-exports |
traits.rs | Storage trait definitions |
memory.rs | In-memory implementation |
tests.rs | Unit tests (20 tests) |
transport β Network Layer
Location: src/transport/
Purpose: TCP connection handling and MQTT framing.
Key Types
#![allow(unused)]
fn main() {
// MQTT codec for tokio
pub struct MqttCodec;
impl Decoder for MqttCodec {
type Item = MqttPacket;
type Error = CodecError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>>;
}
impl Encoder<MqttPacket> for MqttCodec {
type Error = CodecError;
fn encode(&mut self, item: MqttPacket, dst: &mut BytesMut) -> Result<()>;
}
// Connection wrapper
pub struct Connection {
inner: Framed<TcpStream, MqttCodec>,
}
impl Connection {
pub async fn read(&mut self) -> Result<Option<MqttPacket>>;
pub async fn write(&mut self, packet: MqttPacket) -> Result<()>;
}
}
Files
| File | Description |
|---|---|
mod.rs | Module re-exports |
codec.rs | Tokio codec implementation |
connection.rs | Connection wrapper |
tests.rs | Unit tests (16 tests) |
server β Broker Server
Location: src/server/
Purpose: Accept connections and handle MQTT protocol.
Key Types
#![allow(unused)]
fn main() {
// Server configuration
pub struct ServerConfig {
pub bind_addr: SocketAddr,
pub max_connections: usize,
pub max_packet_size: usize,
}
// Main server
pub struct MqttServer {
config: ServerConfig,
router: Arc<Router>,
}
impl MqttServer {
pub async fn run(&self) -> Result<()>;
}
// Per-client handler
pub struct ClientHandler {
connection: Connection,
session: Option<Session>,
router: Arc<Router>,
}
impl ClientHandler {
pub async fn run(&mut self) -> Result<()>;
async fn handle_connect(&mut self, packet: ConnectPacket) -> Result<()>;
async fn handle_publish(&mut self, packet: PublishPacket) -> Result<()>;
async fn handle_subscribe(&mut self, packet: SubscribePacket) -> Result<()>;
// ... other packet handlers
}
}
Files
| File | Description |
|---|---|
mod.rs | Module re-exports |
server.rs | Main server loop |
handler.rs | Client handler |
client.rs | Client state management |
tests.rs | Integration tests (11 tests) |
bin/ β Executables
Location: src/bin/
Purpose: CLI entry points.
Binaries
| Binary | Description |
|---|---|
broker.rs β mqtt-broker | Start the broker server |
producer.rs β mqtt-producer | Publish messages |
consumer.rs β mqtt-consumer | Subscribe and receive |
Test Coverage
| Module | Tests | Coverage |
|---|---|---|
| codec | 60 | Packet encode/decode roundtrips |
| topic_matcher | 23 | Wildcard matching, edge cases |
| session | 24 | State machines, packet IDs |
| router | 24 | Routing, subscriptions |
| persistence | 20 | Storage operations |
| transport | 16 | Framing, connection handling |
| server | 11 | Integration scenarios |
| Total | 178 |
Adding a New Module
- Create directory:
src/mymodule/ - Add
mod.rswith public exports - Add
tests.rswith unit tests - Export from
src/lib.rs:pub mod mymodule; - Add to this documentation
Module Template
#![allow(unused)]
fn main() {
// src/mymodule/mod.rs
mod implementation;
mod tests;
pub use implementation::*;
// src/mymodule/implementation.rs
pub struct MyType {
// ...
}
impl MyType {
pub fn new() -> Self {
// ...
}
}
// src/mymodule/tests.rs
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic() {
let my = MyType::new();
// assertions
}
}
}
Building from Source
Build the MQTT broker and CLI tools from source.
Prerequisites
Rust Toolchain
Install Rust via rustup:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
Minimum version: Rust 1.70+ (uses edition 2021)
Verify installation:
rustc --version
cargo --version
Platform Support
| Platform | Status | Notes |
|---|---|---|
| Linux (x86_64) | β | Primary development |
| macOS (x86_64) | β | Fully supported |
| macOS (ARM64) | β | Apple Silicon |
| Windows (x86_64) | β | MSVC toolchain |
Clone Repository
git clone https://github.com/yourusername/mqtt-broker.git
cd mqtt-broker
Build Commands
Debug Build
Fast compilation, includes debug symbols, no optimizations:
cargo build
Binaries are placed in target/debug/:
target/debug/
βββ mqtt-broker
βββ mqtt-consumer
βββ mqtt-producer
Release Build
Optimized for production, slower to compile:
cargo build --release
Binaries are placed in target/release/:
target/release/
βββ mqtt-broker
βββ mqtt-consumer
βββ mqtt-producer
Build Specific Binary
# Only the broker
cargo build --bin mqtt-broker --release
# Only the producer
cargo build --bin mqtt-producer --release
# Only the consumer
cargo build --bin mqtt-consumer --release
Build Library Only
cargo build --lib
Install Locally
Install to ~/.cargo/bin/ (should be in your PATH):
cargo install --path .
Now available anywhere:
mqtt-broker --help
mqtt-producer --help
mqtt-consumer --help
Build Configuration
Cargo.toml Features
Currently no optional features. All functionality is included by default.
Profile Settings
Release profile in Cargo.toml:
[profile.release]
opt-level = 3
lto = true
codegen-units = 1
This produces smaller, faster binaries but increases compile time.
Reducing Binary Size
For minimal binaries:
# Strip debug symbols
cargo build --release
strip target/release/mqtt-broker
# Or use build flags
RUSTFLAGS="-C link-arg=-s" cargo build --release
Cross Compilation
Install Target
# Linux on macOS
rustup target add x86_64-unknown-linux-gnu
# Windows on Linux/macOS
rustup target add x86_64-pc-windows-gnu
# ARM64 Linux
rustup target add aarch64-unknown-linux-gnu
Cross Build
# Linux
cargo build --release --target x86_64-unknown-linux-gnu
# Windows
cargo build --release --target x86_64-pc-windows-gnu
Note: Cross compilation may require additional linkers. Consider using cross:
cargo install cross
cross build --release --target x86_64-unknown-linux-gnu
Docker Build
Build Image
# Dockerfile
FROM rust:1.75 as builder
WORKDIR /usr/src/mqtt-broker
COPY . .
RUN cargo build --release
FROM debian:bookworm-slim
COPY --from=builder /usr/src/mqtt-broker/target/release/mqtt-broker /usr/local/bin/
EXPOSE 1883
CMD ["mqtt-broker"]
Build and run:
docker build -t mqtt-broker .
docker run -p 1883:1883 mqtt-broker
Multi-Architecture
docker buildx build --platform linux/amd64,linux/arm64 -t mqtt-broker .
Documentation Build
Generate API Docs
cargo doc --open
Opens documentation in your browser.
Build mdBook Docs
# Install mdBook
cargo install mdbook
# Build documentation site
cd docs
mdbook build
# Serve locally
mdbook serve --open
Troubleshooting
Compilation Errors
Missing dependencies:
# Ubuntu/Debian
sudo apt-get update
sudo apt-get install build-essential
# macOS
xcode-select --install
Outdated Rust:
rustup update
Linker Errors
macOS:
xcode-select --install
Linux:
sudo apt-get install gcc
Slow Builds
Use incremental compilation (enabled by default) and consider:
# Use mold linker (Linux)
sudo apt-get install mold
RUSTFLAGS="-C link-arg=-fuse-ld=mold" cargo build
# Use lld linker
RUSTFLAGS="-C link-arg=-fuse-ld=lld" cargo build
Out of Memory
Reduce parallelism:
cargo build -j 2
Build Artifacts
After a successful build:
target/
βββ debug/
β βββ mqtt-broker # Debug binary
β βββ mqtt-consumer
β βββ mqtt-producer
β βββ deps/ # Dependencies
β βββ incremental/ # Incremental build cache
βββ release/
β βββ mqtt-broker # Release binary
β βββ mqtt-consumer
β βββ mqtt-producer
βββ doc/ # Generated documentation
βββ mqtt_broker/
Clean Build
Remove all build artifacts:
cargo clean
Remove only release artifacts:
cargo clean --release
Running Tests
Comprehensive testing is a core principle of this project. Hereβs how to run and write tests.
Quick Start
Run all tests:
cargo test
Expected output:
running 178 tests
...
test result: ok. 178 passed; 0 failed; 0 ignored
Test Commands
Run All Tests
cargo test
Run Tests with Output
See println! output from tests:
cargo test -- --nocapture
Run Specific Test
# By name
cargo test test_publish_decode
# By partial name
cargo test publish
# By module
cargo test codec::
Run Tests in Specific Module
# Codec tests only
cargo test --package mqtt-broker --lib codec::tests
# Topic matcher tests
cargo test --package mqtt-broker --lib topic_matcher::tests
# Session tests
cargo test --package mqtt-broker --lib session::tests
Run Tests in Release Mode
cargo test --release
Run Ignored Tests
Some tests are marked #[ignore] (slow or require setup):
cargo test -- --ignored
Run All Including Ignored
cargo test -- --include-ignored
Test Organization
src/
βββ codec/
β βββ tests.rs # 60 tests
βββ topic_matcher/
β βββ tests.rs # 23 tests
βββ session/
β βββ tests.rs # 24 tests
βββ router/
β βββ tests.rs # 24 tests
βββ persistence/
β βββ tests.rs # 20 tests
βββ transport/
β βββ tests.rs # 16 tests
βββ server/
βββ tests.rs # 11 tests
tests/
βββ codec_proptest.rs # Property-based tests
βββ message_test.rs # Integration tests
Test Categories
Unit Tests
Test individual functions and structs:
#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encode_decode_roundtrip() {
let packet = ConnectPacket {
client_id: "test".to_string(),
// ...
};
let mut buf = BytesMut::new();
MqttEncoder::encode(&MqttPacket::Connect(packet.clone()), &mut buf).unwrap();
let decoded = MqttDecoder::decode(&mut buf).unwrap().unwrap();
assert_eq!(decoded, MqttPacket::Connect(packet));
}
}
}
Integration Tests
Test multiple components together (in tests/ directory):
#![allow(unused)]
fn main() {
// tests/message_test.rs
use mqtt_broker::*;
#[tokio::test]
async fn test_publish_flow() {
let router = Router::new();
let session = Session::new("client-1", false);
// Subscribe
router.subscribe("client-1", "sensors/#", QoS::AtLeastOnce);
// Publish
let message = PublishPacket {
topic: "sensors/temp".to_string(),
payload: b"25".to_vec(),
// ...
};
let deliveries = router.route(&message);
assert_eq!(deliveries.len(), 1);
}
}
Property-Based Tests
Using proptest for randomized testing:
#![allow(unused)]
fn main() {
// tests/codec_proptest.rs
use proptest::prelude::*;
proptest! {
#[test]
fn test_varint_roundtrip(value in 0u32..268435455) {
let mut buf = BytesMut::new();
encode_variable_int(value, &mut buf).unwrap();
let decoded = decode_variable_int(&mut buf).unwrap();
prop_assert_eq!(value, decoded);
}
}
}
Run property tests:
cargo test proptest
Async Tests
Using #[tokio::test] for async code:
#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_server_accept() {
let server = MqttServer::bind("127.0.0.1:0").await.unwrap();
let addr = server.local_addr();
tokio::spawn(async move {
server.run().await.unwrap();
});
let client = TcpStream::connect(addr).await.unwrap();
assert!(client.peer_addr().is_ok());
}
}
Test Coverage
Using cargo-tarpaulin
# Install
cargo install cargo-tarpaulin
# Run coverage
cargo tarpaulin --out Html
# Open report
open tarpaulin-report.html
Using llvm-cov
# Install
rustup component add llvm-tools-preview
cargo install cargo-llvm-cov
# Run coverage
cargo llvm-cov --html
# Open report
open target/llvm-cov/html/index.html
Benchmarks
Criterion Benchmarks
# Run all benchmarks
cargo bench
# Run specific benchmark
cargo bench publish
Benchmark location: benches/
Example benchmark:
#![allow(unused)]
fn main() {
// benches/codec_bench.rs
use criterion::{criterion_group, criterion_main, Criterion};
use mqtt_broker::codec::*;
fn bench_publish_encode(c: &mut Criterion) {
let packet = PublishPacket {
topic: "sensors/temperature".to_string(),
payload: vec![0u8; 1024],
// ...
};
c.bench_function("publish_encode_1kb", |b| {
b.iter(|| {
let mut buf = BytesMut::with_capacity(2048);
MqttEncoder::encode(&MqttPacket::Publish(packet.clone()), &mut buf)
})
});
}
criterion_group!(benches, bench_publish_encode);
criterion_main!(benches);
}
Writing New Tests
Test Naming Convention
#![allow(unused)]
fn main() {
#[test]
fn test_<module>_<function>_<scenario>() {
// ...
}
// Examples:
fn test_codec_publish_empty_payload() { }
fn test_topic_filter_multi_wildcard() { }
fn test_session_clean_session_true() { }
}
Test Structure (AAA Pattern)
#![allow(unused)]
fn main() {
#[test]
fn test_subscribe_adds_to_trie() {
// Arrange
let mut router = Router::new();
// Act
router.subscribe("client-1", "sensors/#", QoS::AtLeastOnce);
// Assert
let matches = router.matches("sensors/temp");
assert_eq!(matches.len(), 1);
assert_eq!(matches[0].client_id, "client-1");
}
}
Testing Error Cases
#![allow(unused)]
fn main() {
#[test]
fn test_invalid_topic_filter_returns_error() {
let result = TopicFilter::parse("sensors/#/invalid");
assert!(result.is_err());
assert!(matches!(result, Err(TopicError::InvalidFilter(_))));
}
}
Testing Async Code
#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_connection_timeout() {
let result = tokio::time::timeout(
Duration::from_millis(100),
Connection::connect("127.0.0.1:9999")
).await;
assert!(result.is_err()); // Timeout
}
}
CI Integration
Tests run automatically on GitHub Actions:
# .github/workflows/ci.yml
name: CI
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- run: cargo test --all-features
Troubleshooting
Tests Hang
Usually a deadlock or missing .await:
# Run with timeout
timeout 60 cargo test
# Run single-threaded
cargo test -- --test-threads=1
Flaky Tests
Add retry or increase timeouts:
#![allow(unused)]
fn main() {
#[tokio::test(flavor = "multi_thread")]
async fn test_concurrent_publishes() {
// Use tokio::time::timeout
let result = tokio::time::timeout(
Duration::from_secs(5),
async_operation()
).await;
assert!(result.is_ok());
}
}
Port Already in Use
Tests using network ports can conflict:
#![allow(unused)]
fn main() {
// Use port 0 for automatic assignment
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
}
Debug Failing Tests
# Verbose output
cargo test test_name -- --nocapture
# With RUST_BACKTRACE
RUST_BACKTRACE=1 cargo test test_name
# With logging
RUST_LOG=debug cargo test test_name -- --nocapture
Contributing
Thank you for your interest in contributing to this MQTT broker project!
Getting Started
- Fork the repository on GitHub
- Clone your fork:
git clone https://github.com/yourusername/mqtt-broker.git cd mqtt-broker - Set upstream remote:
git remote add upstream https://github.com/originalowner/mqtt-broker.git - Create a feature branch:
git checkout -b feature/my-feature
Development Setup
Prerequisites
- Rust 1.70 or later
- Git
- (Optional) Docker for integration testing
Build and Test
# Build
cargo build
# Run tests
cargo test
# Run with logging
RUST_LOG=debug cargo run --bin mqtt-broker
Useful Commands
# Format code
cargo fmt
# Run linter
cargo clippy
# Check without building
cargo check
# Update dependencies
cargo update
# Generate docs
cargo doc --open
Code Style
Formatting
Run cargo fmt before committing:
cargo fmt
We use the default rustfmt configuration.
Linting
All code must pass cargo clippy:
cargo clippy -- -D warnings
Fix warnings before submitting PRs.
Naming Conventions
| Type | Convention | Example |
|---|---|---|
| Structs | PascalCase | TopicFilter |
| Traits | PascalCase | Storage |
| Functions | snake_case | encode_packet |
| Variables | snake_case | client_id |
| Constants | SCREAMING_SNAKE | MAX_PACKET_SIZE |
| Modules | snake_case | topic_matcher |
Documentation
All public items should have documentation:
#![allow(unused)]
fn main() {
/// Encodes an MQTT packet to bytes.
///
/// # Arguments
///
/// * `packet` - The packet to encode
/// * `buf` - Buffer to write encoded bytes to
///
/// # Returns
///
/// Returns `Ok(())` on success, or `CodecError` on failure.
///
/// # Example
///
/// ```rust
/// let packet = MqttPacket::Pingreq;
/// let mut buf = BytesMut::new();
/// MqttEncoder::encode(&packet, &mut buf)?;
/// ```
pub fn encode(packet: &MqttPacket, buf: &mut BytesMut) -> Result<(), CodecError> {
// ...
}
}
Making Changes
Types of Contributions
- π Bug fixes β Fix issues with tests
- β¨ Features β Add new functionality
- π Documentation β Improve docs, examples
- π§ͺ Tests β Add missing tests
- β‘ Performance β Optimize code
- π§ Refactoring β Improve code structure
Branch Naming
feature/description # New features
fix/description # Bug fixes
docs/description # Documentation
test/description # Test additions
refactor/description # Code refactoring
Examples:
feature/qos2-supportfix/packet-decode-crashdocs/session-examples
Commit Messages
Follow Conventional Commits:
type(scope): description
[optional body]
[optional footer]
Types:
feat: New featurefix: Bug fixdocs: Documentationtest: Testsrefactor: Code refactoringperf: Performance improvementchore: Maintenance
Examples:
feat(codec): add MQTT 5 property parsing
fix(session): prevent double-free on disconnect
docs(readme): add installation instructions
test(router): add wildcard matching edge cases
Pull Request Process
Before Submitting
-
Update your branch:
git fetch upstream git rebase upstream/main -
Run all checks:
cargo fmt cargo clippy -- -D warnings cargo test -
Add tests for new functionality
-
Update documentation if needed
PR Guidelines
- One feature per PR β Keep PRs focused
- Descriptive title β What does this PR do?
- Link issues β Reference related issues
- Include tests β All new code needs tests
- Update CHANGELOG β Add entry under βUnreleasedβ
PR Template
## Description
Brief description of the changes.
## Related Issues
Closes #123
## Changes
- Added X
- Fixed Y
- Updated Z
## Testing
- [ ] Unit tests pass
- [ ] Integration tests pass
- [ ] Manual testing completed
## Checklist
- [ ] Code formatted with `cargo fmt`
- [ ] No clippy warnings
- [ ] Documentation updated
- [ ] CHANGELOG updated
Testing Requirements
All PRs Must
- Pass existing tests β No regressions
- Add new tests β For new functionality
- Maintain coverage β Donβt decrease coverage
Test Guidelines
#![allow(unused)]
fn main() {
// Good: Descriptive name, tests one thing
#[test]
fn test_publish_with_empty_payload_succeeds() {
let packet = PublishPacket {
payload: vec![],
// ...
};
assert!(encode(&packet).is_ok());
}
// Good: Tests error case
#[test]
fn test_invalid_topic_returns_error() {
let result = TopicFilter::parse("invalid/#/topic");
assert!(matches!(result, Err(TopicError::InvalidFilter(_))));
}
}
Module Structure
When adding new modules:
src/mymodule/
βββ mod.rs # Public exports
βββ types.rs # Type definitions
βββ implementation.rs
βββ tests.rs # Unit tests
Export from src/lib.rs:
#![allow(unused)]
fn main() {
pub mod mymodule;
}
Error Handling
Use the projectβs error types:
#![allow(unused)]
fn main() {
// Good: Use existing error types
fn parse_topic(s: &str) -> Result<Topic, TopicError> {
// ...
}
// Good: Add new variant if needed
#[derive(Debug, Error)]
pub enum TopicError {
#[error("invalid topic: {0}")]
InvalidTopic(String),
#[error("topic too long: {length} > {max}")]
TooLong { length: usize, max: usize },
}
}
Getting Help
- Questions? Open a discussion on GitHub
- Bug found? Open an issue with reproduction steps
- Feature idea? Open an issue to discuss first
Code of Conduct
- Be respectful and constructive
- Focus on the code, not the person
- Assume good intentions
- Help others learn
Recognition
Contributors are recognized in:
- CHANGELOG.md (for each release)
- GitHub contributors page
Thank you for contributing! π