MEDIUM: streams: Add the ability to retry a request on L7 failure.

When running in HTX mode, if we sent the request, but failed to get the
answer, either because the server just closed its socket, we hit a server
timeout, or we get a 404, 408, 425, 500, 501, 502, 503 or 504 error,
attempt to retry the request, exactly as if we just failed to connect to
the server.

To do so, add a new backend keyword, "retry-on".

It accepts a list of keywords, which can be "none" (never retry),
"conn-failure" (we failed to connect, or to do the SSL handshake),
"empty-response" (the server closed the connection without answering),
"response-timeout" (we timed out while waiting for the server response),
or "404", "408", "425", "500", "501", "502", "503" and "504".

The default is "conn-failure".
diff --git a/src/cfgparse-listen.c b/src/cfgparse-listen.c
index 5f44cfd..7c64be1 100644
--- a/src/cfgparse-listen.c
+++ b/src/cfgparse-listen.c
@@ -395,6 +395,7 @@
 		curproxy->except_mask = defproxy.except_mask;
 		curproxy->except_to = defproxy.except_to;
 		curproxy->except_mask_to = defproxy.except_mask_to;
+		curproxy->retry_type = defproxy.retry_type;
 
 		if (defproxy.fwdfor_hdr_len) {
 			curproxy->fwdfor_hdr_len  = defproxy.fwdfor_hdr_len;
diff --git a/src/cfgparse.c b/src/cfgparse.c
index 48d53e9..dd99bbb 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -2446,6 +2446,12 @@
 			}
 		}
 
