MAJOR: session: introduce embryonic sessions

When an incoming connection request is accepted, a connection
structure is needed to store its state. However we don't want to
fully initialize a session until the data layer is about to be
ready.

As long as the connection is physically stored into the session,
it's not easy to split both allocations.

As such, we only initialize the minimum requirements of a session,
which results in what we call an embryonic session. Then once the
data layer is ready, we can complete the function's initialization.

Doing so avoids buffers allocation and ensures that a session only
sees ready connections.

The frontend's client timeout is used as the handshake timeout. It
is likely that another timeout will be used in the future.
diff --git a/include/proto/session.h b/include/proto/session.h
index 049fc93..d88a19e 100644
--- a/include/proto/session.h
+++ b/include/proto/session.h
@@ -47,6 +47,7 @@
 			 int section_type, struct proxy *curpx,
 			 struct track_ctr_prm *prm,
 			 struct proxy *defpx, char **err);
+int conn_session_initialize(struct connection *conn, int flag);
 
 /* Remove the refcount from the session to the tracked counters, and clear the
  * pointer to ensure this is only performed once. The caller is responsible for
diff --git a/include/types/connection.h b/include/types/connection.h
index 0e12bb5..b74c71a 100644
--- a/include/types/connection.h
+++ b/include/types/connection.h
@@ -75,6 +75,8 @@
 	/* below we have all handshake flags grouped into one */
 	CO_FL_HANDSHAKE     = CO_FL_SI_SEND_PROXY,
 
+	CO_FL_INIT_SESS     = 0x00000800,  /* initialize a session before using data */
+
 	/* when any of these flags is set, polling is defined by socket-layer
 	 * operations, as opposed to data-layer.
 	 */
diff --git a/include/types/session.h b/include/types/session.h
index cce55a4..0450163 100644
--- a/include/types/session.h
+++ b/include/types/session.h
@@ -50,7 +50,7 @@
 #define SN_FORCE_PRST	0x00000010	/* force persistence here, even if server is down */
 #define SN_MONITOR	0x00000020	/* this session comes from a monitoring system */
 #define SN_CURR_SESS	0x00000040	/* a connection is currently being counted on the server */
-/* unused: 0x00000080 */
+#define SN_INITIALIZED	0x00000080	/* the session was fully initialized */
 #define SN_REDISP	0x00000100	/* set if this session was redispatched from one server to another */
 #define SN_CONN_TAR	0x00000200	/* set if this session is turning around before reconnecting */
 #define SN_REDIRECTABLE	0x00000400	/* set if this session is redirectable (GET or HEAD) */
diff --git a/src/connection.c b/src/connection.c
index 0abbe42..9e3d79e 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -15,6 +15,7 @@
 
 #include <proto/connection.h>
 #include <proto/proto_tcp.h>
+#include <proto/session.h>
 #include <proto/stream_interface.h>
 
 /* I/O callback for fd-based connections. It calls the read/write handlers
@@ -25,7 +26,7 @@
 	struct connection *conn = fdtab[fd].owner;
 
 	if (unlikely(!conn))
-		goto leave;
+		return 0;
 
  process_handshake:
 	/* The handshake callbacks are called in sequence. If either of them is
@@ -47,6 +48,14 @@
 	if (!(conn->flags & CO_FL_POLL_SOCK))
 		__conn_sock_stop_both(conn);
 
+	/* Maybe we need to finish initializing an incoming session. The
+	 * function may fail and cause the connection to be destroyed, thus
+	 * we must not use it anymore and should immediately leave instead.
+	 */
+	if ((conn->flags & CO_FL_INIT_SESS) &&
+	    conn_session_initialize(conn, CO_FL_INIT_SESS) < 0)
+		return 0;
+
 	if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
 		conn->app_cb->recv(conn);
 
@@ -80,6 +89,13 @@
 	}
 
  leave:
+	/* we may need to release the connection which is an embryonic session */
+	if ((conn->flags & (CO_FL_ERROR|CO_FL_INIT_SESS)) == (CO_FL_ERROR|CO_FL_INIT_SESS)) {
+		conn->flags |= CO_FL_ERROR;
+		conn_session_complete(conn, CO_FL_INIT_SESS);
+		return 0;
+	}
+
 	if (conn->flags & CO_FL_NOTIFY_SI)
 		conn_notify_si(conn);
 
