2 * Copyright (c) 2011 Joakim Johansson <jocke@tbricks.com>.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
31 # include <sys/time.h>
36 pthread_workqueue_t workqueues[WORKQUEUE_COUNT];
37 struct wq_statistics workqueue_statistics[WORKQUEUE_COUNT];
38 struct wq_event_generator workqueue_generator[GENERATOR_WORKQUEUE_COUNT];
40 struct wq_statistics global_statistics;
41 unsigned int global_stats_used = 0;
43 pthread_mutex_t generator_mutex;
44 pthread_cond_t generator_condition;
45 static unsigned int events_processed;
47 #define PERCENTILE_COUNT 8
48 double percentiles[PERCENTILE_COUNT] = {50.0, 80.0, 98.0, 99.0, 99.5, 99.8, 99.9, 99.99};
49 mytime_t real_start, real_end;
54 #include <CoreServices/CoreServices.h>
55 #include <mach/mach.h>
56 #include <mach/mach_time.h>
58 static mach_timebase_info_data_t sTimebaseInfo;
60 // From http://developer.apple.com/library/mac/#qa/qa2004/qa1398.html
61 unsigned long gettime(void)
63 return (mach_absolute_time() * sTimebaseInfo.numer / sTimebaseInfo.denom);
68 static mytime_t gettime(void)
72 if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0)
73 fprintf(stderr, "Failed to get high resolution clock! errno = %d\n", errno);
74 return ((ts.tv_sec * NANOSECONDS_PER_SECOND) + ts.tv_nsec);
78 if (!QueryPerformanceCounter(&now) )
79 fprintf(stderr, "Failed to get performance counter!\n");
80 if (!QueryPerformanceFrequency(&freq) )
81 fprintf(stderr, "Failed to get performance frequency!\n");
83 return (mytime_t)(now.QuadPart * NANOSECONDS_PER_SECOND / freq.QuadPart);
86 if (clock_gettime(CLOCK_HIGHRES, &ts) != 0)
87 fprintf(stderr, "Failed to get high resolution clock! errno = %d\n", errno);
88 return ((ts.tv_sec * NANOSECONDS_PER_SECOND) + ts.tv_nsec);
96 static void my_sleep(unsigned long nanoseconds) {
97 LARGE_INTEGER start, end;
100 QueryPerformanceCounter(&start);
101 QueryPerformanceFrequency(&freq);
103 // sleep with ms resolution ...
104 Sleep(nanoseconds / 1000000);
106 // ... and busy-wait afterwards, until the requested delay was reached
107 QueryPerformanceCounter(&end);
108 while( (end.QuadPart - start.QuadPart) * NANOSECONDS_PER_SECOND / freq.QuadPart < nanoseconds ){
110 QueryPerformanceCounter(&end);
117 // real resolution on solaris is at best system clock tick, i.e. 100Hz unless having the
118 // high res system clock (1000Hz in that case)
120 static void my_sleep(unsigned long nanoseconds)
122 struct timespec timeout0;
123 struct timespec timeout1;
124 struct timespec* tmp;
125 struct timespec* t0 = &timeout0;
126 struct timespec* t1 = &timeout1;
128 t0->tv_sec = nanoseconds / NANOSECONDS_PER_SECOND;
129 t0->tv_nsec = nanoseconds % NANOSECONDS_PER_SECOND;
131 while ((nanosleep(t0, t1) == (-1)) && (errno == EINTR))
143 static void _process_data(void* context)
145 struct wq_event *event = (struct wq_event *) context;
146 mytime_t elapsed_time;
148 elapsed_time = gettime() - event->start_time;
150 workqueue_statistics[event->queue_index].avg = ((workqueue_statistics[event->queue_index].count * workqueue_statistics[event->queue_index].avg) + elapsed_time) / (workqueue_statistics[event->queue_index].count + 1);
151 workqueue_statistics[event->queue_index].total += elapsed_time;
152 workqueue_statistics[event->queue_index].count += 1;
154 if (elapsed_time < workqueue_statistics[event->queue_index].min ||
155 workqueue_statistics[event->queue_index].min == 0)
156 workqueue_statistics[event->queue_index].min = elapsed_time;
158 if (elapsed_time > workqueue_statistics[event->queue_index].max)
159 workqueue_statistics[event->queue_index].max = elapsed_time;
161 if ((elapsed_time / 1000) < DISTRIBUTION_BUCKETS)
162 workqueue_statistics[event->queue_index].distribution[(int)(elapsed_time / 1000)] += 1;
164 workqueue_statistics[event->queue_index].distribution[DISTRIBUTION_BUCKETS-1] += 1;
166 // allow generator thread to continue when all events have been processed
167 if (atomic_dec_nv(&events_processed) == 0)
169 pthread_mutex_lock(&generator_mutex);
170 pthread_cond_signal(&generator_condition);
171 pthread_mutex_unlock(&generator_mutex);
176 // Perform a small microburst for this tick
177 static void _event_tick(void* context)
179 struct wq_event *current_event;
180 long i, generator_workqueue = (long) context;
182 for (i = 0; i < EVENTS_GENERATED_PER_TICK; i++)
184 current_event = &workqueue_generator[generator_workqueue].wq_events[i];
185 current_event->start_time = gettime();
186 current_event->queue_index = (current_event->start_time % WORKQUEUE_COUNT);
188 (void) pthread_workqueue_additem_np(workqueues[current_event->queue_index], _process_data, current_event, NULL, NULL);
194 static void _generate_simulated_events()
196 unsigned long i = 0, tick;
198 mytime_t start, current, overhead_start = 0, overhead_end = 0;
200 start = current = gettime();
202 for (tick = 0; tick < TOTAL_TICKS_TO_RUN; tick++)
204 start = current = overhead_end;
205 overhead = overhead_end - overhead_start;
207 // wait until we have waited proper amount of time for current rate
208 // we should remove overhead of previous lap to not lag behind in data rate
209 // one call to gethrtime() alone is around 211ns on Nehalem 2.93
210 // use busy waiting in case the frequency is higher than the supported resolution of nanosleep()
212 if (overhead > EVENT_TIME_SLICE)
214 printf("Warning: Event processing overhead > event time slice, readjust test parameters.\n");
217 if ((EVENT_GENERATION_FREQUENCY > SYSTEM_CLOCK_RESOLUTION) || FORCE_BUSY_LOOP)
219 while ((current - start) < (EVENT_TIME_SLICE - overhead))
224 my_sleep(EVENT_TIME_SLICE - overhead);
227 overhead_start = gettime();
229 events_processed = GENERATOR_WORKQUEUE_COUNT * EVENTS_GENERATED_PER_TICK; // number of items that will be processed
231 #if (LATENCY_RUN_GENERATOR_IN_MAIN_THREAD == 0)
232 for (i = 0; i < GENERATOR_WORKQUEUE_COUNT; i++)
233 (void) pthread_workqueue_additem_np(workqueue_generator[i].wq, _event_tick, (void *) i, NULL, NULL);
235 _event_tick((void *)i);
238 // wait for all events to be processed
239 pthread_mutex_lock(&generator_mutex);
240 while (events_processed > 0)
241 pthread_cond_wait(&generator_condition, &generator_mutex);
242 pthread_mutex_unlock(&generator_mutex);
244 overhead_end = gettime();
250 static void _gather_statistics(unsigned long queue_index)
254 if (workqueue_statistics[queue_index].count > 0)
256 global_stats_used ++;
258 global_statistics.avg = ((global_statistics.count * global_statistics.avg) + (workqueue_statistics[queue_index].avg * workqueue_statistics[queue_index].count)) / (global_statistics.count + workqueue_statistics[queue_index].count);
259 global_statistics.total += workqueue_statistics[queue_index].total;
260 global_statistics.count += workqueue_statistics[queue_index].count;
262 if (workqueue_statistics[queue_index].min < global_statistics.min || global_statistics.min == 0)
263 global_statistics.min = workqueue_statistics[queue_index].min;
265 if (workqueue_statistics[queue_index].max > global_statistics.max)
266 global_statistics.max = workqueue_statistics[queue_index].max;
268 for (i = 0; i < DISTRIBUTION_BUCKETS; i++)
269 global_statistics.distribution[i] += workqueue_statistics[queue_index].distribution[i];
275 void _print_statistics()
277 unsigned long i, j, total_events = 0, last_percentile = 0, accumulated_percentile = 0;
279 printf("Collecting statistics...\n");
281 for (i = 0; i < WORKQUEUE_COUNT; i++)
282 _gather_statistics(i);
284 printf("Test is done, run time was %.3f seconds, %.1fM events generated and processed.\n", (double)((double)(real_end - real_start) / (double) NANOSECONDS_PER_SECOND), total_events/1000000.0);
286 //FIXME - casting from mytime_t (u_long) to int will truncate the result
287 printf("Global dispatch queue aggregate statistics for %d queues: %dM events, min = %d ns, avg = %.1f ns, max = %d ns\n",
288 global_stats_used, global_statistics.count/1000000, (int) global_statistics.min, global_statistics.avg, (int) global_statistics.max);
290 printf("\nDistribution:\n");
291 for (i = 0; i < DISTRIBUTION_BUCKETS; i++)
293 printf("%3ld us: %d ", i, global_statistics.distribution[i]);
294 for (j=0; j<(((double) global_statistics.distribution[i] / (double) global_statistics.count) * 400.0); j++)
299 printf("\nPercentiles:\n");
301 for (i = 0; i < DISTRIBUTION_BUCKETS; i++)
303 while ((last_percentile < PERCENTILE_COUNT) && ((100.0 * ((double) accumulated_percentile / (double) global_statistics.count)) > percentiles[last_percentile]))
305 printf("%.2f < %ld us\n", percentiles[last_percentile], i-1);
308 accumulated_percentile += global_statistics.distribution[i];
311 while ((last_percentile < PERCENTILE_COUNT) && ((100.0 * ((double) accumulated_percentile / (double) global_statistics.count)) > percentiles[last_percentile]))
313 printf("%.2f > %d us\n", percentiles[last_percentile], DISTRIBUTION_BUCKETS-1);
323 pthread_workqueue_attr_t attr;
326 (void) mach_timebase_info(&sTimebaseInfo);
330 pthread_workqueue_init_np();
333 memset(&workqueues, 0, sizeof(workqueues));
334 memset(&workqueue_statistics, 0, sizeof(workqueue_statistics));
335 memset(&global_statistics, 0, sizeof(global_statistics));
336 memset(&workqueue_generator, 0, sizeof(workqueue_generator));
338 pthread_mutex_init(&generator_mutex, NULL);
339 pthread_cond_init(&generator_condition, NULL);
341 if (pthread_workqueue_attr_init_np(&attr) != 0)
342 fprintf(stderr, "Failed to set workqueue attributes\n");
344 for (i = 0; i < GENERATOR_WORKQUEUE_COUNT; i++)
346 if (pthread_workqueue_attr_setqueuepriority_np(&attr, i) != 0)
347 fprintf(stderr, "Failed to set workqueue priority\n");
349 if (pthread_workqueue_attr_setovercommit_np(&attr, 1) != 0)
350 fprintf(stderr, "Failed to set workqueue overcommit\n");
352 workqueue_generator[i].wq_events = malloc(sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK);
353 memset(workqueue_generator[i].wq_events, 0, (sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK));
355 if (pthread_workqueue_create_np(&workqueue_generator[i].wq, &attr) != 0)
356 fprintf(stderr, "Failed to create workqueue\n");
359 for (i = 0; i < WORKQUEUE_COUNT; i++)
361 if (pthread_workqueue_attr_init_np(&attr) != 0)
362 fprintf(stderr, "Failed to set workqueue attributes\n");
364 if (pthread_workqueue_attr_setqueuepriority_np(&attr, i) != 0)
365 fprintf(stderr, "Failed to set workqueue priority\n");
367 if (pthread_workqueue_create_np(&workqueues[i], &attr) != 0)
368 fprintf(stderr, "Failed to create workqueue\n");
371 if (SLEEP_BEFORE_START > 0)
373 printf("Sleeping for %d seconds to allow for processor set configuration...\n",SLEEP_BEFORE_START);
374 sleep(SLEEP_BEFORE_START);
377 printf("%d workqueues, running for %d seconds at %d Hz, %d events per tick.\n",WORKQUEUE_COUNT, SECONDS_TO_RUN, EVENT_GENERATION_FREQUENCY, EVENTS_GENERATED_PER_TICK);
379 printf("Running %d generator threads at %dK events/s, the aggregated data rate is %dK events/s. %.2f MB is used for %.2fK events.\n",
380 GENERATOR_WORKQUEUE_COUNT,AGGREGATE_DATA_RATE_PER_SECOND/1000, TOTAL_DATA_PER_SECOND/1000,
381 (double) GENERATOR_WORKQUEUE_COUNT * ((sizeof(struct wq_event) * EVENTS_GENERATED_PER_TICK + sizeof(workqueues))/(1024.0*1024.0)),
382 GENERATOR_WORKQUEUE_COUNT * EVENTS_GENERATED_PER_TICK/1000.0);
384 real_start = gettime();
386 _generate_simulated_events();
388 real_end = gettime();