+		if ((curproxy->retry_type &~ PR_RE_CONN_FAILED) &&
+		    !(curproxy->options2 & PR_O2_USE_HTX)) {
+			ha_warning("Proxy '%s' : retry-on with any other keywords than 'conn-failure' will be ignored, requires 'option http-use-htx'.\n", curproxy->id);
+			err_code |= ERR_WARN;
+			curproxy->retry_type &= PR_RE_CONN_FAILED;
+		}
 		if (curproxy->email_alert.set) {
 		    if (!(curproxy->email_alert.mailers.name && curproxy->email_alert.from && curproxy->email_alert.to)) {
 			    ha_warning("config : 'email-alert' will be ignored for %s '%s' (the presence any of "
diff --git a/src/proto_htx.c b/src/proto_htx.c
index ee9a271..d8363a2 100644
--- a/src/proto_htx.c
+++ b/src/proto_htx.c
@@ -1386,6 +1386,45 @@
 	return 0;
 }
 
+/* Reset the stream and the backend stream_interface to a situation suitable for attemption connection */
+/* Returns 0 if we can attempt to retry, -1 otherwise */
+static __inline int do_l7_retry(struct stream *s, struct stream_interface *si)
+{
+	struct channel *req, *res;
+	int co_data;
+
+	si->conn_retries--;
+	if (si->conn_retries < 0)
+		return -1;
+
+	req = &s->req;
+	res = &s->res;
+	/* Remove any write error from the request, and read error from the response */
+	req->flags &= ~(CF_WRITE_ERROR | CF_WRITE_TIMEOUT | CF_SHUTW | CF_SHUTW_NOW);
+	res->flags &= ~(CF_READ_ERROR | CF_READ_TIMEOUT | CF_SHUTR | CF_EOI | CF_READ_NULL | CF_SHUTR_NOW);
+	res->analysers = 0;
+	si->flags &= ~(SI_FL_ERR | SI_FL_EXP | SI_FL_RXBLK_SHUT);
+	si->state = SI_ST_REQ;
+	si->exp = TICK_ETERNITY;
+	res->rex = TICK_ETERNITY;
+	res->to_forward = 0;
+	res->analyse_exp = TICK_ETERNITY;
+	res->total = 0;
+	s->flags &= ~(SF_ASSIGNED | SF_ADDR_SET | SF_ERR_SRVTO | SF_ERR_SRVCL);
+	si_release_endpoint(&s->si[1]);
+	b_free(&req->buf);
+	/* Swap the L7 buffer with the channel buffer */
+	/* We know we stored the co_data as b_data, so get it there */
+	co_data = b_data(&si->l7_buffer);
+	b_set_data(&si->l7_buffer, b_size(&si->l7_buffer));
+	b_xfer(&req->buf, &si->l7_buffer, b_data(&si->l7_buffer));
+
+	co_set_data(req, co_data);
+	b_reset(&res->buf);
+	co_set_data(res, 0);
+	return 0;
+}
+
 /* This stream analyser waits for a complete HTTP response. It returns 1 if the
  * processing can continue on next analysers, or zero if it either needs more
  * data or wants to immediately abort the response (eg: timeout, error, ...). It
@@ -1406,6 +1445,7 @@
 	struct http_txn *txn = s->txn;
 	struct http_msg *msg = &txn->rsp;
 	struct htx *htx;
+	struct stream_interface *si_b = &s->si[1];
 	struct connection *srv_conn;
 	struct htx_sl *sl;
 	int n;
@@ -1453,6 +1493,17 @@
 			if (txn->flags & TX_NOT_FIRST)
 				goto abort_keep_alive;
 
+			if (si_b->flags & SI_FL_L7_RETRY) {
+				/* If we arrive here, then CF_READ_ERROR was
+				 * set by si_cs_recv() because we matched a
+				 * status, overwise it would have removed
+				 * the SI_FL_L7_RETRY flag, so it's ok not
+				 * to check s->be->retry_type.
+				 */
+				if (co_data(rep) || do_l7_retry(s, si_b) == 0)
+					return 0;
+			}
+
 			_HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
 			if (objt_server(s->target)) {
 				_HA_ATOMIC_ADD(&__objt_server(s->target)->counters.failed_resp, 1);
@@ -1484,6 +1535,11 @@
 
 		/* 2: read timeout : return a 504 to the client. */
 		else if (rep->flags & CF_READ_TIMEOUT) {
+			if ((si_b->flags & SI_FL_L7_RETRY) &&
+			    (s->be->retry_type & PR_RE_TIMEOUT)) {
+				if (co_data(rep) || do_l7_retry(s, si_b) == 0)
+					return 0;
+			}
 			_HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
 			if (objt_server(s->target)) {
 				_HA_ATOMIC_ADD(&__objt_server(s->target)->counters.failed_resp, 1);
@@ -1527,6 +1583,12 @@
 			if (txn->flags & TX_NOT_FIRST)
 				goto abort_keep_alive;
 
+			if ((si_b->flags & SI_FL_L7_RETRY) &&
+			    (s->be->retry_type & PR_RE_DISCONNECTED)) {
+				if (co_data(rep) || do_l7_retry(s, si_b) == 0)
+					return 0;
+			}
+
 			_HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
 			if (objt_server(s->target)) {
 				_HA_ATOMIC_ADD(&__objt_server(s->target)->counters.failed_resp, 1);
diff --git a/src/proxy.c b/src/proxy.c
index a3f355f..6e804a9 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -501,6 +501,62 @@
 	}
 }
 
+/* This function parses a "retry-on" statement */
+static int
+proxy_parse_retry_on(char **args, int section, struct proxy *curpx,
+                               struct proxy *defpx, const char *file, int line,
+                               char **err)
+{
+	int i;
+
+	if (!(*args[1])) {
+		memprintf(err, "'%s' needs at least one keyword to specify when to retry", args[0]);
+		return -1;
+	}
+	if (!(curpx->cap & PR_CAP_BE)) {
+		memprintf(err, "'%s' only available in backend or listen section", args[0]);
+		return -1;
+	}
+	curpx->retry_type = 0;
+	for (i = 1; *(args[i]); i++) {
+		if (!strcmp(args[i], "conn-failure"))
+			curpx->retry_type |= PR_RE_CONN_FAILED;
+		else if (!strcmp(args[i], "empty-response"))
+			curpx->retry_type |= PR_RE_DISCONNECTED;
+		else if (!strcmp(args[i], "response-timeout"))
+			curpx->retry_type |= PR_RE_TIMEOUT;
+		else if (!strcmp(args[i], "404"))
+			curpx->retry_type |= PR_RE_404;
+		else if (!strcmp(args[i], "408"))
+			curpx->retry_type |= PR_RE_408;
+		else if (!strcmp(args[i], "425"))
+			curpx->retry_type |= PR_RE_425;
+		else if (!strcmp(args[i], "500"))
+			curpx->retry_type |= PR_RE_500;
+		else if (!strcmp(args[i], "501"))
+			curpx->retry_type |= PR_RE_501;
+		else if (!strcmp(args[i], "502"))
+			curpx->retry_type |= PR_RE_502;
+		else if (!strcmp(args[i], "503"))
+			curpx->retry_type |= PR_RE_503;
+		else if (!strcmp(args[i], "504"))
+			curpx->retry_type |= PR_RE_504;
+		else if (!strcmp(args[i], "none")) {
+			if (i != 1 || *args[i + 1]) {
+				memprintf(err, "'%s' 'none' keyworld only usable alone", args[0]);
+				return -1;
+			}
+		} else {
+			memprintf(err, "'%s': unknown keyword '%s'", args[0], args[i]);
+			return -1;
+		}
+
+	}
+
+
+	return 0;
+}
+
 /* This function inserts proxy <px> into the tree of known proxies. The proxy's
  * name is used as the storing key so it must already have been initialized.
  */
@@ -823,6 +879,9 @@
 	/* HTX is the default mode, for HTTP and TCP */
 	p->options2 |= PR_O2_USE_HTX;
 
+	/* Default to only allow L4 retries */
+	p->retry_type = PR_RE_CONN_FAILED;
+
 	HA_SPIN_INIT(&p->lock);
 }
 
@@ -1590,6 +1649,7 @@
 	{ CFG_LISTEN, "rate-limit", proxy_parse_rate_limit },
 	{ CFG_LISTEN, "max-keep-alive-queue", proxy_parse_max_ka_queue },
 	{ CFG_LISTEN, "declare", proxy_parse_declare },
+	{ CFG_LISTEN, "retry-on", proxy_parse_retry_on },
 	{ 0, NULL, NULL },
 }};
 