diff --git a/src/session.c b/src/session.c
index 6b784cf..eb32496 100644
--- a/src/session.c
+++ b/src/session.c
@@ -48,16 +48,19 @@
 struct pool_head *pool2_session;
 struct list sessions;
 
-/* This function is called from the protocol layer accept() in order to instanciate
- * a new session on behalf of a given listener and frontend. It returns a positive
- * value upon success, 0 if the connection can be ignored, or a negative value upon
- * critical failure. The accepted file descriptor is closed if we return <= 0.
+static struct task *expire_mini_session(struct task *t);
+int session_complete(struct session *s);
+
+/* This function is called from the protocol layer accept() in order to
+ * instanciate a new embryonic session on behalf of a given listener and
+ * frontend. It returns a positive value upon success, 0 if the connection
+ * can be ignored, or a negative value upon critical failure. The accepted
+ * file descriptor is closed if we return <= 0.
  */
 int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
 {
 	struct proxy *p = l->frontend;
 	struct session *s;
-	struct http_txn *txn;
 	struct task *t;
 	int ret;
 
@@ -67,7 +70,12 @@
 	if (unlikely((s = pool_alloc2(pool2_session)) == NULL))
 		goto out_close;
 
-	/* minimum session initialization required for monitor mode below */
+	/* minimum session initialization required for an embryonic session is
+	 * fairly low. We need very little to execute L4 ACLs, then we need a
+	 * task to make the client-side connection live on its own.
+	 *  - flags
+	 *  - stick-entry tracking
+	 */
 	s->flags = 0;
 	s->logs.logwait = p->to_log;
 	s->stkctr1_entry = NULL;
@@ -75,40 +83,25 @@
 	s->stkctr1_table = NULL;
 	s->stkctr2_table = NULL;
 
-	if (unlikely((t = task_new()) == NULL))
-		goto out_free_session;
+	s->listener = l;
+	s->fe  = p;
 
 	/* OK, we're keeping the session, so let's properly initialize the session */
-	LIST_ADDQ(&sessions, &s->list);
-	LIST_INIT(&s->back_refs);
-
-	s->unique_id = NULL;
-	s->term_trace = 0;
 	s->si[0].conn.t.sock.fd = cfd;
 	s->si[0].conn.ctrl = l->proto;
-	s->si[0].conn.flags = CO_FL_NONE | CO_FL_NOTIFY_SI; /* we're on a stream_interface */
+	s->si[0].conn.flags = CO_FL_NONE;
 	s->si[0].conn.addr.from = *addr;
+	set_target_client(&s->si[0].conn.target, l);
+
 	s->logs.accept_date = date; /* user-visible date for logging */
 	s->logs.tv_accept = now;  /* corrected date for internal use */
 	s->uniq_id = totalconn;
-	p->feconn++;  /* beconn will be increased once assigned */
-
-	proxy_inc_fe_conn_ctr(l, p);	/* note: cum_beconn will be increased once assigned */
-
-	t->process = l->handler;
-	t->context = s;
-	t->nice = l->nice;
-	t->expire = TICK_ETERNITY;
-
-	s->task = t;
-	s->listener = l;
+	p->feconn++;
+	/* This session was accepted, count it now */
+	if (p->feconn > p->fe_counters.conn_max)
+		p->fe_counters.conn_max = p->feconn;
 
-	/* Note: initially, the session's backend points to the frontend.
-	 * This changes later when switching rules are executed or
-	 * when the default backend is assigned.
-	 */
-	s->be  = s->fe  = p;
-	s->req = s->rep = NULL; /* will be allocated later */
+	proxy_inc_fe_conn_ctr(l, p);
 
 	/* if this session comes from a known monitoring system, we want to ignore
 	 * it as soon as possible, which means closing it immediately for TCP, but
@@ -129,13 +122,182 @@
 		/* let's do a no-linger now to close with a single RST. */
 		setsockopt(cfd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger));
 		ret = 0; /* successful termination */
+		goto out_free_session;
+	}
+
+	/* Adjust some socket options */
+	if (unlikely(fcntl(cfd, F_SETFL, O_NONBLOCK) == -1))
+		goto out_free_session;
+
+	if (unlikely((t = task_new()) == NULL))
+		goto out_free_session;
+
+	t->context = s;
+	t->nice = l->nice;
+	s->task = t;
+
+	/* add the various callbacks. Right now the data layer is present but
+	 * not initialized. Also note we need to be careful as the stream int
+	 * is not initialized yet.
+	 */
+	si_prepare_conn(&s->si[0], l->proto, l->data);
+
+	/* finish initialization of the accepted file descriptor */
+	fd_insert(cfd);
+	fdtab[cfd].owner = &s->si[0].conn;
+	fdtab[cfd].flags = 0;
+	fdtab[cfd].iocb = conn_fd_handler;
+	conn_data_want_recv(&s->si[0].conn);
+	if (conn_data_init(&s->si[0].conn) < 0)
 		goto out_free_task;
