MEDIUM: connection: start to introduce a mux layer between xprt and data

For HTTP/2 and QUIC, we'll need to deal with multiplexed streams inside
a connection. After quite a long brainstorming, it appears that the
connection interface to the existing streams is appropriate just like
the connection interface to the lower layers. In fact we need to have
the mux layer in the middle of the connection, between the transport
and the data layer.

A mux can exist on two directions/sides. On the inbound direction, it
instanciates new streams from incoming connections, while on the outbound
direction it muxes streams into outgoing connections. The difference is
visible on the mux->init() call : in one case, an upper context is already
known (outgoing connection), and in the other case, the upper context is
not yet known (incoming connection) and will have to be allocated by the
mux. The session doesn't have to create the new streams anymore, as this
is performed by the mux itself.

This patch introduces this and creates a pass-through mux called
"mux_pt" which is used for all new connections and which only
calls the data layer's recv,send,wake() calls. One incoming stream
is immediately created when init() is called on the inbound direction.
There should not be any visible impact.

Note that the connection's mux is purposely not set until the session
is completed so that we don't accidently run with the wrong mux. This
must not cause any issue as the xprt_done_cb function is always called
prior to using mux's recv/send functions.
diff --git a/src/backend.c b/src/backend.c
index 23b85ce..4d44e54 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -41,6 +41,7 @@
 #include <proto/lb_fwrr.h>
 #include <proto/lb_map.h>
 #include <proto/log.h>
+#include <proto/mux_pt.h>
 #include <proto/obj_type.h>
 #include <proto/payload.h>
 #include <proto/protocol.h>
@@ -1159,12 +1160,14 @@
 		/* set the correct protocol on the output stream interface */
 		if (srv) {
 			conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), srv->xprt);
+			conn_install_mux(srv_conn, &mux_pt_ops, srv_conn);
 		}
 		else if (obj_type(s->target) == OBJ_TYPE_PROXY) {
 			/* proxies exclusively run on raw_sock right now */
 			conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), xprt_get(XPRT_RAW));
 			if (!objt_conn(s->si[1].end) || !objt_conn(s->si[1].end)->ctrl)
 				return SF_ERR_INTERNAL;
+			conn_install_mux(srv_conn, &mux_pt_ops, srv_conn);
 		}
 		else
 			return SF_ERR_INTERNAL;  /* how did we get there ? */
diff --git a/src/checks.c b/src/checks.c
index c9e655d..b717d38 100644
--- a/src/checks.c
+++ b/src/checks.c
@@ -46,6 +46,7 @@
 #include <proto/stats.h>
 #include <proto/fd.h>
 #include <proto/log.h>
+#include <proto/mux_pt.h>
 #include <proto/queue.h>
 #include <proto/port_range.h>
 #include <proto/proto_http.h>
@@ -1562,6 +1563,7 @@
 	proto = protocol_by_family(conn->addr.to.ss_family);
 
 	conn_prepare(conn, proto, check->xprt);
+	conn_install_mux(conn, &mux_pt_ops, conn);
 	conn_attach(conn, check, &check_conn_cb);
 	conn->target = &s->obj_type;
 
@@ -2725,6 +2727,7 @@
 				xprt = xprt_get(XPRT_RAW);
 			}
 			conn_prepare(conn, proto, xprt);
+			conn_install_mux(conn, &mux_pt_ops, conn);
 
 			ret = SF_ERR_INTERNAL;
 			if (proto->connect)
diff --git a/src/connection.c b/src/connection.c
index d235ec5..129c741 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -105,7 +105,7 @@
 		 * both of which will be detected below.
 		 */
 		flags = 0;
-		conn->data->send(conn);
+		conn->mux->send(conn);
 	}
 
 	/* The data transfer starts here and stops on error and handshakes. Note
@@ -119,7 +119,7 @@
 		 * both of which will be detected below.
 		 */
 		flags = 0;
-		conn->data->recv(conn);
+		conn->mux->recv(conn);
 	}
 
 	/* It may happen during the data phase that a handshake is
@@ -169,7 +169,7 @@
 	if ((((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) ||
 	     ((flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) != CO_FL_CONNECTED &&
 	      (conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED)) &&
-	    conn->data->wake(conn) < 0)
+	    conn->mux->wake(conn) < 0)
 		return;
 
 	/* remove the events before leaving */
