SpringBoot + MQTT + EMQX:物联网设备上行数据实时接入与指令下发平台

今天咱们聊聊物联网开发中一个核心问题:设备数据的实时接入和指令下发。

物联网数据接入的挑战

在物联网项目开发中,我们经常遇到这样的需求:

  • 成千上万的设备需要同时连接到服务器
  • 设备数据需要实时传输,不能有明显延迟
  • 要支持设备指令下发,如远程控制、参数设置等
  • 设备可能分布在不同地区,网络状况复杂

传统的HTTP轮询方式不仅效率低,还会给服务器带来巨大压力。今天我们就用MQTT协议来解决这些问题。

解决方案思路

今天我们要解决的,就是如何用SpringBoot + MQTT + EMQX构建一个高效的物联网数据接入平台。

核心思路是:

  1. MQTT协议:轻量级、低延迟的消息传输协议
  2. EMQX Broker:高性能MQTT消息代理服务器
  3. 设备认证:确保只有合法设备可以连接
  4. 数据处理:实时处理设备上行数据
  5. 指令下发:支持向设备发送控制指令

技术选型

  • SpringBoot:快速搭建应用
  • MQTT:物联网通信协议
  • EMQX:MQTT消息代理
  • Redis:设备状态存储
  • WebSocket:前端实时数据展示

核心实现思路

1. EMQX配置

首先配置EMQX服务器:

# emqx.conf
node.name = emqx@127.0.0.1
node.cookie = emqxsecretcookie

listeners.tcp.default.bind = 0.0.0.0:1883
listeners.ssl.default.bind = 0.0.0.0:8883
listeners.ws.default.bind = 0.0.0.0:8083
listeners.wss.default.bind = 0.0.0.0:8084

# 认证配置
authentication {
  backend = built_in_database
  mechanism = password_based
}

# 授权配置
authorization {
  type = built_in_database
  cache = {
    enable = true
    ttl = 1min
  }
}

2. 项目依赖配置

在SpringBoot项目中添加MQTT依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>

3. MQTT配置类

配置MQTT连接:

@Configuration
@EnableIntegration
public class MqttConfig {
    
    @Value("${mqtt.broker.url}")
    private String brokerUrl;
    
    @Value("${mqtt.username}")
    private String username;
    
    @Value("${mqtt.password}")
    private String password;
    
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setKeepAliveInterval(60);
        options.setConnectionTimeout(30);
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }
    
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = 
            new MqttPahoMessageHandler("serverOutbound", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("default/topic");
        return messageHandler;
    }
    
    @Bean
    @InboundChannelAdapter(channel = "mqttInputChannel", 
                          poller = @Poller(fixedDelay = "1000"))
    public MessageProducer mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = 
            new MqttPahoMessageDrivenChannelAdapter("serverInbound", 
                                                   mqttClientFactory(), 
                                                   "device/+/data",  // 订阅设备数据主题
                                                   "device/+/command"); // 订阅指令响应主题
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        return adapter;
    }
    
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC);
            String payload = (String) message.getPayload();
            
            // 处理接收到的消息
            mqttMessageHandler.handleMessage(topic, payload);
        };
    }
}

4. 设备消息处理器

创建设备消息处理服务:

@Service
@Slf4j
public class DeviceMessageHandler {
    
    @Autowired
    private DeviceService deviceService;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 处理设备上行数据
     */
    public void handleMessage(String topic, String payload) {
        try {
            log.info("接收到设备消息 - Topic: {}, Payload: {}", topic, payload);
            
            // 解析主题,获取设备ID
            String[] topicParts = topic.split("/");
            if (topicParts.length >= 3 && "data".equals(topicParts[2])) {
                String deviceId = topicParts[1];
                
                // 解析JSON数据
                DeviceData deviceData = JSON.parseObject(payload, DeviceData.class);
                deviceData.setDeviceId(deviceId);
                deviceData.setTimestamp(System.currentTimeMillis());
                
                // 保存设备数据
                deviceService.saveDeviceData(deviceData);
                
                // 更新设备在线状态
                updateDeviceStatus(deviceId, DeviceStatus.ONLINE);
                
                // 通知WebSocket客户端
                notifyWebSocketClients(deviceData);
                
            } else if (topicParts.length >= 3 && "command".equals(topicParts[2])) {
                // 处理指令响应
                String deviceId = topicParts[1];
                handleCommandResponse(deviceId, payload);
            }
        } catch (Exception e) {
            log.error("处理设备消息失败", e);
        }
    }
    
