前段时间项目用到mqtt的消息推送,整理一下代码,代码的原型是网上找的,具体哪个地址已经忘记了。
代码的实现是新建了一个MyMqttService,全部功能都在里面实现,包括连服务器,断线重连,订阅消息,处理消息,发布消息等基本操作。
首先添加依赖:
dependencies { implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' } 然后编辑AndroidManifest.xml,先添加权限:
<uses-permission android:name="android.permission.INTERNET" /> <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /> <uses-permission android:name="android.permission.WAKE_LOCK" /> 再注册service:
<service android:name="org.eclipse.paho.android.service.MqttService" /> <service android:name=".service.MyMqttService" android:enabled="true" android:exported="true"/> 接着进入正文MyMqttService.java,功能见注释吧:
package com.example.nan.mqtt.service; import android.app.Notification; import android.app.NotificationManager; import android.app.PendingIntent; import android.app.Service; import android.content.Context; import android.content.Intent; import android.graphics.BitmapFactory; import android.os.Bundle; import android.os.IBinder; import android.support.v4.app.NotificationCompat; import android.util.Log; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import org.eclipse.paho.android.service.MqttAndroidClient; import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.json.JSONObject; /** * @author nan */ public class MyMqttService extends Service { private static final String TAG = "nlgMqttService"; private static final String TOPIC_TO_QA = "/s2c/task_quality/"; private static final String publishTopic = "exampleAndroidPublishTopic"; private MqttAndroidClient mqttAndroidClient; private NotificationManager mNotificationManager; public MyMqttService() { } @Override public IBinder onBind(Intent intent) { // TODO: Return the communication channel to the service. throw new UnsupportedOperationException("Not yet implemented"); } @Override public void onCreate() { super.onCreate(); Log.d(TAG, "MqttService onCreate executed"); //mqtt服务器的地址 final String serverUri = "tcp://192.168.10.10:1883"; //新建Client,以设备ID作为client ID mqttAndroidClient = new MqttAndroidClient(MyMqttService.this, serverUri, getIMEI()); mqttAndroidClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { //连接成功 if (reconnect) { Log.d(TAG, "connectComplete: " + serverURI); // Because Clean Session is true, we need to re-subscribe subscribeAllTopics(); } else { Log.d(TAG, "connectComplete: " + serverURI); } } @Override public void connectionLost(Throwable cause) { //连接断开 Log.d(TAG, "connectionLost: connection was lost"); } @Override public void messageArrived(String topic, MqttMessage message) { //订阅的消息送达,推送notify String payload = new String(message.getPayload()); Log.d(TAG, "Topic: " + topic + " ==> Payload: " + payload); if(mNotificationManager == null) { mNotificationManager = (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE); } int roleId = SinSimApp.getApp().getRole(); Gson gson = new Gson(); ServerToClientMsg msg = gson.fromJson(payload, new TypeToken<ServerToClientMsg>(){}.getType()); if(msg != null) { //接受消息 if(topic != null) { if(topic.equals(TOPIC_TO_QA)) { Intent intent = new Intent(MyMqttService.this, ProcessToCheckoutActivity.class); PendingIntent pi = PendingIntent.getActivity(MyMqttService.this, 0, intent, 0); NotificationCompat.Builder builder = new NotificationCompat.Builder(MyMqttService.this, TOPIC_TO_QA); Notification notify = builder.setSmallIcon(R.mipmap.to_quality) .setLargeIcon(BitmapFactory.decodeResource(getResources(), R.mipmap.to_quality)) .setDefaults(Notification.DEFAULT_SOUND|Notification.DEFAULT_VIBRATE)//响铃震动 .setContentTitle("快递来了") .setAutoCancel(true) .setContentIntent(pi) .setVisibility(Notification.VISIBILITY_PUBLIC) .setContentText("你的快递单号:" + msg.getOrderNum()) //不设置此项不会悬挂,false 不会出现悬挂 .build(); mNotificationManager.notify(2,notify); } } } } @Override public void deliveryComplete(IMqttDeliveryToken token) { //即服务器成功delivery消息 } }); //新建连接设置 MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); //断开后,是否自动连接 mqttConnectOptions.setAutomaticReconnect(true); //是否清空客户端的连接记录。若为true,则断开后,broker将自动清除该客户端连接信息 mqttConnectOptions.setCleanSession(false); //设置超时时间,单位为秒 //mqttConnectOptions.setConnectionTimeout(2); //心跳时间,单位为秒。即多长时间确认一次Client端是否在线 //mqttConnectOptions.setKeepAliveInterval(2); //允许同时发送几条消息(未收到broker确认信息) //mqttConnectOptions.setMaxInflight(10); //选择MQTT版本 mqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); try { Log.d(TAG, "onCreate: Connecting to " + serverUri); //开始连接 mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Log.d(TAG, "onSuccess: Success to connect to " + serverUri); DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions(); disconnectedBufferOptions.setBufferEnabled(true); disconnectedBufferOptions.setBufferSize(100); disconnectedBufferOptions.setPersistBuffer(false); disconnectedBufferOptions.setDeleteOldestMessages(false); mqttAndroidClient.setBufferOpts(disconnectedBufferOptions); //成功连接以后开始订阅 subscribeAllTopics(); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { //连接失败 Log.d(TAG, "onFailure: Failed to connect to " + serverUri); exception.printStackTrace(); } }); } catch (MqttException ex) { ex.printStackTrace(); } //service绑定notification Intent intent = new Intent(this, SplashActivity.class); intent.putExtra(SinSimApp.FROM_NOTIFICATION, true); //这边设置“FLAG_UPDATE_CURRENT”是为了让后面的Activity接收pendingIntent中Extra的数据 PendingIntent pi = PendingIntent.getActivity(this, 0, intent, PendingIntent.FLAG_UPDATE_CURRENT); Notification notification = new NotificationCompat.Builder(this) .setContentTitle("mqtt快递") .setContentText("mqtt快递管理系统") .setWhen(System.currentTimeMillis()) .setSmallIcon(R.mipmap.ic_launcher) .setLargeIcon(BitmapFactory.decodeResource(getResources(), R.mipmap.ic_launcher)) .setContentIntent(pi) .build(); startForeground(1, notification); } //订阅所有消息 private void subscribeAllTopics() { subscribeToTopic(TOPIC_TO_QA); } /** * 订阅消息 */ public void subscribeToTopic(String subscriptionTopic) { try { mqttAndroidClient.subscribe(subscriptionTopic, 2, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Log.d(TAG, "onSuccess: Success to Subscribed!"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { Log.d(TAG, "onFailure: Failed to subscribe"); } }); } catch (MqttException ex) { Log.d(TAG, "subscribeToTopic: Exception whilst subscribing"); ex.printStackTrace(); } } /** * 发布消息 */ public void publishMessage(String msg) { try { MqttMessage message = new MqttMessage(); message.setPayload(msg.getBytes()); mqttAndroidClient.publish(publishTopic, message); Log.d(TAG, "publishMessage: Message Published: " + msg); } catch (MqttException e) { Log.d(TAG, "publishMessage: Error Publishing: " + e.getMessage()); e.printStackTrace(); } } @Override public int onStartCommand(Intent intent, int flags, int startId) { Log.d(TAG, "MqttService onStartCommand executed"); return super.onStartCommand(intent, flags, startId); } @Override public void onDestroy() { super.onDestroy(); try { if(mqttAndroidClient!=null){ //服务退出时client断开连接 mqttAndroidClient.disconnect(); } } catch (MqttException e) { e.printStackTrace(); } Log.d(TAG, "MqttService onDestroy executed"); } }
调试过程中出现过一个小插曲:服务在有些时候会不停的断线重连。断线重连的设置是开了的: mqttConnectOptions.setAutomaticReconnect(true); 但是断开的原因找不到,当时还没有重写onDestory方法,就算退出应用也还在重连,一度怀疑service的开启与关闭的问题,还系统的重新学习了一下service的使用,学完以后也没有啥进展,然后重学mqtt的调用流程发挥了效果,在onDestory里面调用了disconnect()方法,完了以后在退出应用以后就不会重连了,但是重新开还是继续不停重连。到了晚上,奇怪的事情发生了,当夜深人静,独自加班的时候,居然再也复现不了了。为什么呢,心想可能平时给八阿哥上的香起了效果,那就开心的回家吧。下班的路上虽然开心的吃了块鸡排,但心里的结还是没有打开,为什么呢,是道德的沦丧还是人性的扭曲,让我独自加班还不饿给我复现问题。突然灵光一现,想到今天特么加班就我一个人,也就是一个人玩就是好的,玩的人多就会有问题,那么答案就来了,跟唯一性有关的只有clientID了,特么老子把clientID设置成用户id了,测试用的用户id就注册了3个,好几个人来回切着用,不出问题才怪。于是我默默的把clientID改成了设备id,困扰2天的问题就这么解决了。 --------------------- 作者:邦德总管 来源:CSDN 原文:https://blog.csdn.net/a5nan/article/details/79975488 版权声明:本文为博主原创文章,转载请附上博文链接!
|