MEDIUM: listener: allocate the connection before queuing a new connection

Till now we would keep a per-thread queue of pending incoming connections
for which we would store:
  - the listener
  - the accepted FD
  - the source address
  - the source address' length

And these elements were first used in session_accept_fd() running on the
target thread to allocate a connection and duplicate them again. Doing
this induces various problems. The first one is that session_accept_fd()
may only run on file descriptors and cannot be reused for QUIC. The second
issue is that it induces lots of memory copies and that the listerner
queue thrashes a lot of cache, consuming 64 bytes per entry.

This patch changes this by allocating the connection before queueing it,
and by only placing the connection's pointer into the queue. Indeed, the
first two calls used to initialize the connection already store all the
information above, which can be retrieved from the connection pointer
alone. So we just have to pop one pointer from the target thread, and
pass it to session_accept_fd() which only needs the FD for the final
settings.

This starts to make the accept path a bit more transport-agnostic, and
saves memory and CPU cycles at the same time (1% connection rate increase
was noticed with 4 threads). Thanks to dividing the accept-queue entry
size from 64 to 8 bytes, its size could be increased from 256 to 1024
connections while still dividing the overall size by two. No single
queue full condition was met.

One minor drawback is that connection may be allocated from one thread's
pool to be used into another one. But this already happens a lot with
connection reuse so there is really nothing new here.
diff --git a/include/haproxy/listener-t.h b/include/haproxy/listener-t.h
index c29328c..89bf7bc 100644
--- a/include/haproxy/listener-t.h
+++ b/include/haproxy/listener-t.h
@@ -43,6 +43,7 @@
 struct xprt_ops;
 struct proxy;
 struct fe_counters;
+struct connection;
 
 /* listener state */
 enum li_state {
@@ -197,7 +198,7 @@
 	int maxconn;			/* maximum connections allowed on this listener */
 	unsigned int backlog;		/* if set, listen backlog */
 	int maxaccept;         /* if set, max number of connections accepted at once (-1 when disabled) */
-	int (*accept)(struct listener *l, int fd, struct sockaddr_storage *addr); /* upper layer's accept() */
+	int (*accept)(struct connection *conn); /* upper layer's accept() */
 	enum obj_type *default_target;  /* default target to use for accepted sessions or NULL */
 	/* cache line boundary */
 	struct mt_list wait_queue;	/* link element to make the listener wait for something (LI_LIMITED)  */
@@ -254,29 +255,14 @@
 	struct bind_kw kw[VAR_ARRAY];
 };
 
-/* This is used to create the accept queue, optimized to be 64 bytes long. */
-struct accept_queue_entry {
-	struct listener *listener;          // 8 bytes
-	int fd __attribute__((aligned(8))); // 4 bytes
-	int addr_len;                       // 4 bytes
-
-	union {
-		sa_family_t family;         // 2 bytes
-		struct sockaddr_in in;      // 16 bytes
-		struct sockaddr_in6 in6;    // 28 bytes
-	} addr; // this is normally 28 bytes
-	/* 20-bytes hole here */
-	char pad0[0] __attribute((aligned(64)));
-};
-
 /* The per-thread accept queue ring, must be a power of two minus 1 */
-#define ACCEPT_QUEUE_SIZE ((1<<8) - 1)
+#define ACCEPT_QUEUE_SIZE ((1<<10) - 1)
 
 struct accept_queue_ring {
 	unsigned int head;
 	unsigned int tail;
 	struct tasklet *tasklet;  /* tasklet of the thread owning this ring */
-	struct accept_queue_entry entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64)));
+	struct connection *entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64)));
 };
 
 
diff --git a/include/haproxy/session.h b/include/haproxy/session.h
index 59945b2..6a24d8a 100644
--- a/include/haproxy/session.h
+++ b/include/haproxy/session.h
@@ -35,7 +35,7 @@
 
 struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type *origin);
 void session_free(struct session *sess);
-int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr);
+int session_accept_fd(struct connection *cli_conn);
 int conn_complete_session(struct connection *conn);
 
 /* Remove the refcount from the session to the tracked counters, and clear the
diff --git a/src/listener.c b/src/listener.c
index 5a375db..b7dd934 100644
--- a/src/listener.c
+++ b/src/listener.c
@@ -55,22 +55,18 @@
 struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((aligned(64))) = { };
 
 /* dequeue and process a pending connection from the local accept queue (single
- * consumer). Returns the accepted fd or -1 if none was found. The listener is
- * placed into *li. The address is copied into *addr for no more than *addr_len
- * bytes, and the address length is returned into *addr_len.
+ * consumer). Returns the accepted connection or NULL if none was found.
  */