    /**
     * 更新设备状态
     */
    private void updateDeviceStatus(String deviceId, DeviceStatus status) {
        String key = "device:status:" + deviceId;
        DeviceStatusInfo statusInfo = new DeviceStatusInfo();
        statusInfo.setDeviceId(deviceId);
        statusInfo.setStatus(status);
        statusInfo.setLastUpdateTime(System.currentTimeMillis());
        
        redisTemplate.opsForValue().set(key, statusInfo, Duration.ofHours(24));
    }
    
    /**
     * 通知WebSocket客户端
     */
    private void notifyWebSocketClients(DeviceData data) {
        // 发送实时数据到WebSocket
        webSocketService.sendToDeviceData(data);
    }
    
    /**
     * 处理指令响应
     */
    private void handleCommandResponse(String deviceId, String response) {
        String key = "command:response:" + deviceId;
        redisTemplate.opsForValue().set(key, response, Duration.ofMinutes(5));
    }
}

5. 设备服务实现

实现设备相关的业务逻辑:

@Service
@Transactional
public class DeviceService {
    
    @Autowired
    private DeviceRepository deviceRepository;
    
    @Autowired
    private DeviceDataRepository deviceDataRepository;
    
    @Autowired
    private MqttMessageSender mqttMessageSender;
    
    /**
     * 保存设备数据
     */
    public void saveDeviceData(DeviceData deviceData) {
        // 验证设备是否存在
        Device device = deviceRepository.findById(deviceData.getDeviceId())
            .orElseThrow(() -> new DeviceNotFoundException("设备不存在: " + deviceData.getDeviceId()));
        
        // 更新设备最后活动时间
        device.setLastActiveTime(new Date());
        deviceRepository.save(device);
        
        // 保存设备数据
        deviceDataRepository.save(deviceData);
        
        // 检查是否需要发送告警
        checkAndSendAlarm(deviceData);
    }
    
    /**
     * 发送指令到设备
     */
    public CommandResult sendCommandToDevice(String deviceId, DeviceCommand command) {
        // 验证设备状态
        Device device = deviceRepository.findById(deviceId)
            .orElseThrow(() -> new DeviceNotFoundException("设备不存在: " + deviceId));
        
        if (!DeviceStatus.ONLINE.equals(device.getStatus())) {
            return CommandResult.failure("设备不在线");
        }
        
        // 生成命令ID
        command.setId(UUID.randomUUID().toString());
        command.setDeviceId(deviceId);
        command.setTimestamp(System.currentTimeMillis());
        command.setStatus(CommandStatus.SENT);
        
        // 发送MQTT消息
        String topic = "device/" + deviceId + "/control";
        String payload = JSON.toJSONString(command);
        
        boolean success = mqttMessageSender.sendCommand(topic, payload);
        
        if (success) {
            // 保存命令记录
            deviceCommandRepository.save(command);
            return CommandResult.success(command.getId());
        } else {
            return CommandResult.failure("发送指令失败");
        }
    }
    
    /**
     * 检查并发送告警
     */
    private void checkAndSendAlarm(DeviceData deviceData) {
        // 根据业务规则检查是否需要告警
        if (deviceData.getTemperature() > 80) { // 温度过高告警
            Alarm alarm = new Alarm();
            alarm.setDeviceId(deviceData.getDeviceId());
            alarm.setType(AlarmType.HIGH_TEMPERATURE);
            alarm.setLevel(AlarmLevel.HIGH);
            alarm.setMessage("设备温度过高: " + deviceData.getTemperature() + "°C");
            alarm.setTimestamp(System.currentTimeMillis());
            
            alarmService.createAlarm(alarm);
        }
    }
    
