服务端测试:
EMQX Cloud:在线测试
MQTTX:需要下载电脑客户端
android:
项目下build.gradle
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
//LocalBroadcastManager这个类被弃用了,需要添加
implementation 'com.android.support:support-v4:30.4.1'
工程下 gradle.properties
#LocalBroadcastManager这个类被弃用了,需要添加
android.enableJetifier=true
清单文件:
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<service android:name="org.eclipse.paho.android.service.MqttService" />
<service
android:name=".MQTTService"
android:enabled="true"
android:exported="true"/>
public interface IGetMessageCallBack {
void setMessage(String message);
}
MqttServiceConnection
public class MqttServiceConnection implements ServiceConnection {
private MQTTService mqttService;
private IGetMessageCallBack iGetMessageCallBack;
@Override
public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
mqttService = ((MQTTService.CustomBinder) iBinder).getService();
mqttService.setIGetMessageCallBack(iGetMessageCallBack);
}
@Override
public void onServiceDisconnected(ComponentName componentName) {
}
public MQTTService getMqttService() {
return mqttService;
}
public void setIGetMessageCallBack(IGetMessageCallBack iGetMessageCallBack) {
this.iGetMessageCallBack = iGetMessageCallBack;
}
}
MQTTService
public class MQTTService extends Service {
public static final String TAG = MQTTService.class.getSimpleName();
public static final String SN = "cloud5.0";//服务端发送给客户端的主题
private static MqttAndroidClient client;
private MqttConnectOptions conOpt;
private String host = "tcp://broker.emqx.io:1883";
// private String host = "tcp://101.133.167.135:1883";
private String userName = "fly";
private String passWord = "123";
private static String mTopic = "testtopic/2"; //服务端接收客户端的主题
private static String willTopic = "willTopic"; //遗嘱主题
private String clientId = "12343112"; //客户端标识
private IGetMessageCallBack iGetMessageCallBack;
private static final Integer qos = 2;
@Override
public void onCreate() {
super.onCreate();
Log.e(getClass().getName(), "onCreate");
init();
}
//客户端发消息给服务端
public static void publish(String msg) {
String topic = mTopic;
Boolean retained = false;
try {
if (client != null) {
client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
}
} catch (MqttException e) {
e.printStackTrace();
}
}
private void init() {
String sn = SN;
startConnect(sn);
// if (sn != null && !sn.isEmpty()) {
// startConnect(sn);
// } else {
// if (Device.getInstance().getSn() == null) {
// //TODO topic写死
// startConnect("11");
// SPUtils.put(SN, "kanisa_001");
// }else {
// startConnect(Device.getInstance().getSn());
// SPUtils.put(SN, Device.getInstance().getSn());
// }
// }
}
private void startConnect(String sn) {
Log.d(TAG, "设备号:" + sn);
mTopic = sn;
// 服务器地址(协议+地址+端口号)
String uri = host;
client = new MqttAndroidClient(getApplicationContext(), uri, clientId);
// 设置MQTT监听并且接受消息
client.setCallback(mqttCallback);
conOpt = new MqttConnectOptions();
// 清除缓存
conOpt.setCleanSession(true);
// 设置超时时间,单位:秒
conOpt.setConnectionTimeout(10);
// 心跳包发送间隔,单位:秒
conOpt.setKeepAliveInterval(30);
// 用户名
conOpt.setUserName(userName);
// 密码
conOpt.setPassword(passWord.toCharArray()); //将字符串转换为字符串数组
//设置断开后重新连接
conOpt.setAutomaticReconnect(true);
// last will message
boolean doConnect = true;
String message = "{\"terminal_uid\":\"" + clientId + "\"}";
Log.d(getClass().getName(), "message是:" + message + " myTopic " + mTopic);
// 最后的遗嘱
// MQTT本身就是为信号不稳定的网络设计的,所以难免一些客户端会无故的和Broker断开连接。
//当客户端连接到Broker时,可以指定LWT,Broker会定期检测客户端是否有异常。
//当客户端异常掉线时,Broker就往连接时指定的topic里推送当时指定的LWT消息。
try {
conOpt.setWill(willTopic, message.getBytes(), qos, false);
Log.d(getClass().getName(), "设置遗嘱主题" );
} catch (Exception e) {
Log.i(TAG, "Exception Occured", e);
doConnect = false;
iMqttActionListener.onFailure(null, e);
}
if (doConnect) {
doClientConnection();
}
}
@Override
public boolean onUnbind(Intent intent) {
client.unregisterResources();
return super.onUnbind(intent);
}
@Override
public void onDestroy() {
stopSelf();
try {
if (client != null)
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
super.onDestroy();
}
/**
* 连接MQTT服务器
*/
private void doClientConnection() {
if (!client.isConnected() && isConnectIsNormal()) {
try {
client.connect(conOpt, null, iMqttActionListener);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
// MQTT是否连接成功
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken arg0) {
Log.i(TAG, "连接成功 ");
try {
// 订阅myTopic话题
client.subscribe(mTopic, 2);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
arg1.printStackTrace();
// 连接失败,重连
Log.d(TAG, "连接失败 arg0:"+arg0.toString()+" arg1:"+arg1);
}
};
// MQTT监听并且接受消息
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
String str1 = new String(message.getPayload());
if (iGetMessageCallBack != null) {
iGetMessageCallBack.setMessage(str1);
}
String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained();
Log.i(TAG, "messageArrived:" + str1);
Log.i(TAG, str2);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
}
@Override
public void connectionLost(Throwable arg0) {
// 失去连接,重连
}
};
/**
* 判断网络是否连接
*/
private boolean isConnectIsNormal() {
ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext()
.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isAvailable()) {
String name = info.getTypeName();
Log.i(TAG, "MQTT当前网络名称:" + name);
return true;
} else {
Log.i(TAG, "MQTT 没有可用网络");
return false;
}
}
@Override
public IBinder onBind(Intent intent) {
Log.e(getClass().getName(), "onBind");
return new CustomBinder();
}
public class CustomBinder extends Binder {
public MQTTService getService() {
return MQTTService.this;
}
}
public void setIGetMessageCallBack(IGetMessageCallBack iGetMessageCallBack) {
this.iGetMessageCallBack = iGetMessageCallBack;
}
}
MainActivity:
public class MainActivity extends AppCompatActivity implements IGetMessageCallBack {
private static final String TAG = MainActivity.class.getSimpleName();
private MqttServiceConnection serviceConnection;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
initAndroidMQTT();
initView();
}
private void initView() {
EditText editText = findViewById(R.id.editText);
Button btn = findViewById(R.id.btn);
btn.setOnClickListener(view -> {
String text = editText.getText().toString().trim();
MQTTService.publish(text);
});
}
private void initAndroidMQTT() {
serviceConnection = new MqttServiceConnection();
serviceConnection.setIGetMessageCallBack(this);
//用Intent方式创建并启用Service
Intent intent = new Intent(this, MQTTService.class);
bindService(intent, serviceConnection, Context.BIND_AUTO_CREATE);
}
@Override
public void setMessage(String message) {
Log.d(TAG, "收到的推送数据:" + message);
}