MEDIUM: filters/lua: Support declaration of some filter callback functions in lua
It is now possible to write some filter callback functions in lua. All
filter callbacks are not supported yet but the mechanism to call them is now
in place. Following method may be defined in the Lua filter class to be
bound on filter callbacks:
* Filter:start_analyse(txn, chn)
* Filter:end_analyse(txn, chn)
* Filter:tcp_payload(txn, chn, offset, length)
hlua_filter_callback() function is responsible to call the good lua function
depending on the filter callback function. Using some flags it is possible
to allow a lua call to yield or not, to retrieve a return value or not, and
to specify if a channel or an http message must be passed as second
argument. For now, the HTTP part has not been added yet. It is also possible
to add extra argument adding them on the stack before the call.
3 new functions are exposed by the global object "filter". The first one,
filter.wake_time(ms_delay), to set the wake_time when a Lua callback
function yields (if allowed). The two others,
filter.register_data_filter(filter, chn) and
filter.unregister_data_filter(filter, chn), to enable or disable the data
filtering on a channel for a specific lua filter instance.
start_analyse() and end_analyse() may return one of the constant
filter.CONTINUE, filter.WAIT or filter.ERROR. If nothing is returned,
filter.CONTINUE is used as the default value. On its side, tcp_payload() may
return the amount of data to forward. If nothing is returned, all incoming
data are forwarded.
For now, these functions are not allowed to yield because this interferes
with the filter workflow.
Here is a simple example :
MyFilter = {}
MyFilter.id = "My Lua filter"
MyFilter.flags = filter.FLT_CFG_FL_HTX
MyFilter.__index = MyFilter
function MyFilter:new()
flt = {}
setmetatable(flt, MyFilter)
flt.req_len = 0
flt.res_len = 0
return flt
end
function MyFilter:start_analyze(txn, chn)
filter.register_data_filter(self, chn)
end
function MyFilter:end_analyze(txn, chn)
print("<Total> request: "..self.req_len.." - response: "..self.res_len)
end
function MyFilter:tcp_payload(txn, chn)
offset = chn:ouput()
len = chn:input()
if chn:is_resp() then
self.res_len = self.res_len + len
print("<TCP:Response> offset: "..offset.." - length: "..len)
else
self.req_len = self.req_len + len
print("<TCP:Request> offset: "..offset.." - length: "..len)
end
end
diff --git a/src/hlua.c b/src/hlua.c
index f3c0221..25a3e51 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -196,6 +196,11 @@
*/
static lua_State *hlua_states[MAX_THREADS + 1];
+#define HLUA_FLT_CB_FINAL 0x00000001
+#define HLUA_FLT_CB_RETVAL 0x00000002
+#define HLUA_FLT_CB_ARG_CHN 0x00000004
+#define HLUA_FLT_CB_ARG_HTTP_MSG 0x00000008
+
#define HLUA_FLT_CTX_FL_PAYLOAD 0x00000001
struct hlua_reg_filter {
@@ -9052,6 +9057,197 @@
return (flt_ctx && !!(flt_ctx->flags & HLUA_FLT_CTX_FL_PAYLOAD));
}
+static int hlua_filter_callback(struct stream *s, struct filter *filter, const char *fun,
+ int dir, unsigned int flags)
+{
+ struct hlua *flt_hlua;
+ struct hlua_flt_config *conf = FLT_CONF(filter);
+ struct hlua_flt_ctx *flt_ctx = filter->ctx;
+ unsigned int hflags = HLUA_TXN_FLT_CTX;
+ int ret = 1;
+
+ flt_hlua = flt_ctx->hlua[(dir == SMP_OPT_DIR_REQ ? 0 : 1)];
+ if (!flt_hlua)
+ goto end;
+
+ if (!HLUA_IS_RUNNING(flt_hlua)) {
+ int extra_idx = lua_gettop(flt_hlua->T);
+
+ /* The following Lua calls can fail. */
+ if (!SET_SAFE_LJMP(flt_hlua)) {
+ const char *error;
+
+ if (lua_type(flt_hlua->T, -1) == LUA_TSTRING)
+ error = lua_tostring(flt_hlua->T, -1);
+ else
+ error = "critical error";
+ SEND_ERR(s->be, "Lua filter '%s': %s.\n", conf->reg->name, error);
+ goto end;
+ }
+
+ /* Check stack size. */
+ if (!lua_checkstack(flt_hlua->T, 3)) {
+ SEND_ERR(s->be, "Lua filter '%s': full stack.\n", conf->reg->name);
+ RESET_SAFE_LJMP(flt_hlua);
+ goto end;
+ }
+
+ lua_rawgeti(flt_hlua->T, LUA_REGISTRYINDEX, flt_ctx->ref);
+ if (lua_getfield(flt_hlua->T, -1, fun) != LUA_TFUNCTION) {
+ RESET_SAFE_LJMP(flt_hlua);
+ goto end;
+ }
+ lua_insert(flt_hlua->T, -2);
+
+ if (!hlua_txn_new(flt_hlua->T, s, s->be, dir, hflags)) {
+ SEND_ERR(s->be, "Lua filter '%s': full stack.\n", conf->reg->name);
+ RESET_SAFE_LJMP(flt_hlua);
+ goto end;
+ }
+ flt_hlua->nargs = 2;
+
+ if (flags & HLUA_FLT_CB_ARG_CHN) {
+ if (dir == SMP_OPT_DIR_REQ)
+ lua_getfield(flt_hlua->T, -1, "req");
+ else
+ lua_getfield(flt_hlua->T, -1, "res");
+ if (lua_type(flt_hlua->T, -1) == LUA_TTABLE) {
+ lua_pushstring(flt_hlua->T, "__filter");
+ lua_pushlightuserdata(flt_hlua->T, filter);
+ lua_settable(flt_hlua->T, -3);
+ }
+ flt_hlua->nargs++;
+ }
+ else if (flags & HLUA_FLT_CB_ARG_HTTP_MSG) {
+ /* XXX: Not implemented yey */
+ }
+
+ /* Check stack size. */
+ if (!lua_checkstack(flt_hlua->T, 1)) {
+ SEND_ERR(s->be, "Lua filter '%s': full stack.\n", conf->reg->name);
+ RESET_SAFE_LJMP(flt_hlua);
+ goto end;
+ }
+
+ while (extra_idx--) {
+ lua_pushvalue(flt_hlua->T, 1);
+ lua_remove(flt_hlua->T, 1);
+ flt_hlua->nargs++;
+ }
+
+ /* We must initialize the execution timeouts. */
+ flt_hlua->max_time = hlua_timeout_session;
+
+ /* At this point the execution is safe. */
+ RESET_SAFE_LJMP(flt_hlua);
+ }
+
+ switch (hlua_ctx_resume(flt_hlua, !(flags & HLUA_FLT_CB_FINAL))) {
+ case HLUA_E_OK:
+ /* Catch the return value if it required */
+ if ((flags & HLUA_FLT_CB_RETVAL) && lua_gettop(flt_hlua->T) > 0) {
+ ret = lua_tointeger(flt_hlua->T, -1);
+ lua_settop(flt_hlua->T, 0); /* Empty the stack. */
+ }
+
+ /* Set timeout in the required channel. */
+ if (flt_hlua->wake_time != TICK_ETERNITY) {
+ if (dir == SMP_OPT_DIR_REQ)
+ s->req.analyse_exp = flt_hlua->wake_time;
+ else
+ s->res.analyse_exp = flt_hlua->wake_time;
+ }
+ break;
+ case HLUA_E_AGAIN:
+ /* Set timeout in the required channel. */
+ if (flt_hlua->wake_time != TICK_ETERNITY) {
+ if (dir == SMP_OPT_DIR_REQ)
+ s->req.analyse_exp = flt_hlua->wake_time;
+ else
+ s->res.analyse_exp = flt_hlua->wake_time;
+ }
+ /* Some actions can be wake up when a "write" event
+ * is detected on a response channel. This is useful
+ * only for actions targeted on the requests.
+ */
+ if (HLUA_IS_WAKERESWR(flt_hlua))
+ s->res.flags |= CF_WAKE_WRITE;
+ if (HLUA_IS_WAKEREQWR(flt_hlua))
+ s->req.flags |= CF_WAKE_WRITE;
+ ret = 0;
+ goto end;
+ case HLUA_E_ERRMSG:
+ SEND_ERR(s->be, "Lua filter '%s' : %s.\n", conf->reg->name, lua_tostring(flt_hlua->T, -1));
+ ret = -1;
+ goto end;
+ case HLUA_E_ETMOUT:
+ SEND_ERR(s->be, "Lua filter '%s' : '%s' callback execution timeout.\n", conf->reg->name, fun);
+ goto end;
+ case HLUA_E_NOMEM:
+ SEND_ERR(s->be, "Lua filter '%s' : out of memory error.\n", conf->reg->name);
+ goto end;
+ case HLUA_E_YIELD:
+ SEND_ERR(s->be, "Lua filter '%s': yield functions like core.tcp() or core.sleep()"
+ " are not allowed from '%s' callback.\n", conf->reg->name, fun);
+ goto end;
+ case HLUA_E_ERR:
+ SEND_ERR(s->be, "Lua filter '%s': '%s' returns an unknown error.\n", conf->reg->name, fun);
+ goto end;
+ default:
+ goto end;
+ }
+
+
+ end:
+ return ret;
+}
+
+static int hlua_filter_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
+{
+ struct hlua_flt_ctx *flt_ctx = filter->ctx;
+
+ flt_ctx->flags = 0;
+ return hlua_filter_callback(s, filter, "start_analyze",
+ (!(chn->flags & CF_ISRESP) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES),
+ (HLUA_FLT_CB_FINAL | HLUA_FLT_CB_RETVAL | HLUA_FLT_CB_ARG_CHN));
+}
+
+static int hlua_filter_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
+{
+ struct hlua_flt_ctx *flt_ctx = filter->ctx;
+
+ flt_ctx->flags &= ~HLUA_FLT_CTX_FL_PAYLOAD;
+ return hlua_filter_callback(s, filter, "end_analyze",
+ (!(chn->flags & CF_ISRESP) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES),
+ (HLUA_FLT_CB_FINAL | HLUA_FLT_CB_RETVAL | HLUA_FLT_CB_ARG_CHN));
+}
+
+static int hlua_filter_tcp_payload(struct stream *s, struct filter *filter, struct channel *chn,
+ unsigned int offset, unsigned int len)
+{
+ struct hlua_flt_ctx *flt_ctx = filter->ctx;
+ struct hlua *flt_hlua;
+ int dir = (!(chn->flags & CF_ISRESP) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
+ int idx = (dir == SMP_OPT_DIR_REQ ? 0 : 1);
+ int ret;
+
+ flt_hlua = flt_ctx->hlua[idx];
+ flt_ctx->cur_off[idx] = offset;
+ flt_ctx->cur_len[idx] = len;
+ flt_ctx->flags |= HLUA_FLT_CTX_FL_PAYLOAD;
+ ret = hlua_filter_callback(s, filter, "tcp_payload", dir, (HLUA_FLT_CB_FINAL | HLUA_FLT_CB_ARG_CHN));
+ if (ret != -1) {
+ ret = flt_ctx->cur_len[idx];
+ if (lua_gettop(flt_hlua->T) > 0) {
+ ret = lua_tointeger(flt_hlua->T, -1);
+ if (ret > flt_ctx->cur_len[idx])
+ ret = flt_ctx->cur_len[idx];
+ lua_settop(flt_hlua->T, 0); /* Empty the stack. */
+ }
+ }
+ return ret;
+}
+
static int hlua_filter_parse_fct(char **args, int *cur_arg, struct proxy *px,
struct flt_conf *fconf, char **err, void *private)
{
@@ -9084,9 +9280,15 @@
/* Push the filter class on the stack and resolve all callbacks */
lua_rawgeti(L, LUA_REGISTRYINDEX, reg_flt->flt_ref[state_id]);
- /*
- * XXX: no callback supported for now
- */
+ if (lua_getfield(L, -1, "start_analyze") == LUA_TFUNCTION)
+ hlua_flt_ops->channel_start_analyze = hlua_filter_start_analyze;
+ lua_pop(L, 1);
+ if (lua_getfield(L, -1, "end_analyze") == LUA_TFUNCTION)
+ hlua_flt_ops->channel_end_analyze = hlua_filter_end_analyze;
+ lua_pop(L, 1);
+ if (lua_getfield(L, -1, "tcp_payload") == LUA_TFUNCTION)
+ hlua_flt_ops->tcp_payload = hlua_filter_tcp_payload;
+ lua_pop(L, 1);
/* Get id and flags of the filter class */
if (lua_getfield(L, -1, "id") == LUA_TSTRING)
@@ -9127,6 +9329,42 @@
return -1;
}
+__LJMP static int hlua_register_data_filter(lua_State *L)
+{
+ struct filter *filter;
+ struct channel *chn;
+
+ MAY_LJMP(check_args(L, 2, "register_data_filter"));
+ MAY_LJMP(luaL_checktype(L, 1, LUA_TTABLE));
+ chn = MAY_LJMP(hlua_checkchannel(L, 2));
+
+ lua_getfield(L, 1, "__filter");
+ MAY_LJMP(luaL_checktype(L, -1, LUA_TLIGHTUSERDATA));
+ filter = lua_touserdata (L, -1);
+ lua_pop(L, 1);
+
+ register_data_filter(chn_strm(chn), chn, filter);
+ return 1;
+}
+
+__LJMP static int hlua_unregister_data_filter(lua_State *L)
+{
+ struct filter *filter;
+ struct channel *chn;
+
+ MAY_LJMP(check_args(L, 2, "unregister_data_filter"));
+ MAY_LJMP(luaL_checktype(L, 1, LUA_TTABLE));
+ chn = MAY_LJMP(hlua_checkchannel(L, 2));
+
+ lua_getfield(L, 1, "__filter");
+ MAY_LJMP(luaL_checktype(L, -1, LUA_TLIGHTUSERDATA));
+ filter = lua_touserdata (L, -1);
+ lua_pop(L, 1);
+
+ unregister_data_filter(chn_strm(chn), chn, filter);
+ return 1;
+}
+
/* This function is an LUA binding used for registering a filter. It expects a
* fileter name used in the haproxy configuration file and a LUA function to
* parse configuration arguments.
@@ -9896,6 +10134,10 @@
hlua_class_const_int(L, "FLT_CFG_FL_HTX", FLT_CFG_FL_HTX);
+ hlua_class_function(L, "wake_time", hlua_set_wake_time);
+ hlua_class_function(L, "register_data_filter", hlua_register_data_filter);
+ hlua_class_function(L, "unregister_data_filter", hlua_unregister_data_filter);
+
lua_setglobal(L, "filter");
/*