-int accept_queue_pop_sc(struct accept_queue_ring *ring, struct listener **li, void *addr, int *addr_len)
+struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring)
 {
-	struct accept_queue_entry *e;
 	unsigned int pos, next;
-	struct listener *ptr;
-	int len;
-	int fd;
+	struct connection *ptr;
+	struct connection **e;
 
 	pos = ring->head;
 
 	if (pos == ring->tail)
-		return -1;
+		return NULL;
 
 	next = pos + 1;
 	if (next >= ACCEPT_QUEUE_SIZE)
@@ -80,42 +76,28 @@
 
 	/* wait for the producer to update the listener's pointer */
 	while (1) {
-		ptr = e->listener;
+		ptr = *e;
 		__ha_barrier_load();
 		if (ptr)
 			break;
 		pl_cpu_relax();
 	}
 
-	fd = e->fd;
-	len = e->addr_len;
-	if (len > *addr_len)
-		len = *addr_len;
-
-	if (likely(len > 0))
-		memcpy(addr, &e->addr, len);
-
 	/* release the entry */
-	e->listener = NULL;
+	*e = NULL;
 
 	__ha_barrier_store();
 	ring->head = next;
-
-	*addr_len = len;
-	*li = ptr;
-
-	return fd;
+	return ptr;
 }
 
 
-/* tries to push a new accepted connection <fd> into ring <ring> for listener
- * <li>, from address <addr> whose length is <addr_len>. Returns non-zero if it
- * succeeds, or zero if the ring is full. Supports multiple producers.
+/* tries to push a new accepted connection <conn> into ring <ring>. Returns
+ * non-zero if it succeeds, or zero if the ring is full. Supports multiple
+ * producers.
  */
-int accept_queue_push_mp(struct accept_queue_ring *ring, int fd,
-                         struct listener *li, const void *addr, int addr_len)
+int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn)
 {
-	struct accept_queue_entry *e;
 	unsigned int pos, next;
 
 	pos = ring->tail;
@@ -127,22 +109,8 @@
 			return 0; // ring full
 	} while (unlikely(!_HA_ATOMIC_CAS(&ring->tail, &pos, next)));
 
-
-	e = &ring->entry[pos];
-
-	if (addr_len > sizeof(e->addr))
-		addr_len = sizeof(e->addr);
-
-	if (addr_len)
-		memcpy(&e->addr, addr, addr_len);
-
-	e->addr_len = addr_len;
-	e->fd = fd;
-
+	ring->entry[pos] = conn;
 	__ha_barrier_store();
-	/* now commit the change */
-
-	e->listener = li;
 	return 1;
 }
 
