阿里云消息队列的使用

Posted by Deadline on November 30, 2015

最近项目中涉及到和其他公司合作,有一些接口需要对接。 其中一个接口涉及到核心业务,我们这边处理完成后需要发送一些数据给对方。 为了不影响我们的业务流程,所以准备采用消息队列的方式来处理。

整个流程其实比较简单,就是我们处理完之后把要发送的数据放到消息队列中去,然后有个独立线程轮询队列将消息取出然后发送出去。

服务器都是在阿里云上,而且阿里云也提供了消息服务。阿里云还有很多其他的服务,感觉确实不错,节省各方面成本,比如key-value缓存、cdn等等。而且消息服务是每月前100w次免费,小公司用不了那么多,每月免费额度就够用了。

关于阿里云的消息服务查看官网文档就好了,其他的不多说啦。

接下来说说实际中遇到的问题吧

1.jar包冲突####

直接使用了提供的sdk,可以在 pom.xml 中直接添加依赖就行了。sdk 依赖 httpcore、httpclient等jar包。版本是 4.3.2。因为之前项目中已经有这些包了,不过版本是4.2.x的,所以新增的依赖不会去更新这些版本。当时也没注意,后来启动的时候一直报错,class def not found之类的,有人说是缺少http相关jar包,但是我们有呀,后来还是在 stackoverfollow上看到有人评论说 4.2.x 和4.3.x 变化较大,使用的时候需要注意版本。后来在 pom 中指定了 最新的版本,结果就不报错了。这种错误有点防不胜防啊。 (其实也提供了 restful 类型的接口来创建消息队列啊,创建和消费消息,如果是项目中自己有写相关工具类可以考虑用自己的,懒人还是用sdk吧,省得折腾)

2.网络问题####

由于公司网络不太好,消费线程总是异常 connect closed 。 队列有提供内网和外网地址,如果网络不好的话,可以传到测试环境上使用内网地址来测试,使用测试环境后就不再出现链接关闭的错误。

3.自己2的错误####

习惯性的将数据判空操作

if(null != queue.pop()){}

要判空的话也要用临时变量存起来,要不然你这样就相当于已经消费掉了。一开始没注意,然后后面发现总是取不到消息,返回去看代码才发现自己2了。

  Message message = queue.pop();
  if(null != message){}

其实SDK有提供异步回调方法的,只要实现 AsyncCallback 接口就行了。

public class ReceiveDeleteAsyncCallback<T> implements AsyncCallback<T> {


	public ReceiveDeleteAsyncCallback() {
	}

	@Override
	public void onSuccess(T message) {
		//mMessage = (Message) message;
		//System.out.println("pop succeed:"+mMessage.getMessageBodyAsString());
		//doDelete();
	}

	@Override
	public void onFail(Exception ex) {
		// TODO Auto-generated method stub

	}

}

最开始是想利用回调方式来进行发送任务的,但是这样的话感觉还是同步方式了,跟之前也没有啥区别,这边操作完将消息 put 进队列,然后马上唤起操作任务将消息 pop 出来发送掉。 这跟直接发送没啥大区别,解耦还是不够。最好的方式,就是操作完直接将消息 put 进队列就不管了,怎么处理跟我没啥关系。

后来考虑了下还是用最简单直接的办法 处理线程定时去轮询,有消息就发送。不过有个缺点就是无法及时发送,异步方式的话就能立即唤起任务来处理消息。不过我们这个任务对时间并不敏感所以也就无所谓啦,而且队列属性先进先出,所以消息的顺序也没有问题。

然后sdk也有多线程批量处理。 详情还是看看他们提供的 sample 吧。

然后关于轮询线程如何写参考我的另外一篇文章—如何设置一个随着Java Webapp启动而启动的任务


There are no comments on this post.