#include <pulsecore/thread-mq.h>
#include <pulsecore/poll.h>
#include <pulsecore/rtpoll.h>
+#include <pulsecore/core-rtclock.h>
#include <pulsecore/time-smoother.h>
#include "raop-sink.h"
#include "raop-client.h"
#include "raop-util.h"
+#define UDP_TIMING_PACKET_LOSS_MAX (30 * PA_USEC_PER_SEC)
+#define UDP_TIMING_PACKET_DISCONNECT_CYCLE 3
+
struct userdata {
pa_core *core;
pa_module *module;
bool oob;
pa_raop_client *raop;
+ char *server;
pa_raop_protocol_t protocol;
pa_raop_encryption_t encryption;
pa_raop_codec_t codec;
+ bool autoreconnect;
+ /* if true, behaves like a null-sink when disconnected */
+ bool autonull;
size_t block_size;
+ pa_usec_t block_usec;
pa_memchunk memchunk;
pa_usec_t delay;
uint64_t write_count;
uint32_t latency;
+ /* Consider as first I/O thread iteration, can be switched to true in autoreconnect mode */
+ bool first;
};
enum {
- PA_SINK_MESSAGE_SET_RAOP_STATE = PA_SINK_MESSAGE_MAX
+ PA_SINK_MESSAGE_SET_RAOP_STATE = PA_SINK_MESSAGE_MAX,
+ PA_SINK_MESSAGE_DISCONNECT_REQUEST
};
static void userdata_free(struct userdata *u);
pa_assert(u->raop);
switch (code) {
+ /* Exception : for this message, we are in main thread, msg sent from the IO/thread
+ Done here, as alloc/free of rtsp_client is also done in this thread for other cases */
+ case PA_SINK_MESSAGE_DISCONNECT_REQUEST: {
+ if (u->sink->state == PA_SINK_RUNNING) {
+ /* Disconnect raop client, and restart the whole chain since
+ * the authentication token might be outdated */
+ pa_raop_client_disconnect(u->raop);
+ pa_raop_client_authenticate(u->raop, NULL);
+ }
+
+ return 0;
+ }
+
case PA_SINK_MESSAGE_GET_LATENCY: {
int64_t r = 0;
- if (pa_raop_client_can_stream(u->raop))
+ if (u->autonull || pa_raop_client_can_stream(u->raop))
r = sink_get_latency(u);
*((int64_t*) data) = r;
pa_module_unload_request(u->module, true);
}
+ if (u->autoreconnect && u->sink->state == PA_SINK_RUNNING) {
+ pa_usec_t now;
+ now = pa_rtclock_now();
+ pa_smoother_reset(u->smoother, now, false);
+
+ if (!pa_raop_client_is_alive(u->raop)) {
+ /* Connecting will trigger a RECORD and start steaming */
+ pa_raop_client_announce(u->raop);
+ }
+ }
+
return 0;
}
now = pa_rtclock_now();
u->write_count = 0;
u->start = now;
+ u->first = true;
pa_rtpoll_set_timer_absolute(u->rtpoll, now);
if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
u->rtpoll_item = NULL;
}
- if (u->sink->thread_info.state == PA_SINK_SUSPENDED)
+ if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
pa_rtpoll_set_timer_disabled(u->rtpoll);
- else if (u->sink->thread_info.state != PA_SINK_IDLE)
- pa_module_unload_request(u->module, true);
+
+ return 0;
+ }
+
+ if (u->autoreconnect) {
+ if (u->sink->thread_info.state != PA_SINK_IDLE) {
+ if (!u->autonull)
+ pa_rtpoll_set_timer_disabled(u->rtpoll);
+ pa_raop_client_authenticate(u->raop, NULL);
+ }
+ } else {
+ if (u->sink->thread_info.state != PA_SINK_IDLE)
+ pa_module_unload_request(u->module, true);
+ }
return 0;
}
now = pa_rtclock_now();
pa_smoother_reset(u->smoother, now, false);
+ /* If autonull is enabled, I/O thread is always eating chunks since
+ * it is emulating a null sink */
+ if (u->autonull) {
+ u->start = now;
+ u->write_count = 0;
+ u->first = true;
+ pa_rtpoll_set_timer_absolute(u->rtpoll, now);
+ }
+
if (!pa_raop_client_is_alive(u->raop)) {
- /* Connecting will trigger a RECORD and start steaming */
+ /* Connecting will trigger a RECORD and start streaming */
pa_raop_client_announce(u->raop);
} else if (!pa_raop_client_is_recording(u->raop)) {
/* RECORD alredy sent, simply start streaming */
static void thread_func(void *userdata) {
struct userdata *u = userdata;
size_t offset = 0;
+ pa_usec_t last_timing;
+ uint32_t check_timing_count;
pa_assert(u);
uint64_t position;
size_t index;
int ret;
+ bool canstream, sendstream, on_timeout;
/* Polling (audio data + control socket + timing socket). */
if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
pa_sink_process_rewind(u->sink, 0);
}
+ on_timeout = pa_rtpoll_timer_elapsed(u->rtpoll);
if (u->rtpoll_item) {
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, &nbfds);
/* If !oob: streaming driven by pollds (POLLOUT) */
}
/* if oob: streaming managed by timing, pollfd for oob sockets */
- if (pollfd && u->oob && !pa_rtpoll_timer_elapsed(u->rtpoll)) {
+ if (pollfd && u->oob && !on_timeout) {
uint8_t packet[32];
ssize_t read;
for (i = 0; i < nbfds; i++) {
if (pollfd->revents & POLLERR) {
+ if (u->autoreconnect && pa_raop_client_is_alive(u->raop)) {
+ pollfd->revents = 0;
+ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink),
+ PA_SINK_MESSAGE_DISCONNECT_REQUEST, 0, 0, NULL, NULL);
+ continue;
+ }
+
/* one of UDP fds is in faulty state, may have been disconnected, this is fatal */
goto fail;
}
pollfd->revents = 0;
read = pa_read(pollfd->fd, packet, sizeof(packet), NULL);
pa_raop_client_handle_oob_packet(u->raop, pollfd->fd, packet, read);
+ if (pa_raop_client_is_timing_fd(u->raop, pollfd->fd)) {
+ last_timing = pa_rtclock_now();
+ check_timing_count = 1;
+ }
}
pollfd++;
}
}
- if (u->sink->thread_info.state != PA_SINK_RUNNING)
- continue;
- if (!pa_raop_client_can_stream(u->raop))
+ if (u->sink->thread_info.state != PA_SINK_RUNNING) {
continue;
+ }
- /* This assertion is meant to silence a complaint from Coverity about
- * pollfd being possibly NULL when we access it later. That's a false
- * positive, because we check pa_raop_client_can_stream() above, and if
- * that returns true, it means that the connection is up, and when the
- * connection is up, pollfd will be non-NULL. */
- pa_assert(pollfd);
+ if (u->first) {
+ last_timing = 0;
+ check_timing_count = 1;
+ intvl = 0;
+ u->first = false;
+ }
- if (u->memchunk.length <= 0) {
- if (u->memchunk.memblock)
- pa_memblock_unref(u->memchunk.memblock);
- pa_memchunk_reset(&u->memchunk);
+ canstream = pa_raop_client_can_stream(u->raop);
+ now = pa_rtclock_now();
+
+ if (u->oob && u->autoreconnect && on_timeout) {
+ if (!canstream) {
+ last_timing = 0;
+ } else if (last_timing != 0) {
+ pa_usec_t since = now - last_timing;
+ /* Incoming Timing packets should be received every 3 seconds in UDP mode
+ according to raop specifications.
+ Here we disconnect if no packet received since UDP_TIMING_PACKET_LOSS_MAX seconds
+ We only detect timing packet requests interruptions (we do nothing if no packet received at all), since some clients do not implement RTCP Timing requests at all */
+
+ if (since > (UDP_TIMING_PACKET_LOSS_MAX/UDP_TIMING_PACKET_DISCONNECT_CYCLE)*check_timing_count) {
+ if (check_timing_count < UDP_TIMING_PACKET_DISCONNECT_CYCLE) {
+ uint32_t since_in_sec = since / PA_USEC_PER_SEC;
+ pa_log_warn(
+ "UDP Timing Packets Warn #%d/%d- Nothing received since %d seconds from %s",
+ check_timing_count,
+ UDP_TIMING_PACKET_DISCONNECT_CYCLE-1, since_in_sec, u->server);
+ check_timing_count++;
+ } else {
+ /* Limit reached, then request disconnect */
+ check_timing_count = 1;
+ last_timing = 0;
+ if (pa_raop_client_is_alive(u->raop)) {
+ pa_log_warn("UDP Timing Packets Warn limit reached - Requesting reconnect");
+ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink),
+ PA_SINK_MESSAGE_DISCONNECT_REQUEST, 0, 0, NULL, NULL);
+ continue;
+ }
+ }
+ }
+ }
+ }
- /* Grab unencoded audio data from PulseAudio */
- pa_sink_render_full(u->sink, u->block_size, &u->memchunk);
- offset = u->memchunk.index;
+ if (!u->autonull) {
+ if (!canstream) {
+ pa_log_debug("Can't stream, connection not established yet...");
+ continue;
+ }
+ /* This assertion is meant to silence a complaint from Coverity about
+ * pollfd being possibly NULL when we access it later. That's a false
+ * positive, because we check pa_raop_client_can_stream() above, and if
+ * that returns true, it means that the connection is up, and when the
+ * connection is up, pollfd will be non-NULL. */
+ pa_assert(pollfd);
}
- pa_assert(u->memchunk.length > 0);
-
- index = u->memchunk.index;
- if (pa_raop_client_send_audio_packet(u->raop, &u->memchunk, offset) < 0) {
- if (errno == EINTR) {
- /* Just try again. */
- pa_log_debug("Failed to write data to FIFO (EINTR), retrying");
- goto fail;
- } else if (errno != EAGAIN && !u->oob) {
- /* Buffer is full, wait for POLLOUT. */
- pollfd->events = POLLOUT;
- pollfd->revents = 0;
- } else {
- pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
- goto fail;
+ if (u->memchunk.length <= 0) {
+ if (intvl < now + u->block_usec) {
+ if (u->memchunk.memblock)
+ pa_memblock_unref(u->memchunk.memblock);
+ pa_memchunk_reset(&u->memchunk);
+
+ /* Grab unencoded audio data from PulseAudio */
+ pa_sink_render_full(u->sink, u->block_size, &u->memchunk);
+ offset = u->memchunk.index;
}
- } else {
- u->write_count += (uint64_t) u->memchunk.index - (uint64_t) index;
- position = u->write_count - pa_usec_to_bytes(u->delay, &u->sink->sample_spec);
+ }
- now = pa_rtclock_now();
- estimated = pa_bytes_to_usec(position, &u->sink->sample_spec);
- pa_smoother_put(u->smoother, now, estimated);
-
- if (u->oob && !pollfd->revents) {
- /* Sleep until next packet transmission */
- intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
- pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
- } else if (!u->oob) {
- if (u->memchunk.length > 0) {
- pollfd->events = POLLOUT;
- pollfd->revents = 0;
+ if (u->memchunk.length > 0) {
+ index = u->memchunk.index;
+ sendstream = !u->autonull || (u->autonull && canstream);
+ if (sendstream && pa_raop_client_send_audio_packet(u->raop, &u->memchunk, offset) < 0) {
+ if (errno == EINTR) {
+ /* Just try again. */
+ pa_log_debug("Failed to write data to FIFO (EINTR), retrying");
+ if (u->autoreconnect) {
+ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_DISCONNECT_REQUEST,
+ 0, 0, NULL, NULL);
+ continue;
+ } else
+ goto fail;
+ } else if (errno != EAGAIN && !u->oob) {
+ /* Buffer is full, wait for POLLOUT. */
+ if (!u->oob) {
+ pollfd->events = POLLOUT;
+ pollfd->revents = 0;
+ }
} else {
+ pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
+ if (u->autoreconnect) {
+ pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_DISCONNECT_REQUEST,
+ 0, 0, NULL, NULL);
+ continue;
+ } else
+ goto fail;
+ }
+ } else {
+ if (sendstream) {
+ u->write_count += (uint64_t) u->memchunk.index - (uint64_t) index;
+ } else {
+ u->write_count += u->memchunk.length;
+ u->memchunk.length = 0;
+ }
+ position = u->write_count - pa_usec_to_bytes(u->delay, &u->sink->sample_spec);
+
+ now = pa_rtclock_now();
+ estimated = pa_bytes_to_usec(position, &u->sink->sample_spec);
+ pa_smoother_put(u->smoother, now, estimated);
+
+ if ((u->autonull && !canstream) || (u->oob && canstream && on_timeout)) {
+ /* Sleep until next packet transmission */
intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
- pollfd->revents = 0;
- pollfd->events = 0;
+ } else if (!u->oob) {
+ if (u->memchunk.length > 0) {
+ pollfd->events = POLLOUT;
+ pollfd->revents = 0;
+ } else {
+ intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
+ pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
+ pollfd->revents = 0;
+ pollfd->events = 0;
+ }
}
}
}
u->rtpoll = pa_rtpoll_new();
u->rtpoll_item = NULL;
u->latency = RAOP_DEFAULT_LATENCY;
+ u->autoreconnect = false;
+ u->server = pa_xstrdup(server);
+
+ if (pa_modargs_get_value_boolean(ma, "autoreconnect", &u->autoreconnect) < 0) {
+ pa_log("Failed to parse autoreconnect argument");
+ goto fail;
+ }
+ /* Linked for now, potentially ready for additional parameter */
+ u->autonull = u->autoreconnect;
if (pa_modargs_get_value_u32(ma, "latency_msec", &u->latency) < 0) {
pa_log("Failed to parse latency_msec argument");
pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
pa_sink_set_rtpoll(u->sink, u->rtpoll);
- u->raop = pa_raop_client_new(u->core, server, u->protocol, u->encryption, u->codec);
+ u->raop = pa_raop_client_new(u->core, server, u->protocol, u->encryption, u->codec, u->autoreconnect);
if (!(u->raop)) {
pa_log("Failed to create RAOP client object");
pa_raop_client_get_frames_per_block(u->raop, &u->block_size);
u->block_size *= pa_frame_size(&ss);
pa_sink_set_max_request(u->sink, u->block_size);
+ u->block_usec = pa_bytes_to_usec(u->block_size, &u->sink->sample_spec);
pa_raop_client_set_state_callback(u->raop, raop_state_cb, u);
if (u->card)
pa_card_free(u->card);
+ if (u->server)
+ pa_xfree(u->server);
pa_xfree(u);
}