MEDIUM: cli: handle payload in CLI proxy

The CLI proxy was not handling payload. To do that, we needed to keep a
connection active on a server and to transfer each new line over that
connection until we receive a empty line.

The CLI proxy handles the payload in the same way that the CLI do it.

Examples:

   $ echo -e "@1;add map #-1 <<\n$(cat data)\n" | socat /tmp/master-socket -

   $ socat /tmp/master-socket readline
   prompt
   master> @1
   25130> add map #-1 <<
   + test test
   + test2 test2
   + test3 test3
   +

   25130>
diff --git a/include/types/stream.h b/include/types/stream.h
index 52c0b97..8eb5115 100644
--- a/include/types/stream.h
+++ b/include/types/stream.h
@@ -164,6 +164,7 @@
 
 	int pcli_next_pid;                      /* next target PID to use for the CLI proxy */
 	int pcli_prompt;                        /* is there a prompt ?! */
+	int pcli_flags;                         /* flags for CLI proxy */
 
 	char *unique_id;                        /* custom unique ID */
 
diff --git a/src/cli.c b/src/cli.c
index 3af5c6a..9d210dd 100644
--- a/src/cli.c
+++ b/src/cli.c
@@ -1661,10 +1661,14 @@
 	if (!s->pcli_prompt)
 		return;
 
-	if (s->pcli_next_pid == 0)
-		chunk_appendf(msg, "master> ");
-	else
-		chunk_appendf(msg, "%d> ", s->pcli_next_pid);
+	if (s->pcli_flags & APPCTX_CLI_ST1_PAYLOAD) {
+		chunk_appendf(msg, "+ ");
+	} else {
+		if (s->pcli_next_pid == 0)
+			chunk_appendf(msg, "master> ");
+		else
+			chunk_appendf(msg, "%d> ", s->pcli_next_pid);
+	}
 	co_inject(oc, msg->area, msg->data);
 }
 
@@ -1815,26 +1819,39 @@
 	int argl; /* number of args */
 	char *p;
 	char *trim = NULL;
+	char *payload = NULL;
 	int wtrim = 0; /* number of words to trim */
 	int reql = 0;
 	int i = 0;
 
 	p = str;
 
-	/* Looks for the end of one command */
-	while (p+reql < end) {
-		/* handle escaping */
-		if (str[reql] == '\\') {
+	if (!(s->pcli_flags & APPCTX_CLI_ST1_PAYLOAD)) {
+
+		/* Looks for the end of one command */
+		while (p+reql < end) {
+			/* handle escaping */
+			if (p[reql] == '\\') {
+				reql++;
+				continue;
+			}
+			if (p[reql] == ';' || p[reql] == '\n') {
+				/* found the end of the command */
+				p[reql] = '\n';
+				reql++;
+				break;
+			}
 			reql++;
-			continue;
 		}
-		if (str[reql] == ';' || str[reql] == '\n') {
-			/* found the end of the command */
-			str[reql] = '\n';
+	} else {
+		while (p+reql < end) {
+			if (p[reql] == '\n') {
+				/* found the end of the line */
+				reql++;
+				break;
+			}
 			reql++;
-			break;
 		}
-		reql++;
 	}
 
 	/* set end to first byte after the end of the command */
@@ -1845,6 +1862,19 @@
 		return -1;
 	}
 
+	/* last line of the payload */
+	if ((s->pcli_flags & APPCTX_CLI_ST1_PAYLOAD) && (reql == 1)) {
+		s->pcli_flags &= ~APPCTX_CLI_ST1_PAYLOAD;
+		return reql;
+	}
+
+	payload = strstr(p, PAYLOAD_PATTERN);
+	if ((end - 1) == (payload + strlen(PAYLOAD_PATTERN))) {
+		/* if the payload pattern is at the end */
+		s->pcli_flags |= APPCTX_CLI_ST1_PAYLOAD;
+		return reql;
+	}
+
 	*(end-1) = '\0';
 
 	/* splits the command in words */
@@ -1952,8 +1982,15 @@
 
 		/* forward only 1 command */
 		channel_forward(req, to_forward);
-		/* we send only 1 command per request, and we write close after it */
-		channel_shutw_now(req);
+
+		if (!(s->pcli_flags & APPCTX_CLI_ST1_PAYLOAD)) {
+			/* we send only 1 command per request, and we write close after it */
+			channel_shutw_now(req);
+		} else {
+			pcli_write_prompt(s);
+		}
+
+		s->res.flags |= CF_WAKE_ONCE; /* need to be called again */
 
 		/* remove the XFER_DATA analysers, which forwards all
 		 * the data, we don't want to forward the next requests
@@ -1963,15 +2000,17 @@
 		req->analysers |= AN_REQ_FLT_END|CF_FLT_ANALYZE;
 		s->res.analysers |= AN_RES_WAIT_CLI;
 
-		if (next_pid > -1)
-			target_pid = next_pid;
-		else
-			target_pid = s->pcli_next_pid;
-		/* we can connect now */
-		s->target = pcli_pid_to_server(target_pid);
+		if (!(s->flags & SF_ASSIGNED)) {
+			if (next_pid > -1)
+				target_pid = next_pid;
+			else
+				target_pid = s->pcli_next_pid;
+			/* we can connect now */
+			s->target = pcli_pid_to_server(target_pid);
 
-		s->flags |= (SF_DIRECT | SF_ASSIGNED);
-		channel_auto_connect(req);
+			s->flags |= (SF_DIRECT | SF_ASSIGNED);
+			channel_auto_connect(req);
+		}
 
 	} else if (to_forward == 0) {
 		/* we trimmed things but we might have other commands to consume */
@@ -2011,6 +2050,13 @@
 	channel_dont_close(&s->res);
 	channel_dont_close(&s->req);
 
+	if (s->pcli_flags & APPCTX_CLI_ST1_PAYLOAD) {
+		s->req.analysers |= AN_REQ_WAIT_CLI;
+		s->res.analysers &= ~AN_RES_WAIT_CLI;
+		s->req.flags |= CF_WAKE_ONCE; /* need to be called again if there is some command left in the request */
+		return 0;
+	}
+
 	/* forward the data */
 	if (ci_data(rep)) {
 		c_adv(rep, ci_data(rep));
diff --git a/src/stream.c b/src/stream.c
index 67b0c8d..24df70a 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -202,6 +202,7 @@
 	s->flags |= SF_INITIALIZED;
 	s->pcli_next_pid = 0;
 	s->pcli_prompt = 0;
+	s->pcli_flags = 0;
 	s->unique_id = NULL;
 
 	if ((t = task_new(tid_bit)) == NULL)