new script for ruote_workitems job to dispatch obsevent
authorLi Yi <yi.a.li@intel.com>
Tue, 7 Aug 2012 08:12:47 +0000 (16:12 +0800)
committerLi Yi <yi.a.li@intel.com>
Tue, 7 Aug 2012 08:12:47 +0000 (16:12 +0800)
two slave job 'requests' and 'repomaker' will keep listening
to amqp queues which specified by job name for the dispatched events

obsevent-dispatcher.py [new file with mode: 0644]

diff --git a/obsevent-dispatcher.py b/obsevent-dispatcher.py
new file mode 100644 (file)
index 0000000..c05d2cf
--- /dev/null
@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+
+try:
+    import json
+except ImportError:
+    import simplejson as json
+
+import amqplib.client_0_8 as amqp
+from common.envparas import export
+
+PARAM_LIST = ['AMQP_HOST',
+              'AMQP_VHOST',
+              'AMQP_USER',
+              'AMQP_PASSWD',
+              'AMQP_QUEUE',
+              'OBS_EVENT_STRING',
+              'REQUESTS_QUEUE_NAME',
+              'REPOMAKER_QUEUE_NAME']
+
+export(PARAM_LIST, locals())
+
+def send_to_queue(queue_name, msg_body):
+    """ send message to the specified AMQP queue
+    """
+    conn = amqp.Connection(host = AMQP_HOST,
+                           userid = AMQP_USER,
+                           password = AMQP_PASSWD,
+                           virtual_host = AMQP_VHOST)
+
+    chan = conn.channel()
+    chan.access_request('/data', active=True, write=True)
+
+    chan.queue_declare(queue=queue_name, durable=True, auto_delete=False)
+    chan.queue_bind(queue=queue_name, exchange="", routing_key=queue_name)
+
+    msg = amqp.Message(msg_body, content_encoding='UTF-8')
+    msg.properties['delivery_mode'] = 2
+
+    chan.basic_publish(msg, exchange="", routing_key=queue_name)
+
+    chan.close()
+    conn.close()
+
+def main():
+    event_dict = json.loads(OBS_EVENT_STRING)
+    requests_queue = REQUESTS_QUEUE_NAME or 'requests'
+    repomaker_queue = REPOMAKER_QUEUE_NAME or 'repomaker'
+
+    if event_dict and event_dict.has_key('fields'):
+        if event_dict['fields'].has_key('obsEvent'):
+            event_type = event_dict['fields']['obsEvent']['type']
+            if 'OBS_SRCSRV_REQUEST' in event_type:
+                send_to_queue(requests_queue, OBS_EVENT_STRING)
+                print "event string has been sent to %s queue." % requests_queue
+            elif 'OBS_REPO' in event_type:
+                send_to_queue(repomaker_queue, OBS_EVENT_STRING)
+                print "event string has been sent to %s queue" % repomaker_queue
+
+if __name__ == '__main__':
+    main()