Imported Upstream version 0.8~alpha1
[platform/upstream/syncevolution.git] / src / client-api / src / c++ / common / push / CTPService.cpp
1 /*
2  * Funambol is a mobile platform developed by Funambol, Inc. 
3  * Copyright (C) 2003 - 2007 Funambol, Inc.
4  * 
5  * This program is free software; you can redistribute it and/or modify it under
6  * the terms of the GNU Affero General Public License version 3 as published by
7  * the Free Software Foundation with the addition of the following permission
8  * added to Section 15 as permitted in Section 7(a): FOR ANY PART OF THE COVERED
9  * WORK IN WHICH THE COPYRIGHT IS OWNED BY FUNAMBOL, FUNAMBOL DISCLAIMS THE
10  * WARRANTY OF NON INFRINGEMENT  OF THIRD PARTY RIGHTS.
11  * 
12  * This program is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14  * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 
15  * details.
16  * 
17  * You should have received a copy of the GNU Affero General Public License
18  * along with this program; if not, see http://www.gnu.org/licenses or write to
19  * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
20  * MA 02110-1301 USA.
21  * 
22  * You can contact Funambol, Inc. headquarters at 643 Bair Island Road, Suite
23  * 305, Redwood City, CA 94063, USA, or at email address info@funambol.com.
24  * 
25  * The interactive user interfaces in modified source and object code versions
26  * of this program must display Appropriate Legal Notices, as required under
27  * Section 5 of the GNU Affero General Public License version 3.
28  * 
29  * In accordance with Section 7(b) of the GNU Affero General Public License
30  * version 3, these Appropriate Legal Notices must retain the display of the
31  * "Powered by Funambol" logo. If the display of the logo is not reasonably
32  * feasible for technical reasons, the Appropriate Legal Notices must display
33  * the words "Powered by Funambol".
34  */
35
36 #include "base/fscapi.h"
37 #include "base/Log.h"
38 #include "base/util/utils.h"
39 #include "base/util/StringBuffer.h"
40
41 #include "push/CTPService.h"
42 #include "push/CTPParam.h"
43 #include "push/FThread.h"
44 #include "push/FSocket.h"
45
46 // Init static pointer.
47 CTPService* CTPService::pinstance = NULL;
48
49
50 /**
51  * Method to create the sole instance of CTPService
52  */
53 CTPService* CTPService::getInstance() {
54
55     if (pinstance == NULL) {
56         pinstance = new CTPService;
57     }
58     return pinstance;
59 }
60
61
62
63
64 /**
65  * Constructor: reads the CTPConfig from registry and init members.
66  */
67 CTPService::CTPService() : config(APPLICATION_URI) {
68
69     // Read config from registry
70     config.readCTPConfig();
71     LOG.debug("CTP config read");
72     
73
74     ctpSocket        = NULL;
75     ctpThread        = NULL;
76     receiverThread   = NULL;
77     heartbeatThread  = NULL;
78     cmdTimeoutThread = NULL;
79     receivedMsg      = NULL;
80     ctpState         = CTP_STATE_DISCONNECTED;
81     leaving          = false;
82
83     totalBytesSent     = 0;
84     totalBytesReceived = 0;
85 }
86
87
88 /**
89  * Denstructor: stop any active thread, and close socket
90  * connection if still active.
91  */
92 CTPService::~CTPService() {
93     
94     stopThread(ctpThread);
95     stopThread(receiverThread);
96     stopThread(heartbeatThread);
97     stopThread(cmdTimeoutThread);
98
99     closeConnection();
100
101     if (receivedMsg) {
102         delete receivedMsg;
103     }
104 }
105
106
107 /**
108  * Starts the CTP process.
109  * Creates the main CTP thread, passing handle stpThread (NULL if not created)
110  * @param stpThread  handle of STPThread: the CTPThread started here must
111  *                   wait until the STPThread has finished
112  * @return           handle of the ctpThread started
113  */
114 FThread* CTPService::startCTP() {
115
116     if (ctpThread) {
117         ctpThread->softTerminate();
118         // TODO Shall we wait until the old thread is still running?
119     }
120
121     setCtpState(CTP_STATE_DISCONNECTED);
122     leaving = false;
123     totalBytesSent     = 0;
124     totalBytesReceived = 0;
125
126     ctpThread = new CTPThread();
127     ctpThread->start();
128     return ctpThread;
129 }
130
131
132
133 /**
134  * Stops the CTP process.
135  * 1. terminates the heartbeat thread if running, to avoid sending
136  *    more READY msg at this point. 
137  * 2. starts the receiving thread if not already running, to catch 
138  *    the last OK msg from the Server
139  * 3. sends the BYE msg
140  * 4. waits for the OK msg, in case of timeout or error the 
141  *    receive thread is terminated
142  *
143  * @return  0 if ctp closed successfully
144  *          1 if ctpThread not running
145  *          2 if socket connection not available
146  *          3 if OK msg from the Server is not received (connection closed after a timeout)
147  *         -1 if errors occurred creating the receive thread
148  *         -2 if errors occurred waiting on receiverThread
149  *          
150  */
151 int32_t CTPService::stopCTP() {
152
153     if (!ctpThread) {
154         LOG.debug("No CTP thread available -> exiting.");
155         return 1;
156     }
157     if (!ctpSocket) {
158         LOG.debug("No socket connection -> exiting.");
159         return 2;
160     }
161
162     int ret = 0;
163     LOG.info("Closing CTP connection...");
164
165
166     // Terminate immediately the heartbeat and cmdTimeout threads to avoid sending 
167     // any READY msg now. Keep receiverThread alive, to receive the last OK msg.
168     if (stopThread(heartbeatThread)) {
169         LOG.debug("heartbeatThread killed");
170     }
171     if (stopThread(cmdTimeoutThread)) {
172         LOG.debug("cmdTimeoutThread killed");
173     }
174
175
176     // Start thread to receive messages from Server if not running
177     // If client authenticated, receiverThread is already running
178     if (!receiverThread) {
179         receiverThread = new ReceiverThread();
180         receiverThread->start();
181         // Just to be sure the receiveWorker has reached the 'recv' state
182         FThread::sleep(1000);
183     }
184
185
186     // Set leaving to true, so receive thread will exit after the OK msg.
187     leaving = true;
188     setCtpState(CTP_STATE_CLOSING);
189
190
191     int32_t timeout;
192     bool terminated;
193     //
194     // Send the BYE message
195     //
196     LOG.info("Sending [BYE] message...");
197     if (sendByeMsg()) {
198         LOG.error("Error sending the BYE message");
199         goto finally;
200     }
201
202     //
203     // Wait for OK msg: receive thread should exit after the last OK
204     // message sent by Server - timeout = ctpCmdTimeout (60sec).
205     //
206     timeout = config.getCtpCmdTimeout();
207     if (!timeout) {
208         timeout = 60;
209     }
210
211     terminated = receiverThread->wait(timeout * 1000);
212
213     if (terminated) {
214         LOG.debug("receiverThread terminated");
215         ret = 0;
216     } else {
217         // Timeout: kill thread -> out.
218         LOG.debug("Timeout - receiverThread will now be terminated");
219         receiverThread->softTerminate();
220         ret = 3;
221     }
222     delete receiverThread;
223     receiverThread = NULL;
224
225
226 finally:
227
228     // Close them if still running...
229     if (stopThread(receiverThread)) {
230         LOG.debug("receiverThread killed");
231     }
232     if (stopThread(ctpThread)) {
233         LOG.debug("ctpThread killed");
234     }
235
236     //
237     // Close socket connection
238     //
239     closeConnection();
240
241     return ret;
242 }
243
244
245 /**
246  * Opens the socket connection.
247  * 1. Initialize Winsock
248  * 2. Get the address of the host (use getaddrinfo())
249  * 3. Create a TCP/IP stream socket
250  * 4. Connect to the server
251  *
252  * @return  0 if no errors
253  */
254 int32_t CTPService::openConnection() {
255
256     int32_t ret = 0;
257
258     if (ctpSocket) {
259         // If an old CTP session is up, here we close threads and bring 
260         // down socket: we want to keep the latest CTP session
261         closeConnection();
262     }
263
264     LOG.info("--- Starting a new SOCKET connection ---");
265     ctpState = CTP_STATE_CONNECTING;
266     leaving  = false;
267     totalBytesSent     = 0;
268     totalBytesReceived = 0;
269
270     //
271     // Find the server
272     //
273     LOG.debug("Find the server address...");
274     LOG.info("HOSTNAME = '%s'  PORT = '%d'", config.getUrlTo().c_str(), config.getCtpPort());
275
276     //
277     // Create a TCP/IP stream socket and connect to the server
278     //
279     LOG.debug("Create SOCKET connection...");
280     StringBuffer url(config.getUrlTo().c_str());
281     ctpSocket = FSocket::createSocket(url, config.getCtpPort());
282
283     if (ctpSocket == NULL) {
284         LOG.error("Cannot create FSocket");
285         ret = -3;
286     } else {
287         LOG.info("Succesfully connected to %s!", url.c_str());
288         ctpState = CTP_STATE_CONNECTED;
289     }
290
291     if (ret != 0) {
292         ctpState = CTP_STATE_DISCONNECTED;
293     }
294     return ret;
295 }
296
297
298
299 /**
300  * Closes the socket connection.
301  * @return 0  if no errors
302  */
303 int32_t CTPService::closeConnection() {
304
305     int ret = 0;
306     if (ctpSocket) {
307         ctpSocket->close();
308         ctpSocket = NULL;
309         LOG.info("Socket connection closed");
310     }
311
312     // Just to be sure: close all ctp threads if still running
313     stopThread(cmdTimeoutThread);
314     stopThread(heartbeatThread);
315     stopThread(receiverThread);
316     
317     ctpState = CTP_STATE_DISCONNECTED;
318     LOG.debug("Total number of bytes sent = %d",     totalBytesSent);
319     LOG.debug("Total number of bytes received = %d", totalBytesReceived);
320     totalBytesSent     = 0;
321     totalBytesReceived = 0;
322
323     return ret;
324 }
325
326
327
328 /**
329  * Sends an AUTH message to the Server.
330  * A CTPMessage is filled with parameters from CTPConfig.
331  * 'sendMsg()' method is used to send the message with the ctpSocket.
332  * @return 0 if no errors
333  */
334 int32_t CTPService::sendAuthMsg(){
335
336     LOG.debug("Creating AUTH msg...");
337     ctpState = CTP_STATE_AUTHENTICATING;
338
339     // Fill CTPMessage members
340     CTPMessage authMsg;
341     authMsg.setGenericCommand(CM_AUTH);
342     authMsg.setProtocolVersion(CTP_PROTOCOL_VERSION);
343
344     // Fill parameters (read values from config)
345     CTPParam devId;
346     devId.setParamCode(P_DEVID);
347     devId.setValue(config.getDeviceId().c_str(), config.getDeviceId().length());
348     authMsg.addParam(&devId);
349
350     CTPParam username;
351     username.setParamCode(P_USERNAME);
352     username.setValue(config.getUsername().c_str(), config.getUsername().length());
353     authMsg.addParam(&username);
354
355     CTPParam cred;
356     cred.setParamCode(P_CRED);
357     // Create credentials from config props
358     StringBuffer credentials = createMD5Credentials();
359     cred.setValue(credentials.c_str(), credentials.length());
360     authMsg.addParam(&cred);
361
362     StringBuffer& fromValue = config.getUrlFrom();
363     if (fromValue.length() > 0) {
364         // FROM is used only after a JUMP status
365         CTPParam from;
366         from.setParamCode(P_FROM);
367         from.setValue(fromValue.c_str(), fromValue.length());
368         authMsg.addParam(&from);
369     }
370
371     LOG.info ("AUTH: devId='%s', user='%s', cred='%s'", config.getDeviceId().c_str(), 
372                                                         config.getUsername().c_str(),
373                                                         credentials.c_str() );
374
375     // Send message
376     return sendMsg(&authMsg);
377 }
378
379
380 /**
381  * Sends a READY message to the Server.
382  * A CTPMessage is filled with parameters from CTPConfig.
383  * 'sendMsg()' method is used to send the message with the ctpSocket.
384  * @return 0 if no errors
385  */
386 int32_t CTPService::sendReadyMsg() { 
387
388     // Fill CTPMessage members
389     CTPMessage readyMsg;
390     readyMsg.setGenericCommand(CM_READY);
391     readyMsg.setProtocolVersion(CTP_PROTOCOL_VERSION);
392
393     // Send message
394     return sendMsg(&readyMsg);
395 }
396
397
398 /**
399  * Sends a BYE message to the Server.
400  * A CTPMessage is filled with parameters from CTPConfig.
401  * 'sendMsg()' method is used to send the message with the ctpSocket.
402  * @return 0 if no errors
403  */
404 int32_t CTPService::sendByeMsg(){
405
406     // Fill CTPMessage members
407     CTPMessage byeMsg;
408     byeMsg.setGenericCommand(CM_BYE);
409     byeMsg.setProtocolVersion(CTP_PROTOCOL_VERSION);
410
411     // Send message
412     return sendMsg(&byeMsg);
413 }
414
415
416 /**
417  * Sends a generic CTPMessage to the Server.
418  * The socket must be already opened calling 'openConnection()'.
419  * The CTPMessage passed must be already filled with all 
420  * desired members, so a byte-array is formatted and sent to
421  * the Server.
422  * The 'ctpState' is set to CTP_STATE_WAITING_RESPONSE after the message 
423  * is sent, as we always wait for a Server response for each sent msg.
424  *
425  * @param message  the CTPMessage ready to be sent
426  * @return         0 if no errors
427  * @note           no timeout is set for the socket 'send' operation
428  */
429 int32_t CTPService::sendMsg(CTPMessage* message) {
430
431     if (!message) {
432         return 1;
433     }
434
435     int ret = 0;
436     char* msg = message->toByte();
437     int msgLength = message->getPackageLength();
438     if (!ctpSocket) {
439         LOG.error("sendMsg error: socket not initialized.");
440         return 2;
441     }
442
443     // Debug the message to send.
444     LOG.debug("Sending %d bytes:", msgLength);
445     hexDump(msg, msgLength);
446
447     ret = ctpSocket->writeBuffer((const int8_t* const) msg, msgLength);
448     if (ret != msgLength) {
449         LOG.error("CTPService::sendMsg - send() error (%d bytes sent)", ret);
450         return -1;
451     }
452     else {
453         LOG.debug("sendMsg - %d bytes sent", ret);
454         ctpState = CTP_STATE_WAITING_RESPONSE;          // We wait for a Server response every msg sent!
455         totalBytesSent += ret;
456         LOG.debug("Total bytes sent since beginning: %d", totalBytesSent);
457
458         // Will restore connection if no response in 60sec
459         stopThread(cmdTimeoutThread);
460         cmdTimeoutThread = new CmdTimeoutThread();
461         cmdTimeoutThread->start();
462     }
463     return 0;
464 }
465
466
467
468
469 /**
470  * Receive a CTP message through the socket connection.
471  * The message is parsed, a CTPMessage is filled and returned (the
472  * CTPMessage is internally owned by CTPService).
473  * The message could be split into more packages, so we read the first 2 bytes
474  * and keep receiving until the message is complete.
475  * The ctpState is set to CTP_STATE_READY after the msg is received successfully.
476  * 
477  * @return  the received CTPMessage (pointer to internally owned object)
478  * @note  this method calls winsock 'recv' function which is blocking
479  */
480 CTPMessage* CTPService::receiveStatusMsg() {
481
482     char buffer[MAX_MESSAGE_SIZE], msg[MAX_MESSAGE_SIZE];
483     int totalBytes     = 0;
484     int expectedLength = 0;
485
486     delete receivedMsg;
487     receivedMsg = NULL;
488
489     //
490     // Receive socket message: could be split into more pkg
491     //
492     while (1) {
493         LOG.info("Waiting for Server message...");
494         int pkgLen = ctpSocket->readBuffer((int8_t*)buffer, sizeof(buffer));
495
496         if (pkgLen <= 0) {
497             // Socket error -> exit
498             LOG.error("SOCKET recv() error");
499             goto finally;
500         } else {
501             if (totalBytes == 0) {      // first time
502                 expectedLength = extractMsgLength(buffer, pkgLen);
503                 if (!expectedLength) {
504                     goto finally;
505                 }
506                 expectedLength += 2;    // the first 2 bytes are the msg length
507             }
508             LOG.debug("Package received: %d bytes read (total = %d, expected = %d)",
509                       pkgLen, totalBytes+pkgLen, expectedLength);
510             // Check if msg too big
511             if (totalBytes+pkgLen >= MAX_MESSAGE_SIZE) {
512                 LOG.error("Message larger than %d bytes!", MAX_MESSAGE_SIZE);
513                 goto finally;
514             }
515
516             // Append bytes to the 'msg' array
517             memcpy(&msg[totalBytes], buffer, pkgLen);
518             totalBytes += pkgLen;
519
520             // Check if msg is complete
521             if (totalBytes < expectedLength) {
522                 LOG.debug("Message incomplete -> back to receive");
523                 continue;
524             }
525             else {
526                 LOG.debug("Message complete");
527                 ctpState = CTP_STATE_READY;             // ctpState back to 'ready'
528                 totalBytesReceived += totalBytes;
529                 
530                 // Debug the message received.
531                 LOG.debug("Received %d bytes:", totalBytes);
532                 hexDump(msg, totalBytes);
533                 LOG.debug("Total bytes received since beginning: %d", totalBytesReceived);
534                 break;
535             }
536         }
537     }
538
539     // Parse the message, receivedMsg is internally owned
540     receivedMsg = new CTPMessage(msg, totalBytes);
541     LOG.debug("status = 0x%02x", receivedMsg->getGenericCommand());
542
543 finally:
544     stopThread(cmdTimeoutThread);       // Msg received or error, anyway kill the cmdTimeoutThread.
545     return receivedMsg;
546 }
547
548
549
550 /**
551  * This method is called when the CTP process is connected and ready to receive
552  * notifications from the Server. Two threads are started here:
553  *
554  * 1. heartbeatWorker: used to send a READY message every 'ctpReady' seconds,
555  *                     as an heartbeat for the CTP connection.
556  * 2. receiveWorker  : keep listening to Server messages on the same socket.
557  *                     Starts the sync when a notification is received.
558  * Handles of both threads are internally owned by CTPService, so threads
559  * can be eventually terminated in case of errors.
560  *
561  * After the 2 threads have been started, we wait on the receiverThread 
562  * (timeout = ctpConnTimeout, default = INFINITE) to be able to terminate the
563  * CTP connection and restore it from scratch every 'ctpConnTimeout' seconds.
564  * 
565  * @return 0 if no errors
566  * @note this method is blocked on the receiverThread (which is blocked on socket recv())
567  *       and will exit only in case of:
568  *       - socket errors
569  *       - Server sends an error state
570  *       - CTP in 'leaving state' (Client is closing CTP)
571  */
572 int32_t CTPService::receive() {
573
574     // Safe checks...
575     if (!ctpSocket) {
576         LOG.error("CTPService::receive() error: no socket connection available");
577         return -3;
578     }
579     if (stopThread(receiverThread)) {
580         LOG.debug("receiverThread killed");
581     }
582     if (stopThread(heartbeatThread)) {
583         LOG.debug("heartbeatThread killed");
584     }
585
586     //
587     // Start thread to send 'ready' messages
588     //
589     heartbeatThread = new HeartbeatThread();
590     heartbeatThread->start();
591
592     //
593     // Start thread to receive messages from Server
594     //
595     receiverThread = new ReceiverThread();
596     receiverThread->start();
597
598     //
599     // Wait for receiverThread: it ends only in case of errors.
600     // Use 'ctpConnTimeout' as timeout on this thread.
601     //
602     int32_t ret = 0;
603     uint32_t timeout = getConfig()->getCtpConnTimeout() * 1000;
604
605     LOG.debug("Waiting for the receive thread to finish (timeout = %d sec)...", getConfig()->getCtpConnTimeout());
606     bool receiveTerminated;
607     if (timeout == 0) {
608         receiverThread->wait();
609         receiveTerminated = true;
610     } else {
611         receiveTerminated = receiverThread->wait(timeout);
612     }
613
614     if (receiveTerminated) {
615         LOG.debug("receiverThread terminated");
616         ret = 0;
617     } else {
618         LOG.debug("Timeout - receiverThread will now be terminated");
619         receiverThread->softTerminate();
620         ret = 1;
621     }
622
623     delete receiverThread;
624     receiverThread = NULL;
625
626     // Also terminate the heartbeatThread
627     if (stopThread(heartbeatThread)) {
628         LOG.debug("heartbeatThread killed");
629     }
630
631     return ret;
632 }
633
634
635
636 /**
637  * Utility to terminate a desired thread, setting its HANDLE to NULL.
638  * @param thread   the HANDLE of the thread to be stopped
639  * @param exitcode [optional] the desired exitcode
640  * @return         true if the thread has been effectively terminated
641  */
642 bool CTPService::stopThread(FThread* thread) {
643
644     if (thread) {
645         thread->softTerminate();
646         return thread->finished();
647     }
648     return false;
649 }
650
651
652 /**
653  * Formats a string for the 'cred' CTP param.
654  * User credentials are encoded using MD5schema: 
655  *   B64(MD5( B64(MD5("username":"password")):"clientNonce" ))
656  * User parameters are retrieved from CTPConfig.
657  * @return the credential string in b64 format.
658  */
659 StringBuffer CTPService::createMD5Credentials() {
660
661     char* credential = NULL;
662
663     const char*  username    = config.getAccessConfig().getUsername();
664     const char*  password    = config.getAccessConfig().getPassword();
665     StringBuffer clientNonce = config.getCtpNonce();
666
667     credential = MD5CredentialData(username, password, clientNonce.c_str());
668     if (credential) {
669         StringBuffer ret(credential);
670         delete [] credential;
671         return ret;
672     }
673
674     StringBuffer emptyRes;
675     return emptyRes;
676 }
677
678
679 ///// TODO FIXME where are these functions from????
680 bool checkStartSync(void);
681 bool checkStartSync() {
682     return false;
683 }
684
685
686
687 // TODO where should this go???
688 /**
689  * Print the message written in exadecimal code.
690  */
691 void CTPService::hexDump(char *buf, int len) {
692
693     if (LOG.getLevel() < LOG_LEVEL_DEBUG) {
694         return;
695     }
696
697     char* tmp = new char[len*8 + 3];
698     tmp[0] = '[';
699     int pos = 1;
700     for (int i=0; i<len; i++) {
701         sprintf(&tmp[pos], "%02x ", buf[i]);
702         pos += 3;
703     }
704     tmp[pos-1] = ']';
705     tmp[pos] = 0;
706     LOG.debug("%s", tmp);
707     delete [] tmp;
708 }
709
710 /**
711  * Utility to extract the message length from a CTP package.
712  * The message length is stored in the first 2 bytes of the CTP package.
713  * @param  package    the input package
714  * @param  packageLen the length of the passed package
715  * @return            the message length
716  */
717 int CTPService::extractMsgLength(const char* package, int packageLen) {
718
719     if (packageLen < 2) {
720         LOG.error("Unable to read the package length: not enough bytes received (%d)", packageLen);
721         return 0;
722     }
723
724     int messageLen  = (int)((unsigned char)package[0]);
725     int messageLen1 = (int)((unsigned char)package[1]);
726
727     messageLen <<= 8;
728     messageLen = messageLen | messageLen1;
729     return messageLen;
730 }
731
732
733
734 //////////////////////////////////////////////////////////////////////////////
735 // CmdTimeoutThread
736 //////////////////////////////////////////////////////////////////////////////
737 CmdTimeoutThread::CmdTimeoutThread() {
738 }
739
740 /**
741  * Thread used to check if a response arrived in ctpCmdTimeout seconds.
742  * If not, the CTP connection will be pulled down so that ctpThread
743  * will restore the whole CTP connection.
744  * This thread is started every time a message is sent.
745  * @param lpv  unused
746  */
747 void CmdTimeoutThread::run() {
748     LOG.debug("Starting cmdTimeoutWorker thread");
749
750     // TODO PowerPolicyNotify(PPN_UNATTENDEDMODE, TRUE);
751
752     // Load the timeout (ctpCmdTimeout)
753     CTPService* ctpService = CTPService::getInstance();
754     int32_t timeout = ctpService->getConfig()->getCtpCmdTimeout();
755     if (!timeout) {
756         timeout = 180;      // 3 minutes max
757     }
758
759     FThread::sleep(timeout * 1000);
760
761     // Check if we were killed, then there is nothing to do
762     if (terminate) {
763         return;
764     }
765
766     if ( (ctpService->isLeaving() == false) &&
767          (ctpService->getCtpState() == CTPService::CTP_STATE_WAITING_RESPONSE) ) {
768         // Response not received -> close ctp connection so that
769         // the receiveThread will exit with error, so ctpThread will restore ctp.
770         LOG.info("No response received from Server after %d seconds: closing CTP", timeout);
771         ctpService->closeConnection();
772     }
773
774     // TODO PowerPolicyNotify(PPN_UNATTENDEDMODE, FALSE);
775     LOG.debug("Exiting cmdTimeoutWorker thread");
776 }
777
778
779 //////////////////////////////////////////////////////////////////////////////
780 // CTPThread
781 //////////////////////////////////////////////////////////////////////////////
782 CTPThread::CTPThread() : FThread(),
783                          errorCode(0)
784 {
785 }
786
787 /**
788  * This is the main CTP thread. Manages the CTP connection using CTPService:
789  * 1. waits until the stpThread is finished, if successfully we exit this thread
790  *    (STP is working), otherwise continue with CTP (STP is not working)
791  * 2. Opens the CTP socket connection
792  * 3. Authenticate the client
793  * 4. Starts threads to receive messages and to send 'ready' msg.
794  * 
795  * @param lpv  the HANDLE of the stpThread running (NULL if not started)
796  * @return a value >= 0 if no errors:
797  *         0 if CTP closed correctly (Client closed CTP)
798  *         1 if STP is working correctly, CTP not started
799  *         2 if CTP client not authenticated (at the 2nd AUTH msg)
800  *         3 if CTP client unauthorized
801  *         4 if CTP client authentication forbidden
802  *         5 if STP not working but CTP not started either
803  *         6 if CTP already active in another instance
804  *         <0 if errors
805  *
806  * @note This thread exits only in case of:
807  *   - STPThread ended succesfully (STP is working, CTP not started)
808  *   - CTP client not authenticated
809  *   - CTP in 'leaving state' (Client is closing CTP)
810  *   - CTP started from another instance (code 6)
811  * In case of socket/Server errors, the whole CTP connection is pulled down
812  * and restored again in a while (after 'ctpRetry' seconds).
813  */
814 void CTPThread::run() {
815
816     LOG.debug("Starting ctpWorker thread");
817     int32_t ret = 0;
818
819     // Get the unique instance of CTPService.
820     CTPService* ctpService = CTPService::getInstance();
821
822     //TODO PowerPolicyNotify(PPN_UNATTENDEDMODE, TRUE);
823
824     //
825     // Wait for STPThread to finish.
826     // - if handle is NULL, no STP has been done  -> go with CTP.
827     // - if STPThread returned error              -> go with CTP.
828     // - if STPThread returned ok                 -> exit now (STP is running).
829     //
830     int32_t timeout = ctpService->getConfig()->getNotifyTimeout();
831
832     // Refresh configuration, save the ctpRetry original value in a buffer
833     ctpService->getConfig()->readCTPConfig();
834     int32_t defaultCtpRetry = ctpService->getConfig()->getCtpRetry();
835
836
837     // Start the CTP connection process.
838     // *********************************
839     // Infinite cycle: always restore the connection if it's lost 
840     // or in case of errors. Exit only if leavingState flag is up.
841     //
842     bool restore = false;
843     bool jump    = false;
844     while (ctpService->isLeaving() == false) {
845
846         if (restore) {
847             // Restoring from a broken connection: close socket and wait some seconds.
848             LOG.debug("Restoring CTP connection...");
849             ctpService->closeConnection();
850
851             int32_t ctpRetry = ctpService->getConfig()->getCtpRetry();
852             int32_t maxCtpRetry = ctpService->getConfig()->getMaxCtpRetry();
853             int32_t sleepTime = ctpRetry < maxCtpRetry ? ctpRetry : maxCtpRetry;
854             LOG.info("CTP will be restored in %d seconds...", sleepTime);
855             FThread::sleep(sleepTime * 1000);
856
857             // CTP could have been restarted during the sleep time!
858             // So exit if the ctp is active.
859             if (ctpService->getCtpState() > CTPService::CTP_STATE_DISCONNECTED) {
860                 LOG.debug("CTP already active -> don't restore ctp");
861                 errorCode = 6;
862                 goto finally;
863             }
864
865             // Save the new value to config
866             sleepTime *= CTP_RETRY_INCREASE_FACTOR;     // Double the retry time
867             ctpService->getConfig()->setCtpRetry(sleepTime);
868             restore = false;
869         }
870         if (jump) {
871             // Restoring from a JUMP status: close socket and reconnect immediately.
872             LOG.debug("Restoring CTP connection from a JUMP...");
873             ctpService->closeConnection();
874             jump = false;
875         }
876
877
878         while (checkStartSync()) {
879             // Wait for sync to finish: CTP AUTH phase uses the same clientNonce
880             // used by sync authentication, so avoid interferences.
881             LOG.debug("Waiting for sync to finish...");
882             FThread::sleep(5000);
883         }
884
885         //
886         // Open socket connection
887         // ----------------------
888         LOG.debug("Open CTP connection...");
889         if (ctpService->openConnection()) {
890             restore = true;
891             continue;
892         }
893
894         //
895         // Authentication
896         // --------------
897         LOG.debug("Sending [AUTH] message...");
898         if (ctpService->sendAuthMsg()) {
899             restore = true;
900             continue;
901         }
902         
903         // Receiving AUTH status message
904         CTPMessage* authStatusMsg = ctpService->receiveStatusMsg();
905         if (!authStatusMsg) {
906             LOG.error("Error receiving AUTH status msg.");
907             restore = true;
908             continue;
909         }
910         char authStatus = authStatusMsg->getGenericCommand();
911         CTPParam* param;
912         char* buf = NULL;
913         switch (authStatus) {
914
915             case ST_NOT_AUTHENTICATED:
916                 //
917                 // Retry with new nonce received
918                 //
919                 LOG.info("Client not authenticated: retry with new nonce");
920                 if (saveNonceParam(authStatusMsg) == false) {
921                     // NONCE not found -> restore connection
922                     LOG.error("Error receiving NON_AUTHENTICATED Status message: NONCE param is missing");
923                     restore = true;
924                     continue;
925                 }
926
927                 // Send 2nd auth msg
928                 LOG.info("Sending CTP authentication message...");
929                 if (ctpService->sendAuthMsg()) {
930                     restore = true;
931                     continue;
932                 }
933
934                 // Check 2nd status received, only OK allowed
935                 authStatusMsg = ctpService->receiveStatusMsg();
936                 authStatus = authStatusMsg->getGenericCommand();
937                 if (authStatus == ST_OK) {
938                     // *** Authentication OK! *** 
939                     // Save nonce if any (go to case ST_OK)
940                 }
941                 else if (authStatus == ST_NOT_AUTHENTICATED) {
942                     LOG.info("CTP error: Client not authenticated. Please check your credentials.");
943                     //showInvalidCredentialsMsgBox(INVALID_CREDENTIALS, 10);      // code 401
944                     errorCode = 2;
945                     goto error;
946                 }
947                 else if (authStatus == ST_UNAUTHORIZED) {
948                     LOG.info("CTP error: Client unauthorized by the Server. Please check your credentials.");
949                     //showInvalidCredentialsMsgBox(PAYMENT_REQUIRED, 10);         // code 402
950                     errorCode = 2;
951                     goto error;
952                 }
953                 else {
954                     LOG.info("CTP error: received status '0x%02x'.", authStatus);
955                     errorCode = 2;
956                     goto error;
957                 }
958                 // no 'break': need to enter into case ST_OK...
959
960
961             case ST_OK:
962                 // *** Authentication OK! *** 
963                 LOG.info("Client authenticated successfully!");
964                 ctpService->setCtpState(CTPService::CTP_STATE_READY);
965                 ctpService->getConfig()->setCtpRetry(defaultCtpRetry);     // Restore the original ctpRetry time
966                 // Save nonce if any
967                 if (saveNonceParam(authStatusMsg) == false) {
968                     LOG.info("No new nonce received.");
969                 }
970                 break;
971
972
973             // --- note: JUMP not implemented Server side ----
974             case ST_JUMP:
975                 //
976                 // Jump to desired server 'to' and save the 'from' value
977                 //
978                 LOG.info("Server requested a JUMP");
979                 if (authStatusMsg->params.size() < 1) {
980                     // Expected FROM and TO params -> restore connection
981                     LOG.error("Error receiving JUMP Status message: some parameter is missing");
982                     restore = true;
983                     continue;
984                 }
985
986                 // Read FROM and TO parameters and update CTPConfig
987                 param = (CTPParam*)authStatusMsg->params.front();
988                 while (param) {
989                     int valueLen = param->getValueLength();
990                     void* value  = param->getValue();
991                     if (param->getParamCode() == P_FROM) {
992                         char* from = stringdup((char*)value, valueLen);
993                         ctpService->getConfig()->setUrlFrom(from);
994                         delete [] from;
995                     }
996                     else if (param->getParamCode() == P_TO) {
997                         char* url = stringdup((char*)value, valueLen);
998                         StringBuffer to = ctpService->getConfig()->getHostName(url);
999                         int port  = ctpService->getConfig()->getHostPort(url);
1000                         ctpService->getConfig()->setUrlTo(to);
1001                         ctpService->getConfig()->setCtpPort(port);
1002                         delete [] url;
1003                     }
1004                     else {
1005                         // Unexpected Status param -> restore connection
1006                         LOG.error("Error receiving JUMP Status message: unexpected \
1007                                    param '0x%02x'", (*param).getParamCode());
1008                         restore = true;
1009                         continue;
1010                     }
1011                     param = (CTPParam*)authStatusMsg->params.next();
1012                 }
1013
1014                 // Now restore the socket connection to the new Server address...
1015                 LOG.debug("JUMP status received: FROM %s TO %s:%d",
1016                                  ctpService->getConfig()->getUrlFrom().c_str(), 
1017                                  ctpService->getConfig()->getUrlTo().c_str(), 
1018                                  ctpService->getConfig()->getCtpPort() );
1019                 jump = true;
1020                 continue;
1021
1022
1023             case ST_UNAUTHORIZED:
1024                 // Not authorized -> save nonce if any, exit thread
1025                 LOG.info("Unauthorized by the Server, please check your credentials.");
1026                 //showInvalidCredentialsMsgBox(PAYMENT_REQUIRED, 10);         // code 402
1027                 if (saveNonceParam(authStatusMsg) == false) {
1028                     LOG.debug("No new nonce received.");
1029                 }
1030                 errorCode = 3;
1031                 goto error;
1032
1033
1034             case ST_FORBIDDEN:
1035                 // Authentication forbidden -> exit thread
1036                 LOG.info("Authentication forbidden by the Server, please check your credentials.");
1037                 //showInvalidCredentialsMsgBox(FORBIDDEN, 10);                // code 403
1038                 errorCode = 4;
1039                 goto error;
1040
1041             case ST_ERROR:
1042                 // Error -> restore connection
1043                 LOG.info("Received ERROR status from Server: restore ctp connection");
1044                 //printErrorStatus(authStatusMsg);
1045                 restore = true;
1046                 continue;
1047
1048             default:
1049                 // Unexpected status -> restore connection
1050                 LOG.error("Unexpected status received '0x%02x' -> restore ctp connection", authStatus);
1051                 restore = true;
1052                 continue;
1053         }
1054
1055         //
1056         // Creates the thread that will be blocked waiting for server msg
1057         // and waits until it ends (errors or ctpConnTimeout).
1058         //
1059         ctpService->receive();
1060
1061         // If here, connection was broken or ctpConnTimeout -> restore connection
1062         restore = true;
1063     }
1064     goto finally;
1065
1066
1067 error:
1068     // Restore the original ctpRetry time
1069     ctpService->getConfig()->setCtpRetry(defaultCtpRetry);
1070     ctpService->closeConnection();
1071
1072 finally:
1073     // TODO PowerPolicyNotify(PPN_UNATTENDEDMODE, FALSE);
1074     LOG.debug("Exiting ctpWorker thread");
1075 }
1076
1077 /**
1078  * Utility to extract the nonce parameter from CTPMessage passed and
1079  * save it to 'CTPConfig::ctpNonce'.
1080  * The configuration (CTPConfig) is also saved to registry.
1081  * The nonce is expected to be the first parameter of the message.
1082  * @param authStatusMsg  the CTPMessage to analyze
1083  * @return               true if the nonce has been saved, 
1084  *                       false if nonce not found
1085  */
1086 bool CTPThread::saveNonceParam(CTPMessage* authStatusMsg) {
1087
1088     if (authStatusMsg->params.size() == 0) {
1089         return false;
1090     }
1091
1092     // Get nonce param
1093     int nonceLen = 0;
1094     void* nonce  = NULL;
1095     CTPParam* param = (CTPParam*)authStatusMsg->params.front();
1096     if (param && param->getParamCode() == P_NONCE) {
1097         nonceLen = (*param).getValueLength();
1098         nonce    = (*param).getValue();
1099     }
1100     else {
1101         return false;
1102     }
1103     if (!nonce || nonceLen==0) {
1104         return false;
1105     }
1106
1107
1108     // Nonce is encoded in b64.
1109     char* b64Nonce = new char[((nonceLen/3+1)<<2) + 32];
1110     int len = b64_encode(b64Nonce, nonce, nonceLen);
1111     b64Nonce[len] = 0;
1112
1113     //LOG.debug("New nonce received:");
1114     //hexDump((char*)nonce, nonceLen);
1115     LOG.debug("New nonce received: '%s'", b64Nonce);
1116
1117     // Save new nonce to config, and save config to registry!
1118     CTPService* ctpService = CTPService::getInstance();
1119     ctpService->getConfig()->setCtpNonce(b64Nonce);
1120     ctpService->getConfig()->saveCTPConfig();
1121
1122     delete [] b64Nonce;
1123     return true;
1124 }
1125
1126
1127
1128 //////////////////////////////////////////////////////////////////////////////
1129 // ReceiverThread
1130 //////////////////////////////////////////////////////////////////////////////
1131 ReceiverThread::ReceiverThread() : FThread(),
1132                                    errorCode(0)
1133 {
1134 }
1135
1136 void ReceiverThread::run() {
1137  
1138     LOG.debug("Starting receiveWorker thread");
1139     errorCode = 0;
1140     CTPService* ctpService = CTPService::getInstance();
1141
1142     //TODO PowerPolicyNotify(PPN_UNATTENDEDMODE, TRUE);
1143
1144     // Keep the socket open, always in 'receive' state.
1145     // Exit only in case of errors or if we're leaving CTP.
1146     while (ctpService->isLeaving() == false) {
1147
1148         // Receive msg from Server
1149         CTPMessage* statusMsg = ctpService->receiveStatusMsg();
1150         if (!statusMsg) {
1151             // Error on receiving -> exit thread
1152             errorCode = -1;
1153             goto finally;
1154         }
1155
1156         char status = statusMsg->getGenericCommand();
1157         SyncNotification* sn = NULL;
1158         switch (status) {
1159
1160             case ST_OK:
1161                 // 'OK' to our 'READY' command -> back to recv
1162                 LOG.info("[OK] received -> back to receive state");
1163                 break;
1164
1165             case ST_SYNC:
1166                 //
1167                 // Start the sync!
1168                 // ---------------
1169                 LOG.info("[SYNC] notification received! Starting the sync");
1170                 sn = statusMsg->getSyncNotification();
1171                 //TODO startSyncFromSAN(sn);
1172
1173                 // Back to recv
1174                 LOG.debug("Back to receive state");
1175                 break;
1176
1177             case ST_ERROR:
1178                 LOG.debug("[ERROR] message received");
1179                 //printErrorStatus(statusMsg);
1180             default:
1181                 // Error from server -> exit thread (will try restoring the socket from scratch)
1182                 LOG.debug("Bad status received (code 0x%02x), exiting thread", status);
1183                 errorCode = -2;
1184                 goto finally;
1185         }
1186     }
1187
1188 finally:
1189     //TODO PowerPolicyNotify(PPN_UNATTENDEDMODE, FALSE);
1190     LOG.debug("Exiting receiveWorker thread");
1191 }
1192
1193 //////////////////////////////////////////////////////////////////////////////
1194 // HeartbeatThread
1195 //////////////////////////////////////////////////////////////////////////////
1196 /**
1197  * Thread used to send 'READY' messages as a heartbeat, every 'ctpReady' seconds.
1198  * It ends only if CTP is in 'leaving state' (Client stops CTP).
1199  * @param lpv  unused
1200  * @return 0 if no errors
1201  */
1202 HeartbeatThread::HeartbeatThread() : FThread()
1203 {
1204 }
1205
1206 void HeartbeatThread::run() {
1207     LOG.debug("Starting Heartbeat thread");
1208     errorCode = 0;
1209
1210     //TODO PowerPolicyNotify(PPN_UNATTENDEDMODE, TRUE);
1211
1212     // Load the sleep interval (ctpReady)
1213     CTPService* ctpService = CTPService::getInstance();
1214     int32_t sleepInterval = ctpService->getConfig()->getCtpReady();
1215
1216     // Send 'ready' message to Server and sleep ctpReady seconds
1217     while (ctpService->isLeaving() == false) {
1218         LOG.info("Sending [READY] message...");
1219         if (ctpService->sendReadyMsg()) {
1220             LOG.debug("Error sending READY msg");
1221             errorCode = 1;
1222             break;
1223         }
1224         LOG.debug("Next ready msg will be sent in %d seconds...", sleepInterval);
1225         FThread::sleep(sleepInterval * 1000);
1226     }
1227
1228     //TODO PowerPolicyNotify(PPN_UNATTENDEDMODE, FALSE);
1229     LOG.debug("Exiting heartbeatWorker thread");
1230 }
1231
1232