diff --git a/src/mux_pt.c b/src/mux_pt.c
new file mode 100644
index 0000000..603c360
--- /dev/null
+++ b/src/mux_pt.c
@@ -0,0 +1,61 @@
+/*
+ * Pass-through mux-demux for connections
+ *
+ * Copyright 2017 Willy Tarreau <w@1wt.eu>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ *
+ */
+
+#include <common/config.h>
+#include <proto/connection.h>
+#include <proto/stream.h>
+
+/* Initialize the mux once it's attached. If conn->mux_ctx is NULL, it is
+ * assumed that no data layer has yet been instanciated so the mux is
+ * attached to an incoming connection and will instanciate a new stream. If
+ * conn->mux_ctx exists, it is assumed that it is an outgoing connection
+ * requested for this context. Returns < 0 on error.
+ */
+static int mux_pt_init(struct connection *conn)
+{
+	if (!conn->mux_ctx)
+		return stream_create_from_conn(conn);
+	return 0;
+}
+
+/* callback to be used by default for the pass-through mux. It calls the data
+ * layer wake() callback if it is set otherwise returns 0.
+ */
+static int mux_pt_wake(struct connection *conn)
+{
+	return conn->data->wake ? conn->data->wake(conn) : 0;
+}
+
+/* callback to be used by default for the pass-through mux. It simply calls the
+ * data layer recv() callback much must be set.
+ */
+static void mux_pt_recv(struct connection *conn)
+{
+	conn->data->recv(conn);
+}
+
+/* callback to be used by default for the pass-through mux. It simply calls the
+ * data layer send() callback which must be set.
+ */
+static void mux_pt_send(struct connection *conn)
+{
+	conn->data->send(conn);
+}
+
+/* The mux operations */
+const struct mux_ops mux_pt_ops = {
+	.init = mux_pt_init,
+	.recv = mux_pt_recv,
+	.send = mux_pt_send,
+	.wake = mux_pt_wake,
+	.name = "PASS",
+};
diff --git a/src/peers.c b/src/peers.c
index 2ca08fe..d7705ea 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -38,6 +38,7 @@
 #include <proto/frontend.h>
 #include <proto/log.h>
 #include <proto/hdr_idx.h>
+#include <proto/mux_pt.h>
 #include <proto/proto_tcp.h>
 #include <proto/proto_http.h>
 #include <proto/proxy.h>
@@ -1912,6 +1913,7 @@
 		goto out_free_strm;
 
 	conn_prepare(conn, peer->proto, peer->xprt);
+	conn_install_mux(conn, &mux_pt_ops, conn);
 	si_attach_conn(&s->si[1], conn);
 
 	conn->target = s->target = &s->be->obj_type;
diff --git a/src/session.c b/src/session.c
index 3f48878..b0bf453 100644
--- a/src/session.c
+++ b/src/session.c
@@ -21,6 +21,7 @@
 #include <proto/connection.h>
 #include <proto/listener.h>
 #include <proto/log.h>
+#include <proto/mux_pt.h>
 #include <proto/proto_http.h>
 #include <proto/proxy.h>
 #include <proto/session.h>
@@ -406,7 +407,7 @@
 		goto fail;
 
 	session_count_new(sess);
-	if (stream_create_from_conn(conn) < 0)
+	if (conn_install_mux(conn, &mux_pt_ops, NULL) < 0)
 		goto fail;
 
 	/* the embryonic session's task is not needed anymore */
diff --git a/src/stream.c b/src/stream.c
index 889908f..4808cfa 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -2871,10 +2871,11 @@
 
 		if ((conn = objt_conn(strm->si[0].end)) != NULL) {
 			chunk_appendf(&trash,
-			              "  co0=%p ctrl=%s xprt=%s data=%s target=%s:%p\n",
+			              "  co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
 				      conn,
 				      conn_get_ctrl_name(conn),
 				      conn_get_xprt_name(conn),
+				      conn_get_mux_name(conn),
 				      conn_get_data_name(conn),
 			              obj_type_name(conn->target),
 			              obj_base_ptr(conn->target));
@@ -2899,10 +2900,11 @@
 
 		if ((conn = objt_conn(strm->si[1].end)) != NULL) {
 			chunk_appendf(&trash,
-			              "  co1=%p ctrl=%s xprt=%s data=%s target=%s:%p\n",
+			              "  co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
 				      conn,
 				      conn_get_ctrl_name(conn),
 				      conn_get_xprt_name(conn),
+				      conn_get_mux_name(conn),
 				      conn_get_data_name(conn),
 			              obj_type_name(conn->target),
 			              obj_base_ptr(conn->target));