diff --git a/src/stream.c b/src/stream.c
index b3573c8..8c2ea55 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -323,6 +323,7 @@
 	if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0)
 		goto out_fail_accept;
 
+	s->si[1].l7_buffer = BUF_NULL;
 	/* finish initialization of the accepted file descriptor */
 	if (appctx)
 		si_want_get(&s->si[0]);
@@ -475,6 +476,7 @@
 	tasklet_free(s->si[0].wait_event.task);
 	tasklet_free(s->si[1].wait_event.task);
 
+	b_free(&s->si[1].l7_buffer);
 	if (must_free_sess) {
 		sess->origin = NULL;
 		session_free(sess);
@@ -769,7 +771,7 @@
 
 	/* ensure that we have enough retries left */
 	si->conn_retries--;
-	if (si->conn_retries < 0) {
+	if (si->conn_retries < 0 || !(s->be->retry_type & PR_RE_CONN_FAILED)) {
 		if (!si->err_type) {
 			si->err_type = SI_ET_CONN_ERR;
 		}
@@ -2322,6 +2324,8 @@
 				 */
 				si_b->state = SI_ST_REQ; /* new connection requested */
 				si_b->conn_retries = s->be->conn_retries;
+				if (s->be->retry_type &~ PR_RE_CONN_FAILED)
+					si_b->flags |= SI_FL_L7_RETRY;
 			}
 		}
 		else {
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 1e50c1f..731df38 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -30,8 +30,10 @@
 #include <proto/applet.h>
 #include <proto/channel.h>
 #include <proto/connection.h>
+#include <proto/http_htx.h>
 #include <proto/mux_pt.h>
 #include <proto/pipe.h>
+#include <proto/proxy.h>
 #include <proto/stream.h>
 #include <proto/stream_interface.h>
 #include <proto/task.h>
@@ -685,6 +687,34 @@
 		if (oc->flags & CF_STREAMER)
 			send_flag |= CO_SFL_STREAMER;
 
+		if ((si->flags & SI_FL_L7_RETRY) && !b_data(&si->l7_buffer)) {
+			/* If we want to be able to do L7 retries, copy
+			 * the data we're about to send, so that we are able
+			 * to resend them if needed
+			 */
+			/* Try to allocate a buffer if we had none.
+			 * If it fails, the next test will just
+			 * disable the l7 retries by setting
+			 * l7_conn_retries to 0.
+			 */
+			if (!(oc->flags & CF_EOI))
+				si->flags &= ~SI_FL_L7_RETRY;
+			else {
+				if (b_is_null(&si->l7_buffer))
+					b_alloc(&si->l7_buffer);
+				if (b_is_null(&si->l7_buffer))
+					si->flags &= ~SI_FL_L7_RETRY;
+				else {
+					memcpy(b_orig(&si->l7_buffer),
+					       b_orig(&oc->buf),
+					       b_size(&oc->buf));
+					si->l7_buffer.head = co_data(oc);
+					b_add(&si->l7_buffer, co_data(oc));
+				}
+
+			}
+		}
+
 		ret = cs->conn->mux->snd_buf(cs, &oc->buf, co_data(oc), send_flag);
 		if (ret > 0) {
 			did_send = 1;
@@ -1268,6 +1298,27 @@
 			break;
 		}
 
+		if (si->flags & SI_FL_L7_RETRY) {
+			struct htx *htx;
+			struct htx_sl *sl;
+
+			htx = htxbuf(&ic->buf);
+			if (htx) {
+				sl = http_find_stline(htx);
+				if (sl && l7_status_match(si_strm(si)->be,
+				    sl->info.res.status)) {
+					/* If we got a status for which we would
+					 * like to retry the request, empty
+					 * the buffer and pretend there's an
+					 * error on the channel.
+					 */
+					ic->flags |= CF_READ_ERROR;
+					htx_reset(htx);
+					return 1;
+				}
+			}
+			si->flags &= ~SI_FL_L7_RETRY;
+		}
 		cur_read += ret;
 
 		/* if we're allowed to directly forward data, we must update ->o */