static bool pipeTraffic(nsjconf_t* nsjconf, int listenfd) {
std::vector<struct pollfd> fds;
- fds.reserve(nsjconf->pipes.size() * 2 + 1);
+ fds.reserve(nsjconf->pipes.size() * 3 + 1);
for (const auto& p : nsjconf->pipes) {
fds.push_back({
- .fd = p.first,
- .events = POLLIN,
+ .fd = p.sock_fd,
+ .events = POLLIN | POLLOUT,
.revents = 0,
});
fds.push_back({
- .fd = p.second,
+ .fd = p.pipe_in,
.events = POLLOUT,
.revents = 0,
});
+ fds.push_back({
+ .fd = p.pipe_out,
+ .events = POLLIN,
+ .revents = 0,
+ });
}
fds.push_back({
.fd = listenfd,
return true;
}
bool cleanup = false;
- for (size_t i = 0; i < fds.size() - 1; i += 2) {
- bool read_ready = fds[i].events == 0 || (fds[i].revents & POLLIN) == POLLIN;
- bool write_ready =
- fds[i + 1].events == 0 || (fds[i + 1].revents & POLLOUT) == POLLOUT;
- bool pair_closed = (fds[i].revents & (POLLHUP | POLLERR)) != 0 ||
- (fds[i + 1].revents & (POLLHUP | POLLERR)) != 0;
- if (read_ready && write_ready) {
- LOG_D("Read+write ready on %ld", i / 2);
- ssize_t rv = splice(fds[i].fd, nullptr, fds[i + 1].fd, nullptr,
- 4096, SPLICE_F_NONBLOCK);
- if (rv == -1 && errno != EAGAIN) {
- PLOG_E("splice fd pair #%ld {%d, %d}\n", i / 2, fds[i].fd,
- fds[i + 1].fd);
+ for (size_t i = 0; i < fds.size() - 1; ++i) {
+ if (fds[i].revents & POLLIN) {
+ fds[i].events &= ~POLLIN;
+ }
+ if (fds[i].revents & POLLOUT) {
+ fds[i].events &= ~POLLOUT;
+ }
+ }
+ for (size_t i = 0; i < fds.size() - 3; i += 3) {
+ const size_t pipe_no = i / 3;
+ int in, out;
+ const char* direction;
+ bool closed = false;
+ std::tuple<int, int, const char*> direction_map[] = {
+ {i, i + 1, "in"}, {i + 2, i, "out"}};
+ for (const auto& entry : direction_map) {
+ std::tie(in, out, direction) = entry;
+ bool in_ready = (fds[in].events & POLLIN) == 0 ||
+ (fds[in].revents & POLLIN) == POLLIN;
+ bool out_ready = (fds[out].events & POLLOUT) == 0 ||
+ (fds[out].revents & POLLOUT) == POLLOUT;
+ if (in_ready && out_ready) {
+ LOG_D("#%ld piping data %s", pipe_no, direction);
+ ssize_t rv = splice(fds[in].fd, nullptr, fds[out].fd,
+ nullptr, 4096, SPLICE_F_NONBLOCK);
+ if (rv == -1 && errno != EAGAIN) {
+ PLOG_E("splice fd pair #%ld {%d, %d}\n", pipe_no,
+ fds[in].fd, fds[out].fd);
+ }
+ if (rv == 0) {
+ closed = true;
+ }
+ fds[in].events |= POLLIN;
+ fds[out].events |= POLLOUT;
}
- if (rv == 0) {
- pair_closed = true;
+ if ((fds[in].revents & (POLLERR | POLLHUP)) != 0 ||
+ (fds[out].revents & (POLLERR | POLLHUP)) != 0) {
+ closed = true;
}
- fds[i].events = POLLIN;
- fds[i + 1].events = POLLOUT;
- } else if (read_ready) {
- LOG_D("Read ready on %ld", i / 2);
- fds[i].events = 0;
- } else if (write_ready) {
- LOG_D("Write ready on %ld", i / 2);
- fds[i + 1].events = 0;
}
- if (pair_closed) {
- LOG_D("Hangup on %ld", i / 2);
+ if (closed) {
+ LOG_D("#%ld connection closed", pipe_no);
cleanup = true;
- close(fds[i].fd);
- close(fds[i + 1].fd);
- nsjconf->pipes[i / 2] = {0, 0};
+ close(nsjconf->pipes[pipe_no].sock_fd);
+ close(nsjconf->pipes[pipe_no].pipe_in);
+ close(nsjconf->pipes[pipe_no].pipe_out);
+ nsjconf->pipes[pipe_no] = {};
}
}
if (cleanup) {
break;
}
}
- nsjconf->pipes.erase(
- std::remove(nsjconf->pipes.begin(), nsjconf->pipes.end(), std::pair<int, int>(0, 0)),
+ nsjconf->pipes.erase(std::remove(nsjconf->pipes.begin(), nsjconf->pipes.end(), pipemap_t{}),
nsjconf->pipes.end());
return false;
}
PLOG_E("pipe");
continue;
}
- nsjconf->pipes.emplace_back(connfd, in[1]);
- nsjconf->pipes.emplace_back(out[0], connfd);
+ nsjconf->pipes.push_back(
+ {.sock_fd = connfd, .pipe_in = in[1], .pipe_out = out[0]});
subproc::runChild(nsjconf, connfd, in[0], out[1], out[1]);
close(in[0]);
close(out[1]);