1 /* queue.c - a string queue implementation
2 * Copyright 2009, 2011 Red Hat Inc., Durham, North Carolina.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 * Steve Grubb <sgrubb@redhat.com>
21 * Miloslav Trmač <mitr@redhat.com>
26 #include <arpa/inet.h>
41 int fd; /* -1 if !Q_IN_FILE */
42 /* NULL if !Q_IN_MEMORY. [i] contains a memory copy of the queue entry
43 "i", if known - it may be NULL even if entry exists. */
44 unsigned char **memory;
49 unsigned char buffer[]; /* Used only locally within q_peek() */
54 /* Compile-time expression verification */
55 #define verify(E) do { \
56 char verify__[(E) ? 1 : -1]; \
60 /* Like pread(), except that it handles partial reads, and returns 0 on
62 static int full_pread(int fd, void *buf, size_t size, off_t offset)
71 res = pread(fd, buf, run, offset);
75 errno = ENXIO; /* Any better value? */
78 buf = (unsigned char *)buf + res;
85 /* Like pwrite(), except that it handles partial writes, and returns 0 on
87 static int full_pwrite(int fd, const void *buf, size_t size, off_t offset)
96 res = pwrite(fd, buf, run, offset);
100 errno = ENXIO; /* Any better value? */
103 buf = (const unsigned char *)buf + res;
110 /* File format and utilities */
112 /* The mutable part of struct file_header */
114 uint32_t queue_head; /* 0-based index of the first non-empty entry */
115 uint32_t queue_length; /* [0, num_entries] */
118 /* All integer values are in network byte order (big endian) */
121 uint8_t magic[14]; /* See fh_magic below */
122 uint8_t version; /* File format version, see FH_VERSION* below */
123 uint8_t reserved; /* Must be 0 */
124 /* Total file size is (num_entries + 1) * entry_size. This must fit
125 into SIZE_MAX because the "len" parameter of posix_fallocate has
127 uint32_t num_entries; /* Total number of entries allocated */
132 /* Contains a '\0' byte to unambiguously mark the file as a binary file. */
133 static const uint8_t fh_magic[14] = "\0audisp-remote";
134 #define FH_VERSION_0 0x00
136 /* Return file position for ENTRY in Q */
137 static size_t entry_offset (const struct queue *q, size_t entry)
139 return (entry + 1) * q->entry_size;
142 /* Synchronize Q if required and return 0.
143 On error, return -1 and set errno. */
144 static int q_sync(struct queue *q)
146 if ((q->flags & Q_SYNC) == 0)
148 return fdatasync(q->fd);
151 /* Sync file's fh_state with Q, q_sync (Q), and return 0.
152 On error, return -1 and set errno. */
153 static int sync_fh_state (struct queue *q)
160 s.queue_head = htonl(q->queue_head);
161 s.queue_length = htonl(q->queue_length);
162 if (full_pwrite(q->fd, &s, sizeof(s), offsetof(struct file_header, s))
168 /* Queue implementation */
170 /* Open PATH for Q, update Q from it, and return 0.
171 On error, return -1 and set errno; Q->fd may be set even on error. */
172 static int q_open_file(struct queue *q, const char *path)
174 int open_flags, fd_flags;
176 struct file_header fh;
179 if ((q->flags & Q_CREAT) != 0)
180 open_flags |= O_CREAT;
181 if ((q->flags & Q_EXCL) != 0)
182 open_flags |= O_EXCL;
183 q->fd = open(path, open_flags, S_IRUSR | S_IWUSR);
187 fd_flags = fcntl(q->fd, F_GETFD);
190 if (fcntl(q->fd, F_SETFD, fd_flags | FD_CLOEXEC) == -1)
193 /* File locking in POSIX is pretty much broken... let's hope nobody
194 attempts to open a single file twice within the same process.
195 open() above has initialized the file offset to 0, so the lockf()
196 below affects the whole file. */
197 if (lockf(q->fd, F_TLOCK, 0) != 0) {
198 if (errno == EACCES || errno == EAGAIN)
199 errno = EBUSY; /* This makes more sense... */
203 if (fstat(q->fd, &st) != 0)
205 if (st.st_size == 0) {
206 verify(sizeof(fh.magic) == sizeof(fh_magic));
207 memcpy(fh.magic, fh_magic, sizeof(fh.magic));
208 fh.version = FH_VERSION_0;
210 fh.num_entries = htonl(q->num_entries);
211 fh.entry_size = htonl(q->entry_size);
212 fh.s.queue_head = htonl(0);
213 fh.s.queue_length = htonl(0);
214 if (full_pwrite(q->fd, &fh, sizeof(fh), 0) != 0)
218 #ifdef HAVE_POSIX_FALLOCATE
219 if (posix_fallocate(q->fd, 0,
220 (q->num_entries + 1) * q->entry_size) != 0)
224 uint32_t file_entries;
225 if (full_pread(q->fd, &fh, sizeof(fh), 0) != 0)
227 if (memcmp(fh.magic, fh_magic, sizeof(fh.magic)) != 0
228 || fh.version != FH_VERSION_0 || fh.reserved != 0
229 || fh.entry_size != htonl(q->entry_size)) {
233 file_entries = ntohl(fh.num_entries);
234 if (file_entries > SIZE_MAX / q->entry_size - 1
235 || ((uintmax_t)st.st_size
236 != (file_entries + 1) * q->entry_size)) {
241 /* Note that this may change q->num_entries! */
242 q->num_entries = ntohl(fh.num_entries);
243 q->queue_head = ntohl(fh.s.queue_head);
244 q->queue_length = ntohl(fh.s.queue_length);
245 if (q->queue_head >= q->num_entries
246 || q->queue_length > q->num_entries) {
253 /* Like q_open(), but does not handle Q_RESIZE, and NUM_ENTRIES is only used
254 when creating a new file. */
255 static struct queue *q_open_no_resize(int q_flags, const char *path,
256 size_t num_entries, size_t entry_size)
261 if ((q_flags & (Q_IN_MEMORY | Q_IN_FILE)) == 0) {
265 if (num_entries == 0 || num_entries > UINT32_MAX
266 || entry_size < 1 /* for trailing NUL */
267 || entry_size < sizeof(struct file_header) /* for Q_IN_FILE */
268 /* to allocate "struct queue" including its buffer*/
269 || entry_size > UINT32_MAX - sizeof(struct queue)) {
273 if (entry_size > SIZE_MAX
274 || num_entries > SIZE_MAX / entry_size - 1 /* for Q_IN_FILE */
275 || num_entries > SIZE_MAX / sizeof(*q->memory)) {
280 q = malloc(sizeof(*q) + entry_size);
286 q->num_entries = num_entries;
287 q->entry_size = entry_size;
291 if ((q_flags & Q_IN_MEMORY) != 0) {
292 size_t sz = num_entries * sizeof(*q->memory);
294 q->memory = malloc(sz);
295 if (q->memory == NULL)
297 memset(q->memory, 0, sz);
300 if ((q_flags & Q_IN_FILE) != 0 && q_open_file(q, path) != 0)
315 void q_close(struct queue *q)
318 close(q->fd); /* Also releases the file lock */
319 if (q->memory != NULL) {
322 for (i = 0; i < q->num_entries; i++)
329 /* Internal use only: add DATA to Q, but don't update fh_state. */
330 static int q_append_no_sync_fh_state(struct queue *q, const char *data)
332 size_t data_size, entry_index;
335 if (q->queue_length == q->num_entries) {
340 data_size = strlen(data) + 1;
341 if (data_size > q->entry_size) {
346 entry_index = (q->queue_head + q->queue_length) % q->num_entries;
347 if (q->memory != NULL) {
348 if (q->memory[entry_index] != NULL) {
349 errno = EIO; /* This is _really_ unexpected. */
352 copy = malloc(data_size);
355 memcpy(copy, data, data_size);
362 offset = entry_offset(q, entry_index);
363 if (full_pwrite(q->fd, data, data_size, offset) != 0) {
375 q->memory[entry_index] = copy;
382 int q_append(struct queue *q, const char *data)
386 r = q_append_no_sync_fh_state(q, data);
390 return sync_fh_state(q); /* Calls q_sync() */
393 int q_peek(struct queue *q, char *buf, size_t size)
395 const unsigned char *data;
398 if (q->queue_length == 0)
401 if (q->memory != NULL && q->memory[q->queue_head] != NULL) {
402 data = q->memory[q->queue_head];
403 data_size = strlen((char *)data) + 1;
404 } else if (q->fd != -1) {
405 const unsigned char *end;
407 if (full_pread(q->fd, q->buffer, q->entry_size,
408 entry_offset(q, q->queue_head)) != 0)
411 end = memchr(q->buffer, '\0', q->entry_size);
413 /* FIXME: silently drop this entry? */
417 data_size = (end - data) + 1;
419 if (q->memory != NULL) {
422 copy = malloc(data_size);
423 if (copy != NULL) { /* Silently ignore failures. */
424 memcpy(copy, data, data_size);
425 q->memory[q->queue_head] = copy;
429 errno = EIO; /* This is _really_ unexpected. */
433 if (size < data_size) {
437 memcpy(buf, data, data_size);
441 /* Internal use only: drop head of Q, but don't write this into the file */
442 static int q_drop_head_memory_only(struct queue *q)
444 if (q->queue_length == 0) {
449 if (q->memory != NULL) {
450 free(q->memory[q->queue_head]);
451 q->memory[q->queue_head] = NULL;
455 if (q->queue_head == q->num_entries)
461 int q_drop_head(struct queue *q)
465 r = q_drop_head_memory_only(q);
469 return sync_fh_state(q); /* Calls q_sync() */
472 size_t q_queue_length(const struct queue *q)
474 return q->queue_length;
477 struct queue *q_open(int q_flags, const char *path, size_t num_entries,
480 struct queue *q, *q2;
481 char *tmp_path, *buf;
485 q = q_open_no_resize(q_flags, path, num_entries, entry_size);
486 if (q == NULL || q->num_entries == num_entries)
489 if ((q->flags & Q_RESIZE) == 0) {
490 saved_errno = EINVAL;
494 if (q->queue_length > num_entries) {
495 saved_errno = ENOSPC;
499 buf = malloc(entry_size);
505 path_len = strlen(path);
506 tmp_path = malloc(path_len + 7);
507 if (tmp_path == NULL) {
511 memcpy(tmp_path, path, path_len);
512 memcpy(tmp_path + path_len, "XXXXXX", 7);
513 /* We really want tmpnam() here (safe due to the Q_EXCL below), but gcc
514 warns on any use of tmpnam(). */
515 fd = mkstemp(tmp_path);
518 goto err_errno_tmp_path;
520 if (close(fd) != 0 || unlink(tmp_path) != 0) {
522 goto err_errno_tmp_file;
525 q2 = q_open_no_resize(q_flags | Q_CREAT | Q_EXCL, tmp_path, num_entries,
529 goto err_errno_tmp_file;
531 if (q2->num_entries != num_entries) {
532 errno = EIO; /* This is _really_ unexpected. */
539 r = q_peek(q, buf, entry_size);
545 if (q_append_no_sync_fh_state(q2, buf) != 0)
547 if (q_drop_head_memory_only(q) != 0)
550 if (sync_fh_state(q2) != 0)
553 if (rename(tmp_path, path) != 0)