Keyboard shortcuts

Press ← or β†’ to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

mqtt-broker

A production-grade MQTT 3.1.1 broker implementation in Rust.

Features

  • πŸš€ High Performance β€” Async I/O with Tokio, handles 10,000+ concurrent connections
  • πŸ“‘ Full MQTT 3.1.1 β€” All 14 packet types with complete protocol compliance
  • 🎯 QoS 0/1/2 β€” Complete delivery guarantees with proper state machines
  • 🌳 Topic Wildcards β€” O(k) trie-based matching for + and # wildcards
  • πŸ’Ύ Retained Messages β€” Automatic delivery to new subscribers
  • πŸ”„ Session Persistence β€” Clean and persistent session support
  • ⏱️ Keep-Alive β€” Automatic connection timeout detection
  • πŸ›‘οΈ Production Ready β€” Comprehensive test suite with 176+ tests

What is MQTT?

MQTT (Message Queuing Telemetry Transport) is a lightweight publish-subscribe messaging protocol designed for constrained devices and low-bandwidth, high-latency networks. It’s widely used in:

  • IoT (Internet of Things) β€” Sensor data collection, device control
  • Home Automation β€” Smart home devices, lighting, thermostats
  • Mobile Applications β€” Push notifications, real-time messaging
  • Industrial Systems β€” SCADA, telemetry, monitoring

Quick Example

# Terminal 1: Start the broker
mqtt-broker

# Terminal 2: Subscribe to temperature readings
mqtt-consumer "sensors/+/temperature"

# Terminal 3: Publish a temperature reading
mqtt-producer "sensors/living-room/temperature" "23.5Β°C"

Next Steps

Installation

Requirements

  • Rust 1.70+ β€” Install Rust
  • Git β€” For cloning the repository
# Clone the repository
git clone https://github.com/iliasichinava/mqtt-broker.git
cd mqtt-broker

# Build in release mode
cargo build --release

# The binaries are now in target/release/
ls target/release/mqtt-*

The following binaries will be built:

BinaryDescription
mqtt-brokerThe MQTT broker server
mqtt-producerCLI tool to publish messages
mqtt-consumerCLI tool to subscribe to topics

Install Globally

To install the binaries to your Cargo bin directory (~/.cargo/bin/):

cargo install --path .

Now you can run the commands from anywhere:

mqtt-broker --help
mqtt-producer --help
mqtt-consumer --help

From GitHub Directly

Install directly from the repository without cloning:

cargo install --git https://github.com/iliasichinava/mqtt-broker.git

Verify Installation

# Check broker version/help
mqtt-broker --help

# Expected output:
# MQTT Broker - Production-grade MQTT 3.1.1 broker
# 
# Usage: mqtt-broker [OPTIONS]
# ...

Next Steps

Now that you have mqtt-broker installed, head to the Quick Start guide to run your first pub/sub workflow.

Quick Start

This guide will get you up and running with mqtt-broker in 5 minutes.

Step 1: Start the Broker

Open a terminal and start the broker:

mqtt-broker

You should see:

╔══════════════════════════════════════════════════════════════╗
β•‘              MQTT Broker v0.1.0 Starting                     β•‘
╠══════════════════════════════════════════════════════════════╣
β•‘  Protocol: MQTT 3.1.1                                        β•‘
β•‘  Address:  0.0.0.0:1883                                      β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

The broker is now listening for connections on port 1883.

Step 2: Subscribe to a Topic

Open a second terminal and subscribe to a topic:

mqtt-consumer "hello/world"

Output:

Connecting to 127.0.0.1:1883...
Connected as 'mqtt-consumer-12345'
Subscribed to 1 topic(s):
  hello/world -> SuccessQoS0

Waiting for messages... (Press Ctrl+C to exit)

The consumer is now waiting for messages on the hello/world topic.

Step 3: Publish a Message

Open a third terminal and publish a message:

mqtt-producer "hello/world" "Hello, MQTT!"

Output:

Connecting to 127.0.0.1:1883...
Connected as 'mqtt-producer-12346'
Published to 'hello/world': Hello, MQTT!
Done.

Step 4: See the Message

Back in the consumer terminal, you should see:

[1] Topic: hello/world | QoS: AtMostOnce | Retain: false
    Payload: Hello, MQTT!

πŸŽ‰ Congratulations! You’ve just sent your first MQTT message.

Using Wildcards

MQTT supports powerful topic wildcards:

Single-Level Wildcard (+)

Subscribe to all rooms’ temperature:

mqtt-consumer "home/+/temperature"

This matches:

  • home/kitchen/temperature βœ…
  • home/bedroom/temperature βœ…
  • home/kitchen/humidity ❌