+
+	/* OK, now either we have a pending handshake to execute with and
+	 * then we must return to the I/O layer, or we can proceed with the
+	 * end of the session initialization. In case of handshake, we also
+	 * set the I/O timeout to the frontend's client timeout.
+	 */
+
+	if (s->si[0].conn.flags & CO_FL_HANDSHAKE) {
+		t->process = expire_mini_session;
+		t->expire = tick_add_ifset(now_ms, p->timeout.client);
+		task_queue(t);
+		s->si[0].conn.flags |= CO_FL_INIT_SESS;
+		return 1;
 	}
 
-	/* This session was accepted, count it now */
-	if (p->feconn > p->fe_counters.conn_max)
-		p->fe_counters.conn_max = p->feconn;
+	/* OK let's complete session initialization */
+	ret = session_complete(s);
+	if (ret > 0)
+		return ret;
+
+	/* Error unrolling */
+ out_free_task:
+	task_free(t);
+ out_free_session:
+	p->feconn--;
+	if (s->stkctr1_entry || s->stkctr2_entry)
+		session_store_counters(s);
+	pool_free2(pool2_session, s);
+ out_close:
+	if (ret < 0 && p->mode == PR_MODE_HTTP) {
+		/* critical error, no more memory, try to emit a 500 response */
+		struct chunk *err_msg = error_message(s, HTTP_ERR_500);
+		send(cfd, err_msg->str, err_msg->len, MSG_DONTWAIT|MSG_NOSIGNAL);
+	}
+
+	if (fdtab[cfd].owner)
+		fd_delete(cfd);
+	else
+		close(cfd);
+	return ret;
+}
 
