MEDIUM: stream: make stream_new() allocate its own task
Currently a task is allocated in session_new() and serves two purposes :
- either the handshake is complete and it is offered to the stream via
the second arg of stream_new()
- or the handshake is not complete and it's diverted to be used as a
timeout handler for the embryonic session and repurposed once we land
into conn_complete_session()
Furthermore, the task's process() function was taken from the listener's
handler in conn_complete_session() prior to being replaced by a call to
stream_new(). This will become a serious mess with the mux.
Since it's impossible to have a stream without a task, this patch removes
the second arg from stream_new() and make this function allocate its own
task. In session_accept_fd(), we now only allocate the task if needed for
the embryonic session and delete it later.
diff --git a/include/proto/stream.h b/include/proto/stream.h
index 5ff2291..44fc8be 100644
--- a/include/proto/stream.h
+++ b/include/proto/stream.h
@@ -35,7 +35,7 @@
extern struct data_cb sess_conn_cb;
-struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *origin);
+struct stream *stream_new(struct session *sess, enum obj_type *origin);
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
int init_stream();
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 1a8bd2c..47aef57 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -1901,7 +1901,6 @@
{
struct appctx *appctx;
struct session *sess;
- struct task *task;
struct stream *strm;
if ((appctx = appctx_new(&spoe_applet)) == NULL)
@@ -1937,12 +1936,9 @@
if (!sess)
goto out_free_spoe;
- if ((task = task_new()) == NULL)
+ if ((strm = stream_new(sess, &appctx->obj_type)) == NULL)
goto out_free_sess;
- if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
- goto out_free_task;
-
stream_set_backend(strm, conf->agent->b.be);
/* applet is waiting for data */
@@ -1960,12 +1956,10 @@
LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
conf->agent->applets_act++;
- task_wakeup(task, TASK_WOKEN_INIT);
+ task_wakeup(strm->task, TASK_WOKEN_INIT);
return appctx;
/* Error unrolling */
- out_free_task:
- task_free(task);
out_free_sess:
session_free(sess);
out_free_spoe:
diff --git a/src/hlua.c b/src/hlua.c
index 0f82425..594d880 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -2297,7 +2297,6 @@
struct appctx *appctx;
struct session *sess;
struct stream *strm;
- struct task *task;
/* Check stack size. */
if (!lua_checkstack(L, 3)) {
@@ -2341,14 +2340,7 @@
goto out_fail_sess;
}
- task = task_new();
- if (!task) {
- hlua_pusherror(L, "socket: out of memory");
- goto out_fail_task;
- }
- task->nice = 0;
-
- strm = stream_new(sess, task, &appctx->obj_type);
+ strm = stream_new(sess, &appctx->obj_type);
if (!strm) {
hlua_pusherror(L, "socket: out of memory");
goto out_fail_stream;
@@ -2372,13 +2364,11 @@
jobs++;
totalconn++;
- task_wakeup(task, TASK_WOKEN_INIT);
+ task_wakeup(strm->task, TASK_WOKEN_INIT);
/* Return yield waiting for connection. */
return 1;
out_fail_stream:
- task_free(task);
- out_fail_task:
session_free(sess);
out_fail_sess:
appctx_free(appctx);
diff --git a/src/peers.c b/src/peers.c
index 249edf7..d03e72f 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -1784,7 +1784,6 @@
struct appctx *appctx;
struct session *sess;
struct stream *s;
- struct task *t;
struct connection *conn;
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
@@ -1804,15 +1803,9 @@
goto out_free_appctx;
}
- if ((t = task_new()) == NULL) {
- Alert("out of memory in peer_session_create().\n");
- goto out_free_sess;
- }
- t->nice = l->nice;
-
- if ((s = stream_new(sess, t, &appctx->obj_type)) == NULL) {
+ if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
Alert("Failed to initialize stream in peer_session_create().\n");
- goto out_free_task;
+ goto out_free_sess;
}
/* The tasks below are normally what is supposed to be done by
@@ -1851,7 +1844,7 @@
totalconn++;
peer->appctx = appctx;
- task_wakeup(t, TASK_WOKEN_INIT);
+ task_wakeup(s->task, TASK_WOKEN_INIT);
return appctx;
/* Error unrolling */
@@ -1859,8 +1852,6 @@
LIST_DEL(&s->by_sess);
LIST_DEL(&s->list);
pool_free2(pool2_stream, s);
- out_free_task:
- task_free(t);
out_free_sess:
session_free(sess);
out_free_appctx:
diff --git a/src/session.c b/src/session.c
index cc2a0b8..ea4e020 100644
--- a/src/session.c
+++ b/src/session.c
@@ -109,7 +109,7 @@
struct connection *cli_conn;
struct proxy *p = l->bind_conf->frontend;
struct session *sess;
- struct task *t;
+ struct stream *strm;
int ret;
@@ -222,12 +222,6 @@
if (global.tune.client_rcvbuf)
setsockopt(cfd, SOL_SOCKET, SO_RCVBUF, &global.tune.client_rcvbuf, sizeof(global.tune.client_rcvbuf));
- if (unlikely((t = task_new()) == NULL))
- goto out_free_sess;
-
- t->context = sess;
- t->nice = l->nice;
-
/* OK, now either we have a pending handshake to execute with and
* then we must return to the I/O layer, or we can proceed with the
* end of the stream initialization. In case of handshake, we also
@@ -241,10 +235,18 @@
* conn -- owner ---> task
*/
if (cli_conn->flags & CO_FL_HANDSHAKE) {
+ struct task *t;
+
+ if (unlikely((t = task_new()) == NULL))
+ goto out_free_sess;
+
conn_set_owner(cli_conn, t);
conn_set_xprt_done_cb(cli_conn, conn_complete_session);
+
+ t->context = sess;
+ t->nice = l->nice;
t->process = session_expire_embryonic;
- t->expire = tick_add_ifset(now_ms, p->timeout.client);
+ t->expire = tick_add_ifset(now_ms, p->timeout.client);
task_queue(t);
return 1;
}
@@ -261,14 +263,12 @@
goto out_free_sess;
session_count_new(sess);
- if (!stream_new(sess, t, &cli_conn->obj_type))
- goto out_free_task;
+ if ((strm = stream_new(sess, &cli_conn->obj_type)) == NULL)
+ goto out_free_sess;
- task_wakeup(t, TASK_WOKEN_INIT);
+ task_wakeup(strm->task, TASK_WOKEN_INIT);
return 1;
- out_free_task:
- task_free(t);
out_free_sess:
p->feconn--;
session_free(sess);
@@ -412,6 +412,7 @@
{
struct task *task = conn->owner;
struct session *sess = task->context;
+ struct stream *strm;
conn_clear_xprt_done_cb(conn);
@@ -430,11 +431,14 @@
goto fail;
session_count_new(sess);
- task->process = sess->listener->handler;
- if (!stream_new(sess, task, &conn->obj_type))
+ if ((strm = stream_new(sess, &conn->obj_type)) == NULL)
goto fail;
- task_wakeup(task, TASK_WOKEN_INIT);
+ task_wakeup(strm->task, TASK_WOKEN_INIT);
+
+ /* the embryonic session's task is not needed anymore */
+ task_delete(task);
+ task_free(task);
return 0;
fail:
diff --git a/src/stream.c b/src/stream.c
index 6f7a1be..8527c29 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -67,20 +67,22 @@
/* This function is called from the session handler which detects the end of
* handshake, in order to complete initialization of a valid stream. It must be
- * called with a session (which may be embryonic). It returns the pointer to
+ * called with a completley initialized session. It returns the pointer to
* the newly created stream, or NULL in case of fatal error. The client-facing
- * end point is assigned to <origin>, which must be valid. The task's context
- * is set to the new stream, and its function is set to process_stream().
- * Target and analysers are null.
+ * end point is assigned to <origin>, which must be valid. The stream's task
+ * is configured with a nice value inherited from the listener's nice if any.
+ * The task's context is set to the new stream, and its function is set to
+ * process_stream(). Target and analysers are null.
*/
-struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *origin)
+struct stream *stream_new(struct session *sess, enum obj_type *origin)
{
struct stream *s;
+ struct task *t;
struct connection *conn = objt_conn(origin);
struct appctx *appctx = objt_appctx(origin);
if (unlikely((s = pool_alloc2(pool2_stream)) == NULL))
- return s;
+ goto out_fail_alloc;
/* minimum stream initialization required for an embryonic stream is
* fairly low. We need very little to execute L4 ACLs, then we need a
@@ -145,11 +147,16 @@
s->flags |= SF_INITIALIZED;
s->unique_id = NULL;
+ if ((t = task_new()) == NULL)
+ goto out_fail_alloc;
+
s->task = t;
s->pending_events = 0;
t->process = process_stream;
t->context = s;
t->expire = TICK_ETERNITY;
+ if (sess->listener)
+ t->nice = sess->listener->nice;
/* Note: initially, the stream's backend points to the frontend.
* This changes later when switching rules are executed or
@@ -250,6 +257,8 @@
/* Error unrolling */
out_fail_accept:
flt_stream_release(s, 0);
+ task_free(t);
+ out_fail_alloc:
LIST_DEL(&s->by_sess);
LIST_DEL(&s->list);
pool_free2(pool2_stream, s);