Multi-Level Wildcard (#)

Subscribe to everything under home/:

mqtt-consumer "home/#"

This matches:

  • home/kitchen/temperature βœ…
  • home/bedroom/humidity βœ…
  • home/garage/door/status βœ…

Using QoS Levels

Specify Quality of Service for reliable delivery:

# QoS 1 - At least once delivery
mqtt-producer "sensors/critical" "alert!" --qos 1

# QoS 2 - Exactly once delivery
mqtt-producer "transactions/payment" "confirmed" --qos 2

Retained Messages

Retain the last message on a topic:

# Publish with retain flag
mqtt-producer "device/status" "online" --retain

# New subscribers will immediately receive "online"
mqtt-consumer "device/status"

Next Steps

Configuration

Broker Configuration

The broker can be configured via command-line arguments:

mqtt-broker [OPTIONS]

Available Options

OptionDefaultDescription
-h, --host <HOST>0.0.0.0Host address to bind to
-p, --port <PORT>1883Port to listen on
--max-clients <N>10000Maximum concurrent connections
--helpβ€”Print help information

Examples

# Listen on localhost only
mqtt-broker --host 127.0.0.1

# Use a different port
mqtt-broker --port 8883

# Limit connections
mqtt-broker --max-clients 1000

# Combine options
mqtt-broker --host 0.0.0.0 --port 1883 --max-clients 5000

Environment Variables

Enable debug logging with the RUST_LOG environment variable:

# Basic info logging
RUST_LOG=info mqtt-broker

# Debug logging
RUST_LOG=debug mqtt-broker

# Trace logging (very verbose)
RUST_LOG=trace mqtt-broker

# Module-specific logging
RUST_LOG=mqtt_broker::server=debug mqtt-broker

Client Configuration

Producer Options

mqtt-producer [OPTIONS] <TOPIC> <MESSAGE>
OptionDefaultDescription
-h, --host <HOST>127.0.0.1Broker host address
-p, --port <PORT>1883Broker port
-q, --qos <QOS>0QoS level (0, 1, 2)
-r, --retainfalseRetain the message
-c, --client <ID>autoClient ID

Consumer Options

mqtt-consumer [OPTIONS] <TOPIC>...
OptionDefaultDescription
-h, --host <HOST>127.0.0.1Broker host address
-p, --port <PORT>1883Broker port
-q, --qos <QOS>0Maximum QoS level
-c, --client <ID>autoClient ID
-n, --count <N>unlimitedExit after N messages

Default Values

The broker uses sensible defaults for production:

SettingDefaultNotes
Max packet size256 KBPer MQTT 3.1.1 spec
Connect timeout10 secondsTime to receive CONNECT
Keep-alive multiplier1.5xPer MQTT spec
Max inflight messages65535Per QoS 1/2 flows
Offline queue size1000Messages per session
Session expiry24 hoursFor persistent sessions

Planned Configuration

Future versions will support:

  • Configuration file (TOML/YAML)
  • TLS/SSL settings
  • Authentication backends
  • Access control lists (ACL)
  • Clustering configuration

mqtt-broker

The MQTT broker server.

Usage

mqtt-broker [OPTIONS]

Options

OptionShortDefaultDescription
--host <HOST>-h0.0.0.0Host address to bind to
--port <PORT>-p1883Port to listen on
--max-clients <N>β€”10000Maximum concurrent connections
--helpβ€”β€”Print help information

Examples

Start with defaults

mqtt-broker

Listens on 0.0.0.0:1883 (all interfaces, standard MQTT port).

Listen on localhost only

mqtt-broker --host 127.0.0.1

Only accepts connections from the local machine.

Custom port

mqtt-broker --port 8883

Useful when running multiple brokers or avoiding privileged ports.

Production settings

mqtt-broker --host 0.0.0.0 --port 1883 --max-clients 50000

With logging

RUST_LOG=info mqtt-broker

Log levels: error, warn, info, debug, trace

Signals

SignalAction
SIGINT (Ctrl+C)Graceful shutdown
SIGTERMGraceful shutdown

During graceful shutdown, the broker:

  1. Stops accepting new connections
  2. Sends DISCONNECT to all clients
  3. Waits for clients to disconnect (up to 30 seconds)
  4. Exits cleanly

Output

╔══════════════════════════════════════════════════════════════╗
β•‘              MQTT Broker v0.1.0 Starting                     β•‘
╠══════════════════════════════════════════════════════════════╣
β•‘  Protocol: MQTT 3.1.1                                        β•‘
β•‘  Address:  0.0.0.0:1883                                      β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

2026-01-31T12:00:00Z  INFO mqtt_broker::server: Client connected: sensor-01 (192.168.1.100:54321)
2026-01-31T12:00:05Z  INFO mqtt_broker::server: Client connected: dashboard (192.168.1.101:54322)
2026-01-31T12:00:10Z  INFO mqtt_broker::server: Client disconnected: sensor-01

Exit Codes

CodeMeaning
0Clean shutdown
1Error (binding failed, invalid arguments, etc.)

mqtt-producer

Command-line tool for publishing MQTT messages.

Usage

mqtt-producer [OPTIONS] <TOPIC> <MESSAGE>

Arguments

ArgumentDescription
<TOPIC>Topic to publish to
<MESSAGE>Message payload (text)

Options

OptionShortDefaultDescription
--host <HOST>-h127.0.0.1Broker host address
--port <PORT>-p1883Broker port
--qos <QOS>-q0Quality of Service (0, 1, 2)
--retain-rfalseRetain message on broker
--client <ID>-cautoClient ID
--helpβ€”β€”Print help

Examples

Simple publish

mqtt-producer "sensors/temperature" "23.5"

Publish with QoS 1

mqtt-producer --qos 1 "sensors/humidity" "65%"

Waits for PUBACK confirmation from broker.

Publish with QoS 2

mqtt-producer --qos 2 "payments/transaction" '{"id": "tx-123", "amount": 99.99}'

Exactly-once delivery with full 4-way handshake.

Retained message

mqtt-producer --retain "device/status" "online"

New subscribers immediately receive this message.

Custom broker

mqtt-producer -h 192.168.1.100 -p 1883 "home/lights" "on"

With specific client ID

mqtt-producer --client "sensor-gateway-01" "sensors/batch" '{"temp": 23, "humidity": 65}'

JSON payload

mqtt-producer "events/user" '{"event": "login", "user": "alice", "timestamp": 1706745600}'

Multi-word message

mqtt-producer "notifications/alert" "Temperature exceeds threshold"

Output

Successful publish (QoS 0)

Connecting to 127.0.0.1:1883...
Connected as 'mqtt-producer-12345'
Published to 'sensors/temperature': 23.5
Done.

Successful publish (QoS 1)

Connecting to 127.0.0.1:1883...
Connected as 'mqtt-producer-12345'
Published to 'sensors/temperature': 23.5
Message acknowledged (QoS 1)
Done.

Successful publish (QoS 2)

Connecting to 127.0.0.1:1883...
Connected as 'mqtt-producer-12345'
Published to 'sensors/temperature': 23.5
Message delivered exactly once (QoS 2)
Done.

Connection failure

Connecting to 127.0.0.1:1883...
Failed to connect: Connection refused (os error 61)

Exit Codes

CodeMeaning
0Message published successfully
1Error (connection failed, invalid arguments, timeout)

Tips

  • Use single quotes for JSON payloads to avoid shell escaping issues
  • Retained messages persist until replaced or cleared with empty payload
  • QoS 2 has higher latency but guarantees exactly-once delivery

mqtt-consumer

Command-line tool for subscribing to MQTT topics.

Usage

mqtt-consumer [OPTIONS] <TOPIC>...

Arguments

ArgumentDescription
<TOPIC>...One or more topic filters to subscribe to

Topic filters support wildcards:

  • + β€” Single-level wildcard
  • # β€” Multi-level wildcard

Options

OptionShortDefaultDescription
--host <HOST>-h127.0.0.1Broker host address
--port <PORT>-p1883Broker port
--qos <QOS>-q0Maximum QoS level
--client <ID>-cautoClient ID
--count <N>-nunlimitedExit after N messages
--helpβ€”β€”Print help

Examples

Subscribe to a specific topic

mqtt-consumer "sensors/temperature"

Subscribe with wildcards

# All topics under sensors/
mqtt-consumer "sensors/#"

# All temperature readings
mqtt-consumer "sensors/+/temperature"

# Multiple patterns
mqtt-consumer "sensors/#" "alerts/#"

Limit message count

# Exit after 10 messages
mqtt-consumer -n 10 "sensors/#"

With QoS 1

mqtt-consumer --qos 1 "critical/#"

Custom broker

mqtt-consumer -h 192.168.1.100 -p 1883 "home/#"

With specific client ID

mqtt-consumer --client "dashboard-main" "telemetry/#"

Output

Subscription confirmation

Connecting to 127.0.0.1:1883...
Connected as 'mqtt-consumer-12345'
Subscribed to 2 topic(s):
  sensors/# -> SuccessQoS0
  alerts/# -> SuccessQoS0

Waiting for messages... (Press Ctrl+C to exit)

Receiving messages

[1] Topic: sensors/living-room/temperature | QoS: AtMostOnce | Retain: false
    Payload: 23.5

[2] Topic: sensors/kitchen/humidity | QoS: AtMostOnce | Retain: false
    Payload: 65%

[3] Topic: alerts/fire | QoS: AtLeastOnce | Retain: false
    Payload: Smoke detected in garage

Retained message

[1] Topic: device/status | QoS: AtMostOnce | Retain: true
    Payload: online

The Retain: true indicates this message was stored on the broker.

With message limit

[1] Topic: sensors/temp | QoS: AtMostOnce | Retain: false
    Payload: 23.5

...

[10] Topic: sensors/temp | QoS: AtMostOnce | Retain: false
    Payload: 24.1

Received 10 message(s), exiting.
Total messages received: 10

Disconnection

^C
Received Ctrl+C, disconnecting...
Total messages received: 42

Wildcard Patterns

Single-level wildcard (+)

Matches exactly one topic level.

PatternMatchesDoesn’t Match
home/+/temperaturehome/kitchen/temperaturehome/temperature
home/bedroom/temperaturehome/floor1/kitchen/temperature
+/+/statusdevice/sensor/statusdevice/status

Multi-level wildcard (#)

Matches zero or more topic levels. Must be last in the filter.

PatternMatches
home/#home, home/kitchen, home/kitchen/temperature
sensors/temperature/#sensors/temperature, sensors/temperature/celsius
#Everything (all topics)

Exit Codes

CodeMeaning
0Clean exit (Ctrl+C or message limit reached)
1Error (connection failed, invalid arguments)

Tips

  • Use quotes around wildcards to prevent shell expansion: "sensors/#"
  • The # wildcard can only appear at the end of a filter
  • Subscribe to # to see all broker traffic (debugging)
  • Use -n 1 to receive just one message and exit

MQTT Protocol

MQTT (Message Queuing Telemetry Transport) is a lightweight publish-subscribe messaging protocol designed for constrained devices and unreliable networks.

Version

This broker implements MQTT 3.1.1 (OASIS Standard, 2014).

Core Concepts

Publish-Subscribe Model

Unlike request-response protocols (HTTP), MQTT uses a publish-subscribe pattern:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Publisherβ”‚ ──────── β”‚ Broker β”‚ ──────── β”‚Subscriberβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   Pub    β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜    Sub   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
      β”‚                    β”‚                    β”‚
      β”‚  PUBLISH           β”‚                    β”‚
      β”‚  topic: temp       β”‚                    β”‚
      β”‚  payload: 23.5     β”‚                    β”‚
      β”‚ ─────────────────► β”‚                    β”‚
      β”‚                    β”‚  PUBLISH           β”‚
      β”‚                    β”‚  topic: temp       β”‚
      β”‚                    β”‚  payload: 23.5     β”‚
      β”‚                    β”‚ ─────────────────► β”‚
  • Publishers send messages to topics
  • Subscribers receive messages from topics they’ve subscribed to
  • Broker routes messages between publishers and subscribers

Decoupling

MQTT provides three types of decoupling:

  1. Space β€” Publishers and subscribers don’t need to know each other
  2. Time β€” They don’t need to be online simultaneously (with persistence)
  3. Synchronization β€” Operations are asynchronous

Packet Types

MQTT defines 14 packet types:

TypeDirectionPurpose
CONNECTClient β†’ BrokerInitiate connection
CONNACKBroker β†’ ClientAcknowledge connection
PUBLISHBothPublish a message
PUBACKBothQoS 1 acknowledgment
PUBRECBothQoS 2 received
PUBRELBothQoS 2 release
PUBCOMPBothQoS 2 complete
SUBSCRIBEClient β†’ BrokerSubscribe to topics
SUBACKBroker β†’ ClientAcknowledge subscription
UNSUBSCRIBEClient β†’ BrokerUnsubscribe from topics
UNSUBACKBroker β†’ ClientAcknowledge unsubscription
PINGREQClient β†’ BrokerKeep-alive ping
PINGRESPBroker β†’ ClientKeep-alive response
DISCONNECTClient β†’ BrokerClean disconnect

Connection Lifecycle

Client                                  Broker
   β”‚                                       β”‚
   β”‚  CONNECT                              β”‚
   β”‚  - client_id: "sensor-01"             β”‚
   β”‚  - clean_session: true                β”‚
   β”‚  - keep_alive: 60                     β”‚
   β”‚ ────────────────────────────────────► β”‚
   β”‚                                       β”‚
   β”‚                           CONNACK     β”‚
   β”‚                           - code: 0   β”‚
   β”‚ ◄──────────────────────────────────── β”‚
   β”‚                                       β”‚
   β”‚        ... publish/subscribe ...      β”‚
   β”‚                                       β”‚
   β”‚  PINGREQ                              β”‚
   β”‚ ────────────────────────────────────► β”‚
   β”‚                           PINGRESP    β”‚
   β”‚ ◄──────────────────────────────────── β”‚
   β”‚                                       β”‚
   β”‚  DISCONNECT                           β”‚
   β”‚ ────────────────────────────────────► β”‚
   β”‚                                       β”‚

Keep-Alive

The keep-alive mechanism ensures connections stay active:

  1. Client specifies keep-alive interval in CONNECT (e.g., 60 seconds)
  2. Client must send a packet within 1.5Γ— the interval
  3. If no data to send, client sends PINGREQ
  4. Broker responds with PINGRESP
  5. If broker receives nothing within 1.5Γ— interval, it closes connection

Wire Format

MQTT uses a compact binary format:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Fixed Header                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  Packet Type (4) β”‚  Flags (4)                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  Remaining Length (1-4 bytes, variable int)         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                 Variable Header                      β”‚
β”‚            (depends on packet type)                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                     Payload                          β”‚
β”‚            (depends on packet type)                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

The variable-length integer encoding allows:

  • 1 byte for values 0-127
  • 2 bytes for values up to 16,383
  • 3 bytes for values up to 2,097,151
  • 4 bytes for values up to 268,435,455 (256 MB max packet)

References

Topics & Wildcards

Topics are the addressing mechanism in MQTT. Publishers send messages to topics, and subscribers receive messages from topics they’ve subscribed to.

Topic Names

A topic name is the destination for a published message.

Rules

  • UTF-8 encoded string
  • At least 1 character
  • Maximum 65,535 bytes
  • Case-sensitive (Sensor/Temp β‰  sensor/temp)
  • Can contain spaces (but not recommended)
  • Cannot contain wildcards (+ or #)
  • Cannot contain null character

Best Practices

βœ… Good topics:
   home/kitchen/temperature
   sensors/device-01/status
   usa/california/sf/weather

❌ Avoid:
   /leading/slash           (creates empty first level)
   trailing/slash/          (creates empty last level)
   double//slash            (creates empty middle level)
   Spaces In Topic          (harder to work with)

Topic Levels

Topics are hierarchical, separated by /:

home / kitchen / temperature
 β”‚       β”‚           β”‚
 β”‚       β”‚           └── Level 3
 β”‚       └────────────── Level 2
 └────────────────────── Level 1

Topic Filters

A topic filter is used when subscribing. It can include wildcards to match multiple topics.

Single-Level Wildcard (+)

Matches exactly one topic level.

# Subscribe
mqtt-consumer "home/+/temperature"
Published TopicMatches?
home/kitchen/temperatureβœ… Yes
home/bedroom/temperatureβœ… Yes
home/temperature❌ No (missing level)
home/floor1/kitchen/temperature❌ No (extra level)

More examples:

FilterMatches
+/tennis/#sport/tennis/player1
sport/+/player1sport/tennis/player1, sport/golf/player1
+/+a/b, foo/bar (exactly 2 levels)

Multi-Level Wildcard (#)

Matches zero or more topic levels. Must be the last character.

# Subscribe to everything under home/
mqtt-consumer "home/#"
Published TopicMatches?
homeβœ… Yes
home/kitchenβœ… Yes
home/kitchen/temperatureβœ… Yes
home/floor1/kitchen/oven/statusβœ… Yes
office/kitchen❌ No

More examples:

FilterMatches
#Everything (all topics)
sport/#sport, sport/tennis, sport/tennis/player1
sport/tennis/#sport/tennis, sport/tennis/player1/ranking

Combining Wildcards

You can use both wildcards in one filter:

mqtt-consumer "+/sensors/+/temperature/#"

Matches:

  • home/sensors/kitchen/temperature
  • office/sensors/room1/temperature/celsius

System Topics

Topics starting with $ are reserved for broker internals:

$SYS/broker/version
$SYS/broker/uptime
$SYS/broker/clients/connected
  • Clients cannot publish to $ topics
  • Wildcard # does not match $ topics
  • Must explicitly subscribe: $SYS/#

Topic Design Patterns

Hierarchical (Location-based)

{country}/{state}/{city}/{building}/{floor}/{room}/{sensor}
usa/california/sf/office1/floor2/room201/temperature

Entity-based

{entity_type}/{entity_id}/{attribute}
device/sensor-001/temperature
user/alice/presence

Event-based

{domain}/{event_type}/{entity}
orders/created/order-123
payments/processed/tx-456

Command & Response

# Commands
device/{device_id}/command
device/{device_id}/command/reboot

# Responses
device/{device_id}/response
device/{device_id}/status

Implementation Details

This broker uses a trie data structure for O(k) topic matching, where k is the number of topic levels. This enables efficient matching even with millions of subscriptions.

Root
β”œβ”€β”€ home
β”‚   β”œβ”€β”€ kitchen
β”‚   β”‚   └── temperature [subscriber: client-1]
β”‚   └── + (wildcard)
β”‚       └── humidity [subscriber: client-2]
└── sensors
    └── # (multi-wildcard) [subscriber: client-3]

Quality of Service (QoS)

QoS levels define the delivery guarantee between a client and the broker.

Overview

LevelNameGuaranteeUse Case
0At most onceFire and forgetTelemetry, non-critical data
1At least onceGuaranteed, may duplicateImportant events
2Exactly onceGuaranteed, no duplicatesFinancial transactions

QoS 0 β€” At Most Once

The simplest level. Message is sent once with no acknowledgment.

Publisher                Broker                 Subscriber
    β”‚                       β”‚                       β”‚
    β”‚  PUBLISH (QoS 0)      β”‚                       β”‚
    β”‚ ────────────────────► β”‚                       β”‚
    β”‚                       β”‚  PUBLISH (QoS 0)      β”‚
    β”‚                       β”‚ ────────────────────► β”‚
    β”‚                       β”‚                       β”‚

Characteristics:

  • Fastest (no round trips)
  • No retransmission
  • Message may be lost
  • No duplicate detection needed

When to use:

  • Sensor readings at high frequency
  • Data where occasional loss is acceptable
  • When bandwidth/latency is critical

QoS 1 β€” At Least Once

Message is guaranteed to arrive, but may arrive multiple times.

Publisher                Broker                 Subscriber
    β”‚                       β”‚                       β”‚
    β”‚  PUBLISH (QoS 1)      β”‚                       β”‚
    β”‚  packet_id: 1         β”‚                       β”‚
    β”‚ ────────────────────► β”‚                       β”‚
    β”‚                       β”‚  PUBLISH (QoS 1)      β”‚
    β”‚                       β”‚  packet_id: 1         β”‚
    β”‚                       β”‚ ────────────────────► β”‚
    β”‚                       β”‚                       β”‚
    β”‚                       β”‚  PUBACK               β”‚
    β”‚                       β”‚  packet_id: 1         β”‚
    β”‚                       β”‚ ◄──────────────────── β”‚
    β”‚  PUBACK               β”‚                       β”‚
    β”‚  packet_id: 1         β”‚                       β”‚
    β”‚ ◄──────────────────── β”‚                       β”‚

Characteristics:

  • One round trip (PUBLISH β†’ PUBACK)
  • Message stored until acknowledged
  • Retransmitted if no PUBACK received
  • Duplicates possible on retransmit

When to use:

  • Important events that must arrive
  • When duplicates can be handled (idempotent operations)
  • Status updates, alerts

QoS 2 β€” Exactly Once

Most reliable level. Guarantees exactly one delivery.

Publisher                Broker                 Subscriber
    β”‚                       β”‚                       β”‚
    β”‚  PUBLISH (QoS 2)      β”‚                       β”‚
    β”‚  packet_id: 1         β”‚                       β”‚
    β”‚ ────────────────────► β”‚                       β”‚
    β”‚                       β”‚                       β”‚
    β”‚  PUBREC               β”‚                       β”‚
    β”‚  packet_id: 1         β”‚                       β”‚
    β”‚ ◄──────────────────── β”‚                       β”‚
    β”‚                       β”‚  PUBLISH (QoS 2)      β”‚
    β”‚                       β”‚ ────────────────────► β”‚
    β”‚                       β”‚  PUBREC               β”‚
    β”‚                       β”‚ ◄──────────────────── β”‚
    β”‚  PUBREL               β”‚                       β”‚
    β”‚  packet_id: 1         β”‚                       β”‚
    β”‚ ────────────────────► β”‚                       β”‚
    β”‚                       β”‚  PUBREL               β”‚
    β”‚                       β”‚ ────────────────────► β”‚
    β”‚                       β”‚  PUBCOMP              β”‚
    β”‚                       β”‚ ◄──────────────────── β”‚
    β”‚  PUBCOMP              β”‚                       β”‚
    β”‚  packet_id: 1         β”‚                       β”‚
    β”‚ ◄──────────────────── β”‚                       β”‚

Characteristics:

  • Two round trips (4-way handshake)
  • Highest latency
  • No duplicates
  • State must be tracked

When to use:

  • Financial transactions
  • Billing events
  • Critical commands
  • When duplicates are unacceptable

QoS Downgrade

The actual QoS is the minimum of:

  1. Publisher’s QoS
  2. Subscriber’s maximum QoS
Publisher QoS    Subscriber Max QoS    Actual Delivery
     2                   0                    0
     2                   1                    1
     1                   2                    1
     0                   2                    0

Example:

# Publisher sends with QoS 2
mqtt-producer --qos 2 "topic" "message"

# Subscriber subscribed with QoS 1
mqtt-consumer --qos 1 "topic"

# Message delivered with QoS 1 (minimum)

Packet Identifiers

QoS 1 and 2 use packet IDs to track message flows:

  • 16-bit unsigned integer (1-65535)
  • Unique per client per direction
  • Reusable after flow completes
  • 0 is reserved (not used)

State Machines

QoS 1 Publisher State

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     PUBLISH     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Idle      β”‚ ──────────────► β”‚  Awaiting   β”‚
β”‚             β”‚                 β”‚   PUBACK    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                 β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
       β–²                               β”‚
       β”‚            PUBACK             β”‚
       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

QoS 2 Publisher State

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     PUBLISH     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Idle      β”‚ ──────────────► β”‚  Awaiting   β”‚
β”‚             β”‚                 β”‚   PUBREC    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                 β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
       β–²                               β”‚ PUBREC
       β”‚                               β–Ό
       β”‚                        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
       β”‚          PUBCOMP       β”‚  Awaiting   β”‚
       └─────────────────────── β”‚   PUBCOMP   β”‚
                                β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
                                       β”‚ PUBREL
                                       β–Ό

Performance Considerations

AspectQoS 0QoS 1QoS 2
Packets124
LatencyLowestMediumHighest
BandwidthLowestMediumHighest
MemoryNoneModerateHigher
ReliabilityNoneHighHighest

Best Practices

  1. Default to QoS 0 for high-frequency telemetry
  2. Use QoS 1 for important data where duplicates are okay
  3. Use QoS 2 sparingly β€” only when exactly-once is required
  4. Design for idempotency β€” prefer QoS 1 + idempotent handlers over QoS 2
  5. Consider trade-offs β€” QoS 2 is 4Γ— the packets of QoS 0

Sessions

MQTT sessions allow state to persist between client connections.

Session Types

Clean Session (clean_session: true)

A fresh start every connection:

  • All subscriptions are cleared on disconnect
  • All queued messages are discarded
  • No state persists between connections
# Default behavior β€” clean session
mqtt-consumer "sensors/#"
# Disconnect and reconnect β†’ must resubscribe

Persistent Session (clean_session: false)

State persists across connections:

  • Subscriptions are maintained
  • QoS 1/2 messages are queued while offline
  • Queued messages delivered on reconnect

Session State

The broker stores the following for each session:

ComponentDescription
Client IDUnique identifier
SubscriptionsTopic filters and QoS levels
QoS 1 outboundMessages awaiting PUBACK
QoS 2 outboundMessages in QoS 2 flow
QoS 2 inboundReceived messages awaiting PUBREL
Offline queueMessages for offline client

Connection Flow

Clean Session

Client                                    Broker
   β”‚                                         β”‚
   β”‚  CONNECT                                β”‚
   β”‚  client_id: "sensor-01"                 β”‚
   β”‚  clean_session: true                    β”‚
   β”‚ ──────────────────────────────────────► β”‚
   β”‚                                         β”‚ Creates new session
   β”‚                             CONNACK     β”‚
   β”‚                     session_present: 0  β”‚
   β”‚ ◄────────────────────────────────────── β”‚

Persistent Session (First Connect)

Client                                    Broker
   β”‚                                         β”‚
   β”‚  CONNECT                                β”‚
   β”‚  client_id: "sensor-01"                 β”‚
   β”‚  clean_session: false                   β”‚
   β”‚ ──────────────────────────────────────► β”‚
   β”‚                                         β”‚ Creates new session
   β”‚                             CONNACK     β”‚
   β”‚                     session_present: 0  β”‚
   β”‚ ◄────────────────────────────────────── β”‚

Persistent Session (Reconnect)

Client                                    Broker
   β”‚                                         β”‚
   β”‚  CONNECT                                β”‚
   β”‚  client_id: "sensor-01"                 β”‚
   β”‚  clean_session: false                   β”‚
   β”‚ ──────────────────────────────────────► β”‚
   β”‚                                         β”‚ Finds existing session
   β”‚                             CONNACK     β”‚
   β”‚                     session_present: 1  β”‚
   β”‚ ◄────────────────────────────────────── β”‚
   β”‚                                         β”‚
   β”‚                                         β”‚ Delivers queued messages
   β”‚                             PUBLISH     β”‚
   β”‚                             PUBLISH     β”‚
   β”‚ ◄────────────────────────────────────── β”‚

Session Takeover

If a client connects with an ID that’s already connected:

  1. Broker disconnects the existing client
  2. New client takes over the session
  3. This prevents duplicate client IDs
Client A (sensor-01)          Broker          Client B (sensor-01)
        β”‚                        β”‚                     β”‚
        β”‚  ◄── Connected ──►     β”‚                     β”‚
        β”‚                        β”‚                     β”‚
        β”‚                        β”‚  CONNECT            β”‚
        β”‚                        β”‚  client_id: sensor-01
        β”‚                        β”‚ ◄─────────────────── β”‚
        β”‚                        β”‚                     β”‚
   [Disconnected]                β”‚  Takes over session β”‚
        β”‚                        β”‚                     β”‚
        β”‚                        β”‚  CONNACK            β”‚
        β”‚                        β”‚ ────────────────────►│

Offline Message Queuing

When a client with a persistent session disconnects:

  1. Broker keeps the session
  2. Incoming messages (matching subscriptions) are queued
  3. When client reconnects, queued messages are delivered
Subscriber                Broker                Publisher
(offline)                    β”‚                      β”‚
    X                        β”‚                      β”‚
    β”‚                        β”‚  PUBLISH "sensors/t" β”‚
    β”‚                        β”‚ ◄──────────────────── β”‚
    β”‚                        β”‚                      β”‚
    β”‚                   [Queues message for subscriber]
    β”‚                        β”‚                      β”‚
    β”‚  CONNECT               β”‚                      β”‚
    β”‚ ─────────────────────► β”‚                      β”‚
    β”‚                        β”‚                      β”‚
    β”‚  CONNACK (session: 1)  β”‚                      β”‚
    β”‚ ◄───────────────────── β”‚                      β”‚
    β”‚                        β”‚                      β”‚
    β”‚  PUBLISH "sensors/t"   β”‚  (queued message)    β”‚
    β”‚ ◄───────────────────── β”‚                      β”‚

Queue Limits

To prevent memory exhaustion, this broker limits:

SettingDefaultDescription
Max offline messages1000Per client
Max inflight65535QoS 1/2 in progress
Session expiry24 hoursAfter disconnect

When the queue is full, oldest messages are dropped (or new messages, depending on policy).

Session Expiry

Sessions don’t live forever:

  • After session_expiry time, inactive sessions are cleaned up
  • This prevents unbounded memory growth
  • Default: 24 hours after last disconnect

Client ID Rules

  • Must be 1-23 characters (per MQTT 3.1.1)
  • Characters: a-z, A-Z, 0-9
  • If empty, broker generates one (requires clean_session: true)
  • Must be unique across all connected clients

Best Practices

  1. Use persistent sessions for reliability β€” Critical IoT devices
  2. Use clean sessions for ephemeral clients β€” Dashboards, CLI tools
  3. Set appropriate expiry β€” Balance reliability vs. memory
  4. Handle session_present β€” Client should check CONNACK
  5. Use consistent client IDs β€” Enables session recovery
  6. Size queues appropriately β€” Based on expected offline duration

Implementation Details

This broker uses:

  • In-memory session storage (default)
  • LRU eviction for session expiry
  • Separate queues per QoS level
  • Atomic session takeover

Architecture Overview

This broker is built with modularity and testability as core principles.

High-Level Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                           TCP Listener                              β”‚
β”‚                         (tokio::net::TcpListener)                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                          Transport Layer                            β”‚
β”‚                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚
β”‚  β”‚    MqttCodec        β”‚         β”‚    Connection       β”‚           β”‚
β”‚  β”‚ (Framed<TcpStream>) β”‚ ───────►│   (async wrapper)   β”‚           β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                          Codec Layer                                β”‚
β”‚                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚     Decoder     β”‚  β”‚     Encoder     β”‚  β”‚    Packet Types    β”‚  β”‚
β”‚  β”‚ (MQTT 3.1.1)    β”‚  β”‚ (MQTT 3.1.1)    β”‚  β”‚ (16 packet types)  β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                          Server Layer                               β”‚
β”‚                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚    MqttServer       β”‚         β”‚     ClientHandler           β”‚   β”‚
β”‚  β”‚  (accept loop)      │────────►│  (per-connection state)     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                          Session Layer                              β”‚
β”‚                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚     Session     β”‚  β”‚   QoS State     β”‚  β”‚  Packet ID Alloc   β”‚  β”‚
β”‚  β”‚  (client state) β”‚  β”‚   Machines      β”‚  β”‚   (1-65535)        β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                          Router Layer                               β”‚
β”‚                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚     Router      β”‚  β”‚    Registry     β”‚  β”‚   Retain Store     β”‚  β”‚
β”‚  β”‚(msg distribution)β”‚  β”‚ (subscriptions) β”‚  β”‚ (retained msgs)    β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                       Topic Matcher Layer                           β”‚
β”‚                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚   TopicTrie     β”‚  β”‚  TopicFilter    β”‚  β”‚   Validation       β”‚  β”‚
β”‚  β”‚   (O(levels))   β”‚  β”‚  (+, #, exact)  β”‚  β”‚   (UTF-8, limits)  β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        Persistence Layer                            β”‚
β”‚                                                                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚  Storage Trait  β”‚  β”‚  MemoryStore    β”‚  β”‚  (Future: Disk)    β”‚  β”‚
β”‚  β”‚   (abstract)    β”‚  β”‚  (default)      β”‚  β”‚                    β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Data Flow

Publish Flow (QoS 1)

Publisher               Broker                       Subscriber
    β”‚                      β”‚                             β”‚
    β”‚  PUBLISH (QoS 1)     β”‚                             β”‚
    β”‚ ────────────────────►│                             β”‚
    β”‚                      β”‚                             β”‚
    β”‚                      │──── Route to matching ─────►│
    β”‚                      β”‚     subscribers             β”‚
    β”‚                      β”‚                             β”‚
    β”‚                      β”‚                    PUBLISH  β”‚
    β”‚                      │─────────────────────────────►
    β”‚                      β”‚                             β”‚
    β”‚                      │◄──────────────────── PUBACK β”‚
    β”‚                      β”‚                             β”‚
    β”‚               PUBACK β”‚                             β”‚
    │◄─────────────────────│                             β”‚

Subscribe Flow

Client                  Broker
   β”‚                       β”‚
   β”‚  SUBSCRIBE            β”‚
   β”‚  topic: "sensors/#"   β”‚
   β”‚  qos: 1               β”‚
   β”‚ ─────────────────────►│
   β”‚                       β”‚
   β”‚                       β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚                       β”‚  β”‚ 1. Validate filter   β”‚
   β”‚                       β”‚  β”‚ 2. Add to trie       β”‚
   β”‚                       β”‚  β”‚ 3. Store in session  β”‚
   β”‚                       β”‚  β”‚ 4. Check retain      β”‚
   β”‚                       β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
   β”‚                       β”‚
   β”‚  SUBACK               β”‚
   β”‚  granted: [1]         β”‚
   │◄──────────────────────│
   β”‚                       β”‚
   β”‚  PUBLISH (retained)   β”‚
   │◄──────────────────────│

Concurrency Model

Task Structure

                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚      Main Task          β”‚
                    β”‚   (accept connections)  β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                β”‚
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
            β–Ό                   β–Ό                   β–Ό
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚  Client Task   β”‚  β”‚  Client Task   β”‚  β”‚  Client Task   β”‚
   β”‚  (handle one)  β”‚  β”‚  (handle one)  β”‚  β”‚  (handle one)  β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Each client gets its own async task for:

  • Reading packets from socket
  • Processing MQTT protocol
  • Writing responses

Shared State

                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚    Shared State        β”‚
                    β”‚    (Arc<Router>)       β”‚
                    β”‚                        β”‚
                    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
                    β”‚  β”‚ Subscription Trieβ”‚  β”‚
                    β”‚  β”‚   (RwLock)       β”‚  β”‚
                    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
                    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
                    β”‚  β”‚ Session Store    β”‚  β”‚
                    β”‚  β”‚   (RwLock)       β”‚  β”‚
                    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
                    β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
                    β”‚  β”‚ Retain Store     β”‚  β”‚
                    β”‚  β”‚   (RwLock)       β”‚  β”‚
                    β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                           β–²    β–²    β–²
                           β”‚    β”‚    β”‚
      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚    └────────────────────┐
      β”‚                         β”‚                         β”‚
 Client Task 1            Client Task 2             Client Task 3

Lock granularity:

  • Read locks for subscription lookups (fast, parallel)
  • Write locks for subscribe/unsubscribe (rare)
  • Per-session locks for client state

Error Handling Strategy

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        Error Hierarchy                          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  MqttError (top-level)                                          β”‚
β”‚    β”œβ”€β”€ CodecError (packet encoding/decoding)                    β”‚
β”‚    β”‚     β”œβ”€β”€ InvalidPacketType                                  β”‚
β”‚    β”‚     β”œβ”€β”€ MalformedRemainingLength                           β”‚
β”‚    β”‚     β”œβ”€β”€ InvalidUtf8                                        β”‚
β”‚    β”‚     └── PacketTooLarge                                     β”‚
β”‚    β”‚                                                            β”‚
β”‚    β”œβ”€β”€ ProtocolError (MQTT violations)                          β”‚
β”‚    β”‚     β”œβ”€β”€ InvalidClientId                                    β”‚
β”‚    β”‚     β”œβ”€β”€ InvalidTopicFilter                                 β”‚
β”‚    β”‚     └── PacketIdInUse                                      β”‚
β”‚    β”‚                                                            β”‚
β”‚    β”œβ”€β”€ SessionError (session management)                        β”‚
β”‚    β”‚     β”œβ”€β”€ SessionNotFound                                    β”‚
β”‚    β”‚     └── SessionTakeover                                    β”‚
β”‚    β”‚                                                            β”‚
β”‚    └── IoError (network)                                        β”‚
β”‚          └── (std::io::Error wrapped)                           β”‚
β”‚                                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Memory Management

Per-Connection Memory

ComponentTypical Size
TCP buffer (read)8 KB
TCP buffer (write)8 KB
Session state~1 KB
Subscription entries~100 bytes each
Inflight messages~1 KB each

Global Memory

ComponentTypical Size
Subscription trieO(unique topic levels)
Session storeO(connected clients)
Retain storeO(retained messages)
Offline queuesO(queued messages)

Performance Characteristics

OperationTime ComplexityNotes
Topic matchO(topic levels)Trie traversal
SubscribeO(filter levels)Trie insertion
UnsubscribeO(filter levels)Trie removal
Publish routingO(subscribers)For each match
Session lookupO(1)HashMap
Packet decodeO(packet size)Linear scan
Packet encodeO(packet size)Linear write

Design Decisions

Why Trie for Topics?

  • Wildcards require prefix matching
  • O(levels) vs O(total subscriptions)
  • Memory efficient for shared prefixes
  • Natural hierarchy representation

Why In-Memory Storage?

  • Simplicity for learning/prototyping
  • Sufficient for most use cases
  • Trait abstraction allows disk later
  • Avoids I/O latency

Why One Task Per Client?

  • Simple mental model
  • No complex message passing
  • Backpressure handled naturally
  • Easy cleanup on disconnect

Why RwLock vs Channel?

  • Subscriptions are read-heavy
  • Parallel reads scale well
  • Writes are rare (subscribe/unsubscribe)
  • Simpler than actor model

Module Reference

This broker is organized into well-defined modules, each with a single responsibility.

Module Dependency Graph

                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚    bin/broker   β”‚
                    β”‚  bin/producer   β”‚
                    β”‚  bin/consumer   β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                             β”‚
                             β–Ό
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚     server      β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                             β”‚
          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
          β–Ό                  β–Ό                  β–Ό
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚   session   β”‚    β”‚   router    β”‚    β”‚  transport  β”‚
   β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
          β”‚                  β”‚                  β”‚
          β”‚           β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”           β”‚
          β”‚           β–Ό             β–Ό           β”‚
          β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
          β”‚    β”‚topic_matchβ”‚ β”‚persistenceβ”‚      β”‚
          β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
          β”‚                                     β”‚
          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            β–Ό
                     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                     β”‚    codec    β”‚
                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

codec β€” MQTT 3.1.1 Protocol

Location: src/codec/

Purpose: Encode and decode all 16 MQTT 3.1.1 packet types.

Key Types

#![allow(unused)]
fn main() {
// All packet types
pub enum MqttPacket {
    Connect(ConnectPacket),
    Connack(ConnackPacket),
    Publish(PublishPacket),
    Puback(PubackPacket),
    Pubrec(PubrecPacket),
    Pubrel(PubrelPacket),
    Pubcomp(PubcompPacket),
    Subscribe(SubscribePacket),
    Suback(SubackPacket),
    Unsubscribe(UnsubscribePacket),
    Unsuback(UnsubackPacket),
    Pingreq,
    Pingresp,
    Disconnect,
}

// Decoder
pub struct MqttDecoder;
impl MqttDecoder {
    pub fn decode(bytes: &mut BytesMut) -> Result<Option<MqttPacket>>;
}

// Encoder
pub struct MqttEncoder;
impl MqttEncoder {
    pub fn encode(packet: &MqttPacket, buf: &mut BytesMut) -> Result<()>;
}
}

Files

FileDescription
mod.rsModule re-exports
packet.rsPacket type definitions
types.rsCommon types (QoS, ConnectFlags)
decode.rsPacket decoding logic
encode.rsPacket encoding logic
varint.rsVariable-length integer codec
error.rsCodec error types
tests.rsUnit tests (60 tests)

topic_matcher β€” Wildcard Matching

Location: src/topic_matcher/

Purpose: Efficient topic filter matching with wildcard support.

Key Types

#![allow(unused)]
fn main() {
// Parsed topic filter
pub struct TopicFilter {
    pub pattern: String,
    pub levels: Vec<Level>,
}

pub enum Level {
    Exact(String),
    SingleWildcard,  // +
    MultiWildcard,   // #
}

// Trie-based matcher
pub struct TopicTrie<T> {
    // Insert subscription
    pub fn insert(&mut self, filter: &TopicFilter, value: T);
    
    // Find all matching subscriptions
    pub fn matches(&self, topic: &str) -> Vec<&T>;
    
    // Remove subscription
    pub fn remove(&mut self, filter: &TopicFilter) -> Option<T>;
}
}

Wildcard Rules

PatternMatchesDoesn’t Match
sensors/+/tempsensors/room1/tempsensors/temp
sensors/#sensors/a/b/cother/topic
+/+/+a/b/ca/b

Files

FileDescription
mod.rsModule re-exports
filter.rsTopicFilter parsing
trie.rsTrie implementation
validation.rsTopic/filter validation
tests.rsUnit tests (23 tests)

session β€” Connection State

Location: src/session/

Purpose: Manage client sessions and QoS state machines.

Key Types

#![allow(unused)]
fn main() {
// Client session
pub struct Session {
    pub client_id: String,
    pub clean_session: bool,
    pub subscriptions: HashMap<String, QoS>,
    pub connection_state: ConnectionState,
}

// QoS 1 state machine
pub enum Qos1State {
    AwaitingPuback { packet_id: u16, message: Message },
    Complete,
}

// QoS 2 state machine  
pub enum Qos2State {
    AwaitingPubrec { packet_id: u16, message: Message },
    AwaitingPubrel { packet_id: u16 },
    AwaitingPubcomp { packet_id: u16 },
    Complete,
}

// Packet ID allocator
pub struct PacketIdAllocator {
    pub fn allocate(&mut self) -> Option<u16>;
    pub fn release(&mut self, id: u16);
}
}

Files

FileDescription
mod.rsModule re-exports
session.rsSession management
connection.rsConnection state
qos.rsQoS state machines
packet_id.rsPacket ID allocation
tests.rsUnit tests (24 tests)

router β€” Message Distribution

Location: src/router/

Purpose: Route published messages to matching subscribers.

Key Types

#![allow(unused)]
fn main() {
// Central router
pub struct Router {
    subscriptions: TopicTrie<Subscriber>,
    retain_store: RetainStore,
    sessions: SessionStore,
}

impl Router {
    // Add subscription
    pub fn subscribe(&self, client_id: &str, filter: &str, qos: QoS);
    
    // Remove subscription
    pub fn unsubscribe(&self, client_id: &str, filter: &str);
    
    // Route message to subscribers
    pub fn route(&self, message: &PublishPacket) -> Vec<Delivery>;
    
    // Store retained message
    pub fn retain(&self, topic: &str, message: Option<Message>);
}

// Subscription registry
pub struct Registry {
    pub fn add(&self, filter: &str, subscriber: Subscriber);
    pub fn remove(&self, filter: &str, client_id: &str);
    pub fn matches(&self, topic: &str) -> Vec<Subscriber>;
}
}

Files

FileDescription
mod.rsModule re-exports
router.rsMain router logic
registry.rsSubscription registry
retain.rsRetained message store
tests.rsUnit tests (24 tests)

persistence β€” Storage Abstraction

Location: src/persistence/

Purpose: Abstract storage for sessions, messages, and retained data.

Key Types

#![allow(unused)]
fn main() {
// Storage trait
#[async_trait]
pub trait Storage: Send + Sync {
    // Session operations
    async fn store_session(&self, session: &Session) -> Result<()>;
    async fn load_session(&self, client_id: &str) -> Result<Option<Session>>;
    async fn delete_session(&self, client_id: &str) -> Result<()>;
    
    // Message queue operations
    async fn enqueue(&self, client_id: &str, message: Message) -> Result<()>;
    async fn dequeue(&self, client_id: &str) -> Result<Vec<Message>>;
    
    // Retained message operations
    async fn store_retained(&self, topic: &str, message: Option<Message>) -> Result<()>;
    async fn load_retained(&self, topic: &str) -> Result<Option<Message>>;
}

// In-memory implementation
pub struct MemoryStore {
    sessions: RwLock<HashMap<String, Session>>,
    queues: RwLock<HashMap<String, VecDeque<Message>>>,
    retained: RwLock<HashMap<String, Message>>,
}
}

Files

FileDescription
mod.rsModule re-exports
traits.rsStorage trait definitions
memory.rsIn-memory implementation
tests.rsUnit tests (20 tests)

transport β€” Network Layer

Location: src/transport/

Purpose: TCP connection handling and MQTT framing.

Key Types

#![allow(unused)]
fn main() {
// MQTT codec for tokio
pub struct MqttCodec;

impl Decoder for MqttCodec {
    type Item = MqttPacket;
    type Error = CodecError;
    
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>>;
}

impl Encoder<MqttPacket> for MqttCodec {
    type Error = CodecError;
    
    fn encode(&mut self, item: MqttPacket, dst: &mut BytesMut) -> Result<()>;
}

// Connection wrapper
pub struct Connection {
    inner: Framed<TcpStream, MqttCodec>,
}

impl Connection {
    pub async fn read(&mut self) -> Result<Option<MqttPacket>>;
    pub async fn write(&mut self, packet: MqttPacket) -> Result<()>;
}
}

Files

FileDescription
mod.rsModule re-exports
codec.rsTokio codec implementation
connection.rsConnection wrapper
tests.rsUnit tests (16 tests)

server β€” Broker Server

Location: src/server/

Purpose: Accept connections and handle MQTT protocol.

Key Types

#![allow(unused)]
fn main() {
// Server configuration
pub struct ServerConfig {
    pub bind_addr: SocketAddr,
    pub max_connections: usize,
    pub max_packet_size: usize,
}

// Main server
pub struct MqttServer {
    config: ServerConfig,
    router: Arc<Router>,
}

impl MqttServer {
    pub async fn run(&self) -> Result<()>;
}

// Per-client handler
pub struct ClientHandler {
    connection: Connection,
    session: Option<Session>,
    router: Arc<Router>,
}

impl ClientHandler {
    pub async fn run(&mut self) -> Result<()>;
    
    async fn handle_connect(&mut self, packet: ConnectPacket) -> Result<()>;
    async fn handle_publish(&mut self, packet: PublishPacket) -> Result<()>;
    async fn handle_subscribe(&mut self, packet: SubscribePacket) -> Result<()>;
    // ... other packet handlers
}
}

Files

FileDescription
mod.rsModule re-exports
server.rsMain server loop
handler.rsClient handler
client.rsClient state management
tests.rsIntegration tests (11 tests)

bin/ β€” Executables

Location: src/bin/

Purpose: CLI entry points.

Binaries

BinaryDescription
broker.rs β†’ mqtt-brokerStart the broker server
producer.rs β†’ mqtt-producerPublish messages
consumer.rs β†’ mqtt-consumerSubscribe and receive

Test Coverage

ModuleTestsCoverage
codec60Packet encode/decode roundtrips
topic_matcher23Wildcard matching, edge cases
session24State machines, packet IDs
router24Routing, subscriptions
persistence20Storage operations
transport16Framing, connection handling
server11Integration scenarios
Total178

Adding a New Module

  1. Create directory: src/mymodule/
  2. Add mod.rs with public exports
  3. Add tests.rs with unit tests
  4. Export from src/lib.rs: pub mod mymodule;
  5. Add to this documentation

Module Template

#![allow(unused)]
fn main() {
// src/mymodule/mod.rs
mod implementation;
mod tests;

pub use implementation::*;

// src/mymodule/implementation.rs
pub struct MyType {
    // ...
}

impl MyType {
    pub fn new() -> Self {
        // ...
    }
}

// src/mymodule/tests.rs
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_basic() {
        let my = MyType::new();
        // assertions
    }
}
}

Building from Source

Build the MQTT broker and CLI tools from source.

Prerequisites

Rust Toolchain

Install Rust via rustup:

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

Minimum version: Rust 1.70+ (uses edition 2021)

Verify installation:

rustc --version
cargo --version

Platform Support

PlatformStatusNotes
Linux (x86_64)βœ…Primary development
macOS (x86_64)βœ…Fully supported
macOS (ARM64)βœ…Apple Silicon
Windows (x86_64)βœ…MSVC toolchain

Clone Repository

git clone https://github.com/yourusername/mqtt-broker.git
cd mqtt-broker

Build Commands

Debug Build

Fast compilation, includes debug symbols, no optimizations:

cargo build

Binaries are placed in target/debug/:

target/debug/
β”œβ”€β”€ mqtt-broker
β”œβ”€β”€ mqtt-consumer
└── mqtt-producer

Release Build

Optimized for production, slower to compile:

cargo build --release

Binaries are placed in target/release/:

target/release/
β”œβ”€β”€ mqtt-broker
β”œβ”€β”€ mqtt-consumer
└── mqtt-producer

Build Specific Binary

# Only the broker
cargo build --bin mqtt-broker --release

# Only the producer
cargo build --bin mqtt-producer --release

# Only the consumer
cargo build --bin mqtt-consumer --release

Build Library Only

cargo build --lib

Install Locally

Install to ~/.cargo/bin/ (should be in your PATH):

cargo install --path .

Now available anywhere:

mqtt-broker --help
mqtt-producer --help
mqtt-consumer --help

Build Configuration

Cargo.toml Features

Currently no optional features. All functionality is included by default.

Profile Settings

Release profile in Cargo.toml:

[profile.release]
opt-level = 3
lto = true
codegen-units = 1

This produces smaller, faster binaries but increases compile time.

Reducing Binary Size

For minimal binaries:

# Strip debug symbols
cargo build --release
strip target/release/mqtt-broker

# Or use build flags
RUSTFLAGS="-C link-arg=-s" cargo build --release

Cross Compilation

Install Target

# Linux on macOS
rustup target add x86_64-unknown-linux-gnu

# Windows on Linux/macOS
rustup target add x86_64-pc-windows-gnu

# ARM64 Linux
rustup target add aarch64-unknown-linux-gnu

Cross Build

# Linux
cargo build --release --target x86_64-unknown-linux-gnu

# Windows
cargo build --release --target x86_64-pc-windows-gnu

Note: Cross compilation may require additional linkers. Consider using cross:

cargo install cross
cross build --release --target x86_64-unknown-linux-gnu

Docker Build

Build Image

# Dockerfile
FROM rust:1.75 as builder
WORKDIR /usr/src/mqtt-broker
COPY . .
RUN cargo build --release

FROM debian:bookworm-slim
COPY --from=builder /usr/src/mqtt-broker/target/release/mqtt-broker /usr/local/bin/
EXPOSE 1883
CMD ["mqtt-broker"]

Build and run:

docker build -t mqtt-broker .
docker run -p 1883:1883 mqtt-broker

Multi-Architecture

docker buildx build --platform linux/amd64,linux/arm64 -t mqtt-broker .

Documentation Build

Generate API Docs

cargo doc --open

Opens documentation in your browser.

Build mdBook Docs

# Install mdBook
cargo install mdbook

# Build documentation site
cd docs
mdbook build

# Serve locally
mdbook serve --open

Troubleshooting

Compilation Errors

Missing dependencies:

# Ubuntu/Debian
sudo apt-get update
sudo apt-get install build-essential

# macOS
xcode-select --install

Outdated Rust:

rustup update

Linker Errors

macOS:

xcode-select --install

Linux:

sudo apt-get install gcc

Slow Builds

Use incremental compilation (enabled by default) and consider:

# Use mold linker (Linux)
sudo apt-get install mold
RUSTFLAGS="-C link-arg=-fuse-ld=mold" cargo build

# Use lld linker
RUSTFLAGS="-C link-arg=-fuse-ld=lld" cargo build

Out of Memory

Reduce parallelism:

cargo build -j 2

Build Artifacts

After a successful build:

target/
β”œβ”€β”€ debug/
β”‚   β”œβ”€β”€ mqtt-broker        # Debug binary
β”‚   β”œβ”€β”€ mqtt-consumer
β”‚   β”œβ”€β”€ mqtt-producer
β”‚   β”œβ”€β”€ deps/              # Dependencies
β”‚   └── incremental/       # Incremental build cache
β”œβ”€β”€ release/
β”‚   β”œβ”€β”€ mqtt-broker        # Release binary
β”‚   β”œβ”€β”€ mqtt-consumer
β”‚   └── mqtt-producer
└── doc/                   # Generated documentation
    └── mqtt_broker/

Clean Build

Remove all build artifacts:

cargo clean

Remove only release artifacts:

cargo clean --release

Running Tests

Comprehensive testing is a core principle of this project. Here’s how to run and write tests.

Quick Start

Run all tests:

cargo test

Expected output:

running 178 tests
...
test result: ok. 178 passed; 0 failed; 0 ignored

Test Commands

Run All Tests

cargo test

Run Tests with Output

See println! output from tests:

cargo test -- --nocapture

Run Specific Test

# By name
cargo test test_publish_decode

# By partial name
cargo test publish

# By module
cargo test codec::

Run Tests in Specific Module

# Codec tests only
cargo test --package mqtt-broker --lib codec::tests

# Topic matcher tests
cargo test --package mqtt-broker --lib topic_matcher::tests

# Session tests
cargo test --package mqtt-broker --lib session::tests

Run Tests in Release Mode

cargo test --release

Run Ignored Tests

Some tests are marked #[ignore] (slow or require setup):

cargo test -- --ignored

Run All Including Ignored

cargo test -- --include-ignored

Test Organization

src/
β”œβ”€β”€ codec/
β”‚   └── tests.rs          # 60 tests
β”œβ”€β”€ topic_matcher/
β”‚   └── tests.rs          # 23 tests
β”œβ”€β”€ session/
β”‚   └── tests.rs          # 24 tests
β”œβ”€β”€ router/
β”‚   └── tests.rs          # 24 tests
β”œβ”€β”€ persistence/
β”‚   └── tests.rs          # 20 tests
β”œβ”€β”€ transport/
β”‚   └── tests.rs          # 16 tests
└── server/
    └── tests.rs          # 11 tests

tests/
β”œβ”€β”€ codec_proptest.rs     # Property-based tests
└── message_test.rs       # Integration tests

Test Categories

Unit Tests

Test individual functions and structs:

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_encode_decode_roundtrip() {
        let packet = ConnectPacket {
            client_id: "test".to_string(),
            // ...
        };
        
        let mut buf = BytesMut::new();
        MqttEncoder::encode(&MqttPacket::Connect(packet.clone()), &mut buf).unwrap();
        
        let decoded = MqttDecoder::decode(&mut buf).unwrap().unwrap();
        assert_eq!(decoded, MqttPacket::Connect(packet));
    }
}
}

Integration Tests

Test multiple components together (in tests/ directory):

#![allow(unused)]
fn main() {
// tests/message_test.rs
use mqtt_broker::*;

#[tokio::test]
async fn test_publish_flow() {
    let router = Router::new();
    let session = Session::new("client-1", false);
    
    // Subscribe
    router.subscribe("client-1", "sensors/#", QoS::AtLeastOnce);
    
    // Publish
    let message = PublishPacket {
        topic: "sensors/temp".to_string(),
        payload: b"25".to_vec(),
        // ...
    };
    
    let deliveries = router.route(&message);
    assert_eq!(deliveries.len(), 1);
}
}

Property-Based Tests

Using proptest for randomized testing:

#![allow(unused)]
fn main() {
// tests/codec_proptest.rs
use proptest::prelude::*;

proptest! {
    #[test]
    fn test_varint_roundtrip(value in 0u32..268435455) {
        let mut buf = BytesMut::new();
        encode_variable_int(value, &mut buf).unwrap();
        let decoded = decode_variable_int(&mut buf).unwrap();
        prop_assert_eq!(value, decoded);
    }
}
}

Run property tests:

cargo test proptest

Async Tests

Using #[tokio::test] for async code:

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_server_accept() {
    let server = MqttServer::bind("127.0.0.1:0").await.unwrap();
    let addr = server.local_addr();
    
    tokio::spawn(async move {
        server.run().await.unwrap();
    });
    
    let client = TcpStream::connect(addr).await.unwrap();
    assert!(client.peer_addr().is_ok());
}
}

Test Coverage

Using cargo-tarpaulin

# Install
cargo install cargo-tarpaulin

# Run coverage
cargo tarpaulin --out Html

# Open report
open tarpaulin-report.html

Using llvm-cov

# Install
rustup component add llvm-tools-preview
cargo install cargo-llvm-cov

# Run coverage
cargo llvm-cov --html

# Open report
open target/llvm-cov/html/index.html

Benchmarks

Criterion Benchmarks

# Run all benchmarks
cargo bench

# Run specific benchmark
cargo bench publish

Benchmark location: benches/

Example benchmark:

#![allow(unused)]
fn main() {
// benches/codec_bench.rs
use criterion::{criterion_group, criterion_main, Criterion};
use mqtt_broker::codec::*;

fn bench_publish_encode(c: &mut Criterion) {
    let packet = PublishPacket {
        topic: "sensors/temperature".to_string(),
        payload: vec![0u8; 1024],
        // ...
    };
    
    c.bench_function("publish_encode_1kb", |b| {
        b.iter(|| {
            let mut buf = BytesMut::with_capacity(2048);
            MqttEncoder::encode(&MqttPacket::Publish(packet.clone()), &mut buf)
        })
    });
}

criterion_group!(benches, bench_publish_encode);
criterion_main!(benches);
}

Writing New Tests

Test Naming Convention

#![allow(unused)]
fn main() {
#[test]
fn test_<module>_<function>_<scenario>() {
    // ...
}

// Examples:
fn test_codec_publish_empty_payload() { }
fn test_topic_filter_multi_wildcard() { }
fn test_session_clean_session_true() { }
}

Test Structure (AAA Pattern)

#![allow(unused)]
fn main() {
#[test]
fn test_subscribe_adds_to_trie() {
    // Arrange
    let mut router = Router::new();
    
    // Act
    router.subscribe("client-1", "sensors/#", QoS::AtLeastOnce);
    
    // Assert
    let matches = router.matches("sensors/temp");
    assert_eq!(matches.len(), 1);
    assert_eq!(matches[0].client_id, "client-1");
}
}

Testing Error Cases

#![allow(unused)]
fn main() {
#[test]
fn test_invalid_topic_filter_returns_error() {
    let result = TopicFilter::parse("sensors/#/invalid");
    assert!(result.is_err());
    assert!(matches!(result, Err(TopicError::InvalidFilter(_))));
}
}

Testing Async Code

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_connection_timeout() {
    let result = tokio::time::timeout(
        Duration::from_millis(100),
        Connection::connect("127.0.0.1:9999")
    ).await;
    
    assert!(result.is_err()); // Timeout
}
}