+/* This function kills an existing embryonic session. It stops the connection's
+ * data layer, releases assigned resources, resumes the listener if it was
+ * disabled and finally kills the file descriptor.
+ */
+static void kill_mini_session(struct session *s)
+{
+	/* kill the connection now */
+	conn_data_close(&s->si[0].conn);
+
+	s->fe->feconn--;
+	if (s->stkctr1_entry || s->stkctr2_entry)
+		session_store_counters(s);
+
+	if (!(s->listener->options & LI_O_UNLIMITED))
+		actconn--;
+	jobs--;
+	s->listener->nbconn--;
+	if (s->listener->state == LI_FULL)
+		resume_listener(s->listener);
+
+	/* Dequeues all of the listeners waiting for a resource */
+	if (!LIST_ISEMPTY(&global_listener_queue))
+		dequeue_all_listeners(&global_listener_queue);
+
+	if (!LIST_ISEMPTY(&s->fe->listener_queue) &&
+	    (!s->fe->fe_sps_lim || freq_ctr_remain(&s->fe->fe_sess_per_sec, s->fe->fe_sps_lim, 0) > 0))
+		dequeue_all_listeners(&s->fe->listener_queue);
+
+	task_delete(s->task);
+	task_free(s->task);
+
+	if (fdtab[s->si[0].conn.t.sock.fd].owner)
+		fd_delete(s->si[0].conn.t.sock.fd);
+	else
+		close(s->si[0].conn.t.sock.fd);
+
+	pool_free2(pool2_session, s);
+}
+
+/* Finish initializing a session from a connection. Returns <0 if the
+ * connection was killed.
+ */
+int conn_session_initialize(struct connection *conn, int flag)
+{
+	struct session *s = container_of(conn, struct session, si[0].conn);
+
+	if (session_complete(s) > 0) {
+		conn->flags &= ~flag;
+		return 0;
+	}
+
+	/* kill the connection now */
+	kill_mini_session(s);
+	return -1;
+}
+
+/* Manages embryonic sessions timeout. It is only called when the timeout
+ * strikes and performs the required cleanup.
+ */
+static struct task *expire_mini_session(struct task *t)
+{
+	struct session *s = t->context;
+
+	if (!(t->state & TASK_WOKEN_TIMER))
+		return t;
+
+	kill_mini_session(s);
+	return NULL;
+}
+
+/* This function is called from the I/O handler which detects the end of
+ * handshake, in order to complete initialization of a valid session. It must
+ * be called with an embryonic session. It returns a positive value upon
+ * success, 0 if the connection can be ignored, or a negative value upon
+ * critical failure. The accepted file descriptor is closed if we return <= 0.
+ */
+int session_complete(struct session *s)
+{
+	struct listener *l = s->listener;
+	struct proxy *p = s->fe;
+	struct http_txn *txn;
+	struct task *t = s->task;
+	int ret;
+
+	ret = -1; /* assume unrecoverable error by default */
+
+	/* OK, we're keeping the session, so let's properly initialize the session */
+	LIST_ADDQ(&sessions, &s->list);
+	LIST_INIT(&s->back_refs);
+	s->flags |= SN_INITIALIZED;
+
+	s->unique_id = NULL;
+	s->term_trace = 0;
+
+	t->process = l->handler;
+	t->context = s;
+	t->expire = TICK_ETERNITY;
+
+	/* Note: initially, the session's backend points to the frontend.
+	 * This changes later when switching rules are executed or
+	 * when the default backend is assigned.
+	 */
+	s->be  = s->fe;
+	s->req = s->rep = NULL; /* will be allocated later */
+
+	/* Let's count a session now */
 	proxy_inc_fe_sess_ctr(l, p);
 	if (s->stkctr1_entry) {
 		void *ptr;
@@ -170,16 +332,12 @@
 	s->si[0].err_loc   = NULL;
 	s->si[0].release   = NULL;
 	s->si[0].send_proxy_ofs = 0;
-	set_target_client(&s->si[0].conn.target, l);
 	s->si[0].exp       = TICK_ETERNITY;
 	s->si[0].flags     = SI_FL_NONE;
 
 	if (likely(s->fe->options2 & PR_O2_INDEPSTR))
 		s->si[0].flags |= SI_FL_INDEP_STR;
 
-	/* add the various callbacks */
-	si_prepare_conn(&s->si[0], l->proto, l->data);
-
 	/* pre-initialize the other side's stream interface to an INIT state. The
 	 * callbacks will be initialized before attempting to connect.
 	 */
@@ -207,10 +365,6 @@
 	/* init store persistence */
 	s->store_count = 0;
 
-	/* Adjust some socket options */
-	if (unlikely(fcntl(cfd, F_SETFL, O_NONBLOCK) == -1))
-		goto out_free_task;
-
 	if (unlikely((s->req = pool_alloc2(pool2_channel)) == NULL))
 		goto out_free_task; /* no memory */
 
@@ -273,13 +427,7 @@
 	txn->rsp.buf = s->rep;
 
 	/* finish initialization of the accepted file descriptor */
-	fd_insert(cfd);
-	fdtab[cfd].owner = &s->si[0].conn;
-	fdtab[cfd].flags = 0;
-	fdtab[cfd].iocb = conn_fd_handler;
 	conn_data_want_recv(&s->si[0].conn);
-	if (conn_data_init(&s->si[0].conn) < 0)
-		goto out_free_rep;
 
 	if (p->accept && (ret = p->accept(s)) <= 0) {
 		/* Either we had an unrecoverable error (<0) or work is
@@ -289,6 +437,9 @@
 		goto out_free_rep;
 	}
 
+	/* we want the connection handler to notify the stream interface about updates. */
+	s->si[0].conn.flags |= CO_FL_NOTIFY_SI;
+
 	/* it is important not to call the wakeup function directly but to
 	 * pass through task_wakeup(), because this one knows how to apply
 	 * priorities to tasks.
@@ -302,24 +453,6 @@
  out_free_req:
 	pool_free2(pool2_channel, s->req);
  out_free_task:
-	p->feconn--;
-	if (s->stkctr1_entry || s->stkctr2_entry)
-		session_store_counters(s);
-	task_free(t);
-	LIST_DEL(&s->list);
- out_free_session:
-	pool_free2(pool2_session, s);
- out_close:
-	if (ret < 0 && s->fe->mode == PR_MODE_HTTP) {
-		/* critical error, no more memory, try to emit a 500 response */
-		struct chunk *err_msg = error_message(s, HTTP_ERR_500);
-		send(cfd, err_msg->str, err_msg->len, MSG_DONTWAIT|MSG_NOSIGNAL);
-	}
-
-	if (fdtab[cfd].owner)
-		fd_delete(cfd);
-	else
-		close(cfd);
 	return ret;
 }