MEDIUM: sink: Refactor sink forwarder appctx creation
A .init callback function is defined for the sink_forward_applet applet.
This function finishes the appctx startup by calling
appctx_finalize_startup() and its handles the stream customization.
diff --git a/src/sink.c b/src/sink.c
index 681f363..21b590e 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -594,6 +594,43 @@
task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
}
+static int sink_forward_session_init(struct appctx *appctx)
+{
+ struct sink_forward_target *sft = appctx->svcctx;
+ struct stream *s;
+ struct sockaddr_storage *addr = NULL;
+
+ if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
+ goto out_error;
+
+ if (appctx_finalize_startup(appctx, sft->sink->forward_px, &BUF_NULL) == -1)
+ goto out_free_addr;
+
+ s = appctx_strm(appctx);
+ s->csb->dst = addr;
+ s->csb->flags |= CS_FL_NOLINGER;
+
+ s->target = &sft->srv->obj_type;
+ s->flags = SF_ASSIGNED;
+
+ s->do_log = NULL;
+ s->uniq_id = 0;
+
+ s->res.flags |= CF_READ_DONTWAIT;
+ /* for rto and rex to eternity to not expire on idle recv:
+ * We are using a syslog server.
+ */
+ s->res.rto = TICK_ETERNITY;
+ s->res.rex = TICK_ETERNITY;
+ sft->appctx = appctx;
+
+ return 0;
+
+ out_free_addr:
+ sockaddr_free(&addr);
+ out_error:
+ return -1;
+}
static void sink_forward_session_release(struct appctx *appctx)
{
@@ -612,6 +649,7 @@
.obj_type = OBJ_TYPE_APPLET,
.name = "<SINKFWD>", /* used for logging */
.fct = sink_forward_io_handler,
+ .init = sink_forward_session_init,
.release = sink_forward_session_release,
};
@@ -619,6 +657,7 @@
.obj_type = OBJ_TYPE_APPLET,
.name = "<SINKFWDOC>", /* used for logging */
.fct = sink_forward_oc_io_handler,
+ .init = sink_forward_session_init,
.release = sink_forward_session_release,
};
@@ -628,13 +667,8 @@
*/
static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
{
- struct proxy *p = sink->forward_px;
struct appctx *appctx;
- struct session *sess;
- struct conn_stream *cs;
- struct stream *s;
struct applet *applet = &sink_forward_applet;
- struct sockaddr_storage *addr = NULL;
if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
applet = &sink_forward_oc_applet;
@@ -642,49 +676,16 @@
appctx = appctx_new(applet, NULL);
if (!appctx)
goto out_close;
-
appctx->svcctx = (void *)sft;
- sess = session_new(p, NULL, &appctx->obj_type);
- if (!sess) {
- ha_alert("out of memory in sink_forward_session_create().\n");
- goto out_free_appctx;
- }
- appctx->sess = sess;
-
- if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
- goto out_free_appctx;
-
- cs = cs_new_from_endp(appctx->endp, sess, &BUF_NULL);
- if (!cs) {
- ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
+ if (appctx_init(appctx) == -1)
goto out_free_appctx;
- }
- s = DISGUISE(cs_strm(cs));
- s->csb->dst = addr;
- s->csb->flags |= CS_FL_NOLINGER;
-
- s->target = &sft->srv->obj_type;
- s->flags = SF_ASSIGNED;
-
- s->do_log = NULL;
- s->uniq_id = 0;
-
- s->res.flags |= CF_READ_DONTWAIT;
- /* for rto and rex to eternity to not expire on idle recv:
- * We are using a syslog server.
- */
- s->res.rto = TICK_ETERNITY;
- s->res.rex = TICK_ETERNITY;
- sft->appctx = appctx;
return appctx;
/* Error unrolling */
- out_free_addr:
- sockaddr_free(&addr);
out_free_appctx:
- appctx_free(appctx);
+ appctx_free_on_early_error(appctx);
out_close:
return NULL;
}