CI Integration

Tests run automatically on GitHub Actions:

# .github/workflows/ci.yml
name: CI

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: dtolnay/rust-toolchain@stable
      - run: cargo test --all-features

Troubleshooting

Tests Hang

Usually a deadlock or missing .await:

# Run with timeout
timeout 60 cargo test

# Run single-threaded
cargo test -- --test-threads=1

Flaky Tests

Add retry or increase timeouts:

#![allow(unused)]
fn main() {
#[tokio::test(flavor = "multi_thread")]
async fn test_concurrent_publishes() {
    // Use tokio::time::timeout
    let result = tokio::time::timeout(
        Duration::from_secs(5),
        async_operation()
    ).await;
    
    assert!(result.is_ok());
}
}

Port Already in Use

Tests using network ports can conflict:

#![allow(unused)]
fn main() {
// Use port 0 for automatic assignment
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
}

Debug Failing Tests

# Verbose output
cargo test test_name -- --nocapture

# With RUST_BACKTRACE
RUST_BACKTRACE=1 cargo test test_name

# With logging
RUST_LOG=debug cargo test test_name -- --nocapture

Contributing

Thank you for your interest in contributing to this MQTT broker project!

Getting Started

  1. Fork the repository on GitHub
  2. Clone your fork:
    git clone https://github.com/yourusername/mqtt-broker.git
    cd mqtt-broker
    
  3. Set upstream remote:
    git remote add upstream https://github.com/originalowner/mqtt-broker.git
    
  4. Create a feature branch:
    git checkout -b feature/my-feature
    

