REORG/MEDIUM: stream-int: introduce si_ic/si_oc to access channels
We'll soon remove direct references to the channels from the stream
interface since everything belongs to the same session, so let's
first not dereference si->ib / si->ob anymore and use macros instead.
diff --git a/src/peers.c b/src/peers.c
index c87c8a5..48f5fc4 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -226,7 +226,7 @@
appctx->st0 = PEER_SESS_ST_GETVERSION;
/* fall through */
case PEER_SESS_ST_GETVERSION:
- reql = bo_getline(si->ob, trash.str, trash.size);
+ reql = bo_getline(si_oc(si), trash.str, trash.size);
if (reql <= 0) { /* closed or EOL not found */
if (reql == 0)
goto out;
@@ -242,7 +242,7 @@
else
trash.str[reql-1] = 0;
- bo_skip(si->ob, reql);
+ bo_skip(si_oc(si), reql);
/* test version */
if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash.str) != 0) {
@@ -257,7 +257,7 @@
appctx->st0 = PEER_SESS_ST_GETHOST;
/* fall through */
case PEER_SESS_ST_GETHOST:
- reql = bo_getline(si->ob, trash.str, trash.size);
+ reql = bo_getline(si_oc(si), trash.str, trash.size);
if (reql <= 0) { /* closed or EOL not found */
if (reql == 0)
goto out;
@@ -273,7 +273,7 @@
else
trash.str[reql-1] = 0;
- bo_skip(si->ob, reql);
+ bo_skip(si_oc(si), reql);
/* test hostname match */
if (strcmp(localpeer, trash.str) != 0) {
@@ -287,7 +287,7 @@
case PEER_SESS_ST_GETPEER: {
struct peer *curpeer;
char *p;
- reql = bo_getline(si->ob, trash.str, trash.size);
+ reql = bo_getline(si_oc(si), trash.str, trash.size);
if (reql <= 0) { /* closed or EOL not found */
if (reql == 0)
goto out;
@@ -304,7 +304,7 @@
else
trash.str[reql-1] = 0;
- bo_skip(si->ob, reql);
+ bo_skip(si_oc(si), reql);
/* parse line "<peer name> <pid>" */
p = strchr(trash.str, ' ');
@@ -340,7 +340,7 @@
size_t key_size;
char *p;
- reql = bo_getline(si->ob, trash.str, trash.size);
+ reql = bo_getline(si_oc(si), trash.str, trash.size);
if (reql <= 0) { /* closed or EOL not found */
if (reql == 0)
goto out;
@@ -361,7 +361,7 @@
else
trash.str[reql-1] = 0;
- bo_skip(si->ob, reql);
+ bo_skip(si_oc(si), reql);
/* Parse line "<table name> <type> <size>" */
p = strchr(trash.str, ' ');
@@ -447,7 +447,7 @@
struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE);
- repl = bi_putblk(si->ib, trash.str, repl);
+ repl = bi_putblk(si_ic(si), trash.str, repl);
if (repl <= 0) {
if (repl == -1)
goto out;
@@ -511,7 +511,7 @@
goto switchstate;
}
- repl = bi_putblk(si->ib, trash.str, repl);
+ repl = bi_putblk(si_ic(si), trash.str, repl);
if (repl <= 0) {
if (repl == -1)
goto out;
@@ -526,10 +526,10 @@
case PEER_SESS_ST_GETSTATUS: {
struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
- if (si->ib->flags & CF_WRITE_PARTIAL)
+ if (si_ic(si)->flags & CF_WRITE_PARTIAL)
ps->statuscode = PEER_SESS_SC_CONNECTEDCODE;
- reql = bo_getline(si->ob, trash.str, trash.size);
+ reql = bo_getline(si_oc(si), trash.str, trash.size);
if (reql <= 0) { /* closed or EOL not found */
if (reql == 0)
goto out;
@@ -546,7 +546,7 @@
else
trash.str[reql-1] = 0;
- bo_skip(si->ob, reql);
+ bo_skip(si_oc(si), reql);
/* Register status code */
ps->statuscode = atoi(trash.str);
@@ -600,7 +600,7 @@
char c;
int totl = 0;
- reql = bo_getblk(si->ob, (char *)&c, sizeof(c), totl);
+ reql = bo_getblk(si_oc(si), (char *)&c, sizeof(c), totl);
if (reql <= 0) /* closed or EOL not found */
goto incomplete;
@@ -617,7 +617,7 @@
pushack = ps->pushack + (unsigned int)(c & 0x7F);
}
else {
- reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
+ reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
if (reql <= 0) /* closed or EOL not found */
goto incomplete;
@@ -638,7 +638,7 @@
unsigned int to_read, to_store;
/* read size first */
- reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
+ reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
if (reql <= 0) /* closed or EOL not found */
goto incomplete;
@@ -647,7 +647,7 @@
to_store = 0;
to_read = ntohl(netinteger);
- if (to_read + totl > si->ob->buf->size) {
+ if (to_read + totl > si_oc(si)->buf->size) {
/* impossible to read a key this large, abort */
reql = -1;
goto incomplete;
@@ -661,7 +661,7 @@
* the rest is drained into the trash.
*/
if (to_store) {
- reql = bo_getblk(si->ob, (char *)newts->key.key, to_store, totl);
+ reql = bo_getblk(si_oc(si), (char *)newts->key.key, to_store, totl);
if (reql <= 0) /* closed or incomplete */
goto incomplete;
newts->key.key[reql] = 0;
@@ -669,14 +669,14 @@
to_read -= reql;
}
if (to_read) {
- reql = bo_getblk(si->ob, trash.str, to_read, totl);
+ reql = bo_getblk(si_oc(si), trash.str, to_read, totl);
if (reql <= 0) /* closed or incomplete */
goto incomplete;
totl += reql;
}
}
else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
- reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
+ reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
if (reql <= 0) /* closed or EOL not found */
goto incomplete;
newts = stksess_new(ps->table->table, NULL);
@@ -689,14 +689,14 @@
else {
/* type ip or binary */
newts = stksess_new(ps->table->table, NULL);
- reql = bo_getblk(si->ob, newts ? (char *)newts->key.key : trash.str, ps->table->table->key_size, totl);
+ reql = bo_getblk(si_oc(si), newts ? (char *)newts->key.key : trash.str, ps->table->table->key_size, totl);
if (reql <= 0) /* closed or EOL not found */
goto incomplete;
totl += reql;
}
/* read server id */
- reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
+ reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
if (reql <= 0) /* closed or EOL not found */
goto incomplete;
@@ -803,7 +803,7 @@
/* ack message */
uint32_t netinteger;
- reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
+ reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
if (reql <= 0) /* closed or EOL not found */
goto incomplete;
@@ -819,7 +819,7 @@
}
/* skip consumed message */
- bo_skip(si->ob, totl);
+ bo_skip(si_oc(si), totl);
/* loop on that state to peek next message */
goto switchstate;
@@ -844,7 +844,7 @@
/* Confirm finished or partial messages */
while (ps->confirm) {
/* There is a confirm messages to send */
- repl = bi_putchr(si->ib, 'c');
+ repl = bi_putchr(si_ic(si), 'c');
if (repl <= 0) {
/* no more write possible */
if (repl == -1)
@@ -861,7 +861,7 @@
!(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) {
/* Current peer was elected to request a resync */
- repl = bi_putchr(si->ib, 'R');
+ repl = bi_putchr(si_ic(si), 'R');
if (repl <= 0) {
/* no more write possible */
if (repl == -1)
@@ -880,7 +880,7 @@
netinteger = htonl(ps->pushack);
memcpy(&trash.str[1], &netinteger, sizeof(netinteger));
- repl = bi_putblk(si->ib, trash.str, 1+sizeof(netinteger));
+ repl = bi_putblk(si_ic(si), trash.str, 1+sizeof(netinteger));
if (repl <= 0) {
/* no more write possible */
if (repl == -1)
@@ -916,7 +916,7 @@
msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
if (msglen) {
/* message to buffer */
- repl = bi_putblk(si->ib, trash.str, msglen);
+ repl = bi_putblk(si_ic(si), trash.str, msglen);
if (repl <= 0) {
/* no more write possible */
if (repl == -1)
@@ -950,7 +950,7 @@
msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
if (msglen) {
/* message to buffer */
- repl = bi_putblk(si->ib, trash.str, msglen);
+ repl = bi_putblk(si_ic(si), trash.str, msglen);
if (repl <= 0) {
/* no more write possible */
if (repl == -1)
@@ -966,7 +966,7 @@
if (!(ps->flags & PEER_F_TEACH_FINISHED)) {
/* process final lesson message */
- repl = bi_putchr(si->ib, ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C');
+ repl = bi_putchr(si_ic(si), ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C');
if (repl <= 0) {
/* no more write possible */
if (repl == -1)
@@ -1008,7 +1008,7 @@
msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
if (msglen) {
/* message to buffer */
- repl = bi_putblk(si->ib, trash.str, msglen);
+ repl = bi_putblk(si_ic(si), trash.str, msglen);
if (repl <= 0) {
/* no more write possible */
if (repl == -1)
@@ -1027,24 +1027,24 @@
case PEER_SESS_ST_EXIT:
repl = snprintf(trash.str, trash.size, "%d\n", appctx->st1);
- if (bi_putblk(si->ib, trash.str, repl) == -1)
+ if (bi_putblk(si_ic(si), trash.str, repl) == -1)
goto out;
appctx->st0 = PEER_SESS_ST_END;
/* fall through */
case PEER_SESS_ST_END: {
si_shutw(si);
si_shutr(si);
- si->ib->flags |= CF_READ_NULL;
+ si_ic(si)->flags |= CF_READ_NULL;
goto quit;
}
}
}
out:
si_update(si);
- si->ob->flags |= CF_READ_DONTWAIT;
+ si_oc(si)->flags |= CF_READ_DONTWAIT;
/* we don't want to expire timeouts while we're processing requests */
- si->ib->rex = TICK_ETERNITY;
- si->ob->wex = TICK_ETERNITY;
+ si_ic(si)->rex = TICK_ETERNITY;
+ si_oc(si)->wex = TICK_ETERNITY;
quit:
return;
}