MAJOR: peers: peers protocol version 2.0

This patch does'nt add any new feature: the functional behavior
is the same than version 1.0.

Technical differences:

In this version all updates on different stick tables are
multiplexed on the same tcp session. There is only one established
tcp session per peer whereas in first version there was one established
tcp session per peer and per stick table.

Messages format was reviewed to be more evolutive and to support
further types of data exchange such as SSL sessions or other sticktable's
data types (currently only the sticktable's server id is supported).
diff --git a/include/proto/peers.h b/include/proto/peers.h
index 27b3875..782b66e 100644
--- a/include/proto/peers.h
+++ b/include/proto/peers.h
@@ -28,6 +28,7 @@
 #include <types/stream.h>
 #include <types/peers.h>
 
+void peers_init_sync(struct peers *peers);
 void peers_register_table(struct peers *, struct stktable *table);
 void peers_setup_frontend(struct proxy *fe);
 
diff --git a/include/types/peers.h b/include/types/peers.h
index fbe2506..320e128 100644
--- a/include/types/peers.h
+++ b/include/types/peers.h
@@ -35,40 +35,23 @@
 #include <common/tools.h>
 #include <eb32tree.h>
 
-struct peer_session {
-	struct shared_table *table;   /* shared table */
-	struct peer *peer;	      /* current peer */
-	struct stream *stream;        /* current transport stream */
-	struct appctx *appctx;        /* the appctx running it */
-	unsigned int flags; 	      /* peer session flags */
-	unsigned int statuscode;      /* current/last session status code */
-	unsigned int update;	      /* current peer acked update */
-	unsigned int pushack;	      /* last commited update to ack */
-	unsigned int lastack;	      /* last acked update */
-	unsigned int lastpush;	      /* last pushed update */
-	unsigned int confirm;	      /* confirm message counter */
-	unsigned int pushed;	      /* equal to last pushed data or to table local update in case of total push
-				       * or to teaching_origin if teaching is ended */
-	unsigned int reconnect;	      /* next connect timer */
-	unsigned int teaching_origin; /* resync teaching origine update */
-	struct peer_session *next;
-};
-
 struct shared_table {
 	struct stktable *table;		    /* stick table to sync */
-	struct task *sync_task;		    /* main sync task */
-	struct sig_handler *sighandler;     /* signal handler */
-	struct peer_session *local_session; /* local peer session */
-	struct peer_session *sessions;	    /* peer sessions list */
-	unsigned int flags;		    /* current table resync state */
-	unsigned int resync_timeout;	    /* resync timeout timer */
+	int local_id;
+	int remote_id;
+	int flags;
+	uint64_t remote_data;
+	unsigned int last_acked;
+	unsigned int last_pushed;
+	unsigned int last_get;
+	unsigned int teaching_origin;
+	unsigned int update;
 	struct shared_table *next;	    /* next shared table in list */
 };
 
 struct peer {
 	int local;		  /* proxy state */
 	char *id;
-	struct peers *peers;
 	struct {
 		const char *file; /* file where the section appears */
 		int line;	  /* line where the section appears */
@@ -78,6 +61,15 @@
 	struct protocol *proto;	       /* peer address protocol */
 	struct xprt_ops *xprt;         /* peer socket operations at transport layer */
 	void *sock_init_arg;           /* socket operations's opaque init argument if needed */
+	unsigned int flags; 	      /* peer session flags */
+	unsigned int statuscode;      /* current/last session status code */
+	unsigned int reconnect;	      /* next connect timer */
+	unsigned int confirm;         /* confirm message counter */
+	struct stream *stream;        /* current transport stream */
+	struct appctx *appctx;        /* the appctx running it */
+	struct shared_table *remote_table;
+	struct shared_table *last_local_table;
+	struct shared_table *tables;
 	struct peer *next;	  /* next peer in the list */
 };
 
@@ -85,15 +77,19 @@
 struct peers {
 	int state;			 /* proxy state */
 	char *id;			 /* peer section name */
+	struct task *sync_task;		 /* main sync task */
+	struct sig_handler *sighandler;	 /* signal handler */
 	struct peer *remote;		 /* remote peers list */
+	struct peer *local;		 /* local peer list */
 	struct proxy *peers_fe;		 /* peer frontend */
 	struct {
 		const char *file;	 /* file where the section appears */
 		int line;		 /* line where the section appears */
 	} conf;				 /* config information */
-	struct shared_table *tables;	 /* registered shared tables */
 	time_t last_change;
 	struct peers *next;		 /* next peer section */
+	unsigned int flags;		 /* current peers section resync state */
+	unsigned int resync_timeout;	 /* resync timeout timer */
 	int count;			 /* total of peers */
 };
 
diff --git a/src/cfgparse.c b/src/cfgparse.c
index bb31188..6d89b3d 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -1986,7 +1986,6 @@
 		curpeers->count++;
 		newpeer->next = curpeers->remote;
 		curpeers->remote = newpeer;
-		newpeer->peers = curpeers;
 		newpeer->conf.file = strdup(file);
 		newpeer->conf.line = linenum;
 
@@ -2030,6 +2029,7 @@
 		if (strcmp(newpeer->id, localpeer) == 0) {
 			/* Current is local peer, it define a frontend */
 			newpeer->local = 1;
+			peers->local = newpeer;
 
 			if (!curpeers->peers_fe) {
 				if ((curpeers->peers_fe  = calloc(1, sizeof(struct proxy))) == NULL) {
@@ -8397,6 +8397,7 @@
 				curpeers->peers_fe = NULL;
 			}
 			else {
+				peers_init_sync(curpeers);
 				last = &curpeers->next;
 				continue;
 			}
diff --git a/src/peers.c b/src/peers.c
index 468a96d..3174929 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -1,5 +1,5 @@
 /*
- * Stick table synchro management.
+ * Peer synchro management.
  *
  * Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr>
  *
@@ -52,34 +52,79 @@
 /*******************************/
 
 /******************************/
-/* Current table resync state */
+/* Current peers section resync state */
 /******************************/
-#define	SHTABLE_F_RESYNC_LOCAL		0x00000001 /* Learn from local finished or no more needed */
-#define	SHTABLE_F_RESYNC_REMOTE		0x00000002 /* Learn from remote finished or no more needed */
-#define	SHTABLE_F_RESYNC_ASSIGN		0x00000004 /* A peer was assigned to learn our lesson */
-#define	SHTABLE_F_RESYNC_PROCESS	0x00000008 /* The assigned peer was requested for resync */
-#define	SHTABLE_F_DONOTSTOP		0x00010000 /* Main table sync task block process during soft stop
+#define	PEERS_F_RESYNC_LOCAL		0x00000001 /* Learn from local finished or no more needed */
+#define	PEERS_F_RESYNC_REMOTE		0x00000002 /* Learn from remote finished or no more needed */
+#define	PEERS_F_RESYNC_ASSIGN		0x00000004 /* A peer was assigned to learn our lesson */
+#define	PEERS_F_RESYNC_PROCESS		0x00000008 /* The assigned peer was requested for resync */
+#define	PEERS_F_DONOTSTOP		0x00010000 /* Main table sync task block process during soft stop
 						      to push data to new process */
 
-#define	SHTABLE_RESYNC_STATEMASK	(SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
-#define	SHTABLE_RESYNC_FROMLOCAL	0x00000000
-#define	SHTABLE_RESYNC_FROMREMOTE	SHTABLE_F_RESYNC_LOCAL
-#define	SHTABLE_RESYNC_FINISHED		(SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
+#define	PEERS_RESYNC_STATEMASK		(PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE)
+#define	PEERS_RESYNC_FROMLOCAL		0x00000000
+#define	PEERS_RESYNC_FROMREMOTE		PEERS_F_RESYNC_LOCAL
+#define	PEERS_RESYNC_FINISHED		(PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE)
+
+/***********************************/
+/* Current shared table sync state */
+/***********************************/
+#define SHTABLE_F_TEACH_STAGE1		0x00000001 /* Teach state 1 complete */
+#define SHTABLE_F_TEACH_STAGE2		0x00000002 /* Teach state 2 complete */
 
 /******************************/
 /* Remote peer teaching state */
 /******************************/
 #define	PEER_F_TEACH_PROCESS		0x00000001 /* Teach a lesson to current peer */
-#define	PEER_F_TEACH_STAGE1		0x00000002 /* Teach state 1 complete */
-#define	PEER_F_TEACH_STAGE2		0x00000004 /* Teach stage 2 complete */
 #define	PEER_F_TEACH_FINISHED		0x00000008 /* Teach conclude, (wait for confirm) */
 #define	PEER_F_TEACH_COMPLETE		0x00000010 /* All that we know already taught to current peer, used only for a local peer */
 #define	PEER_F_LEARN_ASSIGN		0x00000100 /* Current peer was assigned for a lesson */
 #define	PEER_F_LEARN_NOTUP2DATE		0x00000200 /* Learn from peer finished but peer is not up to date */
 
-#define	PEER_TEACH_RESET		~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_STAGE1|PEER_F_TEACH_STAGE2|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
+#define	PEER_TEACH_RESET		~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
 #define	PEER_LEARN_RESET		~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
 
+/*****************************/
+/* Sync message class        */
+/*****************************/
+enum {
+	PEER_MSG_CLASS_CONTROL = 0,
+	PEER_MSG_CLASS_ERROR,
+	PEER_MSG_CLASS_STICKTABLE = 10,
+	PEER_MSG_CLASS_RESERVED = 255,
+};
+
+/*****************************/
+/* control message types     */
+/*****************************/
+enum {
+	PEER_MSG_CTRL_RESYNCREQ = 0,
+	PEER_MSG_CTRL_RESYNCFINISHED,
+	PEER_MSG_CTRL_RESYNCPARTIAL,
+	PEER_MSG_CTRL_RESYNCCONFIRM,
+};
+
+/*****************************/
+/* error message types       */
+/*****************************/
+enum {
+	PEER_MSG_ERR_PROTOCOL = 0,
+	PEER_MSG_ERR_SIZELIMIT,
+};
+
+
+/*******************************/
+/* stick table sync mesg types */
+/* Note: ids >= 128 contains   */
+/* id message cotains data     */
+/*******************************/
+enum {
+	PEER_MSG_STKT_UPDATE = 128,
+	PEER_MSG_STKT_INCUPDATE,
+	PEER_MSG_STKT_DEFINE,
+	PEER_MSG_STKT_SWITCH,
+	PEER_MSG_STKT_ACK,
+};
 
 /**********************************/
 /* Peer Session IO handler states */
@@ -90,13 +135,14 @@
 	PEER_SESS_ST_GETVERSION,     /* Validate supported protocol version */
 	PEER_SESS_ST_GETHOST,        /* Validate host ID correspond to local host id */
 	PEER_SESS_ST_GETPEER,        /* Validate peer ID correspond to a known remote peer id */
-	PEER_SESS_ST_GETTABLE,       /* Search into registered table for a table with same id and validate type and size */
 	/* after this point, data were possibly exchanged */
 	PEER_SESS_ST_SENDSUCCESS,    /* Send ret code 200 (success) and wait for message */
 	PEER_SESS_ST_CONNECT,        /* Initial state for session create on a connect, push presentation into buffer */
 	PEER_SESS_ST_GETSTATUS,      /* Wait for the welcome message */
 	PEER_SESS_ST_WAITMSG,        /* Wait for data messages */
 	PEER_SESS_ST_EXIT,           /* Exit with status code */
+	PEER_SESS_ST_ERRPROTO,       /* Send error proto message before exit */
+	PEER_SESS_ST_ERRSIZE,        /* Send error size message before exit */
 	PEER_SESS_ST_END,            /* Killed session */
 };
 
@@ -115,66 +161,240 @@
 #define	PEER_SESS_SC_ERRVERSION		502 /* unknown protocol version */
 #define	PEER_SESS_SC_ERRHOST		503 /* bad host name */
 #define	PEER_SESS_SC_ERRPEER		504 /* unknown peer */
-#define	PEER_SESS_SC_ERRTYPE		505 /* table key type mismatch */
-#define	PEER_SESS_SC_ERRSIZE		506 /* table key size mismatch */
-#define	PEER_SESS_SC_ERRTABLE		507 /* unknown table */
 
 #define PEER_SESSION_PROTO_NAME         "HAProxyS"
 
 struct peers *peers = NULL;
 static void peer_session_forceshutdown(struct stream * stream);
 
+int intencode(uint64_t i, char **str) {
+	int idx = 0;
+	unsigned char *msg;
+
+	if (!*str)
+		return 0;
+
+	msg = (unsigned char *)*str;
+	if (i < 240) {
+		msg[0] = (unsigned char)i;
+		*str = (char *)&msg[idx+1];
+		return (idx+1);
+	}
+
+	msg[idx] =(unsigned char)i | 240;
+	i = (i - 240) >> 4;
+	while (i >= 128) {
+		msg[++idx] = (unsigned char)i | 128;
+		i = (i - 128) >> 7;
+	}
+	msg[++idx] = (unsigned char)i;
+	*str = (char *)&msg[idx+1];
+	return (idx+1);
+}
+
+
+/* This function returns the decoded integer or 0
+   if decode failed
+   *str point on the beginning of the integer to decode
+   at the end of decoding *str point on the end of the
+   encoded integer or to null if end is reached */
+uint64_t intdecode(char **str, char *end) {
+	uint64_t i;
+	int idx = 0;
+	unsigned char *msg;
+
+	if (!*str)
+		return 0;
+
+	msg = (unsigned char *)*str;
+	if (msg >= (unsigned char *)end) {
+		*str = NULL;
+		return 0;
+	}
+
+	if (msg[idx] < 240) {
+		*str = (char *)&msg[idx+1];
+		return msg[idx];
+	}
+	i = msg[idx];
+	do {
+		idx++;
+		if (msg >= (unsigned char *)end) {
+			*str = NULL;
+			return 0;
+		}
+		i += (uint64_t)msg[idx] <<  (4 + 7*(idx-1));
+	}
+	while (msg[idx] > 128);
+	*str = (char *)&msg[idx+1];
+	return i;
+}
 
 /*
- * This prepare the data update message of the stick session <ts>, <ps> is the the peer session
- * where the data going to be pushed, <msg> is a buffer of <size> to recieve data message content
+ * This prepare the data update message on the stick session <ts>, <st> is the considered
+ * stick table.
+ *  <msg> is a buffer of <size> to recieve data message content
+ * If function returns 0, the caller should consider we were unable to encode this message (TODO:
+ * check size)
  */
-static int peer_prepare_datamsg(struct stksess *ts, struct peer_session *ps, char *msg, size_t size)
+static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, char *msg, size_t size, int use_identifier)
 {
 	uint32_t netinteger;
-	int len;
+	unsigned short datalen;
+	char *cursor, *datamsg;
+
+	cursor = datamsg = msg + 1 + 5;
+
 	/* construct message */
-	if (ps->lastpush && ts->upd.key > ps->lastpush && (ts->upd.key - ps->lastpush) <= 127) {
-		msg[0] = 0x80 + ts->upd.key - ps->lastpush;
-		len = sizeof(char);
+
+	/* check if we need to send the update identifer */
+	if (st->last_pushed && ts->upd.key > st->last_pushed && (ts->upd.key - st->last_pushed) == 1) {
+		use_identifier = 0;
 	}
-	else {
-		msg[0] = 'D';
+
+	/* encode update identifier if needed */
+	if (use_identifier)  {
 		netinteger = htonl(ts->upd.key);
-		memcpy(&msg[sizeof(char)], &netinteger, sizeof(netinteger));
-		len = sizeof(char) + sizeof(netinteger);
+		memcpy(cursor, &netinteger, sizeof(netinteger));
+		cursor += sizeof(netinteger);
 	}
 
-	if (ps->table->table->type == STKTABLE_TYPE_STRING) {
+	/* encode the key */
+	if (st->table->type == STKTABLE_TYPE_STRING) {
 		int stlen = strlen((char *)ts->key.key);
 
-		netinteger = htonl(strlen((char *)ts->key.key));
-		memcpy(&msg[len], &netinteger, sizeof(netinteger));
-		memcpy(&msg[len+sizeof(netinteger)], ts->key.key, stlen);
-		len += sizeof(netinteger) + stlen;
-
+		intencode(stlen, &cursor);
+		memcpy(cursor, ts->key.key, stlen);
+		cursor += stlen;
 	}
-	else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
+	else if (st->table->type == STKTABLE_TYPE_INTEGER) {
 		netinteger = htonl(*((uint32_t *)ts->key.key));
-		memcpy(&msg[len], &netinteger, sizeof(netinteger));
-		len += sizeof(netinteger);
+		memcpy(cursor, &netinteger, sizeof(netinteger));
+		cursor += sizeof(netinteger);
 	}
 	else {
-		memcpy(&msg[len], ts->key.key, ps->table->table->key_size);
-		len += ps->table->table->key_size;
+		memcpy(cursor, ts->key.key, st->table->key_size);
+		cursor += st->table->key_size;
 	}
 
-	if (stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
-		netinteger = htonl(stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id));
+	/* encode values */
+	if (stktable_data_ptr(st->table, ts, STKTABLE_DT_SERVER_ID)) {
+		int srvid;
+
+		srvid = stktable_data_cast(stktable_data_ptr(st->table, ts, STKTABLE_DT_SERVER_ID), server_id);
+		intencode(srvid, &cursor);
+	}
+
+	/* Compute datalen */
+	datalen = (cursor - datamsg);
+
+	/*  prepare message header */
+	msg[0] = PEER_MSG_CLASS_STICKTABLE;
+	if (use_identifier)
+		msg[1] = PEER_MSG_STKT_UPDATE;
 	else
-		netinteger = 0;
+		msg[1] = PEER_MSG_STKT_INCUPDATE;
+
+	cursor = &msg[2];
+	intencode(datalen, &cursor);
+
+	/* move data after header */
+	memmove(cursor, datamsg, datalen);
+
+	/* return header size + data_len */
+	return (cursor - msg) + datalen;
+}
+
+/*
+ * This prepare the switch table message to targeted share table <st>.
+ *  <msg> is a buffer of <size> to recieve data message content
+ * If function returns 0, the caller should consider we were unable to encode this message (TODO:
+ * check size)
+ */
+static int peer_prepare_switchmsg(struct shared_table *st, char *msg, size_t size)
+{
+	int len;
+	unsigned short datalen;
+	char *cursor, *datamsg;
+	uint64_t data = 0;
+
+	cursor = datamsg = msg + 2 + 5;
+
+	/* Encode data */
+
+	/* encode local id */
+	intencode(st->local_id, &cursor);
+
+	/* encode table name */
+	len = strlen(st->table->id);
+	intencode(len, &cursor);
+	memcpy(cursor, st->table->id, len);
+	cursor += len;
+
+	/* encode table type */
+
+	intencode(st->table->type, &cursor);
+
+	/* encode table key size */
+	intencode(st->table->key_size, &cursor);
+
+	/* encode available data types in table */
+	if (st->table->data_ofs[STKTABLE_DT_SERVER_ID]) {
+		data |= 1 << STKTABLE_DT_SERVER_ID;
+	}
+	intencode(data, &cursor);
+
+	/* Compute datalen */
+	datalen = (cursor - datamsg);
 
-	memcpy(&msg[len], &netinteger , sizeof(netinteger));
-	len += sizeof(netinteger);
+	/*  prepare message header */
+	msg[0] = PEER_MSG_CLASS_STICKTABLE;
+	msg[1] = PEER_MSG_STKT_DEFINE;
+	cursor = &msg[2];
+	intencode(datalen, &cursor);
 
-	return len;
+	/* move data after header */
+	memmove(cursor, datamsg, datalen);
+
+	/* return header size + data_len */
+	return (cursor - msg) + datalen;
 }
 
+/*
+ * This prepare the acknowledge message on the stick session <ts>, <st> is the considered
+ * stick table.
+ *  <msg> is a buffer of <size> to recieve data message content
+ * If function returns 0, the caller should consider we were unable to encode this message (TODO:
+ * check size)
+ */
+static int peer_prepare_ackmsg(struct shared_table *st, char *msg, size_t size)
+{
+	unsigned short datalen;
+	char *cursor, *datamsg;
+	uint32_t netinteger;
+
+	cursor = datamsg = trash.str + 2 + 5;
+
+	intencode(st->remote_id, &cursor);
+	netinteger = htonl(st->last_get);
+	memcpy(cursor, &netinteger, sizeof(netinteger));
+	cursor += sizeof(netinteger);
+
+	/* Compute datalen */
+	datalen = (cursor - datamsg);
+
+	/*  prepare message header */
+	msg[0] = PEER_MSG_CLASS_STICKTABLE;
+	msg[1] = PEER_MSG_STKT_DEFINE;
+	cursor = &msg[2];
+	intencode(datalen, &cursor);
+
+	/* move data after header */
+	memmove(cursor, datamsg, datalen);
+
+	/* return header size + data_len */
+	return (cursor - msg) + datalen;
+}
 
 /*
  * Callback to release a session with a peer
@@ -183,30 +403,31 @@
 {
 	struct stream_interface *si = appctx->owner;
 	struct stream *s = si_strm(si);
-	struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
+	struct peer *peer = (struct peer *)appctx->ctx.peers.ptr;
+	struct peers *peers = (struct peers *)strm_fe(s)->parent;
 
 	/* appctx->ctx.peers.ptr is not a peer session */
 	if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS)
 		return;
 
 	/* peer session identified */
-	if (ps) {
-		if (ps->stream == s) {
-			ps->stream = NULL;
-			ps->appctx = NULL;
-			if (ps->flags & PEER_F_LEARN_ASSIGN) {
+	if (peer) {
+		if (peer->stream == s) {
+			peer->stream = NULL;
+			peer->appctx = NULL;
+			if (peer->flags & PEER_F_LEARN_ASSIGN) {
 				/* unassign current peer for learning */
-				ps->flags &= ~(PEER_F_LEARN_ASSIGN);
-				ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
+				peer->flags &= ~(PEER_F_LEARN_ASSIGN);
+				peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
 
 				/* reschedule a resync */
-				ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
+				peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
 			}
 			/* reset teaching and learning flags to 0 */
-			ps->flags &= PEER_TEACH_RESET;
-			ps->flags &= PEER_LEARN_RESET;
+			peer->flags &= PEER_TEACH_RESET;
+			peer->flags &= PEER_LEARN_RESET;
 		}
-		task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
+		task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
 	}
 }
 
@@ -249,7 +470,7 @@
 				bo_skip(si_oc(si), reql);
 
 				/* test version */
-				if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash.str) != 0) {
+				if (strcmp(PEER_SESSION_PROTO_NAME " 2.0", trash.str) != 0) {
 					appctx->st0 = PEER_SESS_ST_EXIT;
 					appctx->st1 = PEER_SESS_SC_ERRVERSION;
 					/* test protocol */
@@ -310,7 +531,7 @@
 
 				bo_skip(si_oc(si), reql);
 
-				/* parse line "<peer name> <pid>" */
+				/* parse line "<peer name> <pid> <relative_pid>" */
 				p = strchr(trash.str, ' ');
 				if (!p) {
 					appctx->st0 = PEER_SESS_ST_EXIT;
@@ -331,125 +552,25 @@
 					appctx->st1 = PEER_SESS_SC_ERRPEER;
 					goto switchstate;
 				}
-
-				appctx->ctx.peers.ptr = curpeer;
-				appctx->st0 = PEER_SESS_ST_GETTABLE;
-				/* fall through */
-			}
-			case PEER_SESS_ST_GETTABLE: {
-				struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr;
-				struct shared_table *st;
-				struct peer_session *ps = NULL;
-				unsigned long key_type;
-				size_t key_size;
-				char *p;
-
-				reql = bo_getline(si_oc(si), trash.str, trash.size);
-				if (reql <= 0) { /* closed or EOL not found */
-					if (reql == 0)
-						goto out;
-					appctx->ctx.peers.ptr = NULL;
-					appctx->st0 = PEER_SESS_ST_END;
-					goto switchstate;
-				}
-				/* Re init appctx->ctx.peers.ptr to null, to handle correctly a release case */
-				appctx->ctx.peers.ptr = NULL;
 
-				if (trash.str[reql-1] != '\n') {
-					/* Incomplete line, we quit */
-					appctx->st0 = PEER_SESS_ST_END;
-					goto switchstate;
-				}
-				else if (reql > 1 && (trash.str[reql-2] == '\r'))
-					trash.str[reql-2] = 0;
-				else
-					trash.str[reql-1] = 0;
-
-				bo_skip(si_oc(si), reql);
-
-				/* Parse line "<table name> <type> <size>" */
-				p = strchr(trash.str, ' ');
-				if (!p) {
-					appctx->st0 = PEER_SESS_ST_EXIT;
-					appctx->st1 = PEER_SESS_SC_ERRPROTO;
-					goto switchstate;
-				}
-				*p = 0;
-				key_type = (unsigned long)atol(p+1);
-
-				p = strchr(p+1, ' ');
-				if (!p) {
-					appctx->ctx.peers.ptr = NULL;
-					appctx->st0 = PEER_SESS_ST_EXIT;
-					appctx->st1 = PEER_SESS_SC_ERRPROTO;
-					goto switchstate;
-				}
-
-				key_size = (size_t)atoi(p);
-				for (st = curpeers->tables; st; st = st->next) {
-					/* If table name matches */
-					if (strcmp(st->table->id, trash.str) == 0) {
-						/* Check key size mismatches, except for strings
-						 * which may be truncated as long as they fit in
-						 * a buffer.
-						 */
-						if (key_size != st->table->key_size &&
-						    (key_type != STKTABLE_TYPE_STRING ||
-						     1 + 4 + 4 + key_size - 1 >= trash.size)) {
-							appctx->st0 = PEER_SESS_ST_EXIT;
-							appctx->st1 = PEER_SESS_SC_ERRSIZE;
-							goto switchstate;
-						}
-
-						/* If key type mismatches */
-						if (key_type != st->table->type) {
-							appctx->st0 = PEER_SESS_ST_EXIT;
-							appctx->st1 = PEER_SESS_SC_ERRTYPE;
-							goto switchstate;
-						}
-
-						/* lookup peer stream of current peer */
-						for (ps = st->sessions; ps; ps = ps->next) {
-							if (ps->peer == curpeer) {
-								/* If stream already active, replaced by new one */
-								if (ps->stream && ps->stream != s) {
-									if (ps->peer->local) {
-										/* Local connection, reply a retry */
-										appctx->st0 = PEER_SESS_ST_EXIT;
-										appctx->st1 = PEER_SESS_SC_TRYAGAIN;
-										goto switchstate;
-									}
-									peer_session_forceshutdown(ps->stream);
-								}
-								ps->stream = s;
-								ps->appctx = appctx;
-								break;
-							}
-						}
-						break;
+				if (curpeer->stream && curpeer->stream != s) {
+					if (curpeer->local) {
+						/* Local connection, reply a retry */
+						appctx->st0 = PEER_SESS_ST_EXIT;
+						appctx->st1 = PEER_SESS_SC_TRYAGAIN;
+						goto switchstate;
 					}
-				}
-
-				/* If table not found */
-				if (!st){
-					appctx->st0 = PEER_SESS_ST_EXIT;
-					appctx->st1 = PEER_SESS_SC_ERRTABLE;
-					goto switchstate;
+					peer_session_forceshutdown(curpeer->stream);
 				}
-
-				/* If no peer session for current peer */
-				if (!ps) {
-					appctx->st0 = PEER_SESS_ST_EXIT;
-					appctx->st1 = PEER_SESS_SC_ERRPEER;
-					goto switchstate;
-				}
-
-				appctx->ctx.peers.ptr = ps;
+				curpeer->stream = s;
+				curpeer->appctx = appctx;
+				appctx->ctx.peers.ptr = curpeer;
 				appctx->st0 = PEER_SESS_ST_SENDSUCCESS;
 				/* fall through */
 			}
 			case PEER_SESS_ST_SENDSUCCESS: {
-				struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
+				struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr;
+				struct shared_table *st;
 
 				repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE);
 				repl = bi_putblk(si_ic(si), trash.str, repl);
@@ -461,55 +582,57 @@
 				}
 
 				/* Register status code */
-				ps->statuscode = PEER_SESS_SC_SUCCESSCODE;
+				curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE;
 
 				/* Awake main task */
-				task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
-
-				/* Init cursors */
-				ps->teaching_origin =ps->lastpush = ps->lastack = ps->pushack = 0;
-				ps->pushed = ps->update;
+				task_wakeup(curpeers->sync_task, TASK_WOKEN_MSG);
 
 				/* Init confirm counter */
-				ps->confirm = 0;
+				curpeer->confirm = 0;
+
+				/* Init cursors */
+				for (st = curpeer->tables; st ; st = st->next) {
+					st->last_get = st->last_acked = 0;
+					st->teaching_origin = st->last_pushed = st->update;
+				}
 
 				/* reset teaching and learning flags to 0 */
-				ps->flags &= PEER_TEACH_RESET;
-				ps->flags &= PEER_LEARN_RESET;
+				curpeer->flags &= PEER_TEACH_RESET;
+				curpeer->flags &= PEER_LEARN_RESET;
 
 				/* if current peer is local */
-				if (ps->peer->local) {
-					/* if table need resyncfrom local and no process assined  */
-					if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL &&
-					    !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
-						/* assign local peer for a lesson, consider lesson already requested */
-						ps->flags |= PEER_F_LEARN_ASSIGN;
-						ps->table->flags |= (SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
-					}
+                                if (curpeer->local) {
+                                        /* if current host need resyncfrom local and no process assined  */
+                                        if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL &&
+                                            !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
+                                                /* assign local peer for a lesson, consider lesson already requested */
+                                                curpeer->flags |= PEER_F_LEARN_ASSIGN;
+                                                peers->flags |= (PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
+                                        }
 
-				}
-				else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
-					 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
-					/* assign peer for a lesson  */
-					ps->flags |= PEER_F_LEARN_ASSIGN;
-					ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
-				}
+                                }
+                                else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
+                                         !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
+                                        /* assign peer for a lesson  */
+                                        curpeer->flags |= PEER_F_LEARN_ASSIGN;
+                                        peers->flags |= PEERS_F_RESYNC_ASSIGN;
+                                }
+
+
 				/* switch to waiting message state */
 				appctx->st0 = PEER_SESS_ST_WAITMSG;
 				goto switchstate;
 			}
 			case PEER_SESS_ST_CONNECT: {
-				struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
+				struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr;
 
 				/* Send headers */
 				repl = snprintf(trash.str, trash.size,
-				                PEER_SESSION_PROTO_NAME " 1.0\n%s\n%s %d\n%s %lu %d\n",
-				                ps->peer->id,
+				                PEER_SESSION_PROTO_NAME " 2.0\n%s\n%s %d %d\n",
+				                curpeer->id,
 				                localpeer,
 				                (int)getpid(),
-				                ps->table->table->id,
-				                ps->table->table->type,
-				                (int)ps->table->table->key_size);
+						relative_pid);
 
 				if (repl >= trash.size) {
 					appctx->st0 = PEER_SESS_ST_END;
@@ -529,10 +652,11 @@
 				/* fall through */
 			}
 			case PEER_SESS_ST_GETSTATUS: {
-				struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
+				struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr;
+				struct shared_table *st;
 
 				if (si_ic(si)->flags & CF_WRITE_PARTIAL)
-					ps->statuscode = PEER_SESS_SC_CONNECTEDCODE;
+					curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
 
 				reql = bo_getline(si_oc(si), trash.str, trash.size);
 				if (reql <= 0) { /* closed or EOL not found */
@@ -554,40 +678,40 @@
 				bo_skip(si_oc(si), reql);
 
 				/* Register status code */
-				ps->statuscode = atoi(trash.str);
+				curpeer->statuscode = atoi(trash.str);
 
 				/* Awake main task */
-				task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
+				task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
 
 				/* If status code is success */
-				if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
+				if (curpeer->statuscode == PEER_SESS_SC_SUCCESSCODE) {
 					/* Init cursors */
-					ps->teaching_origin = ps->lastpush = ps->lastack = ps->pushack = 0;
-					ps->pushed = ps->update;
+					for (st = curpeer->tables; st ; st = st->next) {
+						st->last_get = st->last_acked = 0;
+						st->teaching_origin = st->last_pushed = st->update;
+					}
 
 					/* Init confirm counter */
-					ps->confirm = 0;
+                                        curpeer->confirm = 0;
 
-					/* reset teaching and learning flags to 0 */
-					ps->flags &= PEER_TEACH_RESET;
-					ps->flags &= PEER_LEARN_RESET;
+                                        /* reset teaching and learning flags to 0 */
+                                        curpeer->flags &= PEER_TEACH_RESET;
+                                        curpeer->flags &= PEER_LEARN_RESET;
 
-					/* If current peer is local */
-					if (ps->peer->local) {
-						/* Init cursors to push a resync */
-						ps->teaching_origin = ps->pushed = ps->table->table->update;
-						/* flag to start to teach lesson */
-						ps->flags |= PEER_F_TEACH_PROCESS;
+                                        /* If current peer is local */
+                                        if (curpeer->local) {
+                                                /* flag to start to teach lesson */
+                                                curpeer->flags |= PEER_F_TEACH_PROCESS;
 
-					}
-					else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
-					            !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
-						/* If peer is remote and resync from remote is needed,
-						   and no peer currently assigned */
+                                        }
+                                        else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
+                                                    !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
+                                                /* If peer is remote and resync from remote is needed,
+                                                   and no peer currently assigned */
 
-						/* assign peer for a lesson */
-						ps->flags |= PEER_F_LEARN_ASSIGN;
-						ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
+                                                /* assign peer for a lesson */
+                                                curpeer->flags |= PEER_F_LEARN_ASSIGN;
+						peers->flags |= PEERS_F_RESYNC_ASSIGN;
 					}
 
 				}
@@ -600,256 +724,403 @@
 				/* fall through */
 			}
 			case PEER_SESS_ST_WAITMSG: {
-				struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
+				struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr;
 				struct stksess *ts, *newts = NULL;
-				char c;
+				uint32_t msg_len = 0;
+				char *msg_cur = trash.str;
+				char *msg_end = trash.str;
+				unsigned char msg_head[7];
 				int totl = 0;
 
-				reql = bo_getblk(si_oc(si), (char *)&c, sizeof(c), totl);
+				reql = bo_getblk(si_oc(si), (char *)msg_head, 2*sizeof(unsigned char), totl);
 				if (reql <= 0) /* closed or EOL not found */
 					goto incomplete;
 
 				totl += reql;
 
-				if ((c & 0x80) || (c == 'D')) {
-					/* Here we have data message */
-					unsigned int pushack;
-					int srvid;
-					uint32_t netinteger;
+				if (msg_head[1] >= 128) {
+					/* Read and Decode message length */
+					reql = bo_getblk(si_oc(si), (char *)&msg_head[2], sizeof(unsigned char), totl);
+					if (reql <= 0) /* closed */
+						goto incomplete;
 
-					/* Compute update remote version */
-					if (c & 0x80) {
-						pushack = ps->pushack + (unsigned int)(c & 0x7F);
+					totl += reql;
+
+					if (msg_head[2] < 240) {
+						msg_len = msg_head[2];
 					}
 					else {
-						reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
-						if (reql <= 0) /* closed or EOL not found */
-							goto incomplete;
+						int i;
+						char *cur;
+						char *end;
 
-						totl += reql;
-						pushack = ntohl(netinteger);
+						for (i = 3 ; i < sizeof(msg_head) ; i++) {
+							reql = bo_getblk(si_oc(si), (char *)&msg_head[i], sizeof(char), totl);
+							if (reql <= 0) /* closed */
+								goto incomplete;
+
+							totl += reql;
+
+							if (!(msg_head[i] & 0x80))
+								break;
+						}
+
+						if (i == sizeof(msg_head)) {
+							/* malformed message */
+							appctx->st0 = PEER_SESS_ST_ERRPROTO;
+							goto switchstate;
+
+						}
+						end = (char *)msg_head + sizeof(msg_head);
+						cur = (char *)&msg_head[2];
+						msg_len = intdecode(&cur, end);
+						if (!cur) {
+							/* malformed message */
+							appctx->st0 = PEER_SESS_ST_ERRPROTO;
+							goto switchstate;
+						}
 					}
 
-					/* Read key. The string keys are read in two steps, the first step
-					 * consists in reading whatever fits into the table directly into
-					 * the pre-allocated key. The second step consists in simply
-					 * draining all exceeding data. This can happen for example after a
-					 * config reload with a smaller key size for the stick table than
-					 * what was previously set, or when facing the impossibility to
-					 * allocate a new stksess (for example when the table is full with
-					 * "nopurge").
-					 */
-					if (ps->table->table->type == STKTABLE_TYPE_STRING) {
-						unsigned int to_read, to_store;
 
-						/* read size first */
-						reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
-						if (reql <= 0) /* closed or EOL not found */
-							goto incomplete;
+					/* Read message content */
+					if (msg_len) {
+						if (msg_len > trash.size) {
+							/* Status code is not success, abort */
+							appctx->st0 = PEER_SESS_ST_ERRSIZE;
+							goto switchstate;
+						}
 
+						reql = bo_getblk(si_oc(si), trash.str, msg_len, totl);
+						if (reql <= 0) /* closed */
+							goto incomplete;
 						totl += reql;
 
-						to_store = 0;
-						to_read = ntohl(netinteger);
+						msg_end += msg_len;
+					}
+				}
 
-						if (to_read + totl > si_ob(si)->size) {
-							/* impossible to read a key this large, abort */
-							reql = -1;
-							goto incomplete;
+				if (msg_head[0] == PEER_MSG_CLASS_CONTROL) {
+					if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) {
+						struct shared_table *st;
+						/* Reset message: remote need resync */
+
+						/* prepare tables fot a global push */
+						for (st = curpeer->tables; st; st = st->next) {
+							st->teaching_origin = st->last_pushed = st->table->update;
+							st->flags = 0;
 						}
 
-						newts = stksess_new(ps->table->table, NULL);
-						if (newts)
-							to_store = MIN(to_read, ps->table->table->key_size - 1);
+						/* reset teaching flags to 0 */
+						curpeer->flags &= PEER_TEACH_RESET;
 
-						/* we read up to two blocks, the first one goes into the key,
-						 * the rest is drained into the trash.
-						 */
-						if (to_store) {
-							reql = bo_getblk(si_oc(si), (char *)newts->key.key, to_store, totl);
-							if (reql <= 0) /* closed or incomplete */
-								goto incomplete;
-							newts->key.key[reql] = 0;
-							totl += reql;
-							to_read -= reql;
-						}
-						if (to_read) {
-							reql = bo_getblk(si_oc(si), trash.str, to_read, totl);
-							if (reql <= 0) /* closed or incomplete */
-								goto incomplete;
-							totl += reql;
+						/* flag to start to teach lesson */
+						curpeer->flags |= PEER_F_TEACH_PROCESS;
+
+
+					}
+					else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
+
+						if (curpeer->flags & PEER_F_LEARN_ASSIGN) {
+							curpeer->flags &= ~PEER_F_LEARN_ASSIGN;
+							peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
+							peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE);
 						}
+						curpeer->confirm++;
 					}
-					else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
-						reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
-						if (reql <= 0) /* closed or EOL not found */
-							goto incomplete;
-						newts = stksess_new(ps->table->table, NULL);
-						if (newts) {
-							netinteger = ntohl(netinteger);
-							memcpy(newts->key.key, &netinteger, sizeof(netinteger));
+					else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
+
+						if (curpeer->flags & PEER_F_LEARN_ASSIGN) {
+							curpeer->flags &= ~PEER_F_LEARN_ASSIGN;
+							peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
+
+							curpeer->flags |= PEER_F_LEARN_NOTUP2DATE;
+							peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
+							task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
 						}
-						totl += reql;
+						curpeer->confirm++;
 					}
-					else {
-						/* type ip or binary */
-						newts = stksess_new(ps->table->table, NULL);
-						reql = bo_getblk(si_oc(si), newts ? (char *)newts->key.key : trash.str, ps->table->table->key_size, totl);
-						if (reql <= 0) /* closed or EOL not found */
-							goto incomplete;
-						totl += reql;
+					else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM)  {
+
+						/* If stopping state */
+						if (stopping) {
+							/* Close session, push resync no more needed */
+							curpeer->flags |= PEER_F_TEACH_COMPLETE;
+							appctx->st0 = PEER_SESS_ST_END;
+							goto switchstate;
+						}
+
+						 /* reset teaching flags to 0 */
+						curpeer->flags &= PEER_TEACH_RESET;
 					}
+				}
+				else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) {
+					if (msg_head[1] == PEER_MSG_STKT_DEFINE) {
+						int table_id_len;
+						struct shared_table *st;
+						int table_type;
+						int table_keylen;
+						int table_id;
+						uint64_t table_data;
 
-					/* read server id */
-					reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
-					if (reql <= 0) /* closed or EOL not found */
-						goto incomplete;
+						table_id = intdecode(&msg_cur, msg_end);
+						if (!msg_cur) {
+							/* malformed message */
+							appctx->st0 = PEER_SESS_ST_ERRPROTO;
+							goto switchstate;
+						}
 
-					totl += reql;
-					srvid = ntohl(netinteger);
+						table_id_len = intdecode(&msg_cur, msg_end);
+						if (!msg_cur) {
+							/* malformed message */
+							appctx->st0 = PEER_SESS_ST_ERRPROTO;
+							goto switchstate;
+						}
 
-					/* update entry */
-					if (newts) {
-						/* lookup for existing entry */
-						ts = stktable_lookup(ps->table->table, newts);
-						if (ts) {
-							 /* the entry already exist, we can free ours */
-							stktable_touch(ps->table->table, ts, 0);
-							stksess_free(ps->table->table, newts);
-							newts = NULL;
+						curpeer->remote_table = NULL;
+						if (!table_id_len || (msg_cur + table_id_len) >= msg_end) {
+							/* malformed message */
+							appctx->st0 = PEER_SESS_ST_ERRPROTO;
+							goto switchstate;
 						}
-						else {
-							struct eb32_node *eb;
 
-							/* create new entry */
-							ts = stktable_store(ps->table->table, newts, 0);
-							newts = NULL; /* don't reuse it */
+						for (st = curpeer->tables; st; st = st->next) {
+							/* Reset IDs */
+							if (st->remote_id == table_id)
+								st->remote_id = 0;
 
-							ts->upd.key= (++ps->table->table->update)+(2^31);
-							eb = eb32_insert(&ps->table->table->updates, &ts->upd);
-							if (eb != &ts->upd) {
-								eb32_delete(eb);
-								eb32_insert(&ps->table->table->updates, &ts->upd);
+							if (!curpeer->remote_table
+							    && (table_id_len == strlen(st->table->id))
+							    && (memcmp(st->table->id, msg_cur, table_id_len) == 0)) {
+								curpeer->remote_table = st;
 							}
 						}
 
-						/* update entry */
-						if (srvid && stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
-							stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid;
-						ps->pushack = pushack;
-					}
-
-				}
-				else if (c == 'R') {
-					/* Reset message: remote need resync */
+						if (!curpeer->remote_table) {
+							goto ignore_msg;
+						}
 
-					/* reinit counters for a resync */
-					ps->lastpush = 0;
-					ps->teaching_origin = ps->pushed = ps->table->table->update;
+						msg_cur += table_id_len;
+						if (msg_cur >= msg_end) {
+							/* malformed message */
+							appctx->st0 = PEER_SESS_ST_ERRPROTO;
+							goto switchstate;
+						}
 
-					/* reset teaching flags to 0 */
-					ps->flags &= PEER_TEACH_RESET;
+						table_type = intdecode(&msg_cur, msg_end);
+						if (!msg_cur) {
+							/* malformed message */
+							appctx->st0 = PEER_SESS_ST_ERRPROTO;
+							goto switchstate;
+						}
 
-					/* flag to start to teach lesson */
-					ps->flags |= PEER_F_TEACH_PROCESS;
-				}
-				else if (c == 'F') {
-					/* Finish message, all known updates have been pushed by remote */
-					/* and remote is up to date */
+						table_keylen = intdecode(&msg_cur, msg_end);
+						if (!msg_cur) {
+							/* malformed message */
+							appctx->st0 = PEER_SESS_ST_ERRPROTO;
+							goto switchstate;
+						}
 
-					/* If resync is in progress with remote peer */
-					if (ps->flags & PEER_F_LEARN_ASSIGN) {
+						table_data = intdecode(&msg_cur, msg_end);
+						if (!msg_cur) {
+							/* malformed message */
+							appctx->st0 = PEER_SESS_ST_ERRPROTO;
+							goto switchstate;
+						}
 
-						/* unassign current peer for learning  */
-						ps->flags &= ~PEER_F_LEARN_ASSIGN;
-						ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
+						if (curpeer->remote_table->table->type != table_type
+						    || curpeer->remote_table->table->key_size != table_keylen) {
+							curpeer->remote_table = NULL;
+							goto ignore_msg;
+						}
 
-						/* Consider table is now up2date, resync resync no more needed from local neither remote */
-						ps->table->flags |= (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE);
+						curpeer->remote_table->remote_data = table_data;
+						curpeer->remote_table->remote_id = table_id;
 					}
-					/* Increase confirm counter to launch a confirm message */
-					ps->confirm++;
-				}
-				else if (c == 'c') {
-					/* confirm message, remote peer is now up to date with us */
+					else if (msg_head[1] == PEER_MSG_STKT_SWITCH) {
+						struct shared_table *st;
+						int table_id;
 
-					/* If stopping state */
-					if (stopping) {
-						/* Close session, push resync no more needed */
-						ps->flags |= PEER_F_TEACH_COMPLETE;
-						appctx->st0 = PEER_SESS_ST_END;
-						goto switchstate;
+						table_id = intdecode(&msg_cur, msg_end);
+						if (!msg_cur) {
+							/* malformed message */
+							appctx->st0 = PEER_SESS_ST_ERRPROTO;
+							goto switchstate;
+						}
+						curpeer->remote_table = NULL;
+						for (st = curpeer->tables; st; st = st->next) {
+							if (st->remote_id == table_id) {
+								curpeer->remote_table = st;
+								break;
+							}
+						}
+
 					}
+					else if (msg_head[1] == PEER_MSG_STKT_UPDATE
+						 || msg_head[1] == PEER_MSG_STKT_INCUPDATE) {
+						struct shared_table *st = curpeer->remote_table;
+						uint32_t update;
 
-					/* reset teaching flags to 0 */
-					ps->flags &= PEER_TEACH_RESET;
-				}
-				else if (c == 'C') {
-					/* Continue message, all known updates have been pushed by remote */
-					/* but remote is not up to date */
+						/* Here we have data message */
+						if (!st)
+							goto ignore_msg;
 
-					/* If resync is in progress with current peer */
-					if (ps->flags & PEER_F_LEARN_ASSIGN) {
+						if (msg_head[1] == PEER_MSG_STKT_UPDATE) {
+							if (msg_len < sizeof(update)) {
+								/* malformed message */
+								appctx->st0 = PEER_SESS_ST_ERRPROTO;
+								goto switchstate;
+							}
+							memcpy(&update, msg_cur, sizeof(update));
+							msg_cur += sizeof(update);
+							st->last_get = htonl(update);
+						}
+						else {
+							st->last_get++;
+						}
 
-						/* unassign current peer   */
-						ps->flags &= ~PEER_F_LEARN_ASSIGN;
-						ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
+						newts = stksess_new(st->table, NULL);
+						if (!newts)
+							goto ignore_msg;
 
-						/* flag current peer is not up 2 date to try from an other */
-						ps->flags |= PEER_F_LEARN_NOTUP2DATE;
+						if (st->table->type == STKTABLE_TYPE_STRING) {
+							unsigned int to_read, to_store;
 
-						/* reschedule a resync */
-						ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
-						task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
-					}
-					ps->confirm++;
-				}
-				else if (c == 'A') {
-					/* ack message */
-					uint32_t netinteger;
+							to_read = intdecode(&msg_cur, msg_end);
+							if (!msg_cur) {
+								/* malformed message */
+								stksess_free(st->table, newts);
+								appctx->st0 = PEER_SESS_ST_ERRPROTO;
+								goto switchstate;
+							}
+							to_store = MIN(to_read, st->table->key_size - 1);
+							if (msg_cur + to_store > msg_end) {
+								/* malformed message */
+								stksess_free(st->table, newts);
+								appctx->st0 = PEER_SESS_ST_ERRPROTO;
+								goto switchstate;
+							}
 
-					reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
-					if (reql <= 0) /* closed or EOL not found */
-						goto incomplete;
+							memcpy(newts->key.key, msg_cur, to_store);
+							newts->key.key[to_store] = 0;
+							msg_cur += to_read;
+						}
+						else if (st->table->type == STKTABLE_TYPE_INTEGER) {
+							unsigned int netinteger;
 
-					totl += reql;
+							if (msg_cur + sizeof(netinteger) > msg_end) {
+								/* malformed message */
+								stksess_free(st->table, newts);
+								appctx->st0 = PEER_SESS_ST_ERRPROTO;
+								goto switchstate;
+							}
+							memcpy(&netinteger, msg_cur, sizeof(netinteger));
+							netinteger = ntohl(netinteger);
+							memcpy(newts->key.key, &netinteger, sizeof(netinteger));
+							msg_cur += sizeof(netinteger);
+						}
+						else {
+							if (msg_cur + st->table->key_size > msg_end) {
+								/* malformed message */
+								stksess_free(st->table, newts);
+								appctx->st0 = PEER_SESS_ST_ERRPROTO;
+								goto switchstate;
+							}
+							memcpy(newts->key.key, msg_cur, st->table->key_size);
+							msg_cur += st->table->key_size;
+						}
+
+						/* lookup for existing entry */
+						ts = stktable_lookup(st->table, newts);
+						if (ts) {
+							/* the entry already exist, we can free ours */
+							stktable_touch(st->table, ts, 0);
+							stksess_free(st->table, newts);
+							newts = NULL;
+						}
+						else {
+							struct eb32_node *eb;
+
+							/* create new entry */
+							ts = stktable_store(st->table, newts, 0);
+							newts = NULL; /* don't reuse it */
+
+							ts->upd.key= (++st->table->update)+(2^31);
+							eb = eb32_insert(&st->table->updates, &ts->upd);
+							if (eb != &ts->upd) {
+								eb32_delete(eb);
+								eb32_insert(&st->table->updates, &ts->upd);
+							}
+						}
+
+						if ((1 << STKTABLE_DT_SERVER_ID) & st->remote_data) {
+							int srvid;
+
+							srvid = intdecode(&msg_cur, msg_end);
+							if (!msg_cur) {
+								/* malformed message */
+								appctx->st0 = PEER_SESS_ST_ERRPROTO;
+								goto switchstate;
+							}
+
+							if (stktable_data_ptr(st->table, ts, STKTABLE_DT_SERVER_ID)) {
+								stktable_data_cast(stktable_data_ptr(st->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid;
+							}
+						}
+					}
+					else if (msg_head[1] == PEER_MSG_STKT_ACK) {
+						/* ack message */
+						uint32_t table_id ;
+						uint32_t update;
+						struct shared_table *st;
+
+						table_id = intdecode(&msg_cur, msg_end);
+						if (!msg_cur || (msg_cur + sizeof(update) > msg_end)) {
+							/* malformed message */
+							appctx->st0 = PEER_SESS_ST_ERRPROTO;
+							goto switchstate;
+						}
+						memcpy(&update, msg_cur, sizeof(update));
+						update = ntohl(update);
 
-					/* Consider remote is up to date with "acked" version */
-					ps->update = ntohl(netinteger);
+						for (st = curpeer->tables; st; st = st->next) {
+							if (st->local_id == table_id) {
+								st->update = update;
+								break;
+							}
+						}
+					}
 				}
-				else {
-					/* Unknown message */
-					appctx->st0 = PEER_SESS_ST_END;
+				else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) {
+					appctx->st0 = PEER_SESS_ST_ERRPROTO;
 					goto switchstate;
 				}
 
+ignore_msg:
 				/* skip consumed message */
 				bo_skip(si_oc(si), totl);
-
 				/* loop on that state to peek next message */
 				goto switchstate;
 
 incomplete:
 				/* we get here when a bo_getblk() returns <= 0 in reql */
 
-				/* first, we may have to release newts */
-				if (newts) {
-					stksess_free(ps->table->table, newts);
-					newts = NULL;
-				}
-
 				if (reql < 0) {
 					/* there was an error */
 					appctx->st0 = PEER_SESS_ST_END;
 					goto switchstate;
 				}
 
-				/* Nothing to read, now we start to write */
 
 				/* Confirm finished or partial messages */
-				while (ps->confirm) {
+				while (curpeer->confirm) {
+					unsigned char msg[2];
+
 					/* There is a confirm messages to send */
-					repl = bi_putchr(si_ic(si), 'c');
+					msg[0] = PEER_MSG_CLASS_CONTROL;
+					msg[1] = PEER_MSG_CTRL_RESYNCCONFIRM;
+
+					/* message to buffer */
+					repl = bi_putblk(si_ic(si), (char *)msg, sizeof(msg));
 					if (repl <= 0) {
 						/* no more write possible */
 						if (repl == -1)
@@ -857,175 +1128,313 @@
 						appctx->st0 = PEER_SESS_ST_END;
 						goto switchstate;
 					}
-					ps->confirm--;
+					curpeer->confirm--;
 				}
 
+
 				/* Need to request a resync */
-				if ((ps->flags & PEER_F_LEARN_ASSIGN) &&
-					(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN) &&
-					!(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) {
-					/* Current peer was elected to request a resync */
+                                if ((curpeer->flags & PEER_F_LEARN_ASSIGN) &&
+                                        (peers->flags & PEERS_F_RESYNC_ASSIGN) &&
+                                        !(peers->flags & PEERS_F_RESYNC_PROCESS)) {
+					unsigned char msg[2];
 
-					repl = bi_putchr(si_ic(si), 'R');
-					if (repl <= 0) {
-						/* no more write possible */
-						if (repl == -1)
-							goto full;
-						appctx->st0 = PEER_SESS_ST_END;
-						goto switchstate;
-					}
-					ps->table->flags |= SHTABLE_F_RESYNC_PROCESS;
-				}
+                                        /* Current peer was elected to request a resync */
+					msg[0] = PEER_MSG_CLASS_CONTROL;
+					msg[1] = PEER_MSG_CTRL_RESYNCREQ;
 
-				/* It remains some updates to ack */
-				if (ps->pushack != ps->lastack) {
-					uint32_t netinteger;
+					/* message to buffer */
+					repl = bi_putblk(si_ic(si), (char *)msg, sizeof(msg));
+                                        if (repl <= 0) {
+                                                /* no more write possible */
+                                                if (repl == -1)
+                                                        goto full;
+                                                appctx->st0 = PEER_SESS_ST_END;
+                                                goto switchstate;
+                                        }
+                                        peers->flags |= PEERS_F_RESYNC_PROCESS;
+                                }
 
-					trash.str[0] = 'A';
-					netinteger = htonl(ps->pushack);
-					memcpy(&trash.str[1], &netinteger, sizeof(netinteger));
+				/* Nothing to read, now we start to write */
 
-					repl = bi_putblk(si_ic(si), trash.str, 1+sizeof(netinteger));
-					if (repl <= 0) {
-						/* no more write possible */
-						if (repl == -1)
-							goto full;
-						appctx->st0 = PEER_SESS_ST_END;
-						goto switchstate;
-					}
-					ps->lastack = ps->pushack;
-				}
+				if (curpeer->tables) {
+					struct shared_table *st;
+					struct shared_table *last_local_table;
 
-				if (ps->flags & PEER_F_TEACH_PROCESS) {
-					/* current peer was requested for a lesson */
+					last_local_table = curpeer->last_local_table;
+					if (!last_local_table)
+						last_local_table = curpeer->tables;
+					st = last_local_table->next;
 
-					if (!(ps->flags & PEER_F_TEACH_STAGE1)) {
-						/* lesson stage 1 not complete */
-						struct eb32_node *eb;
+					while (1) {
+						if (!st)
+							 st = curpeer->tables;
 
-						eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
-						while (1) {
+						/* It remains some updates to ack */
+						if (st->last_get != st->last_acked) {
 							int msglen;
-							struct stksess *ts;
 
-							if (!eb) {
-								/* flag lesson stage1 complete */
-								ps->flags |= PEER_F_TEACH_STAGE1;
-								eb = eb32_first(&ps->table->table->updates);
-								if (eb)
-									ps->pushed = eb->key - 1;
-								break;
+							msglen = peer_prepare_ackmsg(st, trash.str, trash.size);
+							if (!msglen) {
+								/* internal error: message does not fit in trash */
+								appctx->st0 = PEER_SESS_ST_END;
+								goto switchstate;
 							}
 
-							ts = eb32_entry(eb, struct stksess, upd);
-							msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
-							if (msglen) {
-								/* message to buffer */
-								repl = bi_putblk(si_ic(si), trash.str, msglen);
-								if (repl <= 0) {
-									/* no more write possible */
-									if (repl == -1)
-										goto full;
-									appctx->st0 = PEER_SESS_ST_END;
-									goto switchstate;
+							/* message to buffer */
+							repl = bi_putblk(si_ic(si), trash.str, msglen);
+							if (repl <= 0) {
+								/* no more write possible */
+								if (repl == -1) {
+									goto full;
 								}
-								ps->lastpush = ps->pushed = ts->upd.key;
+								appctx->st0 = PEER_SESS_ST_END;
+								goto switchstate;
 							}
-							eb = eb32_next(eb);
+							st->last_acked = st->last_get;
 						}
-					} /* !TEACH_STAGE1 */
 
-					if (!(ps->flags & PEER_F_TEACH_STAGE2)) {
-						/* lesson stage 2 not complete */
-						struct eb32_node *eb;
+						if (!(curpeer->flags & PEER_F_TEACH_PROCESS)) {
+							if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) &&
+							    ((int)(st->last_pushed - st->table->localupdate) < 0)) {
+								struct eb32_node *eb;
+								int new_pushed;
 
-						eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
-						while (1) {
-							int msglen;
-							struct stksess *ts;
+								if (st != curpeer->last_local_table) {
+									int msglen;
 
-							if (!eb || eb->key > ps->teaching_origin) {
-								/* flag lesson stage1 complete */
-								ps->flags |= PEER_F_TEACH_STAGE2;
-								ps->pushed = ps->teaching_origin;
-								break;
-							}
+									msglen = peer_prepare_switchmsg(st, trash.str, trash.size);
+									if (!msglen) {
+										/* internal error: message does not fit in trash */
+										appctx->st0 = PEER_SESS_ST_END;
+										goto switchstate;
+									}
 
-							ts = eb32_entry(eb, struct stksess, upd);
-							msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
-							if (msglen) {
-								/* message to buffer */
-								repl = bi_putblk(si_ic(si), trash.str, msglen);
-								if (repl <= 0) {
-									/* no more write possible */
-									if (repl == -1)
-										goto full;
-									appctx->st0 = PEER_SESS_ST_END;
-									goto switchstate;
+									/* message to buffer */
+									repl = bi_putblk(si_ic(si), trash.str, msglen);
+									if (repl <= 0) {
+										/* no more write possible */
+										if (repl == -1) {
+											goto full;
+										}
+										appctx->st0 = PEER_SESS_ST_END;
+										goto switchstate;
+									}
+									curpeer->last_local_table = st;
 								}
-								ps->lastpush = ps->pushed = ts->upd.key;
+
+								/* We force new pushed to 1 to force identifier in update message */
+								new_pushed = 1;
+								eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
+								while (1) {
+									uint32_t msglen;
+									struct stksess *ts;
+
+									/* push local updates */
+									if (!eb) {
+										eb = eb32_first(&st->table->updates);
+										if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) {
+											st->last_pushed = st->table->localupdate;
+											break;
+										}
+									}
+
+									if ((int)(eb->key - st->table->localupdate) > 0) {
+										st->last_pushed = st->table->localupdate;
+										break;
+									}
+
+									ts = eb32_entry(eb, struct stksess, upd);
+									msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed);
+									if (!msglen) {
+										/* internal error: message does not fit in trash */
+										appctx->st0 = PEER_SESS_ST_END;
+										goto switchstate;
+									}
+
+									/* message to buffer */
+									repl = bi_putblk(si_ic(si), trash.str, msglen);
+									if (repl <= 0) {
+										/* no more write possible */
+										if (repl == -1) {
+											goto full;
+										}
+										appctx->st0 = PEER_SESS_ST_END;
+										goto switchstate;
+									}
+									st->last_pushed = ts->upd.key;
+									/* identifier may not needed in next update message */
+									new_pushed = 0;
+
+									eb = eb32_next(eb);
+								}
 							}
-							eb = eb32_next(eb);
 						}
-					} /* !TEACH_STAGE2 */
+						else {
+							if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
+								struct eb32_node *eb;
+								int new_pushed;
 
-					if (!(ps->flags & PEER_F_TEACH_FINISHED)) {
-						/* process final lesson message */
-						repl = bi_putchr(si_ic(si), ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C');
-						if (repl <= 0) {
-							/* no more write possible */
-							if (repl == -1)
-								goto full;
-							appctx->st0 = PEER_SESS_ST_END;
-							goto switchstate;
-						}
+								if (st != curpeer->last_local_table) {
+									int msglen;
+
+									msglen = peer_prepare_switchmsg(st, trash.str, trash.size);
+									if (!msglen) {
+										/* internal error: message does not fit in trash */
+										appctx->st0 = PEER_SESS_ST_END;
+										goto switchstate;
+									}
 
-						/* flag finished message sent */
-						ps->flags |= PEER_F_TEACH_FINISHED;
-					} /* !TEACH_FINISHED */
-				} /* TEACH_PROCESS */
+									/* message to buffer */
+									repl = bi_putblk(si_ic(si), trash.str, msglen);
+									if (repl <= 0) {
+										/* no more write possible */
+										if (repl == -1) {
+											goto full;
+										}
+										appctx->st0 = PEER_SESS_ST_END;
+										goto switchstate;
+									}
+									curpeer->last_local_table = st;
+								}
 
-				if (!(ps->flags & PEER_F_LEARN_ASSIGN) &&
-				     (int)(ps->pushed - ps->table->table->localupdate) < 0) {
-					/* Push local updates, only if no learning in progress (to avoid ping-pong effects) */
-					struct eb32_node *eb;
+								/* We force new pushed to 1 to force identifier in update message */
+								new_pushed = 1;
+								eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
+								while (1) {
+									uint32_t msglen;
+									struct stksess *ts;
 
-					eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
-					while (1) {
-						int msglen;
-						struct stksess *ts;
+									/* push local updates */
+									if (!eb) {
+										st->flags |= SHTABLE_F_TEACH_STAGE1;
+										eb = eb32_first(&st->table->updates);
+										if (eb)
+											st->last_pushed = eb->key - 1;
+										break;
+									}
 
-						/* push local updates */
-						if (!eb) {
-							eb = eb32_first(&ps->table->table->updates);
-							if (!eb || ((int)(eb->key - ps->pushed) <= 0)) {
-								ps->pushed = ps->table->table->localupdate;
-								break;
+									ts = eb32_entry(eb, struct stksess, upd);
+									msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed);
+									if (!msglen) {
+										/* internal error: message does not fit in trash */
+										appctx->st0 = PEER_SESS_ST_END;
+										goto switchstate;
+									}
+
+									/* message to buffer */
+									repl = bi_putblk(si_ic(si), trash.str, msglen);
+									if (repl <= 0) {
+										/* no more write possible */
+										if (repl == -1) {
+											goto full;
+										}
+										appctx->st0 = PEER_SESS_ST_END;
+										goto switchstate;
+									}
+									st->last_pushed = ts->upd.key;
+									/* identifier may not needed in next update message */
+									new_pushed = 0;
+
+									eb = eb32_next(eb);
+								}
 							}
-						}
 
-						if ((int)(eb->key - ps->table->table->localupdate) > 0) {
-							ps->pushed = ps->table->table->localupdate;
-							break;
-						}
+							if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
+								struct eb32_node *eb;
+								int new_pushed;
 
-						ts = eb32_entry(eb, struct stksess, upd);
-						msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
-						if (msglen) {
-							/* message to buffer */
-							repl = bi_putblk(si_ic(si), trash.str, msglen);
-							if (repl <= 0) {
-								/* no more write possible */
-								if (repl == -1)
-									goto full;
-								appctx->st0 = PEER_SESS_ST_END;
-								goto switchstate;
+								if (st != curpeer->last_local_table) {
+									int msglen;
+
+									msglen = peer_prepare_switchmsg(st, trash.str, trash.size);
+									if (!msglen) {
+										/* internal error: message does not fit in trash */
+										appctx->st0 = PEER_SESS_ST_END;
+										goto switchstate;
+									}
+
+									/* message to buffer */
+									repl = bi_putblk(si_ic(si), trash.str, msglen);
+									if (repl <= 0) {
+										/* no more write possible */
+										if (repl == -1) {
+											goto full;
+										}
+										appctx->st0 = PEER_SESS_ST_END;
+										goto switchstate;
+									}
+									curpeer->last_local_table = st;
+								}
+
+								/* We force new pushed to 1 to force identifier in update message */
+								new_pushed = 1;
+								eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
+								while (1) {
+									uint32_t msglen;
+									struct stksess *ts;
+
+									/* push local updates */
+									if (!eb || eb->key > st->teaching_origin) {
+										st->flags |= SHTABLE_F_TEACH_STAGE2;
+										eb = eb32_first(&st->table->updates);
+										if (eb)
+											st->last_pushed = eb->key - 1;
+										break;
+									}
+
+									ts = eb32_entry(eb, struct stksess, upd);
+									msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed);
+									if (!msglen) {
+										/* internal error: message does not fit in trash */
+										appctx->st0 = PEER_SESS_ST_END;
+										goto switchstate;
+									}
+
+									/* message to buffer */
+									repl = bi_putblk(si_ic(si), trash.str, msglen);
+									if (repl <= 0) {
+										/* no more write possible */
+										if (repl == -1) {
+											goto full;
+										}
+										appctx->st0 = PEER_SESS_ST_END;
+										goto switchstate;
+									}
+									st->last_pushed = ts->upd.key;
+									/* identifier may not needed in next update message */
+									new_pushed = 0;
+
+									eb = eb32_next(eb);
+								}
 							}
-							ps->lastpush = ps->pushed = ts->upd.key;
 						}
-						eb = eb32_next(eb);
+
+						if (st == last_local_table)
+							break;
+						st = st->next;
 					}
-				} /* ! LEARN_ASSIGN */
+				}
+
+
+				if ((curpeer->flags & PEER_F_TEACH_PROCESS) && !(curpeer->flags & PEER_F_TEACH_FINISHED)) {
+					unsigned char msg[2];
+
+                                        /* Current peer was elected to request a resync */
+					msg[0] = PEER_MSG_CLASS_CONTROL;
+					msg[1] = ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED) ? PEER_MSG_CTRL_RESYNCFINISHED : PEER_MSG_CTRL_RESYNCPARTIAL;
+					/* process final lesson message */
+					repl = bi_putblk(si_ic(si), (char *)msg, sizeof(msg));
+					if (repl <= 0) {
+						/* no more write possible */
+						if (repl == -1)
+							goto full;
+						appctx->st0 = PEER_SESS_ST_END;
+						goto switchstate;
+					}
+					/* flag finished message sent */
+					curpeer->flags |= PEER_F_TEACH_FINISHED;
+				}
+
 				/* noting more to do */
 				goto out;
 			}
@@ -1035,7 +1444,29 @@
 				if (bi_putblk(si_ic(si), trash.str, repl) == -1)
 					goto full;
 				appctx->st0 = PEER_SESS_ST_END;
+				goto switchstate;
+			case PEER_SESS_ST_ERRSIZE: {
+				unsigned char msg[2];
+
+				msg[0] = PEER_MSG_CLASS_ERROR;
+				msg[1] = PEER_MSG_ERR_SIZELIMIT;
+
+				if (bi_putblk(si_ic(si), (char *)msg, sizeof(msg)) == -1)
+					goto full;
+				appctx->st0 = PEER_SESS_ST_END;
+				goto switchstate;
+			}
+			case PEER_SESS_ST_ERRPROTO: {
+				unsigned char msg[2];
+
+				msg[0] = PEER_MSG_CLASS_ERROR;
+				msg[1] = PEER_MSG_ERR_PROTOCOL;
+
+				if (bi_putblk(si_ic(si), (char *)msg, sizeof(msg)) == -1)
+					goto full;
+				appctx->st0 = PEER_SESS_ST_END;
 				/* fall through */
+			}
 			case PEER_SESS_ST_END: {
 				si_shutw(si);
 				si_shutr(si);
@@ -1065,7 +1496,8 @@
 static void peer_session_forceshutdown(struct stream * stream)
 {
 	struct appctx *appctx = NULL;
-	struct peer_session *ps;
+	struct peer *ps;
+
 	int i;
 
 	for (i = 0; i <= 1; i++) {
@@ -1080,7 +1512,7 @@
 	if (!appctx)
 		return;
 
-	ps = (struct peer_session *)appctx->ctx.peers.ptr;
+	ps = (struct peer *)appctx->ctx.peers.ptr;
 	/* we're killing a connection, we must apply a random delay before
 	 * retrying otherwise the other end will do the same and we can loop
 	 * for a while.
@@ -1112,9 +1544,9 @@
 /*
  * Create a new peer session in assigned state (connect will start automatically)
  */
-static struct stream *peer_session_create(struct peer *peer, struct peer_session *ps)
+static struct stream *peer_session_create(struct peers *peers, struct peer *peer)
 {
-	struct listener *l = LIST_NEXT(&peer->peers->peers_fe->conf.listeners, struct listener *, by_fe);
+	struct listener *l = LIST_NEXT(&peers->peers_fe->conf.listeners, struct listener *, by_fe);
 	struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */
 	struct appctx *appctx;
 	struct session *sess;
@@ -1122,8 +1554,8 @@
 	struct task *t;
 	struct connection *conn;
 
-	ps->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
-	ps->statuscode = PEER_SESS_SC_CONNECTCODE;
+	peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
+	peer->statuscode = PEER_SESS_SC_CONNECTCODE;
 	s = NULL;
 
 	appctx = appctx_new(&peer_applet);
@@ -1131,7 +1563,7 @@
 		goto out_close;
 
 	appctx->st0 = PEER_SESS_ST_CONNECT;
-	appctx->ctx.peers.ptr = (void *)ps;
+	appctx->ctx.peers.ptr = (void *)peer;
 
 	sess = session_new(p, l, &appctx->obj_type);
 	if (!sess) {
@@ -1185,8 +1617,8 @@
 		actconn++;
 	totalconn++;
 
-	ps->appctx = appctx;
-	ps->stream = s;
+	peer->appctx = appctx;
+	peer->stream = s;
 	return s;
 
 	/* Error unrolling */
@@ -1209,41 +1641,43 @@
  */
 static struct task *process_peer_sync(struct task * task)
 {
-	struct shared_table *st = (struct shared_table *)task->context;
-	struct peer_session *ps;
+	struct peers *peers = (struct peers *)task->context;
+	struct peer *ps;
+	struct shared_table *st;
 
 	task->expire = TICK_ETERNITY;
 
-	if (!st->sessions->peer->peers->peers_fe) {
+	if (!peers->peers_fe) {
 		/* this one was never started, kill it */
-		signal_unregister_handler(st->sighandler);
-		st->table->sync_task = NULL;
-		task_delete(st->sync_task);
-		task_free(st->sync_task);
+		signal_unregister_handler(peers->sighandler);
+		peers->sync_task = NULL;
+		task_delete(peers->sync_task);
+		task_free(peers->sync_task);
 		return NULL;
 	}
 
 	if (!stopping) {
 		/* Normal case (not soft stop)*/
-		if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL) &&
-		     (!nb_oldpids || tick_is_expired(st->resync_timeout, now_ms)) &&
-		     !(st->flags & SHTABLE_F_RESYNC_ASSIGN)) {
+
+		if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) &&
+		     (!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) &&
+		     !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
 			/* Resync from local peer needed
 			   no peer was assigned for the lesson
 			   and no old local peer found
 			       or resync timeout expire */
 
 			/* flag no more resync from local, to try resync from remotes */
-			st->flags |= SHTABLE_F_RESYNC_LOCAL;
+			peers->flags |= PEERS_F_RESYNC_LOCAL;
 
 			/* reschedule a resync */
-			st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
+			peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
 		}
 
 		/* For each session */
-		for (ps = st->sessions; ps; ps = ps->next) {
+		for (ps = peers->remote; ps; ps = ps->next) {
 			/* For each remote peers */
-			if (!ps->peer->local) {
+			if (!ps->local) {
 				if (!ps->stream) {
 					/* no active stream */
 					if (ps->statuscode == 0 ||
@@ -1257,7 +1691,7 @@
 						 * and reconnection timer is expired */
 
 						/* retry a connect */
-						ps->stream = peer_session_create(ps->peer, ps);
+						ps->stream = peer_session_create(peers, ps);
 					}
 					else if (!tick_is_expired(ps->reconnect, now_ms)) {
 						/* If previous session failed during connection
@@ -1270,8 +1704,8 @@
 				} /* !ps->stream */
 				else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
 					/* current stream is active and established */
-					if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
-					    !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
+					if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
+					    !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
 					    !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
 						/* Resync from a remote is needed
 						 * and no peer was assigned for lesson
@@ -1279,14 +1713,20 @@
 
 						/* assign peer for the lesson */
 						ps->flags |= PEER_F_LEARN_ASSIGN;
-						st->flags |= SHTABLE_F_RESYNC_ASSIGN;
+						peers->flags |= PEERS_F_RESYNC_ASSIGN;
 
 						/* awake peer stream task to handle a request of resync */
 						appctx_wakeup(ps->appctx);
 					}
-					else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) {
-						/* awake peer stream task to push local updates */
-						appctx_wakeup(ps->appctx);
+					else {
+						/* Awake session if there is data to push */
+						for (st = ps->tables; st ; st = st->next) {
+							if ((int)(st->last_pushed - st->table->localupdate) < 0) {
+								/* awake peer stream task to push local updates */
+								appctx_wakeup(ps->appctx);
+								break;
+							}
+						}
 					}
 					/* else do nothing */
 				} /* SUCCESSCODE */
@@ -1294,36 +1734,38 @@
 		} /* for */
 
 		/* Resync from remotes expired: consider resync is finished */
-		if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
-		    !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
-		    tick_is_expired(st->resync_timeout, now_ms)) {
+		if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
+		    !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
+		    tick_is_expired(peers->resync_timeout, now_ms)) {
 			/* Resync from remote peer needed
 			 * no peer was assigned for the lesson
 			 * and resync timeout expire */
 
 			/* flag no more resync from remote, consider resync is finished */
-			st->flags |= SHTABLE_F_RESYNC_REMOTE;
+			peers->flags |= PEERS_F_RESYNC_REMOTE;
 		}
 
-		if ((st->flags & SHTABLE_RESYNC_STATEMASK) != SHTABLE_RESYNC_FINISHED) {
+		if ((peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) {
 			/* Resync not finished*/
 			/* reschedule task to resync timeout, to ended resync if needed */
-			task->expire = tick_first(task->expire, st->resync_timeout);
+			task->expire = tick_first(task->expire, peers->resync_timeout);
 		}
 	} /* !stopping */
 	else {
 		/* soft stop case */
 		if (task->state & TASK_WOKEN_SIGNAL) {
 			/* We've just recieved the signal */
-			if (!(st->flags & SHTABLE_F_DONOTSTOP)) {
+			if (!(peers->flags & PEERS_F_DONOTSTOP)) {
 				/* add DO NOT STOP flag if not present */
 				jobs++;
-				st->flags |= SHTABLE_F_DONOTSTOP;
-				st->table->syncing++;
+				peers->flags |= PEERS_F_DONOTSTOP;
+				ps = peers->local;
+				for (st = ps->tables; st ; st = st->next)
+					st->table->syncing++;
 			}
 
 			/* disconnect all connected peers */
-			for (ps = st->sessions; ps; ps = ps->next) {
+			for (ps = peers->remote; ps; ps = ps->next) {
 				if (ps->stream) {
 					peer_session_forceshutdown(ps->stream);
 					ps->stream = NULL;
@@ -1331,14 +1773,15 @@
 				}
 			}
 		}
-		ps = st->local_session;
 
+		ps = peers->local;
 		if (ps->flags & PEER_F_TEACH_COMPLETE) {
-			if (st->flags & SHTABLE_F_DONOTSTOP) {
+			if (peers->flags & PEERS_F_DONOTSTOP) {
 				/* resync of new process was complete, current process can die now */
 				jobs--;
-				st->flags &= ~SHTABLE_F_DONOTSTOP;
-				st->table->syncing--;
+				peers->flags &= ~PEERS_F_DONOTSTOP;
+				for (st = ps->tables; st ; st = st->next)
+					st->table->syncing--;
 			}
 		}
 		else if (!ps->stream) {
@@ -1353,66 +1796,81 @@
 				 * or during previous connect, peer replies a try again statuscode */
 
 				/* connect to the peer */
-				peer_session_create(ps->peer, ps);
+				peer_session_create(peers, ps);
 			}
 			else {
 				/* Other error cases */
-				if (st->flags & SHTABLE_F_DONOTSTOP) {
+				if (peers->flags & PEERS_F_DONOTSTOP) {
 					/* unable to resync new process, current process can die now */
 					jobs--;
-					st->flags &= ~SHTABLE_F_DONOTSTOP;
-					st->table->syncing--;
+					peers->flags &= ~PEERS_F_DONOTSTOP;
+					for (st = ps->tables; st ; st = st->next)
+						st->table->syncing--;
 				}
 			}
 		}
-		else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE &&
-		         (int)(ps->pushed - ps->table->table->localupdate) < 0) {
+		else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
 			/* current stream active and established
 			   awake stream to push remaining local updates */
-			appctx_wakeup(ps->appctx);
+			for (st = ps->tables; st ; st = st->next) {
+				if ((int)(st->last_pushed - st->table->localupdate) < 0) {
+					/* awake peer stream task to push local updates */
+					appctx_wakeup(ps->appctx);
+					break;
+				}
+			}
 		}
 	} /* stopping */
 	/* Wakeup for re-connect */
 	return task;
 }
 
+
 /*
- * Function used to register a table for sync on a group of peers
  *
  */
-void peers_register_table(struct peers *peers, struct stktable *table)
+void peers_init_sync(struct peers *peers)
 {
-	struct shared_table *st;
 	struct peer * curpeer;
-	struct peer_session *ps;
 	struct listener *listener;
 
-	st = (struct shared_table *)calloc(1,sizeof(struct shared_table));
-	st->table = table;
-	st->next = peers->tables;
-	st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
-	peers->tables = st;
-
 	for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
-		ps = (struct peer_session *)calloc(1,sizeof(struct peer_session));
-		ps->table = st;
-		ps->peer = curpeer;
-		if (curpeer->local)
-			st->local_session = ps;
-		ps->next = st->sessions;
-		ps->reconnect = now_ms;
-		st->sessions = ps;
 		peers->peers_fe->maxconn += 3;
 	}
 
 	list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
 		listener->maxconn = peers->peers_fe->maxconn;
-	st->sync_task = task_new();
-	st->sync_task->process = process_peer_sync;
-	st->sync_task->expire = TICK_ETERNITY;
-	st->sync_task->context = (void *)st;
-	table->sync_task = st->sync_task;
-	st->sighandler = signal_register_task(0, table->sync_task, 0);
-	task_wakeup(st->sync_task, TASK_WOKEN_INIT);
+	peers->sync_task = task_new();
+	peers->sync_task->process = process_peer_sync;
+	peers->sync_task->expire = TICK_ETERNITY;
+	peers->sync_task->context = (void *)peers;
+	peers->sighandler = signal_register_task(0, peers->sync_task, 0);
+	task_wakeup(peers->sync_task, TASK_WOKEN_INIT);
+}
+
+
+
+/*
+ * Function used to register a table for sync on a group of peers
+ *
+ */
+void peers_register_table(struct peers *peers, struct stktable *table)
+{
+	struct shared_table *st;
+	struct peer * curpeer;
+	int id = 0;
+
+	for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
+		st = (struct shared_table *)calloc(1,sizeof(struct shared_table));
+		st->table = table;
+		st->next = curpeer->tables;
+		if (curpeer->tables)
+			id = curpeer->tables->local_id;
+		st->local_id = id + 1;
+
+		curpeer->tables = st;
+	}
+
+	table->sync_task = peers->sync_task;
 }