add exception handler for connection with kafka
authorjingui.ren <jingui.ren@samsung.com>
Tue, 22 Jan 2019 06:55:16 +0000 (14:55 +0800)
committerjingui.ren <jingui.ren@samsung.com>
Tue, 22 Jan 2019 06:55:16 +0000 (14:55 +0800)
Change-Id: I13af2661f0ebcbd7c9bb2f6c709345647d7cc19e

depanneur

index 439330af8d66558616e46cf767847816962b5b7c..f8517e657e5131ecc1c2aec62cdd15a3a280ee06 100755 (executable)
--- a/depanneur
+++ b/depanneur
@@ -2539,19 +2539,69 @@ sub dir_wanted {
                $prune = 1;
        }
 }
+
+sub dealWithException {
+       my $error = shift;
+       if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
+           warn 'Error: (', $error->code, ') ',  $error->message, "\n";
+        if ( $error->isa( 'Kafka::Exception::Connection' ) ) {
+            # Specific treatment for 'Kafka::Connection' class error
+            warn 'kafka connection execption, wait for retry...';
+                       sleep(1);
+        } elsif ( $error->isa( 'Kafka::Exception::IO' ) ) {
+            # Specific treatment for 'Kafka::IO' class error
+            warn 'kafka io exception, wait for retry...';
+            sleep(1);
+        }else{
+                       warn 'other kafka exception, exit...';
+            $connection->close;
+            exit;
+               }
+       } else {
+           die $error;
+       }
+}
+
 sub writeToKafka {
     my $cur_level = shift;
     my $partition = 0;
     foreach my $package (@{$packages_level{$cur_level}}) {
-        $producer->send('tizen-unified',$partition,"$package");
-               $partition = ($partition+1)%($max_partitions);
+               try {
+            $producer->send('tizen-unified',$partition,"$package");
+                   $partition = ($partition+1)%($max_partitions);
+        } catch {
+               dealWithException($_);
+               };
        }
 }
 
 sub getCurOffset {
     my $topic = shift;
        my $partition = shift;
-       return $consumer->offset_latest($topic, $partition);    
+       my $curoffset;
+       try {
+               $curoffset = $consumer->offset_latest($topic, $partition);
+       } catch {
+               dealWithException($_);
+       };
+       return $curoffset;
+}
+
+sub getMessage {
+       my $partition = shift;
+       my $cur_offset = shift;
+       my $messages;
+       try {
+               $messages = $consumer->fetch(
+                       'tizen-unified-status',
+                       $partition,
+                       $cur_offset,
+                       $DEFAULT_MAX_BYTES
+               );
+       } catch {
+               dealWithException($_);
+       };
+       return $messages;
 }
 sub readFromKafka {
     my $cur_level = shift;
@@ -2565,13 +2615,9 @@ sub readFromKafka {
     my $succeed_num = 0;
     my $curIndex = 0;
        info("current offset: $cur_offset");
+       info("current building package number: $packageNum");
     while($fail_num + $succeed_num < $packageNum ) {
-        my $messages = $consumer->fetch(
-            'tizen-unified-status',
-            $partition,
-            $cur_offset,
-            $DEFAULT_MAX_BYTES
-        );
+        my $messages = getMessage($partition, $cur_offset);
         foreach my $message ( @$messages ) {
             if ( $message->valid ) {
                 $cur_offset = $message->next_offset;
@@ -2886,13 +2932,7 @@ if ($enable_cluster == 1) {
         $producer = Kafka::Producer->new( Connection => $connection );
         $consumer = Kafka::Consumer->new( Connection => $connection );
     } catch {
-        my $error = $_;
-        if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
-            warn 'Error: (', $error->code, ') ',  $error->message, "\n";
-            exit;
-        } else {
-            die $error;
-        }
+       dealWithException($_); 
     };
 
        my $cur_level = 0;
@@ -2904,31 +2944,22 @@ if ($enable_cluster == 1) {
     # when a level packages building return status are all succeed,
     # then go to next level packages.
        while(defined $packages_level{$cur_level}) {
-       try {
-                       info("current level: $cur_level");      
-                       writeToKafka($cur_level);
-           @fail_packages = readFromKafka($cur_level);
-                       if( @fail_packages ) {
-                               foreach my $p ( @fail_packages ) {
-                                       print "$p ";
-               }
-                               print "\n";
-                               error("these @fail_packages packages build failed");
-                       }
-                       else {
-                               info("@{$packages_level{$cur_level}} packages build succeed");
-                       }
-                       $cur_level++;
-               } catch {
-                       my $error = $_;
-               if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
-                   warn 'Error: (', $error->code, ') ',  $error->message, "\n";
-                   exit;
-               } else {
-                   die $error;
-               }
+               info("current level: $cur_level");      
+               writeToKafka($cur_level);
+           @fail_packages = readFromKafka($cur_level);
+               if( @fail_packages ) {
+                       foreach my $p ( @fail_packages ) {
+                               print "$p ";
+               }
+                       print "\n";
+                       error("these @fail_packages packages build failed");
                }
+               else {
+                       info("@{$packages_level{$cur_level}} packages build succeed");
+               }
+               $cur_level++;
        }
+       $connection->close;
        exit $ret;
 }