struct mainloop_source_io *s;
for (s = idxset_first(m->io_sources, &index); s; s = idxset_next(m->io_sources, &index)) {
- if (s->header.dead || !s->events || !s->pollfd || !s->pollfd->revents)
+ if (s->header.dead || !s->pollfd || !s->pollfd->revents)
continue;
- assert(s->pollfd->revents <= s->pollfd->events && s->pollfd->fd == s->fd && s->callback);
- s->callback(&m->api, s, s->fd, ((s->pollfd->revents & POLLIN) ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | ((s->pollfd->revents & POLLOUT) ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), s->userdata);
+ assert(s->pollfd->fd == s->fd && s->callback);
+ s->callback(&m->api, s, s->fd,
+ ((s->pollfd->revents & (POLLIN|POLLHUP|POLLERR)) ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) |
+ ((s->pollfd->revents & POLLOUT) ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), s->userdata);
+ s->pollfd->revents = 0;
}
}
if (tmp == 0)
return 0;
- else if (tmp < t)
+ else if (t == -1 || tmp < t)
t = tmp;
}
s->header.dead = 1;
m->io_sources_scan_dead = 1;
+ m->rebuild_pollfds = 1;
}
/* Fixed sources */
mainloop_api->quit(mainloop_api, 1);
}
-static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) {
+static void do_write(size_t length) {
size_t l;
- assert(s && length);
-
- mainloop_api->enable_io(mainloop_api, stdin_source, PA_STREAM_PLAYBACK);
-
- if (!buffer)
- return;
-
- assert(buffer_length);
+ assert(buffer && buffer_length);
l = length;
if (l > buffer_length)
l = buffer_length;
- pa_stream_write(s, buffer+buffer_index, l);
+ pa_stream_write(stream, buffer+buffer_index, l);
buffer_length -= l;
buffer_index += l;
}
}
+static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) {
+ assert(s && length);
+
+ mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_INPUT);
+
+ if (!buffer)
+ return;
+
+ do_write(length);
+}
+
static void stream_complete_callback(struct pa_context*c, struct pa_stream *s, void *userdata) {
assert(c);
if (!s) {
fprintf(stderr, "Stream creation failed.\n");
mainloop_api->quit(mainloop_api, 1);
+ return;
}
stream = s;
}
static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
- size_t l;
+ size_t l, w = 0;
ssize_t r;
assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT);
return;
}
- if (!(l = pa_stream_writable_size(stream)))
+ if (!stream || !(l = w = pa_stream_writable_size(stream)))
l = 4096;
buffer = malloc(l);
assert(buffer);
buffer_length = r;
buffer_index = 0;
+
+ if (w)
+ do_write(w);
}
int main(int argc, char *argv[]) {
static void reply_info_free(struct reply_info *r) {
assert(r && r->pdispatch && r->pdispatch->mainloop);
- r->pdispatch->mainloop->cancel_time(r->pdispatch->mainloop, r->mainloop_timeout);
+
+ if (r->pdispatch)
+ r->pdispatch->mainloop->cancel_time(r->pdispatch->mainloop, r->mainloop_timeout);
if (r->previous)
r->previous->next = r->next;
if (ts)
tagstruct_free(ts);
- fprintf(stderr, "protocol-native: invalid packet.\n");
return -1;
}
#define DEFAULT_QUEUE_LENGTH 10240
#define DEFAULT_MAX_LENGTH 20480
#define DEFAULT_PREBUF 4096
-#define DEFAULT_TIMEOUT 5
+#define DEFAULT_TIMEOUT (5*60)
#define DEFAULT_SERVER "/tmp/polypaudio_native"
struct pa_context {
struct pa_stream {
struct pa_context *context;
struct pa_stream *next, *previous;
+ uint32_t device_index;
uint32_t channel;
int channel_valid;
enum pa_stream_direction direction;
struct pa_stream *s;
struct pa_context *c = userdata;
uint32_t bytes, channel;
- assert(pd && command == PA_COMMAND_REQUEST && t && s);
+ assert(pd && command == PA_COMMAND_REQUEST && t && c);
if (tagstruct_getu32(t, &channel) < 0 ||
tagstruct_getu32(t, &bytes) < 0 ||
- tagstruct_eof(t)) {
+ !tagstruct_eof(t)) {
c->errno = PA_ERROR_PROTOCOL;
return -1;
}
return -1;
}
+ fprintf(stderr, "Requested %u bytes\n", bytes);
+
s->requested_bytes += bytes;
if (s->requested_bytes && s->write_callback)
}
if (tagstruct_getu32(t, &s->channel) < 0 ||
- tagstruct_eof(t)) {
+ tagstruct_getu32(t, &s->device_index) < 0 ||
+ !tagstruct_eof(t)) {
s->context->errno = PA_ERROR_PROTOCOL;
ret = -1;
goto fail;
s->requested_bytes = 0;
s->channel = 0;
s->channel_valid = 0;
+ s->device_index = (uint32_t) -1;
s->direction = dir;
t = tagstruct_new(NULL, 0);
chunk.length = length;
pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
+ memblock_unref(chunk.memblock);
+ fprintf(stderr, "Sent %u bytes\n", length);
+
if (length < s->requested_bytes)
s->requested_bytes -= length;
else
size_t qlength;
struct sink_input *sink_input;
struct memblockq *memblockq;
+ size_t requested_bytes;
};
struct connection {
static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct pa_sample_spec *ss, const char *name, size_t qlen, size_t maxlength, size_t prebuf) {
struct playback_stream *s;
- assert(c && sink && s && name && qlen && maxlength && prebuf);
+ assert(c && sink && ss && name && qlen && maxlength && prebuf);
s = malloc(sizeof(struct playback_stream));
assert (s);
s->memblockq = memblockq_new(maxlength, pa_sample_size(ss), prebuf);
assert(s->memblockq);
+ s->requested_bytes = 0;
+
idxset_put(c->playback_streams, s, &s->index);
- request_bytes(s);
return s;
}
if (!(l = memblockq_missing_to(s->memblockq, s->qlength)))
return;
+ if (l <= s->requested_bytes)
+ return;
+
+ l -= s->requested_bytes;
+ s->requested_bytes += l;
+
t = tagstruct_new(NULL, 0);
assert(t);
tagstruct_putu32(t, PA_COMMAND_REQUEST);
+ tagstruct_putu32(t, (uint32_t) -1); /* tag */
tagstruct_putu32(t, s->index);
tagstruct_putu32(t, l);
pstream_send_tagstruct(s->connection->pstream, t);
+
+ fprintf(stderr, "Requesting %u bytes\n", l);
}
/*** sinkinput callbacks ***/
assert(s->sink_input);
tagstruct_putu32(reply, s->sink_input->index);
pstream_send_tagstruct(c->pstream, reply);
+ request_bytes(s);
return 0;
}
/*** pstream callbacks ***/
-
static int packet_callback(struct pstream *p, struct packet *packet, void *userdata) {
struct connection *c = userdata;
assert(p && packet && packet->data && c);
return -1;
}
+ if (chunk->length >= stream->requested_bytes)
+ stream->requested_bytes = 0;
+ else
+ stream->requested_bytes -= chunk->length;
+
memblockq_push(stream->memblockq, chunk, delta);
assert(stream->sink_input);
sink_notify(stream->sink_input->sink);
+ fprintf(stderr, "Recieved %u bytes.\n", chunk->length);
+
return 0;
}
struct item_info *i = item;
assert(i);
- if (i->type == PSTREAM_ITEM_PACKET) {
+ if (i->type == PSTREAM_ITEM_MEMBLOCK) {
assert(i->chunk.memblock);
memblock_unref(i->chunk.memblock);
} else {
- assert(i->type == PSTREAM_ITEM_MEMBLOCK);
+ assert(i->type == PSTREAM_ITEM_PACKET);
assert(i->packet);
packet_unref(i->packet);
}
void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk) {
struct item_info *i;
- assert(p && channel && chunk);
+ assert(p && channel != (uint32_t) -1 && chunk);
i = malloc(sizeof(struct item_info));
assert(i);
l = PSTREAM_DESCRIPTOR_SIZE - p->write.index;
} else {
d = (void*) p->write.data + p->write.index - PSTREAM_DESCRIPTOR_SIZE;
- l = ntohl(p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) - p->write.index - PSTREAM_DESCRIPTOR_SIZE;
+ l = ntohl(p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PSTREAM_DESCRIPTOR_SIZE);
}
if ((r = iochannel_write(p->io, d, l)) < 0)
} else {
assert(p->read.data);
d = (void*) p->read.data + p->read.index - PSTREAM_DESCRIPTOR_SIZE;
- l = ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) - p->read.index - PSTREAM_DESCRIPTOR_SIZE;
+ l = ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PSTREAM_DESCRIPTOR_SIZE);
}
if ((r = iochannel_read(p->io, d, l)) <= 0)
return -1;
error = 1;
- for (n = 0, c = (char*) (t->data+t->rindex+1); n < t->length-t->rindex-1; c++)
+ for (n = 0, c = (char*) (t->data+t->rindex+1); t->rindex+1+n < t->length; n++, c++)
if (!*c) {
error = 0;
break;
*s = (char*) (t->data+t->rindex+1);
- t->rindex += n+1;
+ t->rindex += n+2;
return 0;
}