}
}
-static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
+void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
{
BUG_ON(!con_flag_valid(con_flag));
clear_bit(con_flag, &con->flags);
}
-static void con_flag_set(struct ceph_connection *con, unsigned long con_flag)
+void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag)
{
BUG_ON(!con_flag_valid(con_flag));
set_bit(con_flag, &con->flags);
}
-static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag)
+bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag)
{
BUG_ON(!con_flag_valid(con_flag));
return test_bit(con_flag, &con->flags);
}
-static bool con_flag_test_and_clear(struct ceph_connection *con,
- unsigned long con_flag)
+bool ceph_con_flag_test_and_clear(struct ceph_connection *con,
+ unsigned long con_flag)
{
BUG_ON(!con_flag_valid(con_flag));
return test_and_clear_bit(con_flag, &con->flags);
}
-static bool con_flag_test_and_set(struct ceph_connection *con,
- unsigned long con_flag)
+bool ceph_con_flag_test_and_set(struct ceph_connection *con,
+ unsigned long con_flag)
{
BUG_ON(!con_flag_valid(con_flag));
}
EXPORT_SYMBOL(ceph_pr_addr);
-static void encode_my_addr(struct ceph_messenger *msgr)
+void ceph_encode_my_addr(struct ceph_messenger *msgr)
{
memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr));
ceph_encode_banner_addr(&msgr->my_enc_addr);
* buffer. See net/ipv4/tcp_input.c:tcp_check_space()
* and net/core/stream.c:sk_stream_write_space().
*/
- if (con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) {
+ if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) {
if (sk_stream_is_writeable(sk)) {
dout("%s %p queueing write work\n", __func__, con);
clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
case TCP_CLOSE_WAIT:
dout("%s TCP_CLOSE_WAIT\n", __func__);
con_sock_state_closing(con);
- con_flag_set(con, CEPH_CON_F_SOCK_CLOSED);
+ ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED);
queue_con(con);
break;
case TCP_ESTABLISHED:
/*
* initiate connection to a remote socket.
*/
-static int ceph_tcp_connect(struct ceph_connection *con)
+int ceph_tcp_connect(struct ceph_connection *con)
{
struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */
struct socket *sock;
unsigned int noio_flag;
int ret;
+ dout("%s con %p peer_addr %s\n", __func__, con,
+ ceph_pr_addr(&con->peer_addr));
BUG_ON(con->sock);
/* sock_create_kern() allocates with GFP_KERNEL */
set_sock_callbacks(sock, con);
- dout("connect %s\n", ceph_pr_addr(&con->peer_addr));
-
con_sock_state_connecting(con);
ret = sock->ops->connect(sock, (struct sockaddr *)&ss, sizeof(ss),
O_NONBLOCK);
/*
* Shutdown/close the socket for the given connection.
*/
-static int con_close_socket(struct ceph_connection *con)
+int ceph_con_close_socket(struct ceph_connection *con)
{
int rc = 0;
- dout("con_close_socket on %p sock %p\n", con, con->sock);
+ dout("%s con %p sock %p\n", __func__, con, con->sock);
if (con->sock) {
rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
sock_release(con->sock);
* received a socket close event before we had the chance to
* shut the socket down.
*/
- con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED);
+ ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED);
con_sock_state_closed(con);
return rc;
{
dout("%s con %p\n", __func__, con);
- con_close_socket(con);
+ ceph_con_close_socket(con);
if (con->in_msg) {
WARN_ON(con->in_msg->con != con);
ceph_msg_put(con->in_msg);
}
}
-static void ceph_con_reset_session(struct ceph_connection *con)
+void ceph_con_reset_session(struct ceph_connection *con)
{
dout("%s con %p\n", __func__, con);
dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr));
con->state = CEPH_CON_S_CLOSED;
- con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next connect */
- con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING);
- con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
- con_flag_clear(con, CEPH_CON_F_BACKOFF);
+ ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next
+ connect */
+ ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING);
+ ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
+ ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF);
ceph_con_reset_protocol(con);
ceph_con_reset_session(con);
* We maintain a global counter to order connection attempts. Get
* a unique seq greater than @gt.
*/
-static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
+u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt)
{
u32 ret;
/*
* Discard messages that have been acked by the server.
*/
-static void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
+void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
{
struct ceph_msg *msg;
u64 seq;
* reconnect_seq. This avoids gratuitously resending messages that
* the server had received and handled prior to reconnect.
*/
-static void ceph_con_discard_requeued(struct ceph_connection *con,
- u64 reconnect_seq)
+void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
{
struct ceph_msg *msg;
u64 seq;
cursor->need_crc = true;
}
-static void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
- struct ceph_msg *msg, size_t length)
+void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
+ struct ceph_msg *msg, size_t length)
{
BUG_ON(!length);
BUG_ON(length > msg->data_length);
* data item, and supply the page offset and length of that piece.
* Indicate whether this is the last piece in this data item.
*/
-static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
- size_t *page_offset, size_t *length,
- bool *last_piece)
+struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length,
+ bool *last_piece)
{
struct page *page;
* Returns true if the result moves the cursor on to the next piece
* of the data item.
*/
-static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
- size_t bytes)
+void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
{
bool new_piece;
con->out_msg_done = true;
}
-static void ceph_con_get_out_msg(struct ceph_connection *con);
-
/*
* Prepare headers for the next outgoing message.
*/
prepare_write_message_footer(con);
}
- con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
+ ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
}
/*
&con->out_temp_ack);
con->out_more = 1; /* more will follow.. eventually.. */
- con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
+ ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
}
/*
con_out_kvec_add(con, sizeof (con->out_temp_ack),
&con->out_temp_ack);
- con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
+ ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
}
/*
} else {
con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
}
- con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
+ ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
}
/*
&con->msgr->my_enc_addr);
con->out_more = 0;
- con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
+ ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
}
static void __prepare_write_connect(struct ceph_connection *con)
con->auth->authorizer_buf);
con->out_more = 0;
- con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
+ ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
}
static int prepare_write_connect(struct ceph_connection *con)
{
- unsigned int global_seq = get_global_seq(con->msgr, 0);
+ unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
int proto;
int ret;
return ret; /* done! */
}
-static u32 ceph_crc32c_page(u32 crc, struct page *page,
- unsigned int page_offset,
- unsigned int length)
+u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset,
+ unsigned int length)
{
char *kaddr;
return 0;
}
-static bool addr_is_blank(struct ceph_entity_addr *addr)
+bool ceph_addr_is_blank(const struct ceph_entity_addr *addr)
{
struct sockaddr_storage ss = addr->in_addr; /* align */
struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr;
}
}
-static int addr_port(struct ceph_entity_addr *addr)
+int ceph_addr_port(const struct ceph_entity_addr *addr)
{
switch (get_unaligned(&addr->in_addr.ss_family)) {
case AF_INET:
return 0;
}
-static void addr_set_port(struct ceph_entity_addr *addr, int p)
+void ceph_addr_set_port(struct ceph_entity_addr *addr, int p)
{
switch (get_unaligned(&addr->in_addr.ss_family)) {
case AF_INET:
port = CEPH_MON_PORT;
}
- addr_set_port(&addr[i], port);
+ ceph_addr_set_port(&addr[i], port);
addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
dout("parse_ips got %s\n", ceph_pr_addr(&addr[i]));
*/
if (memcmp(&con->peer_addr, &con->actual_peer_addr,
sizeof(con->peer_addr)) != 0 &&
- !(addr_is_blank(&con->actual_peer_addr) &&
+ !(ceph_addr_is_blank(&con->actual_peer_addr) &&
con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
pr_warn("wrong peer, want %s/%u, got %s/%u\n",
ceph_pr_addr(&con->peer_addr),
/*
* did we learn our address?
*/
- if (addr_is_blank(my_addr)) {
+ if (ceph_addr_is_blank(my_addr)) {
memcpy(&my_addr->in_addr,
&con->peer_addr_for_me.in_addr,
sizeof(con->peer_addr_for_me.in_addr));
- addr_set_port(my_addr, 0);
- encode_my_addr(con->msgr);
+ ceph_addr_set_port(my_addr, 0);
+ ceph_encode_my_addr(con->msgr);
dout("process_banner learned my addr is %s\n",
ceph_pr_addr(my_addr));
}
dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
con->peer_global_seq,
le32_to_cpu(con->in_reply.global_seq));
- get_global_seq(con->msgr,
- le32_to_cpu(con->in_reply.global_seq));
+ ceph_get_global_seq(con->msgr,
+ le32_to_cpu(con->in_reply.global_seq));
con_out_kvec_reset(con);
ret = prepare_write_connect(con);
if (ret < 0)
le32_to_cpu(con->in_reply.connect_seq));
if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
- con_flag_set(con, CEPH_CON_F_LOSSYTX);
+ ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
con->delay = 0; /* reset backoff memory */
/*
* read (part of) a message.
*/
-static int ceph_con_in_msg_alloc(struct ceph_connection *con,
- struct ceph_msg_header *hdr, int *skip);
-
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
* be careful not to do anything that waits on other incoming messages or it
* may deadlock.
*/
-static void process_message(struct ceph_connection *con)
+void ceph_con_process_message(struct ceph_connection *con)
{
struct ceph_msg *msg = con->in_msg;
do_next:
if (con->state == CEPH_CON_S_OPEN) {
- if (con_flag_test_and_clear(con,
+ if (ceph_con_flag_test_and_clear(con,
CEPH_CON_F_KEEPALIVE_PENDING)) {
prepare_write_keepalive(con);
goto more;
}
/* Nothing to do! */
- con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
+ ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
dout("try_write nothing else to write.\n");
ret = 0;
out:
prepare_read_keepalive_ack(con);
break;
case CEPH_MSGR_TAG_CLOSE:
- con_close_socket(con);
+ ceph_con_close_socket(con);
con->state = CEPH_CON_S_CLOSED;
goto out;
default:
}
if (con->in_tag == CEPH_MSGR_TAG_READY)
goto more;
- process_message(con);
+ ceph_con_process_message(con);
if (con->state == CEPH_CON_S_OPEN)
prepare_read_tag(con);
goto more;
static bool con_sock_closed(struct ceph_connection *con)
{
- if (!con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED))
+ if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED))
return false;
#define CASE(x) \
{
int ret;
- if (!con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF))
+ if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF))
return false;
ret = queue_con_delay(con, con->delay);
dout("%s: con %p FAILED to back off %lu\n", __func__,
con, con->delay);
BUG_ON(ret == -ENOENT);
- con_flag_set(con, CEPH_CON_F_BACKOFF);
+ ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
}
return true;
ceph_con_reset_protocol(con);
- if (con_flag_test(con, CEPH_CON_F_LOSSYTX)) {
+ if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) {
dout("fault on LOSSYTX channel, marking CLOSED\n");
con->state = CEPH_CON_S_CLOSED;
return;
/* If there are no messages queued or keepalive pending, place
* the connection in a STANDBY state */
if (list_empty(&con->out_queue) &&
- !con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
+ !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
- con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
+ ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
con->state = CEPH_CON_S_STANDBY;
} else {
/* retry after a delay. */
if (con->delay > MAX_DELAY_INTERVAL)
con->delay = MAX_DELAY_INTERVAL;
}
- con_flag_set(con, CEPH_CON_F_BACKOFF);
+ ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
queue_con(con);
}
}
{
u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000;
msgr->inst.addr.nonce = cpu_to_le32(nonce);
- encode_my_addr(msgr);
+ ceph_encode_my_addr(msgr);
}
/*
if (myaddr) {
memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr,
sizeof(msgr->inst.addr.in_addr));
- addr_set_port(&msgr->inst.addr, 0);
+ ceph_addr_set_port(&msgr->inst.addr, 0);
}
msgr->inst.addr.type = 0;
get_random_bytes(&msgr->inst.addr.nonce,
sizeof(msgr->inst.addr.nonce));
} while (!msgr->inst.addr.nonce);
- encode_my_addr(msgr);
+ ceph_encode_my_addr(msgr);
atomic_set(&msgr->stopping, 0);
write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
dout("clear_standby %p and ++connect_seq\n", con);
con->state = CEPH_CON_S_PREOPEN;
con->connect_seq++;
- WARN_ON(con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
- WARN_ON(con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
+ WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
+ WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
}
}
/* if there wasn't anything waiting to send before, queue
* new work */
- if (con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING) == 0)
+ if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
queue_con(con);
}
EXPORT_SYMBOL(ceph_con_send);
dout("con_keepalive %p\n", con);
mutex_lock(&con->mutex);
clear_standby(con);
- con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING);
+ ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING);
mutex_unlock(&con->mutex);
- if (con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING) == 0)
+ if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
queue_con(con);
}
EXPORT_SYMBOL(ceph_con_keepalive);
* On error (ENOMEM, EAGAIN, ...),
* - con->in_msg == NULL
*/
-static int ceph_con_in_msg_alloc(struct ceph_connection *con,
- struct ceph_msg_header *hdr, int *skip)
+int ceph_con_in_msg_alloc(struct ceph_connection *con,
+ struct ceph_msg_header *hdr, int *skip)
{
int middle_len = le32_to_cpu(hdr->middle_len);
struct ceph_msg *msg;
return ret;
}
-static void ceph_con_get_out_msg(struct ceph_connection *con)
+void ceph_con_get_out_msg(struct ceph_connection *con)
{
struct ceph_msg *msg;