From b8efb09f74c5ac163530a7383b7968ae0846fe76 Mon Sep 17 00:00:00 2001 From: Li Yi Date: Wed, 8 Aug 2012 23:18:51 +0800 Subject: [PATCH] using basicConsume/wait methods to get msg from rabbitmq queue more safety consume message from rabbitmq, and more exactly exception handling --- .../jenkins/plugins/obsevent/GlobalManagement.java | 2 +- .../jenkins/plugins/obsevent/ObsEventHandler.java | 62 +++++++++++++--------- .../plugins/obsevent/amqp/AmqpConnection.java | 25 +++++++-- .../plugins/obsevent/trigger/ObsEventTrigger.java | 14 +++-- .../obsevent/trigger/ObsEventTrigger/config.jelly | 2 +- 5 files changed, 71 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/intel/jenkins/plugins/obsevent/GlobalManagement.java b/src/main/java/com/intel/jenkins/plugins/obsevent/GlobalManagement.java index 2c6e4f7..b4a96b6 100644 --- a/src/main/java/com/intel/jenkins/plugins/obsevent/GlobalManagement.java +++ b/src/main/java/com/intel/jenkins/plugins/obsevent/GlobalManagement.java @@ -118,7 +118,7 @@ public class GlobalManagement extends ManagementLink implements StaplerProxy, De ch.close(); conn.close(); return FormValidation.ok(Messages.Success()); - } catch (Exception ex) { + } catch (IOException ex) { return FormValidation.error("Can not connect to " + amqpHost); } } diff --git a/src/main/java/com/intel/jenkins/plugins/obsevent/ObsEventHandler.java b/src/main/java/com/intel/jenkins/plugins/obsevent/ObsEventHandler.java index 89ce1dd..a5b1980 100644 --- a/src/main/java/com/intel/jenkins/plugins/obsevent/ObsEventHandler.java +++ b/src/main/java/com/intel/jenkins/plugins/obsevent/ObsEventHandler.java @@ -24,8 +24,11 @@ package com.intel.jenkins.plugins.obsevent; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.GetResponse; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.ShutdownSignalException; + import java.io.IOException; import org.slf4j.Logger; @@ -87,6 +90,32 @@ public class ObsEventHandler extends Thread { this.obsListener = obsListener; } + class TriggerConsumer implements Consumer { + + public void handleConsumeOk(String string) { + + } + + public void handleCancelOk(String string) { + + } + + public void handleShutdownSignal(String string, ShutdownSignalException sse) { + + } + + public void handleDelivery(String string, Envelope envlp, BasicProperties bp, byte[] bytes) throws IOException { + String gotEvent = new String(bytes); + JSONObject obGotEvent = (JSONObject) JSONSerializer.toJSON(gotEvent); + + if (obsListener != null) + obsListener.triggerBuild(obGotEvent); + else { + logger.debug("Queue {} listener has been quit", amqpQueue); + } + } + } + public void setListener(ObsEventListener obsListener) { this.obsListener = obsListener; } @@ -104,30 +133,15 @@ public class ObsEventHandler extends Thread { new Authentication(amqpVhost, amqpUser, amqpPass), new Queue(amqpQueue, "", amqpQueue)); - do { - GetResponse response = amqpConn.getResponse(false); - - while (response != null) { - AMQP.BasicProperties props = response.getProps(); - String gotEvent = new String(response.getBody()); + amqpConn.basicConsume(new TriggerConsumer()); - long deliveryTag = response.getEnvelope().getDeliveryTag(); - amqpConn.acknowledge(deliveryTag, false); - - JSONObject obGotEvent = (JSONObject) JSONSerializer.toJSON(gotEvent); - - if (obsListener != null) - obsListener.triggerBuild(obGotEvent); - else { - logger.debug("Queue {} listener has been quit", amqpQueue); - break; - } - - response = amqpConn.getResponse(false); - } + do { + amqpConn.channelWait(); } while (!isShutdownInProgress()); - } catch (Exception ex) { - logger.debug("Error occured when listening queue {}, {}", amqpQueue, ex.toString()); + } catch (IOException ex) { + logger.error("Error occured when registering consumer for queue {}, {}", amqpQueue, ex.toString()); + } catch (InterruptedException ex) { + logger.error("Error occured when listening queue {}, {}", amqpQueue, ex.toString()); } } diff --git a/src/main/java/com/intel/jenkins/plugins/obsevent/amqp/AmqpConnection.java b/src/main/java/com/intel/jenkins/plugins/obsevent/amqp/AmqpConnection.java index 5df6667..5084277 100644 --- a/src/main/java/com/intel/jenkins/plugins/obsevent/amqp/AmqpConnection.java +++ b/src/main/java/com/intel/jenkins/plugins/obsevent/amqp/AmqpConnection.java @@ -28,6 +28,7 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Consumer; import com.rabbitmq.client.GetResponse; import java.io.IOException; import org.slf4j.Logger; @@ -46,6 +47,8 @@ public class AmqpConnection { private Channel channelSession; private Queue queue; + private String consumerTag = null; + public AmqpConnection(String host, Authentication authen, Queue queue) throws IOException { this(host, AMQP.PROTOCOL.PORT, authen, queue); } @@ -65,9 +68,15 @@ public class AmqpConnection { this.queue = queue; try { channelSession.queueDeclare(queue.getName(), true, false, false, null); - } catch (Exception ex) { + } catch (IOException ex) { channelSession.queueDeclarePassive(queue.getName()); } + try { + channelSession.exchangeDeclare(queue.getExchange(), "direct", true); + } catch (IOException ex) { + channelSession.exchangeDeclarePassive(queue.getExchange()); + } + channelSession.queueBind(queue.getName(), queue.getExchange(), queue.getRoutingKey()); } @@ -87,14 +96,21 @@ public class AmqpConnection { return channelSession != null && channelSession.isOpen(); } - public synchronized GetResponse getResponse(boolean autoAck) throws IOException { + public synchronized GetResponse basicGet(boolean autoAck) throws IOException { return channelSession.basicGet(queue.getName(), autoAck); } - public synchronized void acknowledge(long deliveryTag, boolean multiple) throws IOException { + public synchronized void basicAck(long deliveryTag, boolean multiple) throws IOException { channelSession.basicAck(deliveryTag, multiple); } + public synchronized void basicConsume(Consumer consumer) throws IOException { + consumerTag = channelSession.basicConsume(queue.getName(), true, consumer); + } + + public void channelWait() throws InterruptedException { + channelSession.wait(); + } /** * Disconnects the connection. */ @@ -102,6 +118,9 @@ public class AmqpConnection { if (isSessionOpen()) { logger.debug("Closing channel session."); + if (consumerTag != null) + channelSession.basicCancel(consumerTag); + channelSession.close(); channelSession = null; } diff --git a/src/main/java/com/intel/jenkins/plugins/obsevent/trigger/ObsEventTrigger.java b/src/main/java/com/intel/jenkins/plugins/obsevent/trigger/ObsEventTrigger.java index a3562d6..8b2bc91 100644 --- a/src/main/java/com/intel/jenkins/plugins/obsevent/trigger/ObsEventTrigger.java +++ b/src/main/java/com/intel/jenkins/plugins/obsevent/trigger/ObsEventTrigger.java @@ -41,12 +41,11 @@ import hudson.triggers.TriggerDescriptor; import java.util.ArrayList; import java.util.List; -import java.util.Hashtable; +import java.util.HashMap; import org.kohsuke.stapler.DataBoundConstructor; import org.kohsuke.stapler.QueryParameter; -import net.sf.json.JSONArray; import net.sf.json.JSONObject; import net.sf.json.JSONSerializer; import net.sf.json.JSONException; @@ -59,6 +58,8 @@ import com.rabbitmq.client.ConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + import com.intel.jenkins.plugins.obsevent.PluginImpl; import com.intel.jenkins.plugins.obsevent.Messages; import com.intel.jenkins.plugins.obsevent.ObsEventListener; @@ -70,13 +71,16 @@ import com.intel.jenkins.plugins.obsevent.utils.WriterUtil; import static com.intel.jenkins.plugins.obsevent.config.DefaultValues.*; public final class ObsEventTrigger extends Trigger implements ObsEventListener { - private static Hashtable metaData = new Hashtable(); + private static HashMap metaData = new HashMap(); + public static void addMetaData(String key, Object value) { metaData.put(key, value); } + public static Object getMetaData(String key) { return metaData.get(key); } + public static void removeMetaData(String key) { metaData.remove(key); } @@ -422,7 +426,7 @@ public final class ObsEventTrigger extends Trigger implements O ch.close(); conn.close(); return FormValidation.ok(Messages.Success()); - } catch (Exception e) { + } catch (IOException ex) { return FormValidation.error("Can not connect to " + amqpHost); } } @@ -443,7 +447,7 @@ public final class ObsEventTrigger extends Trigger implements O ch.close(); conn.close(); return FormValidation.ok(); - } catch (Exception e) { + } catch (IOException ex) { return FormValidation.error("Can not connect to " + value); } } diff --git a/src/main/resources/com/intel/jenkins/plugins/obsevent/trigger/ObsEventTrigger/config.jelly b/src/main/resources/com/intel/jenkins/plugins/obsevent/trigger/ObsEventTrigger/config.jelly index 358607a..f87a371 100755 --- a/src/main/resources/com/intel/jenkins/plugins/obsevent/trigger/ObsEventTrigger/config.jelly +++ b/src/main/resources/com/intel/jenkins/plugins/obsevent/trigger/ObsEventTrigger/config.jelly @@ -49,7 +49,7 @@
${%Custom Settings}
- +