@@ -150,25 +118,23 @@
 static struct task *accept_queue_process(struct task *t, void *context, unsigned short state)
 {
 	struct accept_queue_ring *ring = context;
+	struct connection *conn;
 	struct listener *li;
-	struct sockaddr_storage addr;
 	unsigned int max_accept;
-	int addr_len;
 	int ret;
-	int fd;
 
 	/* if global.tune.maxaccept is -1, then max_accept is UINT_MAX. It
 	 * is not really illimited, but it is probably enough.
 	 */
 	max_accept = global.tune.maxaccept ? global.tune.maxaccept : 64;
 	for (; max_accept; max_accept--) {
-		addr_len = sizeof(addr);
-		fd = accept_queue_pop_sc(ring, &li, &addr, &addr_len);
-		if (fd < 0)
+		conn = accept_queue_pop_sc(ring);
+		if (!conn)
 			break;
 
+		li = __objt_listener(conn->target);
 		_HA_ATOMIC_ADD(&li->thr_conn[tid], 1);
-		ret = li->accept(li, fd, &addr);
+		ret = li->accept(conn);
 		if (ret <= 0) {
 			/* connection was terminated by the application */
 			continue;
@@ -722,6 +688,7 @@
 void listener_accept(int fd)
 {
 	struct listener *l = fdtab[fd].owner;
+	struct connection *cli_conn;
 	struct proxy *p;
 	unsigned int max_accept;
 	int next_conn = 0;
@@ -928,6 +895,25 @@
 		if (unlikely(master == 1))
 			fcntl(cfd, F_SETFD, FD_CLOEXEC);
 
+		/* we'll have to at least allocate a connection, assign the listener
+		 * to conn->target, set the source address, and set the fd.
+		 */
+		cli_conn = conn_new(&l->obj_type);
+		if (cli_conn) {
+			cli_conn->handle.fd = cfd;
+			cli_conn->flags |= CO_FL_ADDR_FROM_SET;
+			if (!sockaddr_alloc(&cli_conn->src, &addr, laddr)) {
+				conn_free(cli_conn);
+				cli_conn = NULL;
+			}
+		}
+
+		if (!cli_conn) {
+			/* no more memory, give up! */
+			close(cfd);
+			continue;
+		}
+
 		/* The connection was accepted, it must be counted as such */
 		if (l->counters)
 			HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn);
@@ -948,6 +934,7 @@
 			send_log(p, LOG_EMERG,
 				 "Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
 				 p->id);
+			conn_free(cli_conn);
 			close(cfd);
 			expire = tick_add(now_ms, 1000); /* try again in 1 second */
 			goto limit_global;
@@ -962,6 +949,7 @@
 		next_feconn = 0;
 		next_actconn = 0;
 
+
 #if defined(USE_THREAD)
 		mask = thread_mask(l->rx.settings->bind_thread) & all_threads_mask;
 		if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ) && !stopping) {
@@ -1064,7 +1052,7 @@
 			 * when processing this loop.
 			 */
 			ring = &accept_queue_rings[t];
-			if (accept_queue_push_mp(ring, cfd, l, &addr, laddr)) {
+			if (accept_queue_push_mp(ring, cli_conn)) {
 				_HA_ATOMIC_ADD(&activity[t].accq_pushed, 1);
 				tasklet_wakeup(ring->tasklet);
 				continue;
@@ -1077,7 +1065,7 @@
 #endif // USE_THREAD
 
 		_HA_ATOMIC_ADD(&l->thr_conn[tid], 1);
-		ret = l->accept(l, cfd, &addr);
+		ret = l->accept(cli_conn);
 		if (unlikely(ret <= 0)) {
 			/* The connection was closed by stream_accept(). Either
 			 * we just have to ignore it (ret == 0) or it's a critical
diff --git a/src/session.c b/src/session.c
index f6e4fe4..2673482 100644
--- a/src/session.c
+++ b/src/session.c
@@ -129,29 +129,22 @@
 /* This function is called from the protocol layer accept() in order to
  * instantiate a new session on behalf of a given listener and frontend. It
  * returns a positive value upon success, 0 if the connection can be ignored,
- * or a negative value upon critical failure. The accepted file descriptor is
+ * or a negative value upon critical failure. The accepted connection is
  * closed if we return <= 0. If no handshake is needed, it immediately tries
- * to instantiate a new stream. The created connection's owner points to the
- * new session until the upper layers are created.
+ * to instantiate a new stream. The connection must already have been filled
+ * with the incoming connection handle (a fd), a target (the listener) and a
+ * source address.
  */
-int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr)
+int session_accept_fd(struct connection *cli_conn)
 {
-	struct connection *cli_conn;
+	struct listener *l = __objt_listener(cli_conn->target);
 	struct proxy *p = l->bind_conf->frontend;
+	int cfd = cli_conn->handle.fd;
 	struct session *sess;
 	int ret;
 
-
 	ret = -1; /* assume unrecoverable error by default */
 
-	if (unlikely((cli_conn = conn_new(&l->obj_type)) == NULL))
-		goto out_close;
-
-	if (!sockaddr_alloc(&cli_conn->src, addr, sizeof(*addr)))
-		goto out_free_conn;
-
-	cli_conn->handle.fd = cfd;
-	cli_conn->flags |= CO_FL_ADDR_FROM_SET;
 	cli_conn->proxy_netns = l->rx.settings->netns;
 
 	conn_prepare(cli_conn, l->rx.proto, l->bind_conf->xprt);
@@ -282,7 +275,6 @@
 	conn_stop_tracking(cli_conn);
 	conn_xprt_close(cli_conn);
 	conn_free(cli_conn);
- out_close:
 	listener_release(l);
 	if (ret < 0 && l->bind_conf->xprt == xprt_get(XPRT_RAW) &&
 	    p->mode == PR_MODE_HTTP && l->bind_conf->mux_proto == NULL) {