Development Setup

Prerequisites

  • Rust 1.70 or later
  • Git
  • (Optional) Docker for integration testing

Build and Test

# Build
cargo build

# Run tests
cargo test

# Run with logging
RUST_LOG=debug cargo run --bin mqtt-broker

Useful Commands

# Format code
cargo fmt

# Run linter
cargo clippy

# Check without building
cargo check

# Update dependencies
cargo update

# Generate docs
cargo doc --open

Code Style

Formatting

Run cargo fmt before committing:

cargo fmt

We use the default rustfmt configuration.

Linting

All code must pass cargo clippy:

cargo clippy -- -D warnings

Fix warnings before submitting PRs.

Naming Conventions

TypeConventionExample
StructsPascalCaseTopicFilter
TraitsPascalCaseStorage
Functionssnake_caseencode_packet
Variablessnake_caseclient_id
ConstantsSCREAMING_SNAKEMAX_PACKET_SIZE
Modulessnake_casetopic_matcher

Documentation

All public items should have documentation:

#![allow(unused)]
fn main() {
/// Encodes an MQTT packet to bytes.
///
/// # Arguments
///
/// * `packet` - The packet to encode
/// * `buf` - Buffer to write encoded bytes to
///
/// # Returns
///
/// Returns `Ok(())` on success, or `CodecError` on failure.
///
/// # Example
///
/// ```rust
/// let packet = MqttPacket::Pingreq;
/// let mut buf = BytesMut::new();
/// MqttEncoder::encode(&packet, &mut buf)?;
/// ```
pub fn encode(packet: &MqttPacket, buf: &mut BytesMut) -> Result<(), CodecError> {
    // ...
}
}

