[MEDIUM] move connection establishment from backend to the SI.

The connection establishment was completely handled by backend.c which
normally just handles LB algos. Since it's purely TCP, it must move to
proto_tcp.c. Also, instead of calling it directly, we now call it via
the stream interface, which will later help us unify session handling.
diff --git a/src/backend.c b/src/backend.c
index baa301a..7a6b7b8 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -18,8 +18,6 @@
 #include <string.h>
 #include <ctype.h>
 
-#include <netinet/tcp.h>
-
 #include <common/compat.h>
 #include <common/config.h>
 #include <common/debug.h>
@@ -32,16 +30,11 @@
 #include <proto/acl.h>
 #include <proto/backend.h>
 #include <proto/client.h>
-#include <proto/fd.h>
-#include <proto/httperr.h>
-#include <proto/log.h>
-#include <proto/port_range.h>
 #include <proto/proto_http.h>
 #include <proto/proto_tcp.h>
 #include <proto/queue.h>
 #include <proto/server.h>
 #include <proto/session.h>
-#include <proto/stream_sock.h>
 #include <proto/task.h>
 
 static inline void fwrr_remove_from_tree(struct server *s);
@@ -1791,7 +1784,7 @@
  */
 int connect_server(struct session *s)
 {
-	int fd, err;
+	int err;
 
 	if (!(s->flags & SN_ADDR_SET)) {
 		err = assign_server_address(s);
@@ -1799,234 +1792,16 @@
 			return SN_ERR_INTERNAL;
 	}
 
-	if ((fd = s->req->cons->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
-		qfprintf(stderr, "Cannot get a server socket.\n");
-
-		if (errno == ENFILE)
-			send_log(s->be, LOG_EMERG,
-				 "Proxy %s reached system FD limit at %d. Please check system tunables.\n",
-				 s->be->id, maxfd);
-		else if (errno == EMFILE)
-			send_log(s->be, LOG_EMERG,
-				 "Proxy %s reached process FD limit at %d. Please check 'ulimit-n' and restart.\n",
-				 s->be->id, maxfd);
-		else if (errno == ENOBUFS || errno == ENOMEM)
-			send_log(s->be, LOG_EMERG,
-				 "Proxy %s reached system memory limit at %d sockets. Please check system tunables.\n",
-				 s->be->id, maxfd);
-		/* this is a resource error */
-		return SN_ERR_RESOURCE;
-	}
-
-	if (fd >= global.maxsock) {
-		/* do not log anything there, it's a normal condition when this option
-		 * is used to serialize connections to a server !
-		 */
-		Alert("socket(): not enough free sockets. Raise -n argument. Giving up.\n");
-		close(fd);
-		return SN_ERR_PRXCOND; /* it is a configuration limit */
-	}
-
-	if ((fcntl(fd, F_SETFL, O_NONBLOCK)==-1) ||
-	    (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one)) == -1)) {
-		qfprintf(stderr,"Cannot set client socket to non blocking mode.\n");
-		close(fd);
+	if (!s->req->cons->connect)
 		return SN_ERR_INTERNAL;
-	}
-
-	if (s->be->options & PR_O_TCP_SRV_KA)
-		setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one));
-
-	if (s->be->options & PR_O_TCP_NOLING)
-		setsockopt(fd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger));
-
-	/* allow specific binding :
-	 * - server-specific at first
-	 * - proxy-specific next
-	 */
-	if (s->srv != NULL && s->srv->state & SRV_BIND_SRC) {
-		struct sockaddr_in *remote = NULL;
-		int ret, flags = 0;
-
-#if defined(CONFIG_HAP_CTTPROXY) || defined(CONFIG_HAP_LINUX_TPROXY)
-		switch (s->srv->state & SRV_TPROXY_MASK) {
-		case SRV_TPROXY_ADDR:
-			remote = (struct sockaddr_in *)&s->srv->tproxy_addr;
-			flags  = 3;
-			break;
-		case SRV_TPROXY_CLI:
-			flags |= 2;
-			/* fall through */
-		case SRV_TPROXY_CIP:
-			/* FIXME: what can we do if the client connects in IPv6 ? */
-			flags |= 1;
-			remote = (struct sockaddr_in *)&s->cli_addr;
-			break;
-		}
-#endif
-#ifdef SO_BINDTODEVICE
-		/* Note: this might fail if not CAP_NET_RAW */
-		if (s->srv->iface_name)
-			setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, s->srv->iface_name, s->srv->iface_len + 1);
-#endif
-
-		if (s->srv->sport_range) {
-			int attempts = 10; /* should be more than enough to find a spare port */
-			struct sockaddr_in src;
-
-			ret = 1;
-			src = s->srv->source_addr;
-
-			do {
-				/* note: in case of retry, we may have to release a previously
-				 * allocated port, hence this loop's construct.
-				 */
-				port_range_release_port(fdtab[fd].port_range, fdtab[fd].local_port);
-				fdtab[fd].port_range = NULL;
-
-				if (!attempts)
-					break;
-				attempts--;
-
-				fdtab[fd].local_port = port_range_alloc_port(s->srv->sport_range);
-				if (!fdtab[fd].local_port)
-					break;
-
-				fdtab[fd].port_range = s->srv->sport_range;
-				src.sin_port = htons(fdtab[fd].local_port);
-
-				ret = tcpv4_bind_socket(fd, flags, &src, remote);
-			} while (ret != 0); /* binding NOK */
-		}
-		else {
-			ret = tcpv4_bind_socket(fd, flags, &s->srv->source_addr, remote);
-		}
-
-		if (ret) {
-			port_range_release_port(fdtab[fd].port_range, fdtab[fd].local_port);
-			fdtab[fd].port_range = NULL;
-			close(fd);
-
-			if (ret == 1) {
-				Alert("Cannot bind to source address before connect() for server %s/%s. Aborting.\n",
-				      s->be->id, s->srv->id);
-				send_log(s->be, LOG_EMERG,
-					 "Cannot bind to source address before connect() for server %s/%s.\n",
-					 s->be->id, s->srv->id);
-			} else {
-				Alert("Cannot bind to tproxy source address before connect() for server %s/%s. Aborting.\n",
-				      s->be->id, s->srv->id);
-				send_log(s->be, LOG_EMERG,
-					 "Cannot bind to tproxy source address before connect() for server %s/%s.\n",
-					 s->be->id, s->srv->id);
-			}
-			return SN_ERR_RESOURCE;
-		}
-	}
-	else if (s->be->options & PR_O_BIND_SRC) {
-		struct sockaddr_in *remote = NULL;
-		int ret, flags = 0;
-
-#if defined(CONFIG_HAP_CTTPROXY) || defined(CONFIG_HAP_LINUX_TPROXY)
-		switch (s->be->options & PR_O_TPXY_MASK) {
-		case PR_O_TPXY_ADDR:
-			remote = (struct sockaddr_in *)&s->be->tproxy_addr;
-			flags  = 3;
-			break;
-		case PR_O_TPXY_CLI:
-			flags |= 2;
-			/* fall through */
-		case PR_O_TPXY_CIP:
-			/* FIXME: what can we do if the client connects in IPv6 ? */
-			flags |= 1;
-			remote = (struct sockaddr_in *)&s->cli_addr;
-			break;
-		}
-#endif
-#ifdef SO_BINDTODEVICE
-		/* Note: this might fail if not CAP_NET_RAW */
-		if (s->be->iface_name)
-			setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, s->be->iface_name, s->be->iface_len + 1);
-#endif
-		ret = tcpv4_bind_socket(fd, flags, &s->be->source_addr, remote);
-		if (ret) {
-			close(fd);
-			if (ret == 1) {
-				Alert("Cannot bind to source address before connect() for proxy %s. Aborting.\n",
-				      s->be->id);
-				send_log(s->be, LOG_EMERG,
-					 "Cannot bind to source address before connect() for proxy %s.\n",
-					 s->be->id);
-			} else {
-				Alert("Cannot bind to tproxy source address before connect() for proxy %s. Aborting.\n",
-				      s->be->id);
-				send_log(s->be, LOG_EMERG,
-					 "Cannot bind to tproxy source address before connect() for proxy %s.\n",
-					 s->be->id);
-			}
-			return SN_ERR_RESOURCE;
-		}
-	}
-
-#if defined(TCP_QUICKACK) && defined(SOL_TCP)
-	/* disabling tcp quick ack now allows the first request to leave the
-	 * machine with the first ACK. We only do this if there are pending
-	 * data in the buffer.
-	 */
-	if ((s->be->options2 & PR_O2_SMARTCON) && s->req->send_max)
-                setsockopt(fd, SOL_TCP, TCP_QUICKACK, (char *) &zero, sizeof(zero));
-#endif
 
