编写和MQTT服务器通信的Android客户端程序(二)

客户端使用的API,开始我使用的是mqtt-client,使用过后发现问题百出,不能很好的满足要求,后来使用了官方推荐的Eclipse Paho,下面开始客户端代码的编写,为了方便测试这里有android和j2se两个工程:

1、新建android工程MQTTClient

2、MainActivity代码如下:

package ldw.mqttclient;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import android.app.Activity;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.view.KeyEvent;
import android.widget.TextView;
import android.widget.Toast;

public class MainActivity extends Activity {

	private TextView resultTv;

	private String host = "tcp://127.0.0.1:1883";
	private String userName = "admin";
	private String passWord = "password";

	private Handler handler;

	private MqttClient client;

	private String myTopic = "test/topic";

	private MqttConnectOptions options;

	private ScheduledExecutorService scheduler;

	@Override
	protected void onCreate(Bundle savedInstanceState) {
		super.onCreate(savedInstanceState);
		setContentView(R.layout.main);

		resultTv = (TextView) findViewById(R.id.result);

		init();

		handler = new Handler() {
			@Override
			public void handleMessage(Message msg) {
				super.handleMessage(msg);
				if(msg.what == 1) {
					Toast.makeText(MainActivity.this, (String) msg.obj,
							Toast.LENGTH_SHORT).show();
					System.out.println("-----------------------------");
				} else if(msg.what == 2) {
					Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_SHORT).show();
					try {
						client.subscribe(myTopic, 1);
					} catch (Exception e) {
						e.printStackTrace();
					}
				} else if(msg.what == 3) {
					Toast.makeText(MainActivity.this, "连接失败,系统正在重连", Toast.LENGTH_SHORT).show();
				}
			}
		};

