os.system("rm -rf %s" % self.result)
class Consumer(Process):
- def __init__(self, threadId, partition = 0):
+ def __init__(self, threadId, topic):
super(Consumer,self).__init__()
- self.partition = partition
self.threadId = threadId
+ self.topic = topic
+
def run(self):
print "start consume thread %d , os pid: %d" % (self.threadId, os.getpid())
while True:
# while WorkerPool.taskQueue.empty() is True:
# time.sleep(1)
# packageName = WorkerPool.taskQueue.get()
- self.messageConsumer = KafkaConsumer('tizen-unified', group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092')
- resultDict = self.messageConsumer.poll(36000,1)
-
- while not resultDict:
- resultDict = self.messageConsumer.poll(36000, 1)
+ self.messageConsumer = KafkaConsumer(self.topic, group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092')
+ resultDict = self.messageConsumer.poll(36000, 1)
+ if not resultDict:
+ self.messageConsumer.close()
+ continue
ConsumerRecords = resultDict.values()
packageName = ConsumerRecords[0][0].value
gbsbuild.syncResult()
print "sync done"
-class LargeConsumer(Process):
- def __init__(self, threadId, partition = 0):
- super(LargeConsumer,self).__init__()
- self.partition = partition
- self.threadId = threadId
-
- def run(self):
- print "start large consume thread %d , os pid: %d" % (self.threadId, os.getpid())
- while True:
- # while WorkerPool.taskQueue.empty() is True:
- # time.sleep(1)
- # packageName = WorkerPool.taskQueue.get()
- self.messageConsumer = KafkaConsumer('tizen-unified-large-packages', group_id='tizen-worker', bootstrap_servers='109.123.100.144:9092')
- resultDict = self.messageConsumer.poll(36000, 1)
-
- while not resultDict:
- resultDict = self.messageConsumer.poll(36000, 1)
-
- ConsumerRecords = resultDict.values()
- packageName = ConsumerRecords[0][0].value
- self.messageConsumer.close()
- print "thread %d start %s package " % (self.threadId, packageName)
- gbsbuild = GbsBuild(packageName,self.threadId)
- print "thread %d building %s package" % (self.threadId, packageName)
- self.messageHandler = KafkaProducer(bootstrap_servers="109.123.100.137:9092")
- if gbsbuild.build() == "Success":
- #if True:
- result = self.messageHandler.send("tizen-unified-status", value = "succeed", key = packageName, partition=0)
- if(result.get(60)):
- print "send success"
- else:
- print "send fail"
- else:
- result = self.messageHandler.send("tizen-unified-status", value = "failed", key = packageName, partition=0)
- if(result.get(60)):
- print "send success"
- else:
- print "send fail"
- self.messageHandler.close()
- gbsbuild.syncResult()
- print "sync done"
-
class WorkerPool(object):
capcaticy = cpu_count()
curThreadNum = 0
def __init__(self):
if(WorkerPool.capcaticy == 8):
- self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy - 1)]
- self.consumers.append(LargeConsumer(WorkerPool.capcaticy - 1))
+ self.consumers = [Consumer(i, 'tizen-unified') for i in xrange(WorkerPool.capcaticy - 1)]
+ self.consumers.append(Consumer(WorkerPool.capcaticy - 1, 'tizen-unified-large-packages'))
else:
- self.consumers = [Consumer(i) for i in xrange(WorkerPool.capcaticy)]
+ self.consumers = [Consumer(i, 'tizen-unified') for i in xrange(WorkerPool.capcaticy)]
def start(self):
#set daemon
for i in range(0, WorkerPool.capcaticy):
wp = WorkerPool()
wp.start()
print "Done"
+
+