论消息队列的正确使用姿势

Posted by Deadline on August 22, 2016

本文所使用的消息队列是阿里云的消息队列

消息队列通常被用来系统解耦以及异步处理,我们系统中就是用来给第三方接口发送消息的。
之前我有篇文章介绍过一些情况。
当时是用轮询的方式来处理的,但是处理的不是很好。如果启用了多线程,那每个线程都会轮询,这样很浪费资源。 于是参考官方文档做了些改进。

package aliyun.mns;

import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.Message;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MessageReceiver {


    public static final int WAIT_SECONDS = 30;

    // if there are too many queues, a clear method could be involved after deleting the queue
    protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
    protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();

    protected Object lockObj;
    protected String queueName;
    protected CloudQueue cloudQueue;
    protected int workerId;

    public MessageReceiver(int id, MNSClient mnsClient, String queue) {
        cloudQueue = mnsClient.getQueueRef(queue);
        queueName = queue;
        workerId = id;

        synchronized (sLockObjMap) {
            lockObj = sLockObjMap.get(queueName);
            if (lockObj == null) {
                lockObj = new Object();
                sLockObjMap.put(queueName, lockObj);
            }
        }
    }

    public boolean setPolling()
    {
        synchronized (lockObj) {
            Boolean ret = sPollingMap.get(queueName);
            if (ret == null || !ret) {
                sPollingMap.put(queueName, true);
                return true;
            }
            return false;
        }
    }

    public void clearPolling()
    {
        synchronized (lockObj) {
            sPollingMap.put(queueName, false);
            lockObj.notifyAll();
            System.out.println("Everyone WakeUp and Work!");
        }
    }

    public Message receiveMessage()
    {
        boolean polling = false;
        while (true) {
            synchronized (lockObj) {
                Boolean p = sPollingMap.get(queueName);
                if (p != null && p) {
                    try {
                        System.out.println("Thread" + workerId + " Have a nice sleep!");
                        polling = false;
                        lockObj.wait();
                    } catch (InterruptedException e) {
                        System.out.println("MessageReceiver Interrupted! QueueName is " + queueName);
                        return null;
                    }
                }
            }

            try {
                Message message;
                if (!polling) {
                    message = cloudQueue.popMessage();
                    if (message == null) {
                        polling = true;
                        continue;
                    }
                } else {
                    if (setPolling()) {
                        System.out.println("Thread" + workerId + " Polling!");
                    } else {
                        continue;
                    }
                    do {
                        System.out.println("Thread" + workerId + " KEEP Polling!");
                        message = cloudQueue.popMessage(WAIT_SECONDS);
                    } while (message == null);
                    clearPolling();
                }
                return message;
            } catch (Exception e) {
                // it could be network exception
                System.out.println("Exception Happened when popMessage: " + e);
            }
        }
    }
}

改进后如果队列中没有消息了的话,就只会有一个线程轮询,如果有消息了的话,又会唤起所有线程,支持多队列。

下面是具体的应用代码,只开了两个线程,然后去掉了业务代码。

@Repository
public class StartWithServer {

	Logger logger = Logger.getLogger(getClass());

	public StartWithServer() {

		Runnable deal = new DealMeassage();
		new Thread(deal).start();
	}

	public void work(int workId){
		MessageReceiver receiver = new MessageReceiver(workId, client, queueName);
		while (true) {
			Message message = receiver.receiveMessage();

      // Do your bussiness

			client.getQueueRef(queueName).deleteMessage(message.getReceiptHandle());
		}

	}

	public class DealMeassage implements Runnable {

		@Override
		public void run() {

		     Thread thread1 = new Thread(new Runnable() {
		            public void run() {
		                work(1);
		            }
		        });

        Thread thread2 = new Thread(new Runnable() {
                   public void run() {
                       work(2);
                   }
           });
		     thread1.start();

		     try {
		            thread1.join();
                thread2.join();
		        } catch (InterruptedException e) {
		            e.printStackTrace();
		        }
		}

	}

}


There are no comments on this post.