    /**
     * 获取设备历史数据
     */
    public List<DeviceData> getDeviceHistory(String deviceId, Date startTime, Date endTime) {
        return deviceDataRepository.findByDeviceIdAndTimestampBetween(
            deviceId, startTime, endTime);
    }
}

6. MQTT消息发送器

创建MQTT消息发送器:

@Component
@Slf4j
public class MqttMessageSender {
    
    @Autowired
    private MqttPahoClientFactory mqttClientFactory;
    
    /**
     * 发送指令到设备
     */
    public boolean sendCommand(String topic, String payload) {
        try {
            IMqttAsyncClient client = mqttClientFactory.getClientInstance(
                mqttClientFactory.getConnectionOptions().getServerURIs()[0], 
                "server_" + System.currentTimeMillis());
            
            client.connect(mqttClientFactory.getConnectionOptions()).waitForCompletion();
            
            MqttMessage message = new MqttMessage(payload.getBytes());
            message.setQos(1);
            message.setRetained(false);
            
            IMqttToken token = client.publish(topic, message);
            token.waitForCompletion();
            
            client.disconnect().waitForCompletion();
            client.close();
            
            return true;
        } catch (Exception e) {
            log.error("发送MQTT消息失败", e);
            return false;
        }
    }
    
    /**
     * 批量发送消息
     */
    public void batchSend(List<MqttMessageInfo> messages) {
        messages.parallelStream().forEach(msg -> {
            sendCommand(msg.getTopic(), msg.getPayload());
        });
    }
}

7. WebSocket实时数据推送

实现实时数据推送功能:

@Component
@ServerEndpoint(value = "/websocket/device-data", 
                configurator = SpringConfigurator.class)
@Slf4j
public class DeviceDataWebSocket {
    
    @Autowired
    private DeviceService deviceService;
    
    private static Set<DeviceDataWebSocket> webSocketSet = new CopyOnWriteArraySet<>();
    
    private Session session;
    
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        webSocketSet.add(this);
        log.info("设备数据WebSocket连接建立,当前连接数: {}", webSocketSet.size());
    }
    
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);
        log.info("设备数据WebSocket连接关闭,当前连接数: {}", webSocketSet.size());
    }
    
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("来自客户端的消息: {}", message);
        // 可以处理客户端发送的消息
    }
    
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("设备数据WebSocket发生错误", error);
    }
    
    /**
     * 发送设备数据到所有客户端
     */
    public static void sendToDeviceData(DeviceData data) {
        String message = JSON.toJSONString(data);
        for (DeviceDataWebSocket item : webSocketSet) {
            try {
                item.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                log.error("发送WebSocket消息失败", e);
            }
        }
    }
    
    /**
     * 发送设备列表到客户端
     */
    public void sendDeviceList(List<Device> devices) {
        String message = JSON.toJSONString(devices);
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.error("发送设备列表失败", e);
        }
    }
}

8. REST API接口

提供REST API供管理端使用:

@RestController
@RequestMapping("/api/device")
public class DeviceController {
    
    @Autowired
    private DeviceService deviceService;
    
    @Autowired
    private DeviceMessageHandler messageHandler;
    
    /**
     * 获取设备列表
     */
    @GetMapping("/list")
    public Result<List<Device>> getDeviceList() {
        List<Device> devices = deviceService.getAllDevices();
        return Result.success(devices);
    }
    
    /**
     * 获取设备实时数据
     */
    @GetMapping("/{deviceId}/realtime")
    public Result<DeviceData> getRealtimeData(@PathVariable String deviceId) {
        DeviceData data = deviceService.getLatestData(deviceId);
        return Result.success(data);
    }
    
    /**
     * 获取设备历史数据
     */
    @GetMapping("/{deviceId}/history")
    public Result<List<DeviceData>> getHistoryData(
            @PathVariable String deviceId,
            @RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date startTime,
            @RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date endTime) {
        
        List<DeviceData> data = deviceService.getDeviceHistory(deviceId, startTime, endTime);
        return Result.success(data);
    }
    
    /**
     * 发送控制指令
     */
    @PostMapping("/{deviceId}/command")
    public Result<CommandResult> sendCommand(
            @PathVariable String deviceId,
            @RequestBody DeviceCommand command) {
        
        CommandResult result = deviceService.sendCommandToDevice(deviceId, command);
        return Result.success(result);
    }
    
