}
}
+sub getCurOffset {
+ my $topic = shift;
+ my $partition = shift;
+ return $consumer->offset_latest($topic, $partition);
+}
sub readFromKafka {
my $cur_level = shift;
my $partition = 0;
my @fail_packages;
my @succeed_packages;
- my $cur_offset = 0;
+ my $cur_offset = getCurOffset("tizen-unified-status", $partition);
my $packageNum = @{$packages_level{$cur_level}};
+ info("current offset: $cur_offset");
while(@fail_packages + @succeed_packages < $packageNum ) {
my $messages = $consumer->fetch(
'tizen-unified-status',
$cur_offset = $message->next_offset;
if($message->payload eq "failed") {
push(@fail_packages, $message->key);
+ info("package: $message->key build failed");
} else {
push(@succeed_packages, $message->key);
+ info("package: $message->key build success");
}
} else {
info($message->error);