$prune = 1;
}
}
+
+sub dealWithException {
+ my $error = shift;
+ if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
+ warn 'Error: (', $error->code, ') ', $error->message, "\n";
+ if ( $error->isa( 'Kafka::Exception::Connection' ) ) {
+ # Specific treatment for 'Kafka::Connection' class error
+ warn 'kafka connection execption, wait for retry...';
+ sleep(1);
+ } elsif ( $error->isa( 'Kafka::Exception::IO' ) ) {
+ # Specific treatment for 'Kafka::IO' class error
+ warn 'kafka io exception, wait for retry...';
+ sleep(1);
+ }else{
+ warn 'other kafka exception, exit...';
+ $connection->close;
+ exit;
+ }
+ } else {
+ die $error;
+ }
+}
+
sub writeToKafka {
my $cur_level = shift;
my $partition = 0;
foreach my $package (@{$packages_level{$cur_level}}) {
- $producer->send('tizen-unified',$partition,"$package");
- $partition = ($partition+1)%($max_partitions);
+ try {
+ $producer->send('tizen-unified',$partition,"$package");
+ $partition = ($partition+1)%($max_partitions);
+ } catch {
+ dealWithException($_);
+ };
}
}
sub getCurOffset {
my $topic = shift;
my $partition = shift;
- return $consumer->offset_latest($topic, $partition);
+ my $curoffset;
+ try {
+ $curoffset = $consumer->offset_latest($topic, $partition);
+ } catch {
+ dealWithException($_);
+ };
+ return $curoffset;
+}
+
+sub getMessage {
+ my $partition = shift;
+ my $cur_offset = shift;
+ my $messages;
+ try {
+ $messages = $consumer->fetch(
+ 'tizen-unified-status',
+ $partition,
+ $cur_offset,
+ $DEFAULT_MAX_BYTES
+ );
+ } catch {
+ dealWithException($_);
+ };
+ return $messages;
}
sub readFromKafka {
my $cur_level = shift;
my $succeed_num = 0;
my $curIndex = 0;
info("current offset: $cur_offset");
+ info("current building package number: $packageNum");
while($fail_num + $succeed_num < $packageNum ) {
- my $messages = $consumer->fetch(
- 'tizen-unified-status',
- $partition,
- $cur_offset,
- $DEFAULT_MAX_BYTES
- );
+ my $messages = getMessage($partition, $cur_offset);
foreach my $message ( @$messages ) {
if ( $message->valid ) {
$cur_offset = $message->next_offset;
$producer = Kafka::Producer->new( Connection => $connection );
$consumer = Kafka::Consumer->new( Connection => $connection );
} catch {
- my $error = $_;
- if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
- warn 'Error: (', $error->code, ') ', $error->message, "\n";
- exit;
- } else {
- die $error;
- }
+ dealWithException($_);
};
my $cur_level = 0;
# when a level packages building return status are all succeed,
# then go to next level packages.
while(defined $packages_level{$cur_level}) {
- try {
- info("current level: $cur_level");
- writeToKafka($cur_level);
- @fail_packages = readFromKafka($cur_level);
- if( @fail_packages ) {
- foreach my $p ( @fail_packages ) {
- print "$p ";
- }
- print "\n";
- error("these @fail_packages packages build failed");
- }
- else {
- info("@{$packages_level{$cur_level}} packages build succeed");
- }
- $cur_level++;
- } catch {
- my $error = $_;
- if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
- warn 'Error: (', $error->code, ') ', $error->message, "\n";
- exit;
- } else {
- die $error;
- }
+ info("current level: $cur_level");
+ writeToKafka($cur_level);
+ @fail_packages = readFromKafka($cur_level);
+ if( @fail_packages ) {
+ foreach my $p ( @fail_packages ) {
+ print "$p ";
+ }
+ print "\n";
+ error("these @fail_packages packages build failed");
}
+ else {
+ info("@{$packages_level{$cur_level}} packages build succeed");
+ }
+ $cur_level++;
}
+ $connection->close;
exit $ret;
}