    /**
     * 获取设备状态
     */
    @GetMapping("/{deviceId}/status")
    public Result<DeviceStatusInfo> getDeviceStatus(@PathVariable String deviceId) {
        DeviceStatusInfo status = deviceService.getDeviceStatus(deviceId);
        return Result.success(status);
    }
}

9. 设备认证与安全

实现设备认证机制:

@Component
public class DeviceAuthenticator {
    
    @Autowired
    private DeviceRepository deviceRepository;
    
    /**
     * 验证设备连接
     */
    public boolean authenticateDevice(String clientId, String username, String password) {
        try {
            Device device = deviceRepository.findByClientId(clientId);
            if (device == null) {
                return false;
            }
            
            // 验证用户名密码
            return device.getUsername().equals(username) && 
                   BCrypt.checkpw(password, device.getPasswordHash());
        } catch (Exception e) {
            log.error("设备认证失败", e);
            return false;
        }
    }
    
    /**
     * 生成设备连接凭证
     */
    public DeviceCredentials generateCredentials(String deviceId) {
        Device device = deviceRepository.findById(deviceId)
            .orElseThrow(() -> new DeviceNotFoundException("设备不存在"));
        
        String username = device.getClientId();
        String password = generateRandomPassword();
        String passwordHash = BCrypt.hashpw(password, BCrypt.gensalt());
        
        // 更新设备密码
        device.setPasswordHash(passwordHash);
        deviceRepository.save(device);
        
        return new DeviceCredentials(username, password);
    }
    
    private String generateRandomPassword() {
        return UUID.randomUUID().toString().replace("-", "").substring(0, 16);
    }
}

性能优化策略

1. 连接池优化

@Configuration
public class MqttConnectionPoolConfig {
    
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{"tcp://localhost:1883"});
        options.setKeepAliveInterval(60);
        options.setConnectionTimeout(30);
        options.setMaxInflight(1000); // 增加飞行窗口
        options.setCleanSession(false); // 保持会话
        factory.setConnectionOptions(options);
        return factory;
    }
}

2. 消息批处理

@Service
public class BatchMessageProcessor {
    
    private final List<DeviceData> batchBuffer = new ArrayList<>();
    private final Object lock = new Object();
    
    @Scheduled(fixedRate = 1000) // 每秒处理一次
    public void processBatch() {
        synchronized (lock) {
            if (!batchBuffer.isEmpty()) {
                List<DeviceData> currentBatch = new ArrayList<>(batchBuffer);
                batchBuffer.clear();
                
                // 批量保存到数据库
                deviceDataRepository.saveAll(currentBatch);
            }
        }
    }
    
    public void addMessage(DeviceData data) {
        synchronized (lock) {
            batchBuffer.add(data);
        }
    }
}

优势分析

相比传统的HTTP方式,MQTT方案的优势明显:

  1. 低延迟:实时双向通信,毫秒级响应
  2. 低功耗:轻量级协议,适合电池供电设备
  3. 高并发:单服务器可支持数万设备连接
  4. 可靠性:支持QoS等级,确保消息可靠传输
  5. 网络适应性:适合网络不稳定环境

注意事项

  1. 安全防护:设备认证、消息加密、访问控制
  2. 资源管理:连接数限制、内存使用监控
  3. 数据存储:历史数据归档、冷热数据分离
  4. 监控告警:设备状态监控、异常告警
  5. 扩展性:集群部署、负载均衡

总结

通过SpringBoot + MQTT + EMQX的技术组合,我们可以构建一个高效、可靠的物联网数据接入平台。这不仅能解决大量设备连接的问题,还能实现实时数据传输和指令下发。

在实际项目中,建议根据具体业务需求进行定制化开发,并充分考虑安全性、性能和可扩展性等因素。


服务端技术精选,专注分享后端开发实战技术,助力你的技术成长!


标题:SpringBoot + MQTT + EMQX:物联网设备上行数据实时接入与指令下发平台
作者:jiangyi
地址:http://www.jiangyi.space/articles/2026/01/12/1768283302219.html

    0 评论
avatar