Making Changes

Types of Contributions

  • πŸ› Bug fixes β€” Fix issues with tests
  • ✨ Features β€” Add new functionality
  • πŸ“š Documentation β€” Improve docs, examples
  • πŸ§ͺ Tests β€” Add missing tests
  • ⚑ Performance β€” Optimize code
  • πŸ”§ Refactoring β€” Improve code structure

Branch Naming

feature/description    # New features
fix/description        # Bug fixes
docs/description       # Documentation
test/description       # Test additions
refactor/description   # Code refactoring

Examples:

  • feature/qos2-support
  • fix/packet-decode-crash
  • docs/session-examples

Commit Messages

Follow Conventional Commits:

type(scope): description

[optional body]

[optional footer]

Types:

  • feat: New feature
  • fix: Bug fix
  • docs: Documentation
  • test: Tests
  • refactor: Code refactoring
  • perf: Performance improvement
  • chore: Maintenance

Examples:

feat(codec): add MQTT 5 property parsing

fix(session): prevent double-free on disconnect

docs(readme): add installation instructions

test(router): add wildcard matching edge cases

Pull Request Process

Before Submitting

  1. Update your branch:

    git fetch upstream
    git rebase upstream/main
    
  2. Run all checks:

    cargo fmt
    cargo clippy -- -D warnings
    cargo test
    
  3. Add tests for new functionality

  4. Update documentation if needed

