Upload Tizen:Base source
[toolchains/nspr.git] / mozilla / nsprpub / pr / tests / thrpool_server.c
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* ***** BEGIN LICENSE BLOCK *****
3  * Version: MPL 1.1/GPL 2.0/LGPL 2.1
4  *
5  * The contents of this file are subject to the Mozilla Public License Version
6  * 1.1 (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  * http://www.mozilla.org/MPL/
9  *
10  * Software distributed under the License is distributed on an "AS IS" basis,
11  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
12  * for the specific language governing rights and limitations under the
13  * License.
14  *
15  * The Original Code is the Netscape Portable Runtime (NSPR).
16  *
17  * The Initial Developer of the Original Code is
18  * Netscape Communications Corporation.
19  * Portions created by the Initial Developer are Copyright (C) 1999-2000
20  * the Initial Developer. All Rights Reserved.
21  *
22  * Contributor(s):
23  *
24  * Alternatively, the contents of this file may be used under the terms of
25  * either the GNU General Public License Version 2 or later (the "GPL"), or
26  * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27  * in which case the provisions of the GPL or the LGPL are applicable instead
28  * of those above. If you wish to allow use of your version of this file only
29  * under the terms of either the GPL or the LGPL, and not to allow others to
30  * use your version of this file under the terms of the MPL, indicate your
31  * decision by deleting the provisions above and replace them with the notice
32  * and other provisions required by the GPL or the LGPL. If you do not delete
33  * the provisions above, a recipient may use your version of this file under
34  * the terms of any one of the MPL, the GPL or the LGPL.
35  *
36  * ***** END LICENSE BLOCK ***** */
37
38 /***********************************************************************
39 **
40 ** Name: thrpool.c
41 **
42 ** Description: Test threadpool functionality.
43 **
44 ** Modification History:
45 */
46 #include "primpl.h"
47
48 #include "plgetopt.h"
49
50 #include <stdio.h>
51 #include <string.h>
52 #include <errno.h>
53 #ifdef XP_UNIX
54 #include <sys/mman.h>
55 #endif
56 #if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS)
57 #include <pthread.h>
58 #endif
59
60 /* for getcwd */
61 #if defined(XP_UNIX) || defined (XP_OS2) || defined(XP_BEOS)
62 #include <unistd.h>
63 #elif defined(XP_PC)
64 #include <direct.h>
65 #endif
66
67 #ifdef WIN32
68 #include <process.h>
69 #endif
70
71 static int _debug_on = 0;
72 static char *program_name = NULL;
73 static void serve_client_write(void *arg);
74
75 #include "obsolete/prsem.h"
76
77 #ifdef XP_PC
78 #define mode_t int
79 #endif
80
81 #define DPRINTF(arg) if (_debug_on) printf arg
82
83
84 #define BUF_DATA_SIZE    (2 * 1024)
85 #define TCP_MESG_SIZE    1024
86 #define NUM_TCP_CLIENTS  10     /* for a listen queue depth of 5 */
87
88
89 #define NUM_TCP_CONNECTIONS_PER_CLIENT  10
90 #define NUM_TCP_MESGS_PER_CONNECTION    10
91 #define TCP_SERVER_PORT                         10000
92 #define SERVER_MAX_BIND_COUNT           100
93
94 #ifdef WINCE
95 char *getcwd(char *buf, size_t size)
96 {
97     wchar_t wpath[MAX_PATH];
98     _wgetcwd(wpath, MAX_PATH);
99     WideCharToMultiByte(CP_ACP, 0, wpath, -1, buf, size, 0, 0);
100 }
101  
102 #define perror(s)
103 #endif
104
105 static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS;
106 static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT;
107 static PRInt32 tcp_mesg_size = TCP_MESG_SIZE;
108 static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION;
109 static void TCP_Server_Accept(void *arg);
110
111
112 int failed_already=0;
113 typedef struct buffer {
114     char    data[BUF_DATA_SIZE];
115 } buffer;
116
117
118 typedef struct Server_Param {
119     PRJobIoDesc iod;    /* socket to read from/write to    */
120     PRInt32     datalen;    /* bytes of data transfered in each read/write */
121     PRNetAddr   netaddr;
122     PRMonitor   *exit_mon;    /* monitor to signal on exit            */
123     PRInt32             *job_counterp;    /* counter to decrement, before exit        */
124     PRInt32             conn_counter;    /* counter to decrement, before exit        */
125         PRThreadPool *tp;
126 } Server_Param;
127
128 typedef struct Serve_Client_Param {
129     PRJobIoDesc iod;    /* socket to read from/write to    */
130     PRInt32    datalen;    /* bytes of data transfered in each read/write */
131     PRMonitor *exit_mon;    /* monitor to signal on exit            */
132     PRInt32 *job_counterp;    /* counter to decrement, before exit        */
133         PRThreadPool *tp;
134 } Serve_Client_Param;
135
136 typedef struct Session {
137     PRJobIoDesc iod;    /* socket to read from/write to    */
138         buffer  *in_buf;
139         PRInt32 bytes;
140         PRInt32 msg_num;
141         PRInt32 bytes_read;
142     PRMonitor *exit_mon;    /* monitor to signal on exit            */
143     PRInt32 *job_counterp;    /* counter to decrement, before exit        */
144         PRThreadPool *tp;
145 } Session;
146
147 static void
148 serve_client_read(void *arg)
149 {
150         Session *sp = (Session *) arg;
151     int rem;
152     int bytes;
153     int offset;
154         PRFileDesc *sockfd;
155         char *buf;
156         PRJob *jobp;
157
158         PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
159
160         sockfd = sp->iod.socket;
161         buf = sp->in_buf->data;
162
163     PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
164         PR_ASSERT(sp->bytes_read < sp->bytes);
165
166         offset = sp->bytes_read;
167         rem = sp->bytes - offset;
168         bytes = PR_Recv(sockfd, buf + offset, rem, 0, timeout);
169         if (bytes < 0) {
170                 return;
171         }
172         sp->bytes_read += bytes;
173         sp->iod.timeout = PR_SecondsToInterval(60);
174         if (sp->bytes_read <  sp->bytes) {
175                 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
176                                                         PR_FALSE);
177                 PR_ASSERT(NULL != jobp);
178                 return;
179         }
180         PR_ASSERT(sp->bytes_read == sp->bytes);
181         DPRINTF(("serve_client: read complete, msg(%d) \n", sp->msg_num));
182
183         sp->iod.timeout = PR_SecondsToInterval(60);
184         jobp = PR_QueueJob_Write(sp->tp, &sp->iod, serve_client_write, sp,
185                                                         PR_FALSE);
186         PR_ASSERT(NULL != jobp);
187
188     return;
189 }
190
191 static void
192 serve_client_write(void *arg)
193 {
194         Session *sp = (Session *) arg;
195     int bytes;
196         PRFileDesc *sockfd;
197         char *buf;
198         PRJob *jobp;
199
200         sockfd = sp->iod.socket;
201         buf = sp->in_buf->data;
202
203     PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
204
205         bytes = PR_Send(sockfd, buf, sp->bytes, 0, PR_INTERVAL_NO_TIMEOUT);
206         PR_ASSERT(bytes == sp->bytes);
207
208         if (bytes < 0) {
209                 return;
210         }
211         DPRINTF(("serve_client: write complete, msg(%d) \n", sp->msg_num));
212     sp->msg_num++;
213     if (sp->msg_num < num_tcp_mesgs_per_connection) {
214                 sp->bytes_read = 0;
215                 sp->iod.timeout = PR_SecondsToInterval(60);
216                 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
217                                                         PR_FALSE);
218                 PR_ASSERT(NULL != jobp);
219                 return;
220         }
221
222         DPRINTF(("serve_client: read/write complete, msg(%d) \n", sp->msg_num));
223     if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) {
224         fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name);
225     }
226
227     PR_Close(sockfd);
228     PR_EnterMonitor(sp->exit_mon);
229     --(*sp->job_counterp);
230     PR_Notify(sp->exit_mon);
231     PR_ExitMonitor(sp->exit_mon);
232
233     PR_DELETE(sp->in_buf);
234     PR_DELETE(sp);
235
236     return;
237 }
238
239 /*
240  * Serve_Client
241  *    Thread, started by the server, for serving a client connection.
242  *    Reads data from socket and writes it back, unmodified, and
243  *    closes the socket
244  */
245 static void PR_CALLBACK
246 Serve_Client(void *arg)
247 {
248     Serve_Client_Param *scp = (Serve_Client_Param *) arg;
249     buffer *in_buf;
250         Session *sp;
251         PRJob *jobp;
252
253         sp = PR_NEW(Session);
254         sp->iod = scp->iod;
255
256     in_buf = PR_NEW(buffer);
257     if (in_buf == NULL) {
258         fprintf(stderr,"%s: failed to alloc buffer struct\n",program_name);
259         failed_already=1;
260         return;
261     }
262
263         sp->in_buf = in_buf;
264         sp->bytes = scp->datalen;
265         sp->msg_num = 0;
266         sp->bytes_read = 0;
267         sp->tp = scp->tp;
268         sp->exit_mon = scp->exit_mon;
269     sp->job_counterp = scp->job_counterp;
270
271         sp->iod.timeout = PR_SecondsToInterval(60);
272         jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
273                                                         PR_FALSE);
274         PR_ASSERT(NULL != jobp);
275         PR_DELETE(scp);
276 }
277
278 static void
279 print_stats(void *arg)
280 {
281     Server_Param *sp = (Server_Param *) arg;
282     PRThreadPool *tp = sp->tp;
283     PRInt32 counter;
284         PRJob *jobp;
285
286         PR_EnterMonitor(sp->exit_mon);
287         counter = (*sp->job_counterp);
288         PR_ExitMonitor(sp->exit_mon);
289
290         printf("PRINT_STATS: #client connections = %d\n",counter);
291
292
293         jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
294                                                 print_stats, sp, PR_FALSE);
295
296         PR_ASSERT(NULL != jobp);
297 }
298
299 static int job_counter = 0;
300 /*
301  * TCP Server
302  *    Server binds an address to a socket, starts a client process and
303  *        listens for incoming connections.
304  *    Each client connects to the server and sends a chunk of data
305  *    Starts a Serve_Client job for each incoming connection, to read
306  *    the data from the client and send it back to the client, unmodified.
307  *    Each client checks that data received from server is same as the
308  *    data it sent to the server.
309  *        Finally, the threadpool is shutdown
310  */
311 static void PR_CALLBACK
312 TCP_Server(void *arg)
313 {
314     PRThreadPool *tp = (PRThreadPool *) arg;
315     Server_Param *sp;
316     PRFileDesc *sockfd;
317     PRNetAddr netaddr;
318         PRMonitor *sc_mon;
319         PRJob *jobp;
320         int i;
321         PRStatus rval;
322
323     /*
324      * Create a tcp socket
325      */
326     if ((sockfd = PR_NewTCPSocket()) == NULL) {
327         fprintf(stderr,"%s: PR_NewTCPSocket failed\n", program_name);
328         return;
329     }
330     memset(&netaddr, 0 , sizeof(netaddr));
331     netaddr.inet.family = PR_AF_INET;
332     netaddr.inet.port = PR_htons(TCP_SERVER_PORT);
333     netaddr.inet.ip = PR_htonl(PR_INADDR_ANY);
334     /*
335      * try a few times to bind server's address, if addresses are in
336      * use
337      */
338         i = 0;
339     while (PR_Bind(sockfd, &netaddr) < 0) {
340         if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR) {
341             netaddr.inet.port += 2;
342             if (i++ < SERVER_MAX_BIND_COUNT)
343                 continue;
344         }
345         fprintf(stderr,"%s: ERROR - PR_Bind failed\n", program_name);
346         perror("PR_Bind");
347         failed_already=1;
348         return;
349     }
350
351     if (PR_Listen(sockfd, 32) < 0) {
352         fprintf(stderr,"%s: ERROR - PR_Listen failed\n", program_name);
353         failed_already=1;
354         return;
355     }
356
357     if (PR_GetSockName(sockfd, &netaddr) < 0) {
358         fprintf(stderr,"%s: ERROR - PR_GetSockName failed\n", program_name);
359         failed_already=1;
360         return;
361     }
362
363     DPRINTF((
364         "TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n",
365         netaddr.inet.ip, netaddr.inet.port));
366
367         sp = PR_NEW(Server_Param);
368         if (sp == NULL) {
369                 fprintf(stderr,"%s: PR_NEW failed\n", program_name);
370                 failed_already=1;
371                 return;
372         }
373         sp->iod.socket = sockfd;
374         sp->iod.timeout = PR_SecondsToInterval(60);
375         sp->datalen = tcp_mesg_size;
376         sp->exit_mon = sc_mon;
377         sp->job_counterp = &job_counter;
378         sp->conn_counter = 0;
379         sp->tp = tp;
380         sp->netaddr = netaddr;
381
382         /* create and cancel an io job */
383         jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
384                                                         PR_FALSE);
385         PR_ASSERT(NULL != jobp);
386         rval = PR_CancelJob(jobp);
387         PR_ASSERT(PR_SUCCESS == rval);
388
389         /*
390          * create the client process
391          */
392         {
393 #define MAX_ARGS 4
394                 char *argv[MAX_ARGS + 1];
395                 int index = 0;
396                 char port[32];
397         char path[1024 + sizeof("/thrpool_client")];
398
399         getcwd(path, sizeof(path));
400
401         (void)strcat(path, "/thrpool_client");
402 #ifdef XP_PC
403         (void)strcat(path, ".exe");
404 #endif
405         argv[index++] = path;
406                 sprintf(port,"%d",PR_ntohs(netaddr.inet.port));
407         if (_debug_on)
408         {
409             argv[index++] = "-d";
410             argv[index++] = "-p";
411             argv[index++] = port;
412             argv[index++] = NULL;
413         } else {
414             argv[index++] = "-p";
415             argv[index++] = port;
416                         argv[index++] = NULL;
417                 }
418                 PR_ASSERT(MAX_ARGS >= (index - 1));
419         
420         DPRINTF(("creating client process %s ...\n", path));
421         if (PR_FAILURE == PR_CreateProcessDetached(path, argv, NULL, NULL)) {
422                 fprintf(stderr,
423                                 "thrpool_server: ERROR - PR_CreateProcessDetached failed\n");
424                 failed_already=1;
425                 return;
426                 }
427         }
428
429     sc_mon = PR_NewMonitor();
430     if (sc_mon == NULL) {
431         fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name);
432         failed_already=1;
433         return;
434     }
435
436         sp->iod.socket = sockfd;
437         sp->iod.timeout = PR_SecondsToInterval(60);
438         sp->datalen = tcp_mesg_size;
439         sp->exit_mon = sc_mon;
440         sp->job_counterp = &job_counter;
441         sp->conn_counter = 0;
442         sp->tp = tp;
443         sp->netaddr = netaddr;
444
445         /* create and cancel a timer job */
446         jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000),
447                                                 print_stats, sp, PR_FALSE);
448         PR_ASSERT(NULL != jobp);
449         rval = PR_CancelJob(jobp);
450         PR_ASSERT(PR_SUCCESS == rval);
451
452     DPRINTF(("TCP_Server: Accepting connections \n"));
453
454         jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
455                                                         PR_FALSE);
456         PR_ASSERT(NULL != jobp);
457         return;
458 }
459
460 static void
461 TCP_Server_Accept(void *arg)
462 {
463     Server_Param *sp = (Server_Param *) arg;
464     PRThreadPool *tp = sp->tp;
465     Serve_Client_Param *scp;
466         PRFileDesc *newsockfd;
467         PRJob *jobp;
468
469         if ((newsockfd = PR_Accept(sp->iod.socket, &sp->netaddr,
470                 PR_INTERVAL_NO_TIMEOUT)) == NULL) {
471                 fprintf(stderr,"%s: ERROR - PR_Accept failed\n", program_name);
472                 failed_already=1;
473                 goto exit;
474         }
475         scp = PR_NEW(Serve_Client_Param);
476         if (scp == NULL) {
477                 fprintf(stderr,"%s: PR_NEW failed\n", program_name);
478                 failed_already=1;
479                 goto exit;
480         }
481
482         /*
483          * Start a Serve_Client job for each incoming connection
484          */
485         scp->iod.socket = newsockfd;
486         scp->iod.timeout = PR_SecondsToInterval(60);
487         scp->datalen = tcp_mesg_size;
488         scp->exit_mon = sp->exit_mon;
489         scp->job_counterp = sp->job_counterp;
490         scp->tp = sp->tp;
491
492         PR_EnterMonitor(sp->exit_mon);
493         (*sp->job_counterp)++;
494         PR_ExitMonitor(sp->exit_mon);
495         jobp = PR_QueueJob(tp, Serve_Client, scp,
496                                                 PR_FALSE);
497
498         PR_ASSERT(NULL != jobp);
499         DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp));
500
501         /*
502          * single-threaded update; no lock needed
503          */
504     sp->conn_counter++;
505     if (sp->conn_counter <
506                         (num_tcp_clients * num_tcp_connections_per_client)) {
507                 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
508                                                                 PR_FALSE);
509                 PR_ASSERT(NULL != jobp);
510                 return;
511         }
512         jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
513                                                 print_stats, sp, PR_FALSE);
514
515         PR_ASSERT(NULL != jobp);
516         DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp));
517
518 exit:
519         PR_EnterMonitor(sp->exit_mon);
520     /* Wait for server jobs to finish */
521     while (0 != *sp->job_counterp) {
522         PR_Wait(sp->exit_mon, PR_INTERVAL_NO_TIMEOUT);
523         DPRINTF(("TCP_Server: conn_counter = %d\n",
524                                                                                                 *sp->job_counterp));
525     }
526
527     PR_ExitMonitor(sp->exit_mon);
528     if (sp->iod.socket) {
529         PR_Close(sp->iod.socket);
530     }
531         PR_DestroyMonitor(sp->exit_mon);
532     printf("%30s","TCP_Socket_Client_Server_Test:");
533     printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l,
534         num_tcp_clients, num_tcp_connections_per_client);
535     printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":",
536         num_tcp_mesgs_per_connection, tcp_mesg_size);
537
538         DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name));
539         PR_ShutdownThreadPool(sp->tp);
540         PR_DELETE(sp);
541 }
542
543 /************************************************************************/
544
545 #define DEFAULT_INITIAL_THREADS         4
546 #define DEFAULT_MAX_THREADS                     100
547 #define DEFAULT_STACKSIZE                       (512 * 1024)
548
549 int main(int argc, char **argv)
550 {
551         PRInt32 initial_threads = DEFAULT_INITIAL_THREADS;
552         PRInt32 max_threads = DEFAULT_MAX_THREADS;
553         PRInt32 stacksize = DEFAULT_STACKSIZE;
554         PRThreadPool *tp = NULL;
555         PRStatus rv;
556         PRJob *jobp;
557
558     /*
559      * -d           debug mode
560      */
561     PLOptStatus os;
562     PLOptState *opt;
563
564         program_name = argv[0];
565     opt = PL_CreateOptState(argc, argv, "d");
566     while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
567     {
568         if (PL_OPT_BAD == os) continue;
569         switch (opt->option)
570         {
571         case 'd':  /* debug mode */
572             _debug_on = 1;
573             break;
574         default:
575             break;
576         }
577     }
578     PL_DestroyOptState(opt);
579
580     PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
581     PR_STDIO_INIT();
582
583     PR_SetConcurrency(4);
584
585         tp = PR_CreateThreadPool(initial_threads, max_threads, stacksize);
586     if (NULL == tp) {
587         printf("PR_CreateThreadPool failed\n");
588         failed_already=1;
589         goto done;
590         }
591         jobp = PR_QueueJob(tp, TCP_Server, tp, PR_TRUE);
592         rv = PR_JoinJob(jobp);          
593         PR_ASSERT(PR_SUCCESS == rv);
594
595         DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name));
596         rv = PR_JoinThreadPool(tp);
597         PR_ASSERT(PR_SUCCESS == rv);
598         DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name));
599
600 done:
601     PR_Cleanup();
602     if (failed_already) return 1;
603     else return 0;
604 }