		startReconnect();

	}

	private void startReconnect() {
		scheduler = Executors.newSingleThreadScheduledExecutor();
		scheduler.scheduleAtFixedRate(new Runnable() {

			@Override
			public void run() {
				if(!client.isConnected()) {
					connect();
				}
			}
		}, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
	}

	private void init() {
		try {
                       //host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
			client = new MqttClient(host, "test",
					new MemoryPersistence());
                       //MQTT的连接设置
			options = new MqttConnectOptions();
                       //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
			options.setCleanSession(true);
                       //设置连接的用户名
			options.setUserName(userName);
                       //设置连接的密码
			options.setPassword(passWord.toCharArray());
			// 设置超时时间 单位为秒
			options.setConnectionTimeout(10);
			// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
			options.setKeepAliveInterval(20);
                        //设置回调
			client.setCallback(new MqttCallback() {

				@Override
				public void connectionLost(Throwable cause) {
                                        //连接丢失后,一般在这里面进行重连
					System.out.println("connectionLost----------");
				}

				@Override
				public void deliveryComplete(IMqttDeliveryToken token) {
                                        //publish后会执行到这里
					System.out.println("deliveryComplete---------"
							+ token.isComplete());
				}

				@Override
				public void messageArrived(String topicName, MqttMessage message)
						throws Exception {
                                        //subscribe后得到的消息会执行到这里面
					System.out.println("messageArrived----------");
					Message msg = new Message();
					msg.what = 1;
					msg.obj = topicName+"---"+message.toString();
					handler.sendMessage(msg);
				}
			});
//			connect();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private void connect() {
		new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					client.connect(options);
					Message msg = new Message();
					msg.what = 2;
					handler.sendMessage(msg);
				} catch (Exception e) {
					e.printStackTrace();
					Message msg = new Message();
					msg.what = 3;
					handler.sendMessage(msg);
				}
			}
		}).start();
	}

	@Override
	public boolean onKeyDown(int keyCode, KeyEvent event) {
		if(client != null && keyCode == KeyEvent.KEYCODE_BACK) {
			try {
				client.disconnect();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		return super.onKeyDown(keyCode, event);
	}

	@Override
	protected void onDestroy() {
		super.onDestroy();
		try {
			scheduler.shutdown();
			client.disconnect();
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}
}

由于项目需要,我用到了心跳重连。根据这里的解释设置apollo.xml,主要有设置主机连接的地址。另外,options还有个setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。

3、新建j2se工程MQTTServer

4、Server代码如下:

import java.awt.Container;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;

import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Server extends JFrame {
	private static final long serialVersionUID = 1L;
	private JPanel panel;
	private JButton button;

	private MqttClient client;
	private String host = "tcp://127.0.0.1:1883";
//	private String host = "tcp://localhost:1883";
	private String userName = "test";
	private String passWord = "test";
	private MqttTopic topic;
	private MqttMessage message;

	private String myTopic = "test/topic";

	public Server() {

		try {
			client = new MqttClient(host, "Server",
					new MemoryPersistence());
			connect();
		} catch (Exception e) {
			e.printStackTrace();
		}

		Container container = this.getContentPane();
		panel = new JPanel();
		button = new JButton("发布话题");
		button.addActionListener(new ActionListener() {

			@Override
			public void actionPerformed(ActionEvent ae) {
				try {
					MqttDeliveryToken token = topic.publish(message);
					token.waitForCompletion();
					System.out.println(token.isComplete()+"========");
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		});
		panel.add(button);
		container.add(panel, "North");

	}

	private void connect() {

		MqttConnectOptions options = new MqttConnectOptions();
		options.setCleanSession(false);
		options.setUserName(userName);
		options.setPassword(passWord.toCharArray());
		// 设置超时时间
		options.setConnectionTimeout(10);
		// 设置会话心跳时间
		options.setKeepAliveInterval(20);
		try {
			client.setCallback(new MqttCallback() {

				@Override
				public void connectionLost(Throwable cause) {
					System.out.println("connectionLost-----------");
				}

				@Override
				public void deliveryComplete(IMqttDeliveryToken token) {
					System.out.println("deliveryComplete---------"+token.isComplete());
				}

				@Override
				public void messageArrived(String topic, MqttMessage arg1)
						throws Exception {
					System.out.println("messageArrived----------");

				}
			});

			topic = client.getTopic(myTopic);

			message = new MqttMessage();
			message.setQos(1);
			message.setRetained(true);
			System.out.println(message.isRetained()+"------ratained状态");
			message.setPayload("eeeeeaaaaaawwwwww---".getBytes());

			client.connect(options);
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	public static void main(String[] args) {
		Server s = new Server();
		s.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
		s.setSize(600, 370);
		s.setLocationRelativeTo(null);
		s.setVisible(true);
	}
}

上面代码跟客户端的代码差不多,这里就不做解释了。

没什么好说的,MQTT就是这么简单,但开始在使用的时候要注意一些参数的设置来适应项目的需求。

《编写和MQTT服务器通信的Android客户端程序(二)》上有102条评论

  1. 我又有个问题了,在使用apollo,明明订阅者A连上了,但是为什么有时候收不到积累的durable消息? 这个时候,发布者B再发布消息出去,A也没收到。

    1. 原来是因为username是一样才会错乱。注意不是clientId,username是验证时候的名字。apollo需要保证连上的username都是不一样才不会错乱。看来apollo还不是很成熟的

    2. 没订阅成功,或者订阅的主题和发送的主题不一致导致的,原因很多哦。。。

  2. 我又有个问题了,在使用apollo,明明订阅者A连上了,但是为什么有时候收不到积累的durable消息? 这个时候,发布者B再发布消息出去,A也没收到。

    1. 原来是因为username是一样才会错乱。注意不是clientId,username是验证时候的名字。apollo需要保证连上的username都是不一样才不会错乱。看来apollo还不是很成熟的

    2. 没订阅成功,或者订阅的主题和发送的主题不一致导致的,原因很多哦。。。

      1. jar包eclipse官网有的,把源码下下来然后导出个jar包就可以了,很久没弄了,我的是很老的jar包了

      1. jar包eclipse官网有的,把源码下下来然后导出个jar包就可以了,很久没弄了,我的是很老的jar包了

    1. 我为什么一直链接不上服务器 我进http://127.0.0.1:61680/console/index.html都可以 客户端和服务端都链接不上

    1. 我为什么一直链接不上服务器 我进http://127.0.0.1:61680/console/index.html都可以 客户端和服务端都链接不上

  3. 为什么连接在不断的丢失,导致不断的连接,不断的收到了重复的消息,这是怎么回事儿,是不是哪里设置的问题

  4. 为什么连接在不断的丢失,导致不断的连接,不断的收到了重复的消息,这是怎么回事儿,是不是哪里设置的问题

  5. 我初次连接通知服务器都能正常工作,然后关闭服务器,再次启动之后,再次连接通知服务器就报错了at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:27)at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:773)at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:110)

  6. 我初次连接通知服务器都能正常工作,然后关闭服务器,再次启动之后,再次连接通知服务器就报错了at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:27)at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:773)at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:110)

  7. 你好,我写了client1用来publish,client2去durable订阅,就是cleanSession设成false。先启动client2这时publish来的信息都是能接收的。然后强制关闭client2造成crash的测试case,再启动client2,就再也不能接收到信息。我登录http://127.0.0.1:61680/里面显示client2确实有durable的信息积压在topic里。请问这是为什么重连之后不能继续接收呢?

  8. 你好,我写了client1用来publish,client2去durable订阅,就是cleanSession设成false。先启动client2这时publish来的信息都是能接收的。然后强制关闭client2造成crash的测试case,再启动client2,就再也不能接收到信息。我登录http://127.0.0.1:61680/里面显示client2确实有durable的信息积压在topic里。请问这是为什么重连之后不能继续接收呢?

  9. android端代码太弱,这样会极不稳定,Activity推出的情况下肯定也是收不到消息的。应该写一个常驻服务Service,在这里实现逻辑。

  10. android端代码太弱,这样会极不稳定,Activity推出的情况下肯定也是收不到消息的。应该写一个常驻服务Service,在这里实现逻辑。

  11. “开始我使用的是mqtt-client,使用过后发现问题百出”,请问楼主是什么问题?因为我们这边现在就是用的mqtt-client

    1. 都是去年的事情了,很久没用过了,记得之前的问题好像是收不到消息

  12. “开始我使用的是mqtt-client,使用过后发现问题百出”,请问楼主是什么问题?因为我们这边现在就是用的mqtt-client

    1. 都是去年的事情了,很久没用过了,记得之前的问题好像是收不到消息

    1. 你还没看懂哦,需要调用相关的方法才能接收到消息的,你看下我上面写的J2SE端 MqttDeliveryToken token = topic.publish(message);这段代码就是发送消息的。

    2. 你还没看懂哦,需要调用相关的方法才能接收到消息的,你看下我上面写的J2SE端 MqttDeliveryToken token = topic.publish(message);这段代码就是发送消息的。

      1. LZ,调用相关方法是可以了,但有个问题想问一下,前几天什么安全公司发布了linux漏洞:美国时间1月27日,安全公司qualys发现名为 “幽灵”(GHOST)的高危漏洞。漏洞利用Linux GNU C Library (glibc) 2.18之前的版本中gethostbyname 函数处理漏洞。最严重可导致远程执行代码,控制受影响系统。glibc是GNU发布的libc库,即c运行库。glibc是linux系统中最底层的api,几乎其它任何运行库都会依赖于glibc。glibc除了封装linux操作系统所提供的系统服务外,它本身也提供了许多其它一些必要功能服务的实现。glibc 囊括了几乎所有的 UNIX 通行的标准。glibc库中的__nss_hostname_digits_dots()函数中发现了一个缓冲区溢出的漏洞,这个bug可以经过 gethostbyname*()函数被本地或者远程的触发。应用程序主要使用gethostbyname*()函数发起DNS请求,这个函数会将主机名称转换为ip地址。Qualys公司已测试可成功利用漏洞,针对名为Exim的邮件系统服务测试,通过发送特殊构造的邮件,即可获得该邮件服务器的权限,从而控制该服务器。我的服务器是ubuntu12.04,本来是可以发布消息的,昨天打了这个漏洞补丁之后,发现执行发布的java代码连接有问题了,估计ubuntu14.04也连接不上,我使用apollo 1.7,老版本应该没问题log如下:true——ratained状态 (32103) at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:27) at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:63) at org.eclipse.paho.client.mqttv3.internal.ClientComms.connect(ClientComms.java:135) at org.eclipse.paho.client.mqttv3.MqttClient.connect(MqttClient.java:339) at cn.smartslim.mqtt.demo.paho.ApolloServerJFrame.connect(ApolloServerJFrame.java:104) at cn.smartslim.mqtt.demo.paho.ApolloServerJFrame.(ApolloServerJFrame.java:39) at cn.smartslim.mqtt.demo.paho.ApolloServerJFrame.main(ApolloServerJFrame.java:115)

    1. 你还没看懂哦,需要调用相关的方法才能接收到消息的,你看下我上面写的J2SE端 MqttDeliveryToken token = topic.publish(message);这段代码就是发送消息的。

    2. 你还没看懂哦,需要调用相关的方法才能接收到消息的,你看下我上面写的J2SE端 MqttDeliveryToken token = topic.publish(message);这段代码就是发送消息的。

      1. LZ,调用相关方法是可以了,但有个问题想问一下,前几天什么安全公司发布了linux漏洞:美国时间1月27日,安全公司qualys发现名为 “幽灵”(GHOST)的高危漏洞。漏洞利用Linux GNU C Library (glibc) 2.18之前的版本中gethostbyname 函数处理漏洞。最严重可导致远程执行代码,控制受影响系统。glibc是GNU发布的libc库,即c运行库。glibc是linux系统中最底层的api,几乎其它任何运行库都会依赖于glibc。glibc除了封装linux操作系统所提供的系统服务外,它本身也提供了许多其它一些必要功能服务的实现。glibc 囊括了几乎所有的 UNIX 通行的标准。glibc库中的__nss_hostname_digits_dots()函数中发现了一个缓冲区溢出的漏洞,这个bug可以经过 gethostbyname*()函数被本地或者远程的触发。应用程序主要使用gethostbyname*()函数发起DNS请求,这个函数会将主机名称转换为ip地址。Qualys公司已测试可成功利用漏洞,针对名为Exim的邮件系统服务测试,通过发送特殊构造的邮件,即可获得该邮件服务器的权限,从而控制该服务器。我的服务器是ubuntu12.04,本来是可以发布消息的,昨天打了这个漏洞补丁之后,发现执行发布的java代码连接有问题了,估计ubuntu14.04也连接不上,我使用apollo 1.7,老版本应该没问题log如下:true——ratained状态 (32103) at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:27) at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:63) at org.eclipse.paho.client.mqttv3.internal.ClientComms.connect(ClientComms.java:135) at org.eclipse.paho.client.mqttv3.MqttClient.connect(MqttClient.java:339) at cn.smartslim.mqtt.demo.paho.ApolloServerJFrame.connect(ApolloServerJFrame.java:104) at cn.smartslim.mqtt.demo.paho.ApolloServerJFrame.(ApolloServerJFrame.java:39) at cn.smartslim.mqtt.demo.paho.ApolloServerJFrame.main(ApolloServerJFrame.java:115)

  13. true——ratained状态无法连接至服务器 (32103) – java.net.ConnectException: Connection refused: connect at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79) at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:603) at java.lang.Thread.run(Thread.java:744)Caused by: java.net.ConnectException: Connection refused: connect at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70) … 2 more救命啊~~~~~

  14. true——ratained状态无法连接至服务器 (32103) – java.net.ConnectException: Connection refused: connect at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79) at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:603) at java.lang.Thread.run(Thread.java:744)Caused by: java.net.ConnectException: Connection refused: connect at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70) … 2 more救命啊~~~~~

  15. 我启动了apollo服务器端, 用上面两个客户端都连接成功了. 我想写服务器端的业务逻辑, 应该写在哪里啊?

    1. 服务器端肯定负责发送消息,把j2se的逻辑代码嵌入到业务系统中就可以 配置一下主题应该就可以

  16. 我启动了apollo服务器端, 用上面两个客户端都连接成功了. 我想写服务器端的业务逻辑, 应该写在哪里啊?

    1. 服务器端肯定负责发送消息,把j2se的逻辑代码嵌入到业务系统中就可以 配置一下主题应该就可以

  17. 您好 ,小白正在研究这个mqtt推送,代码给我发一下吧549372543@qq.com. 非常感谢啊

  18. 您好 ,小白正在研究这个mqtt推送,代码给我发一下吧549372543@qq.com. 非常感谢啊

  19. 使用的是mqtt-client-0.4.0这个包,但提示java.lang.NoClassDefFoundError: org.eclipse.paho.client.mqttv3.MqttClient
    请教下是什么问题???

  20. 使用的是mqtt-client-0.4.0这个包,但提示java.lang.NoClassDefFoundError: org.eclipse.paho.client.mqttv3.MqttClient
    请教下是什么问题???

  21. 设置cleansession=false后不能接收在线消息,设置cleansession=true不能接收离线消息,怎么能同时接收离线消息和在线消息呢?

  22. 设置cleansession=false后不能接收在线消息,设置cleansession=true不能接收离线消息,怎么能同时接收离线消息和在线消息呢?

  23. 我用的是apollo,但是在apollo重启后,客户端第一次连接到apollo,如果这个时候给这个客户端已订阅的topic发一条消息,客户端无法获取消息,但客户端在这个时候断开连接,然后第二次连接到apollo,就可以获取到先前发的这个topic,请教下这是什么问题?谢谢

  24. 我用的是apollo,但是在apollo重启后,客户端第一次连接到apollo,如果这个时候给这个客户端已订阅的topic发一条消息,客户端无法获取消息,但客户端在这个时候断开连接,然后第二次连接到apollo,就可以获取到先前发的这个topic,请教下这是什么问题?谢谢

  25. 新手小白,我想请问下,是不是服务器作为发布,客户端作为订阅,然后可以用mosquitto或者其他来做代理服务器。还有就是这个你写的server是做什么用的类。。

  26. 新手小白,我想请问下,是不是服务器作为发布,客户端作为订阅,然后可以用mosquitto或者其他来做代理服务器。还有就是这个你写的server是做什么用的类。。

评论已关闭。