Refine worker code --first version distributed-worker
authorbiao.wang <biao716.wang@samsung.com>
Wed, 6 Mar 2019 02:40:47 +0000 (10:40 +0800)
committerbiao.wang <biao716.wang@samsung.com>
Wed, 6 Mar 2019 03:11:33 +0000 (11:11 +0800)
Change-Id: I97aeaa13dc7c354cc7d9fedcacc7a92811f058aa

tools/worker.py

index f7306071d959eb984550dcfac19858657cdb963e..f525bffa3d4f6d22f37b1aa101c2d7dcf8e336af 100755 (executable)
@@ -74,21 +74,22 @@ class GbsBuild(object):
                os.system("rm -rf %s" % self.result)
 
 class Consumer(Process):
-       def __init__(self, threadId, partition = 0):
+       def __init__(self, threadId, topic):
                super(Consumer,self).__init__()
-               self.partition = partition
                self.threadId = threadId
+               self.topic = topic
+
        def run(self):
                print "start consume thread %d , os pid: %d" % (self.threadId, os.getpid())
                while True:
                        # while WorkerPool.taskQueue.empty() is True:
                        #       time.sleep(1)
                        # packageName = WorkerPool.taskQueue.get()
-                       self.messageConsumer = KafkaConsumer('tizen-unified', group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092')
-                       resultDict = self.messageConsumer.poll(36000,1)
-
-                       while not resultDict:
-                               resultDict = self.messageConsumer.poll(36000, 1)
+                       self.messageConsumer = KafkaConsumer(self.topic, group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092')
+                       resultDict = self.messageConsumer.poll(36000, 1)
+                       if not resultDict:
+                               self.messageConsumer.close()
+                               continue
 
                        ConsumerRecords = resultDict.values()
                        packageName = ConsumerRecords[0][0].value
@@ -117,57 +118,15 @@ class Consumer(Process):
                        gbsbuild.syncResult()
                        print "sync done"
 
-class LargeConsumer(Process):
-       def __init__(self, threadId, partition = 0):
-               super(LargeConsumer,self).__init__()
-               self.partition = partition
-               self.threadId = threadId
-
-       def run(self):
-               print "start large consume thread %d , os pid: %d" % (self.threadId, os.getpid())
-               while True:
-                       # while WorkerPool.taskQueue.empty() is True:
-                       #       time.sleep(1)
-                       # packageName = WorkerPool.taskQueue.get()
-                       self.messageConsumer = KafkaConsumer('tizen-unified-large-packages', group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092')
-                       resultDict = self.messageConsumer.poll(36000, 1)
-
-                       while not resultDict:
-                               resultDict = self.messageConsumer.poll(36000, 1)
-
-                       ConsumerRecords = resultDict.values()
-                       packageName = ConsumerRecords[0][0].value
-                       self.messageConsumer.close()
-                       print "thread %d start %s package " % (self.threadId, packageName)
-                       gbsbuild = GbsBuild(packageName,self.threadId)
-                       print "thread %d building %s package" % (self.threadId, packageName)
-                       self.messageHandler = KafkaProducer(bootstrap_servers="109.123.100.137:9092")
-                       if gbsbuild.build() == "Success":
-                       #if True:
-                               result = self.messageHandler.send("tizen-unified-status", value = "succeed", key = packageName, partition=0)
-                               if(result.get(60)):
-                                       print "send success"
-                               else:
-                                       print "send fail"
-                       else:
-                               result = self.messageHandler.send("tizen-unified-status", value = "failed", key = packageName, partition=0)
-                               if(result.get(60)):
-                                       print "send success"
-                               else:
-                                       print "send fail"
-                       self.messageHandler.close()
-                       gbsbuild.syncResult()
-                       print "sync done"
-
 class WorkerPool(object):
        capcaticy = cpu_count()
        curThreadNum = 0
        def __init__(self):
                if(WorkerPool.capcaticy == 8):
-                       self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy - 1)]
-                       self.consumers.append(LargeConsumer(WorkerPool.capcaticy - 1))
+                       self.consumers = [Consumer(i, 'tizen-unified') for i in xrange(WorkerPool.capcaticy - 1)]
+                       self.consumers.append(Consumer(WorkerPool.capcaticy - 1, 'tizen-unified-large-packages'))
                else:
-                       self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy)]
+                       self.consumers = [Consumer(i, 'tizen-unified') for i in xrange(WorkerPool.capcaticy)]
        def start(self):
                #set daemon
                for i in range(0, WorkerPool.capcaticy):
@@ -180,3 +139,5 @@ class WorkerPool(object):
 wp = WorkerPool()
 wp.start()
 print "Done"
+
+