SpringBoot + MQTT + EMQX:物联网设备上行数据实时接入与指令下发平台
今天咱们聊聊物联网开发中一个核心问题:设备数据的实时接入和指令下发。
物联网数据接入的挑战
在物联网项目开发中,我们经常遇到这样的需求:
- 成千上万的设备需要同时连接到服务器
- 设备数据需要实时传输,不能有明显延迟
- 要支持设备指令下发,如远程控制、参数设置等
- 设备可能分布在不同地区,网络状况复杂
传统的HTTP轮询方式不仅效率低,还会给服务器带来巨大压力。今天我们就用MQTT协议来解决这些问题。
解决方案思路
今天我们要解决的,就是如何用SpringBoot + MQTT + EMQX构建一个高效的物联网数据接入平台。
核心思路是:
- MQTT协议:轻量级、低延迟的消息传输协议
- EMQX Broker:高性能MQTT消息代理服务器
- 设备认证:确保只有合法设备可以连接
- 数据处理:实时处理设备上行数据
- 指令下发:支持向设备发送控制指令
技术选型
- SpringBoot:快速搭建应用
- MQTT:物联网通信协议
- EMQX:MQTT消息代理
- Redis:设备状态存储
- WebSocket:前端实时数据展示
核心实现思路
1. EMQX配置
首先配置EMQX服务器:
# emqx.conf
node.name = emqx@127.0.0.1
node.cookie = emqxsecretcookie
listeners.tcp.default.bind = 0.0.0.0:1883
listeners.ssl.default.bind = 0.0.0.0:8883
listeners.ws.default.bind = 0.0.0.0:8083
listeners.wss.default.bind = 0.0.0.0:8084
# 认证配置
authentication {
backend = built_in_database
mechanism = password_based
}
# 授权配置
authorization {
type = built_in_database
cache = {
enable = true
ttl = 1min
}
}
2. 项目依赖配置
在SpringBoot项目中添加MQTT依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
3. MQTT配置类
配置MQTT连接:
@Configuration
@EnableIntegration
public class MqttConfig {
@Value("${mqtt.broker.url}")
private String brokerUrl;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setKeepAliveInterval(60);
options.setConnectionTimeout(30);
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("serverOutbound", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("default/topic");
return messageHandler;
}
@Bean
@InboundChannelAdapter(channel = "mqttInputChannel",
poller = @Poller(fixedDelay = "1000"))
public MessageProducer mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("serverInbound",
mqttClientFactory(),
"device/+/data", // 订阅设备数据主题
"device/+/command"); // 订阅指令响应主题
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC);
String payload = (String) message.getPayload();
// 处理接收到的消息
mqttMessageHandler.handleMessage(topic, payload);
};
}
}
4. 设备消息处理器
创建设备消息处理服务:
@Service
@Slf4j
public class DeviceMessageHandler {
@Autowired
private DeviceService deviceService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 处理设备上行数据
*/
public void handleMessage(String topic, String payload) {
try {
log.info("接收到设备消息 - Topic: {}, Payload: {}", topic, payload);
// 解析主题,获取设备ID
String[] topicParts = topic.split("/");
if (topicParts.length >= 3 && "data".equals(topicParts[2])) {
String deviceId = topicParts[1];
// 解析JSON数据
DeviceData deviceData = JSON.parseObject(payload, DeviceData.class);
deviceData.setDeviceId(deviceId);
deviceData.setTimestamp(System.currentTimeMillis());
// 保存设备数据
deviceService.saveDeviceData(deviceData);
// 更新设备在线状态
updateDeviceStatus(deviceId, DeviceStatus.ONLINE);
// 通知WebSocket客户端
notifyWebSocketClients(deviceData);
} else if (topicParts.length >= 3 && "command".equals(topicParts[2])) {
// 处理指令响应
String deviceId = topicParts[1];
handleCommandResponse(deviceId, payload);
}
} catch (Exception e) {
log.error("处理设备消息失败", e);
}
}
/**
* 更新设备状态
*/
private void updateDeviceStatus(String deviceId, DeviceStatus status) {
String key = "device:status:" + deviceId;
DeviceStatusInfo statusInfo = new DeviceStatusInfo();
statusInfo.setDeviceId(deviceId);
statusInfo.setStatus(status);
statusInfo.setLastUpdateTime(System.currentTimeMillis());
redisTemplate.opsForValue().set(key, statusInfo, Duration.ofHours(24));
}
/**
* 通知WebSocket客户端
*/
private void notifyWebSocketClients(DeviceData data) {
// 发送实时数据到WebSocket
webSocketService.sendToDeviceData(data);
}
/**
* 处理指令响应
*/
private void handleCommandResponse(String deviceId, String response) {
String key = "command:response:" + deviceId;
redisTemplate.opsForValue().set(key, response, Duration.ofMinutes(5));
}
}
5. 设备服务实现
实现设备相关的业务逻辑:
@Service
@Transactional
public class DeviceService {
@Autowired
private DeviceRepository deviceRepository;
@Autowired
private DeviceDataRepository deviceDataRepository;
@Autowired
private MqttMessageSender mqttMessageSender;
/**
* 保存设备数据
*/
public void saveDeviceData(DeviceData deviceData) {
// 验证设备是否存在
Device device = deviceRepository.findById(deviceData.getDeviceId())
.orElseThrow(() -> new DeviceNotFoundException("设备不存在: " + deviceData.getDeviceId()));
// 更新设备最后活动时间
device.setLastActiveTime(new Date());
deviceRepository.save(device);
// 保存设备数据
deviceDataRepository.save(deviceData);
// 检查是否需要发送告警
checkAndSendAlarm(deviceData);
}
/**
* 发送指令到设备
*/
public CommandResult sendCommandToDevice(String deviceId, DeviceCommand command) {
// 验证设备状态
Device device = deviceRepository.findById(deviceId)
.orElseThrow(() -> new DeviceNotFoundException("设备不存在: " + deviceId));
if (!DeviceStatus.ONLINE.equals(device.getStatus())) {
return CommandResult.failure("设备不在线");
}
// 生成命令ID
command.setId(UUID.randomUUID().toString());
command.setDeviceId(deviceId);
command.setTimestamp(System.currentTimeMillis());
command.setStatus(CommandStatus.SENT);
// 发送MQTT消息
String topic = "device/" + deviceId + "/control";
String payload = JSON.toJSONString(command);
boolean success = mqttMessageSender.sendCommand(topic, payload);
if (success) {
// 保存命令记录
deviceCommandRepository.save(command);
return CommandResult.success(command.getId());
} else {
return CommandResult.failure("发送指令失败");
}
}
/**
* 检查并发送告警
*/
private void checkAndSendAlarm(DeviceData deviceData) {
// 根据业务规则检查是否需要告警
if (deviceData.getTemperature() > 80) { // 温度过高告警
Alarm alarm = new Alarm();
alarm.setDeviceId(deviceData.getDeviceId());
alarm.setType(AlarmType.HIGH_TEMPERATURE);
alarm.setLevel(AlarmLevel.HIGH);
alarm.setMessage("设备温度过高: " + deviceData.getTemperature() + "°C");
alarm.setTimestamp(System.currentTimeMillis());
alarmService.createAlarm(alarm);
}
}
/**
* 获取设备历史数据
*/
public List<DeviceData> getDeviceHistory(String deviceId, Date startTime, Date endTime) {
return deviceDataRepository.findByDeviceIdAndTimestampBetween(
deviceId, startTime, endTime);
}
}
6. MQTT消息发送器
创建MQTT消息发送器:
@Component
@Slf4j
public class MqttMessageSender {
@Autowired
private MqttPahoClientFactory mqttClientFactory;
/**
* 发送指令到设备
*/
public boolean sendCommand(String topic, String payload) {
try {
IMqttAsyncClient client = mqttClientFactory.getClientInstance(
mqttClientFactory.getConnectionOptions().getServerURIs()[0],
"server_" + System.currentTimeMillis());
client.connect(mqttClientFactory.getConnectionOptions()).waitForCompletion();
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(1);
message.setRetained(false);
IMqttToken token = client.publish(topic, message);
token.waitForCompletion();
client.disconnect().waitForCompletion();
client.close();
return true;
} catch (Exception e) {
log.error("发送MQTT消息失败", e);
return false;
}
}
/**
* 批量发送消息
*/
public void batchSend(List<MqttMessageInfo> messages) {
messages.parallelStream().forEach(msg -> {
sendCommand(msg.getTopic(), msg.getPayload());
});
}
}
7. WebSocket实时数据推送
实现实时数据推送功能:
@Component
@ServerEndpoint(value = "/websocket/device-data",
configurator = SpringConfigurator.class)
@Slf4j
public class DeviceDataWebSocket {
@Autowired
private DeviceService deviceService;
private static Set<DeviceDataWebSocket> webSocketSet = new CopyOnWriteArraySet<>();
private Session session;
@OnOpen
public void onOpen(Session session) {
this.session = session;
webSocketSet.add(this);
log.info("设备数据WebSocket连接建立,当前连接数: {}", webSocketSet.size());
}
@OnClose
public void onClose() {
webSocketSet.remove(this);
log.info("设备数据WebSocket连接关闭,当前连接数: {}", webSocketSet.size());
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("来自客户端的消息: {}", message);
// 可以处理客户端发送的消息
}
@OnError
public void onError(Session session, Throwable error) {
log.error("设备数据WebSocket发生错误", error);
}
/**
* 发送设备数据到所有客户端
*/
public static void sendToDeviceData(DeviceData data) {
String message = JSON.toJSONString(data);
for (DeviceDataWebSocket item : webSocketSet) {
try {
item.session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("发送WebSocket消息失败", e);
}
}
}
/**
* 发送设备列表到客户端
*/
public void sendDeviceList(List<Device> devices) {
String message = JSON.toJSONString(devices);
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("发送设备列表失败", e);
}
}
}
8. REST API接口
提供REST API供管理端使用:
@RestController
@RequestMapping("/api/device")
public class DeviceController {
@Autowired
private DeviceService deviceService;
@Autowired
private DeviceMessageHandler messageHandler;
/**
* 获取设备列表
*/
@GetMapping("/list")
public Result<List<Device>> getDeviceList() {
List<Device> devices = deviceService.getAllDevices();
return Result.success(devices);
}
/**
* 获取设备实时数据
*/
@GetMapping("/{deviceId}/realtime")
public Result<DeviceData> getRealtimeData(@PathVariable String deviceId) {
DeviceData data = deviceService.getLatestData(deviceId);
return Result.success(data);
}
/**
* 获取设备历史数据
*/
@GetMapping("/{deviceId}/history")
public Result<List<DeviceData>> getHistoryData(
@PathVariable String deviceId,
@RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date startTime,
@RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date endTime) {
List<DeviceData> data = deviceService.getDeviceHistory(deviceId, startTime, endTime);
return Result.success(data);
}
/**
* 发送控制指令
*/
@PostMapping("/{deviceId}/command")
public Result<CommandResult> sendCommand(
@PathVariable String deviceId,
@RequestBody DeviceCommand command) {
CommandResult result = deviceService.sendCommandToDevice(deviceId, command);
return Result.success(result);
}
/**
* 获取设备状态
*/
@GetMapping("/{deviceId}/status")
public Result<DeviceStatusInfo> getDeviceStatus(@PathVariable String deviceId) {
DeviceStatusInfo status = deviceService.getDeviceStatus(deviceId);
return Result.success(status);
}
}
9. 设备认证与安全
实现设备认证机制:
@Component
public class DeviceAuthenticator {
@Autowired
private DeviceRepository deviceRepository;
/**
* 验证设备连接
*/
public boolean authenticateDevice(String clientId, String username, String password) {
try {
Device device = deviceRepository.findByClientId(clientId);
if (device == null) {
return false;
}
// 验证用户名密码
return device.getUsername().equals(username) &&
BCrypt.checkpw(password, device.getPasswordHash());
} catch (Exception e) {
log.error("设备认证失败", e);
return false;
}
}
/**
* 生成设备连接凭证
*/
public DeviceCredentials generateCredentials(String deviceId) {
Device device = deviceRepository.findById(deviceId)
.orElseThrow(() -> new DeviceNotFoundException("设备不存在"));
String username = device.getClientId();
String password = generateRandomPassword();
String passwordHash = BCrypt.hashpw(password, BCrypt.gensalt());
// 更新设备密码
device.setPasswordHash(passwordHash);
deviceRepository.save(device);
return new DeviceCredentials(username, password);
}
private String generateRandomPassword() {
return UUID.randomUUID().toString().replace("-", "").substring(0, 16);
}
}
性能优化策略
1. 连接池优化
@Configuration
public class MqttConnectionPoolConfig {
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{"tcp://localhost:1883"});
options.setKeepAliveInterval(60);
options.setConnectionTimeout(30);
options.setMaxInflight(1000); // 增加飞行窗口
options.setCleanSession(false); // 保持会话
factory.setConnectionOptions(options);
return factory;
}
}
2. 消息批处理
@Service
public class BatchMessageProcessor {
private final List<DeviceData> batchBuffer = new ArrayList<>();
private final Object lock = new Object();
@Scheduled(fixedRate = 1000) // 每秒处理一次
public void processBatch() {
synchronized (lock) {
if (!batchBuffer.isEmpty()) {
List<DeviceData> currentBatch = new ArrayList<>(batchBuffer);
batchBuffer.clear();
// 批量保存到数据库
deviceDataRepository.saveAll(currentBatch);
}
}
}
public void addMessage(DeviceData data) {
synchronized (lock) {
batchBuffer.add(data);
}
}
}
优势分析
相比传统的HTTP方式,MQTT方案的优势明显:
- 低延迟:实时双向通信,毫秒级响应
- 低功耗:轻量级协议,适合电池供电设备
- 高并发:单服务器可支持数万设备连接
- 可靠性:支持QoS等级,确保消息可靠传输
- 网络适应性:适合网络不稳定环境
注意事项
- 安全防护:设备认证、消息加密、访问控制
- 资源管理:连接数限制、内存使用监控
- 数据存储:历史数据归档、冷热数据分离
- 监控告警:设备状态监控、异常告警
- 扩展性:集群部署、负载均衡
总结
通过SpringBoot + MQTT + EMQX的技术组合,我们可以构建一个高效、可靠的物联网数据接入平台。这不仅能解决大量设备连接的问题,还能实现实时数据传输和指令下发。
在实际项目中,建议根据具体业务需求进行定制化开发,并充分考虑安全性、性能和可扩展性等因素。
服务端技术精选,专注分享后端开发实战技术,助力你的技术成长!
标题:SpringBoot + MQTT + EMQX:物联网设备上行数据实时接入与指令下发平台
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/01/12/1768283302219.html
0 评论