1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* ***** BEGIN LICENSE BLOCK *****
3 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
5 * The contents of this file are subject to the Mozilla Public License Version
6 * 1.1 (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 * http://www.mozilla.org/MPL/
10 * Software distributed under the License is distributed on an "AS IS" basis,
11 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
12 * for the specific language governing rights and limitations under the
15 * The Original Code is the Netscape Portable Runtime (NSPR).
17 * The Initial Developer of the Original Code is
18 * Netscape Communications Corporation.
19 * Portions created by the Initial Developer are Copyright (C) 1999-2000
20 * the Initial Developer. All Rights Reserved.
24 * Alternatively, the contents of this file may be used under the terms of
25 * either the GNU General Public License Version 2 or later (the "GPL"), or
26 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27 * in which case the provisions of the GPL or the LGPL are applicable instead
28 * of those above. If you wish to allow use of your version of this file only
29 * under the terms of either the GPL or the LGPL, and not to allow others to
30 * use your version of this file under the terms of the MPL, indicate your
31 * decision by deleting the provisions above and replace them with the notice
32 * and other provisions required by the GPL or the LGPL. If you do not delete
33 * the provisions above, a recipient may use your version of this file under
34 * the terms of any one of the MPL, the GPL or the LGPL.
36 * ***** END LICENSE BLOCK ***** */
38 /***********************************************************************
42 ** Description: Test threadpool functionality.
44 ** Modification History:
56 #if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS)
61 #if defined(XP_UNIX) || defined (XP_OS2) || defined(XP_BEOS)
71 static int _debug_on = 0;
72 static char *program_name = NULL;
73 static void serve_client_write(void *arg);
75 #include "obsolete/prsem.h"
81 #define DPRINTF(arg) if (_debug_on) printf arg
84 #define BUF_DATA_SIZE (2 * 1024)
85 #define TCP_MESG_SIZE 1024
86 #define NUM_TCP_CLIENTS 10 /* for a listen queue depth of 5 */
89 #define NUM_TCP_CONNECTIONS_PER_CLIENT 10
90 #define NUM_TCP_MESGS_PER_CONNECTION 10
91 #define TCP_SERVER_PORT 10000
92 #define SERVER_MAX_BIND_COUNT 100
95 char *getcwd(char *buf, size_t size)
97 wchar_t wpath[MAX_PATH];
98 _wgetcwd(wpath, MAX_PATH);
99 WideCharToMultiByte(CP_ACP, 0, wpath, -1, buf, size, 0, 0);
105 static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS;
106 static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT;
107 static PRInt32 tcp_mesg_size = TCP_MESG_SIZE;
108 static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION;
109 static void TCP_Server_Accept(void *arg);
112 int failed_already=0;
113 typedef struct buffer {
114 char data[BUF_DATA_SIZE];
118 typedef struct Server_Param {
119 PRJobIoDesc iod; /* socket to read from/write to */
120 PRInt32 datalen; /* bytes of data transfered in each read/write */
122 PRMonitor *exit_mon; /* monitor to signal on exit */
123 PRInt32 *job_counterp; /* counter to decrement, before exit */
124 PRInt32 conn_counter; /* counter to decrement, before exit */
128 typedef struct Serve_Client_Param {
129 PRJobIoDesc iod; /* socket to read from/write to */
130 PRInt32 datalen; /* bytes of data transfered in each read/write */
131 PRMonitor *exit_mon; /* monitor to signal on exit */
132 PRInt32 *job_counterp; /* counter to decrement, before exit */
134 } Serve_Client_Param;
136 typedef struct Session {
137 PRJobIoDesc iod; /* socket to read from/write to */
142 PRMonitor *exit_mon; /* monitor to signal on exit */
143 PRInt32 *job_counterp; /* counter to decrement, before exit */
148 serve_client_read(void *arg)
150 Session *sp = (Session *) arg;
158 PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
160 sockfd = sp->iod.socket;
161 buf = sp->in_buf->data;
163 PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
164 PR_ASSERT(sp->bytes_read < sp->bytes);
166 offset = sp->bytes_read;
167 rem = sp->bytes - offset;
168 bytes = PR_Recv(sockfd, buf + offset, rem, 0, timeout);
172 sp->bytes_read += bytes;
173 sp->iod.timeout = PR_SecondsToInterval(60);
174 if (sp->bytes_read < sp->bytes) {
175 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
177 PR_ASSERT(NULL != jobp);
180 PR_ASSERT(sp->bytes_read == sp->bytes);
181 DPRINTF(("serve_client: read complete, msg(%d) \n", sp->msg_num));
183 sp->iod.timeout = PR_SecondsToInterval(60);
184 jobp = PR_QueueJob_Write(sp->tp, &sp->iod, serve_client_write, sp,
186 PR_ASSERT(NULL != jobp);
192 serve_client_write(void *arg)
194 Session *sp = (Session *) arg;
200 sockfd = sp->iod.socket;
201 buf = sp->in_buf->data;
203 PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
205 bytes = PR_Send(sockfd, buf, sp->bytes, 0, PR_INTERVAL_NO_TIMEOUT);
206 PR_ASSERT(bytes == sp->bytes);
211 DPRINTF(("serve_client: write complete, msg(%d) \n", sp->msg_num));
213 if (sp->msg_num < num_tcp_mesgs_per_connection) {
215 sp->iod.timeout = PR_SecondsToInterval(60);
216 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
218 PR_ASSERT(NULL != jobp);
222 DPRINTF(("serve_client: read/write complete, msg(%d) \n", sp->msg_num));
223 if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) {
224 fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name);
228 PR_EnterMonitor(sp->exit_mon);
229 --(*sp->job_counterp);
230 PR_Notify(sp->exit_mon);
231 PR_ExitMonitor(sp->exit_mon);
233 PR_DELETE(sp->in_buf);
241 * Thread, started by the server, for serving a client connection.
242 * Reads data from socket and writes it back, unmodified, and
245 static void PR_CALLBACK
246 Serve_Client(void *arg)
248 Serve_Client_Param *scp = (Serve_Client_Param *) arg;
253 sp = PR_NEW(Session);
256 in_buf = PR_NEW(buffer);
257 if (in_buf == NULL) {
258 fprintf(stderr,"%s: failed to alloc buffer struct\n",program_name);
264 sp->bytes = scp->datalen;
268 sp->exit_mon = scp->exit_mon;
269 sp->job_counterp = scp->job_counterp;
271 sp->iod.timeout = PR_SecondsToInterval(60);
272 jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
274 PR_ASSERT(NULL != jobp);
279 print_stats(void *arg)
281 Server_Param *sp = (Server_Param *) arg;
282 PRThreadPool *tp = sp->tp;
286 PR_EnterMonitor(sp->exit_mon);
287 counter = (*sp->job_counterp);
288 PR_ExitMonitor(sp->exit_mon);
290 printf("PRINT_STATS: #client connections = %d\n",counter);
293 jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
294 print_stats, sp, PR_FALSE);
296 PR_ASSERT(NULL != jobp);
299 static int job_counter = 0;
302 * Server binds an address to a socket, starts a client process and
303 * listens for incoming connections.
304 * Each client connects to the server and sends a chunk of data
305 * Starts a Serve_Client job for each incoming connection, to read
306 * the data from the client and send it back to the client, unmodified.
307 * Each client checks that data received from server is same as the
308 * data it sent to the server.
309 * Finally, the threadpool is shutdown
311 static void PR_CALLBACK
312 TCP_Server(void *arg)
314 PRThreadPool *tp = (PRThreadPool *) arg;
324 * Create a tcp socket
326 if ((sockfd = PR_NewTCPSocket()) == NULL) {
327 fprintf(stderr,"%s: PR_NewTCPSocket failed\n", program_name);
330 memset(&netaddr, 0 , sizeof(netaddr));
331 netaddr.inet.family = PR_AF_INET;
332 netaddr.inet.port = PR_htons(TCP_SERVER_PORT);
333 netaddr.inet.ip = PR_htonl(PR_INADDR_ANY);
335 * try a few times to bind server's address, if addresses are in
339 while (PR_Bind(sockfd, &netaddr) < 0) {
340 if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR) {
341 netaddr.inet.port += 2;
342 if (i++ < SERVER_MAX_BIND_COUNT)
345 fprintf(stderr,"%s: ERROR - PR_Bind failed\n", program_name);
351 if (PR_Listen(sockfd, 32) < 0) {
352 fprintf(stderr,"%s: ERROR - PR_Listen failed\n", program_name);
357 if (PR_GetSockName(sockfd, &netaddr) < 0) {
358 fprintf(stderr,"%s: ERROR - PR_GetSockName failed\n", program_name);
364 "TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n",
365 netaddr.inet.ip, netaddr.inet.port));
367 sp = PR_NEW(Server_Param);
369 fprintf(stderr,"%s: PR_NEW failed\n", program_name);
373 sp->iod.socket = sockfd;
374 sp->iod.timeout = PR_SecondsToInterval(60);
375 sp->datalen = tcp_mesg_size;
376 sp->exit_mon = sc_mon;
377 sp->job_counterp = &job_counter;
378 sp->conn_counter = 0;
380 sp->netaddr = netaddr;
382 /* create and cancel an io job */
383 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
385 PR_ASSERT(NULL != jobp);
386 rval = PR_CancelJob(jobp);
387 PR_ASSERT(PR_SUCCESS == rval);
390 * create the client process
394 char *argv[MAX_ARGS + 1];
397 char path[1024 + sizeof("/thrpool_client")];
399 getcwd(path, sizeof(path));
401 (void)strcat(path, "/thrpool_client");
403 (void)strcat(path, ".exe");
405 argv[index++] = path;
406 sprintf(port,"%d",PR_ntohs(netaddr.inet.port));
409 argv[index++] = "-d";
410 argv[index++] = "-p";
411 argv[index++] = port;
412 argv[index++] = NULL;
414 argv[index++] = "-p";
415 argv[index++] = port;
416 argv[index++] = NULL;
418 PR_ASSERT(MAX_ARGS >= (index - 1));
420 DPRINTF(("creating client process %s ...\n", path));
421 if (PR_FAILURE == PR_CreateProcessDetached(path, argv, NULL, NULL)) {
423 "thrpool_server: ERROR - PR_CreateProcessDetached failed\n");
429 sc_mon = PR_NewMonitor();
430 if (sc_mon == NULL) {
431 fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name);
436 sp->iod.socket = sockfd;
437 sp->iod.timeout = PR_SecondsToInterval(60);
438 sp->datalen = tcp_mesg_size;
439 sp->exit_mon = sc_mon;
440 sp->job_counterp = &job_counter;
441 sp->conn_counter = 0;
443 sp->netaddr = netaddr;
445 /* create and cancel a timer job */
446 jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000),
447 print_stats, sp, PR_FALSE);
448 PR_ASSERT(NULL != jobp);
449 rval = PR_CancelJob(jobp);
450 PR_ASSERT(PR_SUCCESS == rval);
452 DPRINTF(("TCP_Server: Accepting connections \n"));
454 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
456 PR_ASSERT(NULL != jobp);
461 TCP_Server_Accept(void *arg)
463 Server_Param *sp = (Server_Param *) arg;
464 PRThreadPool *tp = sp->tp;
465 Serve_Client_Param *scp;
466 PRFileDesc *newsockfd;
469 if ((newsockfd = PR_Accept(sp->iod.socket, &sp->netaddr,
470 PR_INTERVAL_NO_TIMEOUT)) == NULL) {
471 fprintf(stderr,"%s: ERROR - PR_Accept failed\n", program_name);
475 scp = PR_NEW(Serve_Client_Param);
477 fprintf(stderr,"%s: PR_NEW failed\n", program_name);
483 * Start a Serve_Client job for each incoming connection
485 scp->iod.socket = newsockfd;
486 scp->iod.timeout = PR_SecondsToInterval(60);
487 scp->datalen = tcp_mesg_size;
488 scp->exit_mon = sp->exit_mon;
489 scp->job_counterp = sp->job_counterp;
492 PR_EnterMonitor(sp->exit_mon);
493 (*sp->job_counterp)++;
494 PR_ExitMonitor(sp->exit_mon);
495 jobp = PR_QueueJob(tp, Serve_Client, scp,
498 PR_ASSERT(NULL != jobp);
499 DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp));
502 * single-threaded update; no lock needed
505 if (sp->conn_counter <
506 (num_tcp_clients * num_tcp_connections_per_client)) {
507 jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
509 PR_ASSERT(NULL != jobp);
512 jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
513 print_stats, sp, PR_FALSE);
515 PR_ASSERT(NULL != jobp);
516 DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp));
519 PR_EnterMonitor(sp->exit_mon);
520 /* Wait for server jobs to finish */
521 while (0 != *sp->job_counterp) {
522 PR_Wait(sp->exit_mon, PR_INTERVAL_NO_TIMEOUT);
523 DPRINTF(("TCP_Server: conn_counter = %d\n",
527 PR_ExitMonitor(sp->exit_mon);
528 if (sp->iod.socket) {
529 PR_Close(sp->iod.socket);
531 PR_DestroyMonitor(sp->exit_mon);
532 printf("%30s","TCP_Socket_Client_Server_Test:");
533 printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l,
534 num_tcp_clients, num_tcp_connections_per_client);
535 printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":",
536 num_tcp_mesgs_per_connection, tcp_mesg_size);
538 DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name));
539 PR_ShutdownThreadPool(sp->tp);
543 /************************************************************************/
545 #define DEFAULT_INITIAL_THREADS 4
546 #define DEFAULT_MAX_THREADS 100
547 #define DEFAULT_STACKSIZE (512 * 1024)
549 int main(int argc, char **argv)
551 PRInt32 initial_threads = DEFAULT_INITIAL_THREADS;
552 PRInt32 max_threads = DEFAULT_MAX_THREADS;
553 PRInt32 stacksize = DEFAULT_STACKSIZE;
554 PRThreadPool *tp = NULL;
564 program_name = argv[0];
565 opt = PL_CreateOptState(argc, argv, "d");
566 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
568 if (PL_OPT_BAD == os) continue;
571 case 'd': /* debug mode */
578 PL_DestroyOptState(opt);
580 PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
583 PR_SetConcurrency(4);
585 tp = PR_CreateThreadPool(initial_threads, max_threads, stacksize);
587 printf("PR_CreateThreadPool failed\n");
591 jobp = PR_QueueJob(tp, TCP_Server, tp, PR_TRUE);
592 rv = PR_JoinJob(jobp);
593 PR_ASSERT(PR_SUCCESS == rv);
595 DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name));
596 rv = PR_JoinThreadPool(tp);
597 PR_ASSERT(PR_SUCCESS == rv);
598 DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name));
602 if (failed_already) return 1;