消息推送模式
Aqara开发者平台支持通过以下两种方式主动推送设备或联动相关数据给第三方应用,以满足用户对消息实时性和消息持久化的要求。
基于消息队列获取消息
消息队列目前是基于开源的rocketmq进行相关改造实现的消息队列。Aqara开发者平台会将开放消息推送到MQ,订阅所需的消息进行消费处理。绿米会保留最近12小时产生的消息存放在MQ,以提供第三方应用进行消费处理。
私有化服务不支持此推送方式,请使用http推送方式。
基于HTTP获取消息
Aqara开发者平台通过调用第三方提供的http服务地址获取开放消息,在推送地址配置完成后,将不定期对接收推送消息的服务地址进行访问验证,以确保服务地址的可靠性及消息接收应答机制的准确性。同时也会对推送的失败情况进行数据统计,如:推送消息在5分钟内失败率超过5%时,将通过短信或邮件通知第三方应用及时处理,否则将在发出通知的半个小时后暂停对第三方应用相关的消息推送。若需重新开启,则需在控制台重新开启推送,才可重新接收到推送消息。
绿米会保留最近12小时推送失败的相关消息,并提供相应API进行查询。
请求说明示例
请求URL:在Aqara开发者平台-控制台-应用管理-消息推送页面配置的消息接收地址,如:https://xx.xx.com/aqara/app/message
请求方式:HTTP POST (application/json)
请求head参数:
名称 | 类型 | 是否必须 | 描述 |
---|---|---|---|
token | String | 是 | 授权账号有效的accessToken |
time | String | 是 | 请求时间 |
nonce | String | 是 | 请求随机数保障请求唯一性 |
appkey | String | 否 | 请求签名key,如果配置开启签名时返回 |
sign | String | 否 | 请求签名,如果配置开启签名时返回 |
Sign签名规则:
- header参数按照token、appkey、nonce、time字段顺序进行排序,并使用
&
符号拼接,拼接格式为appkey=xxx&nonce=xxx&time=xxx&token=xxx; - 对第一步产生的字符串末尾拼接appSecret;
- 对第二步产生的字符串,全部小写处理;
- 对第三步产生的字符串MD5 32位加密后,生成的数即为Sign的值。
注意:默认不开启签名,如需开启签名,需要开发者在Aqara开发者平台-控制台-应用管理-消息推送页面进行配置,完善 appKey(签名Key) 和 appSecret(签名秘钥)。
-------------------------------------- 第一步:拼接header参数 --------------------------------
appkey=532cad73c5493193d63d367016b98b27&Nonce=C6wuzd0Qguxzelhb&Time=1618914078668&token=4e693d54d75db580a56d1263
-------------------------------------- 第二步:拼接appKey参数 --------------------------------
appkey=532cad73c5493193d63d367016b98b27&Nonce=C6wuzd0Qguxzelhb&Time=1618914078668&token=4e693d54d75db580a56d1263gU7Qtxi4dWnYAdmudyxni52bWZ58b8uN
-------------------------------------- 第三步:字符串小写 -------------------------------------
appkey=532cad73c5493193d63d367016b98b27&nonce=c6wuzd0qguxzelhb&time=1618914078668&token=4e693d54d75db580a56d1263gu7qtxi4dwnyadmudyxni52bwz58b8un
-------------------------------------- 第三步:MD5 32位加密 ----------------------------------
0e27da9f4c88c27896d1393a64d70392
请求Body参数:具体请参考消息推送格式页面
请求响应格式
{
"code":0
}
注意:当第三方系统收到推送的HTTP消息时,请按以上示例返回,若返回格式不符合要求,Aqara系统会认为第三方接收消息系统服务异常,将在一定条件下停止对第三方系统进行消息推送。
配置方法
登录Aqara开发者平台,在右上角进入控制台;
单击应用管理,创建应用,进入应用详情;
选择“消息推送设置”页面,根据需要填写配置信息;
订阅模式:支持“全部接收模式”和“用户自定义订阅模式”。
全部接收模式,指配置完成后,将授权账号的设备数据全部推送给第三方服务器,且通过“订阅资源”接口订阅的信息会自动失效,同时订阅接口将拒绝访问
用户自定义订阅模式,指通过“订阅资源”接口订阅指定设备资源数据,只将这些数据推送到第三方服务器。
推送通道:支持“消息队列推送”和“HTTP推送”。
推送配置地址:若“推送通道”选择为“HTTP推送”时配置,输入需要接收消息的服务器地址,并且验证服务器是否连通。
密钥ID:若“推送通道”选择为“消息队列推送”时配置,选择对应密钥ID。
推送状态:推送配置地址验证成功后,可设置推送是否启用。
备注:填写消息推送的备注信息。
保存配置。
Java Demo
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
package com.lumi.aiot.cloud.utils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author: zzm
* @Date: 2021/5/27 10:41
*/
public class MqConsumer {
private final String mqAddress = "3rd-subscription.aqara.cn:9876";
private final String appId = "06699244921c39c91e012252";
private final String keyId = "K.761236781237217";
private final String appKey = "cqLCZDxt4PgMOPrFyHVYReygeyX1z2rP";
public static void main(String[] args) {
MqConsumer mqConsumer = new MqConsumer();
try {
mqConsumer.start();
Thread.sleep(1000000L);
} catch (Exception e) {
e.printStackTrace();
}
}
public void start() throws MQClientException {
AclClientRPCHook acl = new AclClientRPCHook(new SessionCredentials(keyId, appKey));
//设置消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(appId, acl, new AllocateMessageQueueAveragely());
consumer.setVipChannelEnabled(false);
consumer.setNamesrvAddr(mqAddress);
//设置消费者端消息拉取策略,表示从哪里开始消费
// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(
// System.currentTimeMillis() - (1000 * 10 * 1)));
//设置从10分钟前开始消费,配合setConsumeTimestamp一起使用,格式yyyyMMddhhmmss
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - 1000 * 60 * 10));
//设置消费者拉取消息的策略,*表示消费该topic下的所有消息,也可以指定tag进行消息过滤
consumer.subscribe(appId, "*");
//消费者端启动消息监听,一旦生产者发送消息被监听到,就打印消息,和rabbitmq中的handlerDelivery类似
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String topic = messageExt.getTopic();
String tag = messageExt.getTags();
String msg = new String(messageExt.getBody());
log.info("******device info******{}", msg);
}
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//调用start()方法启动consumer
consumer.start();
}
}