-	if ((connect(fd, (struct sockaddr *)&s->srv_addr, sizeof(s->srv_addr)) == -1) &&
-	    (errno != EINPROGRESS) && (errno != EALREADY) && (errno != EISCONN)) {
+	err = s->req->cons->connect(s->req->cons, s->be, s->srv,
+				    (struct sockaddr *)&s->srv_addr,
+				    (struct sockaddr *)&s->cli_addr);
 
-		if (errno == EAGAIN || errno == EADDRINUSE) {
-			char *msg;
-			if (errno == EAGAIN) /* no free ports left, try again later */
-				msg = "no free ports";
-			else
-				msg = "local address already in use";
-
-			qfprintf(stderr,"Cannot connect: %s.\n",msg);
-			port_range_release_port(fdtab[fd].port_range, fdtab[fd].local_port);
-			fdtab[fd].port_range = NULL;
-			close(fd);
-			send_log(s->be, LOG_EMERG,
-				 "Connect() failed for server %s/%s: %s.\n",
-				 s->be->id, s->srv->id, msg);
-			return SN_ERR_RESOURCE;
-		} else if (errno == ETIMEDOUT) {
-			//qfprintf(stderr,"Connect(): ETIMEDOUT");
-			port_range_release_port(fdtab[fd].port_range, fdtab[fd].local_port);
-			fdtab[fd].port_range = NULL;
-			close(fd);
-			return SN_ERR_SRVTO;
-		} else {
-			// (errno == ECONNREFUSED || errno == ENETUNREACH || errno == EACCES || errno == EPERM)
-			//qfprintf(stderr,"Connect(): %d", errno);
-			port_range_release_port(fdtab[fd].port_range, fdtab[fd].local_port);
-			fdtab[fd].port_range = NULL;
-			close(fd);
-			return SN_ERR_SRVCL;
-		}
-	}
-
-	fdtab[fd].owner = s->req->cons;
-	fdtab[fd].state = FD_STCONN; /* connection in progress */
-	fdtab[fd].flags = FD_FL_TCP | FD_FL_TCP_NODELAY;
-	fdtab[fd].cb[DIR_RD].f = &stream_sock_read;
-	fdtab[fd].cb[DIR_RD].b = s->rep;
-	fdtab[fd].cb[DIR_WR].f = &stream_sock_write;
-	fdtab[fd].cb[DIR_WR].b = s->req;
-
-	fdtab[fd].peeraddr = (struct sockaddr *)&s->srv_addr;
-	fdtab[fd].peerlen = sizeof(s->srv_addr);
-
-	fd_insert(fd);
-	EV_FD_SET(fd, DIR_WR);  /* for connect status */
+	if (err != SN_ERR_NONE)
+		return err;
 
-	s->req->cons->state = SI_ST_CON;
-	s->req->cons->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
 	if (s->srv) {
 		s->flags |= SN_CURR_SESS;
 		s->srv->cur_sess++;
@@ -2036,7 +1811,6 @@
 			s->be->lbprm.server_take_conn(s->srv);
 	}
 
-	s->req->cons->exp = tick_add_ifset(now_ms, s->be->timeout.connect);
 	return SN_ERR_NONE;  /* connection is OK */
 }