PR Guidelines

  • One feature per PR β€” Keep PRs focused
  • Descriptive title β€” What does this PR do?
  • Link issues β€” Reference related issues
  • Include tests β€” All new code needs tests
  • Update CHANGELOG β€” Add entry under β€œUnreleased”

PR Template

## Description

Brief description of the changes.

## Related Issues

Closes #123

## Changes

- Added X
- Fixed Y
- Updated Z

## Testing

- [ ] Unit tests pass
- [ ] Integration tests pass
- [ ] Manual testing completed

## Checklist

- [ ] Code formatted with `cargo fmt`
- [ ] No clippy warnings
- [ ] Documentation updated
- [ ] CHANGELOG updated

Testing Requirements

All PRs Must

  1. Pass existing tests β€” No regressions
  2. Add new tests β€” For new functionality
  3. Maintain coverage β€” Don’t decrease coverage

Test Guidelines

#![allow(unused)]
fn main() {
// Good: Descriptive name, tests one thing
#[test]
fn test_publish_with_empty_payload_succeeds() {
    let packet = PublishPacket {
        payload: vec![],
        // ...
    };
    assert!(encode(&packet).is_ok());
}

// Good: Tests error case
#[test]
fn test_invalid_topic_returns_error() {
    let result = TopicFilter::parse("invalid/#/topic");
    assert!(matches!(result, Err(TopicError::InvalidFilter(_))));
}
}

Module Structure

When adding new modules:

src/mymodule/
β”œβ”€β”€ mod.rs          # Public exports
β”œβ”€β”€ types.rs        # Type definitions
β”œβ”€β”€ implementation.rs
└── tests.rs        # Unit tests

Export from src/lib.rs:

#![allow(unused)]
fn main() {
pub mod mymodule;
}

Error Handling

Use the project’s error types:

#![allow(unused)]
fn main() {
// Good: Use existing error types
fn parse_topic(s: &str) -> Result<Topic, TopicError> {
    // ...
}

// Good: Add new variant if needed
#[derive(Debug, Error)]
pub enum TopicError {
    #[error("invalid topic: {0}")]
    InvalidTopic(String),
    
    #[error("topic too long: {length} > {max}")]
    TooLong { length: usize, max: usize },
}
}

Getting Help

  • Questions? Open a discussion on GitHub
  • Bug found? Open an issue with reproduction steps
  • Feature idea? Open an issue to discuss first

Code of Conduct

  • Be respectful and constructive
  • Focus on the code, not the person
  • Assume good intentions
  • Help others learn

Recognition

Contributors are recognized in:

  • CHANGELOG.md (for each release)
  • GitHub contributors page

Thank you for contributing! πŸŽ‰