MEDIUM: connections/mux: Revamp the send direction.
Totally nuke the "send" method, instead, the upper layer decides when it's
time to send data, and if it's not possible, uses the new subscribe() method
to be called when it can send data again.
diff --git a/include/proto/connection.h b/include/proto/connection.h
index 16103b7..a2580f1 100644
--- a/include/proto/connection.h
+++ b/include/proto/connection.h
@@ -598,6 +598,7 @@
{
cs->obj_type = OBJ_TYPE_CS;
cs->flags = CS_FL_NONE;
+ LIST_INIT(&cs->wait_list.list);
LIST_INIT(&cs->send_wait_list);
cs->conn = conn;
}
@@ -663,6 +664,8 @@
/* Releases a conn_stream previously allocated by cs_new() */
static inline void cs_free(struct conn_stream *cs)
{
+ if (cs->wait_list.task)
+ tasklet_free(cs->wait_list.task);
pool_free(pool_head_connstream, cs);
}
@@ -681,6 +684,11 @@
if (!likely(cs))
return NULL;
+ cs->wait_list.task = tasklet_new();
+ if (!likely(cs->wait_list.task)) {
+ cs_free(cs);
+ return NULL;
+ }
if (!conn) {
conn = conn_new();
if (!likely(conn)) {
diff --git a/include/types/connection.h b/include/types/connection.h
index de0c32a..9a57fce 100644
--- a/include/types/connection.h
+++ b/include/types/connection.h
@@ -307,7 +307,6 @@
struct mux_ops {
int (*init)(struct connection *conn); /* early initialization */
void (*recv)(struct connection *conn); /* mux-layer recv callback */
- void (*send)(struct connection *conn); /* mux-layer send callback */
int (*wake)(struct connection *conn); /* mux-layer callback to report activity, mandatory */
void (*update_poll)(struct conn_stream *cs); /* commit cs flags to mux/conn */
size_t (*rcv_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */
@@ -334,7 +333,6 @@
*/
struct data_cb {
void (*recv)(struct conn_stream *cs); /* data-layer recv callback */
- void (*send)(struct conn_stream *cs); /* data-layer send callback */
int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */
int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
char name[8]; /* data layer name, zero-terminated */
@@ -370,6 +368,7 @@
enum obj_type obj_type; /* differentiates connection from applet context */
unsigned int flags; /* CS_FL_* */
struct connection *conn; /* xprt-level connection */
+ struct wait_list wait_list; /* We're in a wait list for send */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
void *data; /* pointer to upper layer's entity (eg: stream interface) */
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */
diff --git a/src/checks.c b/src/checks.c
index c7b5c30..624a065 100644
--- a/src/checks.c
+++ b/src/checks.c
@@ -69,6 +69,7 @@
static int tcpcheck_get_step_id(struct check *);
static char * tcpcheck_get_step_comment(struct check *, int);
static int tcpcheck_main(struct check *);
+static void __event_srv_chk_w(struct conn_stream *cs);
static struct pool_head *pool_head_email_alert = NULL;
static struct pool_head *pool_head_tcpcheck_rule = NULL;
@@ -709,23 +710,42 @@
* the connection acknowledgement. If the proxy requires L7 health-checks,
* it sends the request. In other cases, it calls set_server_check_status()
* to set check->status, check->duration and check->result.
+ */
+static struct task *event_srv_chk_w(struct task *task, void *ctx, unsigned short state)
+{
+ struct conn_stream *cs = ctx;
+ struct check __maybe_unused *check = cs->data;
+
+ HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock);
+ __event_srv_chk_w(cs);
+ HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock);
+ return NULL;
+}
+
+/* same as above but protected by the server lock.
*
* Please do NOT place any return statement in this function and only leave
- * via the out_unlock label.
+ * via the out label. NOTE THAT THIS FUNCTION DOESN'T LOCK, YOU PROBABLY WANT
+ * TO USE event_srv_chk_w() instead.
*/
-static void event_srv_chk_w(struct conn_stream *cs)
+static void __event_srv_chk_w(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct check *check = cs->data;
struct server *s = check->server;
struct task *t = check->task;
- HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock);
if (unlikely(check->result == CHK_RES_FAILED))
goto out_wakeup;
- if (conn->flags & CO_FL_HANDSHAKE)
- goto out_unlock;
+ if (conn->flags & CO_FL_HANDSHAKE) {
+ if (cs->wait_list.task->process != event_srv_chk_w) {
+ cs->wait_list.task->process = event_srv_chk_w;
+ cs->wait_list.task->context = cs;
+ }
+ LIST_ADDQ(&conn->send_wait_list, &cs->wait_list.list);
+ goto out;
+ }
if (retrieve_errno_from_socket(conn)) {
chk_report_conn_err(check, errno, 0);
@@ -748,19 +768,24 @@
/* wake() will take care of calling tcpcheck_main() */
if (check->type == PR_O2_TCPCHK_CHK)
- goto out_unlock;
+ goto out;
if (b_data(&check->bo)) {
b_del(&check->bo, conn->mux->snd_buf(cs, &check->bo, b_data(&check->bo), 0));
b_realign_if_empty(&check->bo);
-
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) {
chk_report_conn_err(check, errno, 0);
__cs_stop_both(cs);
goto out_wakeup;
}
+ if (b_data(&check->bo)) {
+ if (!cs->wait_list.task->process) {
+ cs->wait_list.task->process = event_srv_chk_w;
+ cs->wait_list.task->context = cs;
+ }
+ conn->mux->subscribe(cs, SUB_CAN_SEND, &cs->wait_list);
+ goto out;
+ }
- if (b_data(&check->bo))
- goto out_unlock;
}
/* full request sent, we allow up to <timeout.check> if nonzero for a response */
@@ -774,8 +799,8 @@
task_wakeup(t, TASK_WOKEN_IO);
out_nowake:
__cs_stop_send(cs); /* nothing more to write */
- out_unlock:
- HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock);
+ out:
+ return;
}
/*
@@ -1390,7 +1415,8 @@
ret = tcpcheck_main(check);
cs = check->cs;
conn = cs_conn(cs);
- }
+ } else if (LIST_ISEMPTY(&cs->wait_list.list))
+ __event_srv_chk_w(cs);
if (unlikely(conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) {
/* We may get error reports bypassing the I/O handlers, typically
@@ -1433,7 +1459,6 @@
struct data_cb check_conn_cb = {
.recv = event_srv_chk_r,
- .send = event_srv_chk_w,
.wake = wake_srv_chk,
.name = "CHCK",
};
diff --git a/src/connection.c b/src/connection.c
index 94e7209..e021290 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -64,6 +64,7 @@
{
struct connection *conn = fdtab[fd].owner;
unsigned int flags;
+ int can_send = 0;
if (unlikely(!conn)) {
activity[tid].conn_dead++;
@@ -127,7 +128,7 @@
* both of which will be detected below.
*/
flags = 0;
- conn->mux->send(conn);
+ can_send = LIST_ISEMPTY(&conn->send_wait_list);
while (!LIST_ISEMPTY(&conn->send_wait_list)) {
struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
struct wait_list *, list);
@@ -195,9 +196,9 @@
* Note that the wake callback is allowed to release the connection and
* the fd (and return < 0 in this case).
*/
- if ((((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) ||
+ if ((can_send || (((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) ||
((flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) != CO_FL_CONNECTED &&
- (conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED)) &&
+ (conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED))) &&
conn->mux->wake(conn) < 0)
return;
diff --git a/src/mux_h2.c b/src/mux_h2.c
index ba6bd8d..3dfb396 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -121,6 +121,7 @@
struct list fctl_list; /* list of streams blocked by connection's fctl */
struct buffer_wait buf_wait; /* wait list for buffer allocations */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
+ struct wait_list wait_list; /* We're in a wait list, to send */
};
/* H2 stream state, in h2s->st */
@@ -217,6 +218,7 @@
};
static struct task *h2_timeout_task(struct task *t, void *context, unsigned short state);
+static struct task *h2_send(struct task *t, void *ctx, unsigned short state);
/*****************************************************/
/* functions below are for dynamic buffer management */
@@ -347,6 +349,12 @@
t->expire = tick_add(now_ms, h2c->timeout);
}
+ h2c->wait_list.task = tasklet_new();
+ if (!h2c->wait_list.task)
+ goto fail;
+ h2c->wait_list.task->process = h2_send;
+ h2c->wait_list.task->context = conn;
+
h2c->ddht = hpack_dht_alloc(h2_settings_header_table_size);
if (!h2c->ddht)
goto fail;
@@ -381,12 +389,15 @@
task_queue(t);
conn_xprt_want_recv(conn);
LIST_INIT(&h2c->send_wait_list);
+ LIST_INIT(&h2c->wait_list.list);
/* mux->wake will be called soon to complete the operation */
return 0;
fail:
if (t)
task_free(t);
+ if (h2c->wait_list.task)
+ tasklet_free(h2c->wait_list.task);
pool_free(pool_head_h2c, h2c);
return -1;
}
@@ -445,6 +456,8 @@
task_wakeup(h2c->task, TASK_WOKEN_OTHER);
h2c->task = NULL;
}
+ if (h2c->wait_list.task)
+ tasklet_free(h2c->wait_list.task);
pool_free(pool_head_h2c, h2c);
}
@@ -2049,7 +2062,6 @@
h2s->flags &= ~H2_SF_BLK_ANY;
if (h2s->cs) {
- h2s->cs->data_cb->send(h2s->cs);
h2s->cs->data_cb->wake(h2s->cs);
} else {
h2s_send_rst_stream(h2c, h2s);
@@ -2091,7 +2103,6 @@
h2s->flags &= ~H2_SF_BLK_ANY;
if (h2s->cs) {
- h2s->cs->data_cb->send(h2s->cs);
h2s->cs->data_cb->wake(h2s->cs);
} else {
h2s_send_rst_stream(h2c, h2s);
@@ -2167,18 +2178,19 @@
return;
}
-/* callback called on send event by the connection handler */
-static void h2_send(struct connection *conn)
+/* Try to send data if possible */
+static struct task *h2_send(struct task *t, void *ctx, unsigned short state)
{
+ struct connection *conn = ctx;
struct h2c *h2c = conn->mux_ctx;
int done;
if (conn->flags & CO_FL_ERROR)
- return;
+ return NULL;
if (conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) {
/* a handshake was requested */
- return;
+ return NULL;
}
/* This loop is quite simple : it tries to fill as much as it can from
@@ -2243,6 +2255,13 @@
}
}
+ /* We're done, no more to send */
+ if (!b_data(&h2c->mbuf))
+ return NULL;
+schedule:
+ if (LIST_ISEMPTY(&h2c->wait_list.list))
+ conn->xprt->subscribe(conn, SUB_CAN_SEND, &h2c->wait_list);
+ return NULL;
}
/* callback called on any event by the connection handler.
@@ -2349,6 +2368,8 @@
else
h2c->task->expire = TICK_ETERNITY;
}
+
+ h2_send(NULL, conn, 0);
return 0;
}
@@ -3474,8 +3495,6 @@
else if (LIST_ISEMPTY(&h2s->list)) {
if (h2s->flags & H2_SF_BLK_MFCTL)
LIST_ADDQ(&h2s->h2c->fctl_list, &h2s->list);
- else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM))
- LIST_ADDQ(&h2s->h2c->send_list, &h2s->list);
}
return total;
@@ -3575,7 +3594,6 @@
const struct mux_ops h2_ops = {
.init = h2_init,
.recv = h2_recv,
- .send = h2_send,
.wake = h2_wake,
.update_poll = h2_update_poll,
.rcv_buf = h2_rcv_buf,
diff --git a/src/mux_pt.c b/src/mux_pt.c
index 059e499..7fb3779 100644
--- a/src/mux_pt.c
+++ b/src/mux_pt.c
@@ -97,19 +97,6 @@
cs_update_mux_polling(cs);
}
-/* callback to be used by default for the pass-through mux. It simply calls the
- * data layer send() callback which must be set.
- */
-static void mux_pt_send(struct connection *conn)
-{
- struct conn_stream *cs = conn->mux_ctx;
-
- if (conn->flags & CO_FL_ERROR)
- cs->flags |= CS_FL_ERROR;
- cs->data_cb->send(cs);
- cs_update_mux_polling(cs);
-}
-
/*
* Attach a new stream to a connection
* (Used for outgoing connections)
@@ -207,7 +194,6 @@
const struct mux_ops mux_pt_ops = {
.init = mux_pt_init,
.recv = mux_pt_recv,
- .send = mux_pt_send,
.wake = mux_pt_wake,
.update_poll = mux_pt_update_poll,
.rcv_buf = mux_pt_rcv_buf,
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 2fecb94..6fb7b53 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -52,10 +52,10 @@
static void stream_int_chk_rcv_applet(struct stream_interface *si);
static void stream_int_chk_snd_applet(struct stream_interface *si);
static void si_cs_recv_cb(struct conn_stream *cs);
-static void si_cs_send_cb(struct conn_stream *cs);
static int si_cs_wake_cb(struct conn_stream *cs);
static int si_idle_conn_wake_cb(struct conn_stream *cs);
static void si_idle_conn_null_cb(struct conn_stream *cs);
+static struct task * si_cs_send(struct task *t, void *ctx, unsigned short state);
/* stream-interface operations for embedded tasks */
struct si_ops si_embedded_ops = {
@@ -85,14 +85,12 @@
struct data_cb si_conn_cb = {
.recv = si_cs_recv_cb,
- .send = si_cs_send_cb,
.wake = si_cs_wake_cb,
.name = "STRM",
};
struct data_cb si_idle_conn_cb = {
.recv = si_idle_conn_null_cb,
- .send = si_idle_conn_null_cb,
.wake = si_idle_conn_wake_cb,
.name = "IDLE",
};
@@ -462,6 +460,10 @@
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
+ /* If we have data to send, try it now */
+ if (!channel_is_empty(oc) && objt_cs(si->end))
+ si_cs_send(NULL, objt_cs(si->end), 0);
+
/* process consumer side */
if (channel_is_empty(oc)) {
struct connection *conn = objt_cs(si->end) ? objt_cs(si->end)->conn : NULL;
@@ -632,20 +634,42 @@
* caller to commit polling changes. The caller should check conn->flags
* for errors.
*/
-static void si_cs_send(struct conn_stream *cs)
+static struct task * si_cs_send(struct task *t, void *ctx, unsigned short state)
{
+ struct conn_stream *cs = ctx;
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *oc = si_oc(si);
int ret;
+ int did_send = 0;
+
+ /* We're already waiting to be able to send, give up */
+ if (!LIST_ISEMPTY(&cs->wait_list.list))
+ return NULL;
+
+ if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
+ return NULL;
+
+ if (conn->flags & CO_FL_HANDSHAKE) {
+ /* a handshake was requested */
+ /* Schedule ourself to be woken up once the handshake is done */
+ LIST_ADDQ(&conn->send_wait_list, &cs->wait_list.list);
+ return NULL;
+ }
+
+ /* we might have been called just after an asynchronous shutw */
+ if (si_oc(si)->flags & CF_SHUTW)
+ return NULL;
/* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL;
if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
ret = conn->mux->snd_pipe(cs, oc->pipe);
- if (ret > 0)
+ if (ret > 0) {
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA | CF_WRITE_EVENT;
+ did_send = 1;
+ }
if (!oc->pipe->data) {
put_pipe(oc->pipe);
@@ -653,14 +677,14 @@
}
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return;
+ return NULL;
}
/* At this point, the pipe is empty, but we may still have data pending
* in the normal buffer.
*/
if (!co_data(oc))
- return;
+ goto wake_others;
/* when we're here, we already know that there is no spliced
* data left, and that there are sendable buffered data.
@@ -691,6 +715,7 @@
ret = conn->mux->snd_buf(cs, &oc->buf, co_data(oc), send_flag);
if (ret > 0) {
+ did_send = 1;
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA | CF_WRITE_EVENT;
co_set_data(oc, co_data(oc) - ret);
@@ -706,6 +731,26 @@
*/
}
}
+ /* We couldn't send all of our data, let the mux know we'd like to send more */
+ if (co_data(oc)) {
+ if (!cs->wait_list.task->process) {
+ cs->wait_list.task->process = si_cs_send;
+ cs->wait_list.task->context = ctx;
+ }
+ conn->mux->subscribe(cs, SUB_CAN_SEND, &cs->wait_list);
+ }
+wake_others:
+ /* Maybe somebody was waiting for this conn_stream, wake them */
+ if (did_send) {
+ while (!LIST_ISEMPTY(&cs->send_wait_list)) {
+ struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n,
+ struct wait_list *, list);
+ LIST_DEL(&sw->list);
+ LIST_INIT(&sw->list);
+ tasklet_wakeup(sw->task);
+ }
+ }
+ return NULL;
}
/* This function is designed to be called from within the stream handler to
@@ -995,7 +1040,7 @@
__cs_want_send(cs);
- si_cs_send(cs);
+ si_cs_send(NULL, cs, 0);
if (cs->flags & CS_FL_ERROR || cs->conn->flags & CO_FL_ERROR) {
/* Write error on the file descriptor */
__cs_stop_both(cs);
@@ -1313,34 +1358,6 @@
}
/*
- * This is the callback which is called by the connection layer to send data
- * from the buffer to the connection. It iterates over the transport layer's
- * snd_buf function.
- */
-static void si_cs_send_cb(struct conn_stream *cs)
-{
- struct connection *conn = cs->conn;
- struct stream_interface *si = cs->data;
-
- if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return;
-
- if (conn->flags & CO_FL_HANDSHAKE)
- /* a handshake was requested */
- return;
-
- /* we might have been called just after an asynchronous shutw */
- if (si_oc(si)->flags & CF_SHUTW)
- return;
-
- /* OK there are data waiting to be sent */
- si_cs_send(cs);
-
- /* OK all done */
- return;
-}
-
-/*
* This function propagates a null read received on a socket-based connection.
* It updates the stream interface. If the stream interface has SI_FL_NOHALF,
* the close is also forwarded to the write side as an abort.