summaryrefslogtreecommitdiffstats
path: root/fs/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/lowcomms.c95
1 files changed, 60 insertions, 35 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index fe9113bd5ba0..36adccc4f849 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -118,6 +118,7 @@ struct writequeue_entry {
int len;
int end;
int users;
+ bool dirty;
struct connection *con;
struct list_head msgs;
struct kref ref;
@@ -700,6 +701,42 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
}
+static void dlm_page_release(struct kref *kref)
+{
+ struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
+ ref);
+
+ __free_page(e->page);
+ kfree(e);
+}
+
+static void dlm_msg_release(struct kref *kref)
+{
+ struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
+
+ kref_put(&msg->entry->ref, dlm_page_release);
+ kfree(msg);
+}
+
+static void free_entry(struct writequeue_entry *e)
+{
+ struct dlm_msg *msg, *tmp;
+
+ list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
+ if (msg->orig_msg) {
+ msg->orig_msg->retransmit = false;
+ kref_put(&msg->orig_msg->ref, dlm_msg_release);
+ }
+
+ list_del(&msg->list);
+ kref_put(&msg->ref, dlm_msg_release);
+ }
+
+ list_del(&e->list);
+ atomic_dec(&e->con->writequeue_cnt);
+ kref_put(&e->ref, dlm_page_release);
+}
+
static void dlm_close_sock(struct socket **sock)
{
if (*sock) {
@@ -714,6 +751,7 @@ static void close_connection(struct connection *con, bool and_other,
bool tx, bool rx)
{
bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
+ struct writequeue_entry *e;
if (tx && !closing && cancel_work_sync(&con->swork)) {
log_print("canceled swork for node %d", con->nodeid);
@@ -732,6 +770,26 @@ static void close_connection(struct connection *con, bool and_other,
close_connection(con->othercon, false, tx, rx);
}
+ /* if we send a writequeue entry only a half way, we drop the
+ * whole entry because reconnection and that we not start of the
+ * middle of a msg which will confuse the other end.
+ *
+ * we can always drop messages because retransmits, but what we
+ * cannot allow is to transmit half messages which may be processed
+ * at the other side.
+ *
+ * our policy is to start on a clean state when disconnects, we don't
+ * know what's send/received on transport layer in this case.
+ */
+ spin_lock(&con->writequeue_lock);
+ if (!list_empty(&con->writequeue)) {
+ e = list_first_entry(&con->writequeue, struct writequeue_entry,
+ list);
+ if (e->dirty)
+ free_entry(e);
+ }
+ spin_unlock(&con->writequeue_lock);
+
con->rx_leftover = 0;
con->retries = 0;
clear_bit(CF_CONNECTED, &con->flags);
@@ -1026,41 +1084,6 @@ accept_err:
return result;
}
-static void dlm_page_release(struct kref *kref)
-{
- struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
- ref);
-
- __free_page(e->page);
- kfree(e);
-}
-
-static void dlm_msg_release(struct kref *kref)
-{
- struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
-
- kref_put(&msg->entry->ref, dlm_page_release);
- kfree(msg);
-}
-
-static void free_entry(struct writequeue_entry *e)
-{
- struct dlm_msg *msg, *tmp;
-
- list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
- if (msg->orig_msg) {
- msg->orig_msg->retransmit = false;
- kref_put(&msg->orig_msg->ref, dlm_msg_release);
- }
- list_del(&msg->list);
- kref_put(&msg->ref, dlm_msg_release);
- }
-
- list_del(&e->list);
- atomic_dec(&e->con->writequeue_cnt);
- kref_put(&e->ref, dlm_page_release);
-}
-
/*
* writequeue_entry_complete - try to delete and free write queue entry
* @e: write queue entry to try to delete
@@ -1072,6 +1095,8 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
{
e->offset += completed;
e->len -= completed;
+ /* signal that page was half way transmitted */
+ e->dirty = true;
if (e->len == 0 && e->users == 0)
free_entry(e);