本文所使用的消息队列是阿里云的消息队列
消息队列通常被用来系统解耦以及异步处理,我们系统中就是用来给第三方接口发送消息的。
之前我有篇文章介绍过一些情况。
当时是用轮询的方式来处理的,但是处理的不是很好。如果启用了多线程,那每个线程都会轮询,这样很浪费资源。
于是参考官方文档做了些改进。
package aliyun.mns;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.Message;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MessageReceiver {
public static final int WAIT_SECONDS = 30;
// if there are too many queues, a clear method could be involved after deleting the queue
protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();
protected Object lockObj;
protected String queueName;
protected CloudQueue cloudQueue;
protected int workerId;
public MessageReceiver(int id, MNSClient mnsClient, String queue) {
cloudQueue = mnsClient.getQueueRef(queue);
queueName = queue;
workerId = id;
synchronized (sLockObjMap) {
lockObj = sLockObjMap.get(queueName);
if (lockObj == null) {
lockObj = new Object();
sLockObjMap.put(queueName, lockObj);
}
}
}
public boolean setPolling()
{
synchronized (lockObj) {
Boolean ret = sPollingMap.get(queueName);
if (ret == null || !ret) {
sPollingMap.put(queueName, true);
return true;
}
return false;
}
}
public void clearPolling()
{
synchronized (lockObj) {
sPollingMap.put(queueName, false);
lockObj.notifyAll();
System.out.println("Everyone WakeUp and Work!");
}
}
public Message receiveMessage()
{
boolean polling = false;
while (true) {
synchronized (lockObj) {
Boolean p = sPollingMap.get(queueName);
if (p != null && p) {
try {
System.out.println("Thread" + workerId + " Have a nice sleep!");
polling = false;
lockObj.wait();
} catch (InterruptedException e) {
System.out.println("MessageReceiver Interrupted! QueueName is " + queueName);
return null;
}
}
}
try {
Message message;
if (!polling) {
message = cloudQueue.popMessage();
if (message == null) {
polling = true;
continue;
}
} else {
if (setPolling()) {
System.out.println("Thread" + workerId + " Polling!");
} else {
continue;
}
do {
System.out.println("Thread" + workerId + " KEEP Polling!");
message = cloudQueue.popMessage(WAIT_SECONDS);
} while (message == null);
clearPolling();
}
return message;
} catch (Exception e) {
// it could be network exception
System.out.println("Exception Happened when popMessage: " + e);
}
}
}
}
改进后如果队列中没有消息了的话,就只会有一个线程轮询,如果有消息了的话,又会唤起所有线程,支持多队列。
下面是具体的应用代码,只开了两个线程,然后去掉了业务代码。
@Repository
public class StartWithServer {
Logger logger = Logger.getLogger(getClass());
public StartWithServer() {
Runnable deal = new DealMeassage();
new Thread(deal).start();
}
public void work(int workId){
MessageReceiver receiver = new MessageReceiver(workId, client, queueName);
while (true) {
Message message = receiver.receiveMessage();
// Do your bussiness
client.getQueueRef(queueName).deleteMessage(message.getReceiptHandle());
}
}
public class DealMeassage implements Runnable {
@Override
public void run() {
Thread thread1 = new Thread(new Runnable() {
public void run() {
work(1);
}
});
Thread thread2 = new Thread(new Runnable() {
public void run() {
work(2);
}
});
thread1.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
There are no comments on this post.