MINOR: connections: Introduce an unsubscribe method.

As we don't know how subscriptions are handled, we can't just assume we can
use LIST_DEL() to unsubscribe, so introduce a new method to mux and connections
to do so.
diff --git a/src/connection.c b/src/connection.c
index c0da874..c8f1df1 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -358,12 +358,38 @@
 	return ret;
 }
 
+int conn_unsubscribe(struct connection *conn, int event_type, void *param)
+{
+	struct wait_list *sw;
+
+	if (event_type & SUB_CAN_RECV) {
+		sw = param;
+		if (sw->wait_reason & SUB_CAN_RECV) {
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			sw->wait_reason &= ~SUB_CAN_RECV;
+			if (sw->wait_reason & SUB_CAN_SEND)
+				LIST_ADDQ(&conn->send_wait_list, &sw->list);
+		}
+	}
+	if (event_type & SUB_CAN_SEND) {
+		sw = param;
+		if (sw->wait_reason & SUB_CAN_SEND) {
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			sw->wait_reason &= ~SUB_CAN_SEND;
+			if (sw->wait_reason & SUB_CAN_RECV)
+				LIST_ADDQ(&conn->recv_wait_list, &sw->list);
+		}
+	}
+	return 0;
+}
+
 int conn_subscribe(struct connection *conn, int event_type, void *param)
 {
 	struct wait_list *sw;
 
-	switch (event_type) {
-	case SUB_CAN_RECV:
+	if (event_type & SUB_CAN_RECV) {
 		sw = param;
 		if (!(sw->wait_reason & SUB_CAN_RECV)) {
 			sw->wait_reason |= SUB_CAN_RECV;
@@ -377,8 +403,9 @@
 			} else
 				LIST_ADDQ(&conn->recv_wait_list, &sw->list);
 		}
-		return 0;
-	case SUB_CAN_SEND:
+		event_type &= ~SUB_CAN_RECV;
+	}
+	if (event_type & SUB_CAN_SEND) {
 		sw = param;
 		if (!(sw->wait_reason & SUB_CAN_SEND)) {
 			sw->wait_reason |= SUB_CAN_SEND;
@@ -392,11 +419,11 @@
 			} else
 				LIST_ADDQ(&conn->send_wait_list, &sw->list);
 		}
-		return 0;
-	default:
-		break;
+		event_type &= ~SUB_CAN_SEND;
 	}
-	return (-1);
+	if (event_type != 0)
+		return (-1);
+	return 0;
 }
 
 /* Drains possibly pending incoming data on the file descriptor attached to the