MEDIUM: lua: socket: add "socket" class for TCP I/O
This patch adds the TCP I/O functionnality. The class implemented
provides the same functions than the "lua socket" project. This
make network compatibility with another LUA project. The documentation
is located here:
http://w3.impa.br/~diego/software/luasocket/tcp.html
diff --git a/include/types/hlua.h b/include/types/hlua.h
index f21f4e1..4925155 100644
--- a/include/types/hlua.h
+++ b/include/types/hlua.h
@@ -2,9 +2,14 @@
#define _TYPES_HLUA_H
#include <lua.h>
+#include <lauxlib.h>
+
+#include <types/proxy.h>
+#include <types/server.h>
#define CLASS_CORE "Core"
#define CLASS_TXN "TXN"
+#define CLASS_SOCKET "Socket"
struct session;
@@ -100,4 +105,12 @@
unsigned int wakeup_ms; /* hour to wakeup. */
};
+/* This struct is used to create coprocess doing TCP or
+ * SSL I/O. It uses a fake session.
+ */
+struct hlua_socket {
+ struct session *s; /* Session used for socket I/O. */
+ luaL_Buffer b; /* buffer used to prepare strings. */
+};
+
#endif /* _TYPES_HLUA_H */
diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h
index e060634..ab0e750 100644
--- a/include/types/stream_interface.h
+++ b/include/types/stream_interface.h
@@ -27,6 +27,9 @@
#include <types/channel.h>
#include <types/connection.h>
+#ifdef USE_LUA
+#include <types/hlua.h>
+#endif
#include <types/obj_type.h>
#include <common/config.h>
@@ -150,6 +153,14 @@
struct pattern_expr *expr;
struct chunk chunk;
} map;
+#ifdef USE_LUA
+ struct {
+ int connected;
+ struct hlua_socket *socket;
+ struct list wake_on_read;
+ struct list wake_on_write;
+ } hlua;
+#endif
} ctx; /* used by stats I/O handlers to dump the stats */
};
diff --git a/src/hlua.c b/src/hlua.c
index 6f85d99..0f06188 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -1,3 +1,5 @@
+#include <sys/socket.h>
+
#include <lauxlib.h>
#include <lua.h>
#include <lualib.h>
@@ -6,16 +8,24 @@
#include <common/cfgparse.h>
+#include <types/connection.h>
#include <types/hlua.h>
#include <types/proto_tcp.h>
#include <types/proxy.h>
#include <proto/arg.h>
+#include <proto/channel.h>
#include <proto/hdr_idx.h>
+#include <proto/obj_type.h>
#include <proto/payload.h>
#include <proto/proto_http.h>
#include <proto/proto_tcp.h>
+#include <proto/raw_sock.h>
#include <proto/sample.h>
+#include <proto/server.h>
+#include <proto/session.h>
+#include <proto/ssl_sock.h>
+#include <proto/stream_interface.h>
#include <proto/task.h>
/* Lua uses longjmp to perform yield or throwing errors. This
@@ -38,6 +48,13 @@
struct pool_head *pool2_hlua_com;
struct pool_head *pool2_hlua_sleep;
+/* Used for Socket connection. */
+static struct proxy socket_proxy;
+static struct server socket_tcp;
+#ifdef USE_OPENSSL
+static struct server socket_ssl;
+#endif
+
/* List head of the function called at the initialisation time. */
struct list hlua_init_functions = LIST_HEAD_INIT(hlua_init_functions);
@@ -53,6 +70,7 @@
*/
static int class_core_ref;
static int class_txn_ref;
+static int class_socket_ref;
/* These functions converts types between HAProxy internal args or
* sample and LUA types. Another function permits to check if the
@@ -773,6 +791,937 @@
* __lt "<"
* __le "<="
*/
+
+/*
+ *
+ *
+ * Class Socket
+ *
+ *
+ */
+
+__LJMP static struct hlua_socket *hlua_checksocket(lua_State *L, int ud)
+{
+ return (struct hlua_socket *)MAY_LJMP(hlua_checkudata(L, ud, class_socket_ref));
+}
+
+/* This function is the handler called for each I/O on the established
+ * connection. It is used for notify space avalaible to send or data
+ * received.
+ */
+static void hlua_socket_handler(struct stream_interface *si)
+{
+ struct appctx *appctx = objt_appctx(si->end);
+ struct connection *c = objt_conn(si->ib->cons->end);
+
+ /* Wakeup the main session if the client connection is closed. */
+ if (!c || channel_output_closed(si->ib) || channel_input_closed(si->ob)) {
+ if (appctx->ctx.hlua.socket) {
+ appctx->ctx.hlua.socket->s = NULL;
+ appctx->ctx.hlua.socket = NULL;
+ }
+ si_shutw(si);
+ si_shutr(si);
+ si->ib->flags |= CF_READ_NULL;
+ hlua_com_wake(&appctx->ctx.hlua.wake_on_read);
+ hlua_com_wake(&appctx->ctx.hlua.wake_on_write);
+ return;
+ }
+
+ if (!(c->flags & CO_FL_CONNECTED))
+ return;
+
+ /* This function is called after the connect. */
+ appctx->ctx.hlua.connected = 1;
+
+ /* Wake the tasks which wants to write if the buffer have avalaible space. */
+ if (channel_may_recv(si->ob))
+ hlua_com_wake(&appctx->ctx.hlua.wake_on_write);
+
+ /* Wake the tasks which wants to read if the buffer contains data. */
+ if (channel_is_empty(si->ib))
+ hlua_com_wake(&appctx->ctx.hlua.wake_on_read);
+}
+
+/* This function is called when the "struct session" is destroyed.
+ * Remove the link from the object to this session.
+ * Wake all the pending signals.
+ */
+static void hlua_socket_release(struct stream_interface *si)
+{
+ struct appctx *appctx = objt_appctx(si->end);
+
+ /* Remove my link in the original object. */
+ if (appctx->ctx.hlua.socket)
+ appctx->ctx.hlua.socket->s = NULL;
+
+ /* Wake all the task waiting for me. */
+ hlua_com_wake(&appctx->ctx.hlua.wake_on_read);
+ hlua_com_wake(&appctx->ctx.hlua.wake_on_write);
+}
+
+/* If the garbage collectio of the object is launch, nobody
+ * uses this object. If the session does not exists, just quit.
+ * Send the shutdown signal to the session. In some cases,
+ * pending signal can rest in the read and write lists. destroy
+ * it.
+ */
+__LJMP static int hlua_socket_gc(lua_State *L)
+{
+ MAY_LJMP(check_args(L, 1, "__gc"));
+
+ struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1));
+ struct appctx *appctx;
+
+ if (!socket->s)
+ return 0;
+
+ /* Remove all reference between the Lua stack and the coroutine session. */
+ appctx = objt_appctx(socket->s->si[0].end);
+ session_shutdown(socket->s, SN_ERR_KILLED);
+ socket->s = NULL;
+ appctx->ctx.hlua.socket = NULL;
+
+ return 0;
+}
+
+/* The close function send shutdown signal and break the
+ * links between the session and the object.
+ */
+__LJMP static int hlua_socket_close(lua_State *L)
+{
+ MAY_LJMP(check_args(L, 1, "close"));
+
+ struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1));
+ struct appctx *appctx;
+
+ if (!socket->s)
+ return 0;
+
+ /* Close the session and remove the associated stop task. */
+ session_shutdown(socket->s, SN_ERR_KILLED);
+ appctx = objt_appctx(socket->s->si[0].end);
+ appctx->ctx.hlua.socket = NULL;
+ socket->s = NULL;
+
+ return 0;
+}
+
+/* This Lua function assumes that the stack contain three parameters.
+ * 1 - USERDATA containing a struct socket
+ * 2 - INTEGER with values of the macro defined below
+ * If the integer is -1, we must read at most one line.
+ * If the integer is -2, we ust read all the data until the
+ * end of the stream.
+ * If the integer is positive value, we must read a number of
+ * bytes corresponding to this value.
+ */
+#define HLSR_READ_LINE (-1)
+#define HLSR_READ_ALL (-2)
+__LJMP static int hlua_socket_receive_yield(struct lua_State *L)
+{
+ struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1));
+ int wanted = lua_tointeger(L, 2);
+ struct hlua *hlua = hlua_gethlua(L);
+ struct appctx *appctx;
+ int len;
+ int nblk;
+ char *blk1;
+ int len1;
+ char *blk2;
+ int len2;
+
+ /* Check if this lua stack is schedulable. */
+ if (!hlua || !hlua->task)
+ WILL_LJMP(luaL_error(L, "The 'receive' function is only allowed in "
+ "'frontend', 'backend' or 'task'"));
+
+ /* check for connection closed. If some data where read, return it. */
+ if (!socket->s)
+ goto connection_closed;
+
+ if (wanted == HLSR_READ_LINE) {
+
+ /* Read line. */
+ nblk = bo_getline_nc(socket->s->si[0].ob, &blk1, &len1, &blk2, &len2);
+ if (nblk < 0) /* Connection close. */
+ goto connection_closed;
+ if (nblk == 0) /* No data avalaible. */
+ goto connection_empty;
+ }
+
+ else if (wanted == HLSR_READ_ALL) {
+
+ /* Read all the available data. */
+ nblk = bo_getblk_nc(socket->s->si[0].ob, &blk1, &len1, &blk2, &len2);
+ if (nblk < 0) /* Connection close. */
+ goto connection_closed;
+ if (nblk == 0) /* No data avalaible. */
+ goto connection_empty;
+ }
+
+ else {
+
+ /* Read a block of data. */
+ nblk = bo_getblk_nc(socket->s->si[0].ob, &blk1, &len1, &blk2, &len2);
+ if (nblk < 0) /* Connection close. */
+ goto connection_closed;
+ if (nblk == 0) /* No data avalaible. */
+ goto connection_empty;
+
+ if (len1 > wanted) {
+ nblk = 1;
+ len1 = wanted;
+ } if (nblk == 2 && len1 + len2 > wanted)
+ len2 = wanted - len1;
+ }
+
+ len = len1;
+
+ luaL_addlstring(&socket->b, blk1, len1);
+ if (nblk == 2) {
+ len += len2;
+ luaL_addlstring(&socket->b, blk2, len2);
+ }
+
+ /* Consume data. */
+ bo_skip(socket->s->si[0].ob, len);
+
+ /* Don't wait anything. */
+ si_update(&socket->s->si[0]);
+
+ /* If the pattern reclaim to read all the data
+ * in the connection, got out.
+ */
+ if (wanted == HLSR_READ_ALL)
+ goto connection_empty;
+ else if (wanted >= 0 && len < wanted)
+ goto connection_empty;
+
+ /* Return result. */
+ luaL_pushresult(&socket->b);
+ return 1;
+
+connection_closed:
+
+ /* If the buffer containds data. */
+ if (socket->b.n > 0) {
+ luaL_pushresult(&socket->b);
+ return 1;
+ }
+ lua_pushnil(L);
+ lua_pushstring(L, "connection closed.");
+ return 2;
+
+connection_empty:
+
+ appctx = objt_appctx(socket->s->si[0].end);
+ if (!hlua_com_new(hlua, &appctx->ctx.hlua.wake_on_read))
+ WILL_LJMP(luaL_error(L, "out of memory"));
+ WILL_LJMP(lua_yieldk(L, 0, 0, hlua_socket_receive_yield));
+ return 0;
+}
+
+/* This Lus function gets two parameters. The first one can be string
+ * or a number. If the string is "*l", the user require one line. If
+ * the string is "*a", the user require all the content of the stream.
+ * If the value is a number, the user require a number of bytes equal
+ * to the value. The default value is "*l" (a line).
+ *
+ * This paraeter with a variable type is converted in integer. This
+ * integer takes this values:
+ * -1 : read a line
+ * -2 : read all the stream
+ * >0 : amount if bytes.
+ *
+ * The second parameter is optinal. It contains a string that must be
+ * concatenated with the read data.
+ */
+__LJMP static int hlua_socket_receive(struct lua_State *L)
+{
+ int wanted = HLSR_READ_LINE;
+ const char *pattern;
+ int type;
+ char *error;
+ size_t len;
+
+ if (lua_gettop(L) < 1 || lua_gettop(L) > 3)
+ WILL_LJMP(luaL_error(L, "The 'receive' function requires between 1 and 3 arguments."));
+
+ struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1));
+
+ /* check for pattern. */
+ if (lua_gettop(L) >= 2) {
+ type = lua_type(L, 2);
+ if (type == LUA_TSTRING) {
+ pattern = lua_tostring(L, 2);
+ if (strcmp(pattern, "*a") == 0)
+ wanted = HLSR_READ_ALL;
+ else if (strcmp(pattern, "*l") == 0)
+ wanted = HLSR_READ_LINE;
+ else {
+ wanted = strtoll(pattern, &error, 10);
+ if (*error != '\0')
+ WILL_LJMP(luaL_error(L, "Unsupported pattern."));
+ }
+ }
+ else if (type == LUA_TNUMBER) {
+ wanted = lua_tointeger(L, 2);
+ if (wanted < 0)
+ WILL_LJMP(luaL_error(L, "Unsupported size."));
+ }
+ }
+
+ /* Set pattern. */
+ lua_pushinteger(L, wanted);
+ lua_replace(L, 2);
+
+ /* init bufffer, and fiil it wih prefix. */
+ luaL_buffinit(L, &socket->b);
+
+ /* Check prefix. */
+ if (lua_gettop(L) >= 3) {
+ if (lua_type(L, 3) != LUA_TSTRING)
+ WILL_LJMP(luaL_error(L, "Expect a 'string' for the prefix"));
+ pattern = lua_tolstring(L, 3, &len);
+ luaL_addlstring(&socket->b, pattern, len);
+ }
+
+ return __LJMP(hlua_socket_receive_yield(L));
+}
+
+/* Write the Lua input string in the output buffer.
+ * This fucntion returns a yield if no space are available.
+ */
+static int hlua_socket_write_yield(struct lua_State *L)
+{
+ struct hlua_socket *socket;
+ struct hlua *hlua = hlua_gethlua(L);
+ struct appctx *appctx;
+ size_t buf_len;
+ const char *buf;
+ int len;
+ int send_len;
+ int sent;
+
+ /* Check if this lua stack is schedulable. */
+ if (!hlua || !hlua->task)
+ WILL_LJMP(luaL_error(L, "The 'write' function is only allowed in "
+ "'frontend', 'backend' or 'task'"));
+
+ /* Get object */
+ socket = MAY_LJMP(hlua_checksocket(L, 1));
+ buf = MAY_LJMP(luaL_checklstring(L, 2, &buf_len));
+ sent = MAY_LJMP(luaL_checkunsigned(L, 3));
+
+ /* Check for connection close. */
+ if (!socket->s || channel_output_closed(socket->s->req)) {
+ lua_pushinteger(L, -1);
+ return 1;
+ }
+
+ /* Update the input buffer data. */
+ buf += sent;
+ send_len = buf_len - sent;
+
+ /* All the data are sent. */
+ if (sent >= buf_len)
+ return 1; /* Implicitly return the length sent. */
+
+ /* Check for avalaible space. */
+ len = buffer_total_space(socket->s->si[0].ib->buf);
+ if (len <= 0)
+ goto hlua_socket_write_yield_return;
+
+ /* send data */
+ if (len < send_len)
+ send_len = len;
+ len = bi_putblk(socket->s->si[0].ib, buf+sent, send_len);
+
+ /* "Not enough space" (-1), "Buffer too little to contain
+ * the data" (-2) are not expected because the available length
+ * is tested.
+ * Other unknown error are also not expected.
+ */
+ if (len <= 0) {
+ MAY_LJMP(hlua_socket_close(L));
+ lua_pop(L, 1);
+ lua_pushunsigned(L, -1);
+ return 1;
+ }
+
+ /* update buffers. */
+ si_update(&socket->s->si[0]);
+ socket->s->si[0].ib->rex = TICK_ETERNITY;
+ socket->s->si[0].ob->wex = TICK_ETERNITY;
+
+ /* Update length sent. */
+ lua_pop(L, 1);
+ lua_pushunsigned(L, sent + len);
+
+ /* All the data buffer is sent ? */
+ if (sent + len >= buf_len)
+ return 1;
+
+hlua_socket_write_yield_return:
+ appctx = objt_appctx(socket->s->si[0].end);
+ if (!hlua_com_new(hlua, &appctx->ctx.hlua.wake_on_write))
+ WILL_LJMP(luaL_error(L, "out of memory"));
+ WILL_LJMP(lua_yieldk(L, 0, 0, hlua_socket_write_yield));
+ return 0;
+}
+
+/* This function initiate the send of data. It just check the input
+ * parameters and push an integer in the Lua stack that contain the
+ * amount of data writed in the buffer. This is used by the function
+ * "hlua_socket_write_yield" that can yield.
+ *
+ * The Lua function gets between 3 and 4 parameters. The first one is
+ * the associated object. The second is a string buffer. The third is
+ * a facultative integer that represents where is the buffer position
+ * of the start of the data that can send. The first byte is the
+ * position "1". The default value is "1". The fourth argument is a
+ * facultative integer that represents where is the buffer position
+ * of the end of the data that can send. The default is the last byte.
+ */
+static int hlua_socket_send(struct lua_State *L)
+{
+ int i;
+ int j;
+ const char *buf;
+ size_t buf_len;
+
+ /* Check number of arguments. */
+ if (lua_gettop(L) < 2 || lua_gettop(L) > 4)
+ WILL_LJMP(luaL_error(L, "'send' needs between 2 and 4 arguments"));
+
+ /* Get the string. */
+ buf = MAY_LJMP(luaL_checklstring(L, 2, &buf_len));
+
+ /* Get and check j. */
+ if (lua_gettop(L) == 4) {
+ j = MAY_LJMP(luaL_checkinteger(L, 4));
+ if (j < 0)
+ j = buf_len + j + 1;
+ if (j > buf_len)
+ j = buf_len + 1;
+ lua_pop(L, 1);
+ }
+ else
+ j = buf_len;
+
+ /* Get and check i. */
+ if (lua_gettop(L) == 3) {
+ i = MAY_LJMP(luaL_checkinteger(L, 3));
+ if (i < 0)
+ i = buf_len + i + 1;
+ if (i > buf_len)
+ i = buf_len + 1;
+ lua_pop(L, 1);
+ } else
+ i = 1;
+
+ /* Check bth i and j. */
+ if (i > j) {
+ lua_pushunsigned(L, 0);
+ return 1;
+ }
+ if (i == 0 && j == 0) {
+ lua_pushunsigned(L, 0);
+ return 1;
+ }
+ if (i == 0)
+ i = 1;
+ if (j == 0)
+ j = 1;
+
+ /* Pop the string. */
+ lua_pop(L, 1);
+
+ /* Update the buffer length. */
+ buf += i - 1;
+ buf_len = j - i + 1;
+ lua_pushlstring(L, buf, buf_len);
+
+ /* This unsigned is used to remember the amount of sent data. */
+ lua_pushunsigned(L, 0);
+
+ return MAY_LJMP(hlua_socket_write_yield(L));
+}
+
+#define SOCKET_INFO_EXPANDED_FORM "[0000:0000:0000:0000:0000:0000:0000:0000]:12345"
+static char _socket_info_expanded_form[] = SOCKET_INFO_EXPANDED_FORM;
+#define SOCKET_INFO_MAX_LEN (sizeof(_socket_info_expanded_form))
+__LJMP static inline int hlua_socket_info(struct lua_State *L, struct sockaddr_storage *addr)
+{
+ static char buffer[SOCKET_INFO_MAX_LEN];
+ int ret;
+ int len;
+ char *p;
+
+ ret = addr_to_str(addr, buffer+1, SOCKET_INFO_MAX_LEN-1);
+ if (ret <= 0) {
+ lua_pushnil(L);
+ return 1;
+ }
+
+ if (ret == AF_UNIX) {
+ lua_pushstring(L, buffer+1);
+ return 1;
+ }
+ else if (ret == AF_INET6) {
+ buffer[0] = '[';
+ len = strlen(buffer);
+ buffer[len] = ']';
+ len++;
+ buffer[len] = ':';
+ len++;
+ p = buffer;
+ }
+ else if (ret == AF_INET) {
+ p = buffer + 1;
+ len = strlen(p);
+ p[len] = ':';
+ len++;
+ }
+ else {
+ lua_pushnil(L);
+ return 1;
+ }
+
+ if (port_to_str(addr, p + len, SOCKET_INFO_MAX_LEN-1 - len) <= 0) {
+ lua_pushnil(L);
+ return 1;
+ }
+
+ lua_pushstring(L, p);
+ return 1;
+}
+
+/* Returns information about the peer of the connection. */
+__LJMP static int hlua_socket_getpeername(struct lua_State *L)
+{
+ MAY_LJMP(check_args(L, 1, "getpeername"));
+
+ struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1));
+
+ /* Check if the tcp object is avalaible. */
+ if (!socket->s) {
+ lua_pushnil(L);
+ return 1;
+ }
+
+ struct connection *conn = objt_conn(socket->s->si[1].end);
+ if (!conn) {
+ lua_pushnil(L);
+ return 1;
+ }
+
+ if (!(conn->flags & CO_FL_ADDR_TO_SET)) {
+ unsigned int salen = sizeof(conn->addr.to);
+ if (getpeername(conn->t.sock.fd, (struct sockaddr *)&conn->addr.to, &salen) == -1) {
+ lua_pushnil(L);
+ return 1;
+ }
+ conn->flags |= CO_FL_ADDR_TO_SET;
+ }
+
+ return MAY_LJMP(hlua_socket_info(L, &conn->addr.to));
+}
+
+/* Returns information about my connection side. */
+static int hlua_socket_getsockname(struct lua_State *L)
+{
+ MAY_LJMP(check_args(L, 1, "getsockname"));
+
+ struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1));
+
+ /* Check if the tcp object is avalaible. */
+ if (!socket->s) {
+ lua_pushnil(L);
+ return 1;
+ }
+
+ struct connection *conn = objt_conn(socket->s->si[1].end);
+ if (!conn) {
+ lua_pushnil(L);
+ return 1;
+ }
+
+ if (!(conn->flags & CO_FL_ADDR_FROM_SET)) {
+ unsigned int salen = sizeof(conn->addr.from);
+ if (getsockname(conn->t.sock.fd, (struct sockaddr *)&conn->addr.from, &salen) == -1) {
+ lua_pushnil(L);
+ return 1;
+ }
+ conn->flags |= CO_FL_ADDR_FROM_SET;
+ }
+
+ return hlua_socket_info(L, &conn->addr.from);
+}
+
+/* This struct define the applet. */
+static struct si_applet update_applet = {
+ .obj_type = OBJ_TYPE_APPLET,
+ .name = "<LUA_TCP>",
+ .fct = hlua_socket_handler,
+ .release = hlua_socket_release,
+};
+
+__LJMP static int hlua_socket_connect_yield(struct lua_State *L)
+{
+ struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1));
+ struct hlua *hlua = hlua_gethlua(L);
+ struct appctx *appctx;
+
+ /* Check for connection close. */
+ if (!hlua || !socket->s || channel_output_closed(socket->s->req)) {
+ lua_pushnil(L);
+ lua_pushstring(L, "Can't connect");
+ return 2;
+ }
+
+ appctx = objt_appctx(socket->s->si[0].end);
+
+ /* Check for connection established. */
+ if (appctx->ctx.hlua.connected) {
+ lua_pushinteger(L, 1);
+ return 1;
+ }
+
+ if (!hlua_com_new(hlua, &appctx->ctx.hlua.wake_on_write))
+ WILL_LJMP(luaL_error(L, "out of memory error"));
+ WILL_LJMP(lua_yieldk(L, 0, 0, hlua_socket_connect_yield));
+ return 0;
+}
+
+/* This function fail or initite the connection. */
+__LJMP static int hlua_socket_connect(struct lua_State *L)
+{
+ struct hlua_socket *socket;
+ unsigned int port;
+ const char *ip;
+ struct connection *conn;
+
+ MAY_LJMP(check_args(L, 3, "connect"));
+
+ /* Get args. */
+ socket = MAY_LJMP(hlua_checksocket(L, 1));
+ ip = MAY_LJMP(luaL_checkstring(L, 2));
+ port = MAY_LJMP(luaL_checkunsigned(L, 3));
+
+ conn = si_alloc_conn(socket->s->req->cons, 0);
+ if (!conn)
+ WILL_LJMP(luaL_error(L, "connect: internal error"));
+
+ /* Parse ip address. */
+ conn->addr.to.ss_family = AF_UNSPEC;
+ if (!str2ip2(ip, &conn->addr.to, 0))
+ WILL_LJMP(luaL_error(L, "connect: cannot parse ip address '%s'", ip));
+
+ /* Set port. */
+ if (conn->addr.to.ss_family == AF_INET)
+ ((struct sockaddr_in *)&conn->addr.to)->sin_port = htons(port);
+ else if (conn->addr.to.ss_family == AF_INET6)
+ ((struct sockaddr_in6 *)&conn->addr.to)->sin6_port = htons(port);
+
+ /* it is important not to call the wakeup function directly but to
+ * pass through task_wakeup(), because this one knows how to apply
+ * priorities to tasks.
+ */
+ task_wakeup(socket->s->task, TASK_WOKEN_INIT);
+
+ WILL_LJMP(lua_yieldk(L, 0, 0, hlua_socket_connect_yield));
+
+ return 0;
+}
+
+__LJMP static int hlua_socket_connect_ssl(struct lua_State *L)
+{
+ struct hlua_socket *socket;
+
+ MAY_LJMP(check_args(L, 3, "connect_ssl"));
+ socket = MAY_LJMP(hlua_checksocket(L, 1));
+ socket->s->target = &socket_ssl.obj_type;
+ return MAY_LJMP(hlua_socket_connect(L));
+}
+
+__LJMP static int hlua_socket_setoption(struct lua_State *L)
+{
+ return 0;
+}
+
+__LJMP static int hlua_socket_settimeout(struct lua_State *L)
+{
+ MAY_LJMP(check_args(L, 2, "settimeout"));
+
+ struct hlua_socket *socket = MAY_LJMP(hlua_checksocket(L, 1));
+ unsigned int tmout = MAY_LJMP(luaL_checkunsigned(L, 2)) * 1000;
+
+ socket->s->req->rto = tmout;
+ socket->s->req->wto = tmout;
+ socket->s->rep->rto = tmout;
+ socket->s->rep->wto = tmout;
+
+ return 0;
+}
+
+__LJMP static int hlua_socket_new(lua_State *L)
+{
+ struct hlua_socket *socket;
+ struct appctx *appctx;
+
+ /* Check stack size. */
+ if (!lua_checkstack(L, 2)) {
+ hlua_pusherror(L, "socket: full stack");
+ goto out_fail_conf;
+ }
+
+ socket = MAY_LJMP(lua_newuserdata(L, sizeof(*socket)));
+ memset(socket, 0, sizeof(*socket));
+
+ /* Pop a class session metatable and affect it to the userdata. */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, class_socket_ref);
+ lua_setmetatable(L, -2);
+
+ /*
+ *
+ * Get memory for the request.
+ *
+ */
+
+ socket->s = pool_alloc2(pool2_session);
+ if (!socket->s) {
+ hlua_pusherror(L, "socket: out of memory");
+ goto out_fail_conf;
+ }
+
+ socket->s->task = task_new();
+ if (!socket->s->task) {
+ hlua_pusherror(L, "socket: out of memory");
+ goto out_free_session;
+ }
+
+ socket->s->req = pool_alloc2(pool2_channel);
+ if (!socket->s->req) {
+ hlua_pusherror(L, "socket: out of memory");
+ goto out_fail_req;
+ }
+
+ socket->s->req->buf = pool_alloc2(pool2_buffer);
+ if (!socket->s->req->buf) {
+ hlua_pusherror(L, "socket: out of memory");
+ goto out_fail_req_buf;
+ }
+
+ socket->s->rep = pool_alloc2(pool2_channel);
+ if (!socket->s->rep) {
+ hlua_pusherror(L, "socket: out of memory");
+ goto out_fail_rep;
+ }
+
+ socket->s->rep->buf = pool_alloc2(pool2_buffer);
+ if (!socket->s->rep->buf) {
+ hlua_pusherror(L, "socket: out of memory");
+ goto out_fail_rep_buf;
+ }
+
+ /* Configura empty Lua for the session. */
+ socket->s->hlua.T = NULL;
+ socket->s->hlua.Tref = LUA_REFNIL;
+ socket->s->hlua.Mref = LUA_REFNIL;
+ socket->s->hlua.nargs = 0;
+ socket->s->hlua.state = HLUA_STOP;
+ LIST_INIT(&socket->s->hlua.com);
+
+ /* session initialisation. */
+ session_init_srv_conn(socket->s);
+
+ /*
+ *
+ * Configure the associated task.
+ *
+ */
+
+ /* This is the dedicated function to process the session. This function
+ * is able to establish the conection, process the timeouts, etc ...
+ */
+ socket->s->task->process = process_session;
+
+ /* Back reference to session. This is used by process_session(). */
+ socket->s->task->context = socket->s;
+
+ /* The priority of the task is normal. */
+ socket->s->task->nice = 0;
+
+ /* Init the next run to eternity. Later in this function, this task is
+ * waked.
+ */
+ socket->s->task->expire = TICK_ETERNITY;
+
+ /*
+ *
+ * Initialize the attached buffers
+ *
+ */
+ socket->s->req->buf->size = global.tune.bufsize;
+ socket->s->rep->buf->size = global.tune.bufsize;
+
+ /*
+ *
+ * Initialize channels.
+ *
+ */
+
+ /* This function reset the struct. It must be called
+ * before the configuration.
+ */
+ channel_init(socket->s->req);
+ channel_init(socket->s->rep);
+
+ socket->s->req->prod = &socket->s->si[0];
+ socket->s->req->cons = &socket->s->si[1];
+
+ socket->s->rep->prod = &socket->s->si[1];
+ socket->s->rep->cons = &socket->s->si[0];
+
+ socket->s->si[0].ib = socket->s->req;
+ socket->s->si[0].ob = socket->s->rep;
+
+ socket->s->si[1].ib = socket->s->rep;
+ socket->s->si[1].ob = socket->s->req;
+
+ socket->s->req->analysers = 0;
+ socket->s->req->rto = socket_proxy.timeout.client;
+ socket->s->req->wto = socket_proxy.timeout.server;
+ socket->s->req->rex = TICK_ETERNITY;
+ socket->s->req->wex = TICK_ETERNITY;
+ socket->s->req->analyse_exp = TICK_ETERNITY;
+
+ socket->s->rep->analysers = 0;
+ socket->s->rep->rto = socket_proxy.timeout.server;
+ socket->s->rep->wto = socket_proxy.timeout.client;
+ socket->s->rep->rex = TICK_ETERNITY;
+ socket->s->rep->wex = TICK_ETERNITY;
+ socket->s->rep->analyse_exp = TICK_ETERNITY;
+
+ /*
+ *
+ * Configure the session.
+ *
+ */
+
+ /* The session dont have listener. The listener is used with real
+ * proxies.
+ */
+ socket->s->listener = NULL;
+
+ /* The flags are initialized to 0. Values are setted later. */
+ socket->s->flags = 0;
+
+ /* Assign the configured proxy to the new session. */
+ socket->s->be = &socket_proxy;
+ socket->s->fe = &socket_proxy;
+
+ /* XXX: Set namy variables */
+ socket->s->store_count = 0;
+ memset(socket->s->stkctr, 0, sizeof(socket->s->stkctr));
+
+ /* Configure logs. */
+ socket->s->logs.logwait = 0;
+ socket->s->logs.level = 0;
+ socket->s->logs.accept_date = date; /* user-visible date for logging */
+ socket->s->logs.tv_accept = now; /* corrected date for internal use */
+ socket->s->do_log = NULL;
+
+ /* Function used if an error is occured. */
+ socket->s->srv_error = default_srv_error;
+
+ /* Init the list of buffers. */
+ LIST_INIT(&socket->s->buffer_wait);
+
+ /* Dont configure the unique ID. */
+ socket->s->uniq_id = 0;
+ socket->s->unique_id = NULL;
+
+ /* XXX: ? */
+ socket->s->pend_pos = NULL;
+
+ /* XXX: See later. */
+ socket->s->txn.sessid = NULL;
+ socket->s->txn.srv_cookie = NULL;
+ socket->s->txn.cli_cookie = NULL;
+ socket->s->txn.uri = NULL;
+ socket->s->txn.req.cap = NULL;
+ socket->s->txn.rsp.cap = NULL;
+ socket->s->txn.hdr_idx.v = NULL;
+ socket->s->txn.hdr_idx.size = 0;
+ socket->s->txn.hdr_idx.used = 0;
+
+ /* Configure "left" stream interface as applet. This "si" produce
+ * and use the data received from the server. The applet is initialized
+ * and is attached to the stream interface.
+ */
+
+ /* The data producer is already connected. It is the applet. */
+ socket->s->req->flags = CF_READ_ATTACHED;
+
+ channel_auto_connect(socket->s->req); /* don't wait to establish connection */
+ channel_auto_close(socket->s->req); /* let the producer forward close requests */
+
+ si_reset(&socket->s->si[0], socket->s->task);
+ si_set_state(&socket->s->si[0], SI_ST_EST); /* connection established (resource exists) */
+
+ appctx = stream_int_register_handler(&socket->s->si[0], &update_applet);
+ if (!appctx)
+ goto out_fail_conn1;
+ appctx->ctx.hlua.socket = socket;
+ appctx->ctx.hlua.connected = 0;
+ LIST_INIT(&appctx->ctx.hlua.wake_on_write);
+ LIST_INIT(&appctx->ctx.hlua.wake_on_read);
+
+ /* Configure "right" stream interface. this "si" is used to connect
+ * and retrieve data from the server. The connection is initialized
+ * with the "struct server".
+ */
+ si_reset(&socket->s->si[1], socket->s->task);
+ si_set_state(&socket->s->si[1], SI_ST_INI);
+ socket->s->si[1].conn_retries = socket_proxy.conn_retries;
+
+ /* Force destination server. */
+ socket->s->flags |= SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET | SN_BE_ASSIGNED;
+ socket->s->target = &socket_tcp.obj_type;
+
+ /* This session is added to te lists of alive sessions. */
+ LIST_ADDQ(&sessions, &socket->s->list);
+
+ /* XXX: I think that this list is used by stats. */
+ LIST_INIT(&socket->s->back_refs);
+
+ /* Update statistics counters. */
+ socket_proxy.feconn++; /* beconn will be increased later */
+ jobs++;
+ totalconn++;
+
+ /* Return yield waiting for connection. */
+ return 1;
+
+out_fail_conn1:
+ pool_free2(pool2_buffer, socket->s->rep->buf);
+out_fail_rep_buf:
+ pool_free2(pool2_channel, socket->s->rep);
+out_fail_rep:
+ pool_free2(pool2_buffer, socket->s->req->buf);
+out_fail_req_buf:
+ pool_free2(pool2_channel, socket->s->req);
+out_fail_req:
+ task_free(socket->s->task);
+out_free_session:
+ pool_free2(pool2_session, socket->s);
+out_fail_conf:
+ WILL_LJMP(lua_error(L));
+ return 0;
+}
/*
*
@@ -1873,6 +2822,7 @@
hlua_class_function(gL.T, "register_converters", hlua_register_converters);
hlua_class_function(gL.T, "sleep", hlua_sleep);
hlua_class_function(gL.T, "msleep", hlua_msleep);
+ hlua_class_function(gL.T, "tcp", hlua_socket_new);
/* Store the table __index in the metable. */
lua_settable(gL.T, -3);
@@ -1944,4 +2894,176 @@
lua_pushvalue(gL.T, -1); /* Copy the -1 entry and push it on the stack. */
lua_setfield(gL.T, LUA_REGISTRYINDEX, CLASS_TXN); /* register class session. */
class_txn_ref = luaL_ref(gL.T, LUA_REGISTRYINDEX); /* reference class session. */
+
+ /*
+ *
+ * Register class Socket
+ *
+ */
+
+ /* Create and fill the metatable. */
+ lua_newtable(gL.T);
+
+ /* Create and fille the __index entry. */
+ lua_pushstring(gL.T, "__index");
+ lua_newtable(gL.T);
+
+ hlua_class_function(gL.T, "connect_ssl", hlua_socket_connect_ssl);
+ hlua_class_function(gL.T, "connect", hlua_socket_connect);
+ hlua_class_function(gL.T, "send", hlua_socket_send);
+ hlua_class_function(gL.T, "receive", hlua_socket_receive);
+ hlua_class_function(gL.T, "close", hlua_socket_close);
+ hlua_class_function(gL.T, "getpeername", hlua_socket_getpeername);
+ hlua_class_function(gL.T, "getsockname", hlua_socket_getsockname);
+ hlua_class_function(gL.T, "setoption", hlua_socket_setoption);
+ hlua_class_function(gL.T, "settimeout", hlua_socket_settimeout);
+
+ lua_settable(gL.T, -3); /* Push the last 2 entries in the table at index -3 */
+
+ /* Register the garbage collector entry. */
+ lua_pushstring(gL.T, "__gc");
+ lua_pushcclosure(gL.T, hlua_socket_gc, 0);
+ lua_settable(gL.T, -3); /* Push the last 2 entries in the table at index -3 */
+
+ /* Register previous table in the registry with reference and named entry. */
+ lua_pushvalue(gL.T, -1); /* Copy the -1 entry and push it on the stack. */
+ lua_pushvalue(gL.T, -1); /* Copy the -1 entry and push it on the stack. */
+ lua_setfield(gL.T, LUA_REGISTRYINDEX, CLASS_SOCKET); /* register class socket. */
+ class_socket_ref = luaL_ref(gL.T, LUA_REGISTRYINDEX); /* reference class socket. */
+
+ /* Proxy and server configuration initialisation. */
+ memset(&socket_proxy, 0, sizeof(socket_proxy));
+ init_new_proxy(&socket_proxy);
+ socket_proxy.parent = NULL;
+ socket_proxy.last_change = now.tv_sec;
+ socket_proxy.id = "LUA-SOCKET";
+ socket_proxy.cap = PR_CAP_FE | PR_CAP_BE;
+ socket_proxy.maxconn = 0;
+ socket_proxy.accept = NULL;
+ socket_proxy.options2 |= PR_O2_INDEPSTR;
+ socket_proxy.srv = NULL;
+ socket_proxy.conn_retries = 0;
+ socket_proxy.timeout.connect = 5000; /* By default the timeout connection is 5s. */
+
+ /* Init TCP server: unchanged parameters */
+ memset(&socket_tcp, 0, sizeof(socket_tcp));
+ socket_tcp.next = NULL;
+ socket_tcp.proxy = &socket_proxy;
+ socket_tcp.obj_type = OBJ_TYPE_SERVER;
+ LIST_INIT(&socket_tcp.actconns);
+ LIST_INIT(&socket_tcp.pendconns);
+ socket_tcp.state = SRV_ST_RUNNING; /* early server setup */
+ socket_tcp.last_change = 0;
+ socket_tcp.id = "LUA-TCP-CONN";
+ socket_tcp.check.state &= ~CHK_ST_ENABLED; /* Disable health checks. */
+ socket_tcp.agent.state &= ~CHK_ST_ENABLED; /* Disable health checks. */
+ socket_tcp.pp_opts = 0; /* Remove proxy protocol. */
+
+ /* XXX: Copy default parameter from default server,
+ * but the default server is not initialized.
+ */
+ socket_tcp.maxqueue = socket_proxy.defsrv.maxqueue;
+ socket_tcp.minconn = socket_proxy.defsrv.minconn;
+ socket_tcp.maxconn = socket_proxy.defsrv.maxconn;
+ socket_tcp.slowstart = socket_proxy.defsrv.slowstart;
+ socket_tcp.onerror = socket_proxy.defsrv.onerror;
+ socket_tcp.onmarkeddown = socket_proxy.defsrv.onmarkeddown;
+ socket_tcp.onmarkedup = socket_proxy.defsrv.onmarkedup;
+ socket_tcp.consecutive_errors_limit = socket_proxy.defsrv.consecutive_errors_limit;
+ socket_tcp.uweight = socket_proxy.defsrv.iweight;
+ socket_tcp.iweight = socket_proxy.defsrv.iweight;
+
+ socket_tcp.check.status = HCHK_STATUS_INI;
+ socket_tcp.check.rise = socket_proxy.defsrv.check.rise;
+ socket_tcp.check.fall = socket_proxy.defsrv.check.fall;
+ socket_tcp.check.health = socket_tcp.check.rise; /* socket, but will fall down at first failure */
+ socket_tcp.check.server = &socket_tcp;
+
+ socket_tcp.agent.status = HCHK_STATUS_INI;
+ socket_tcp.agent.rise = socket_proxy.defsrv.agent.rise;
+ socket_tcp.agent.fall = socket_proxy.defsrv.agent.fall;
+ socket_tcp.agent.health = socket_tcp.agent.rise; /* socket, but will fall down at first failure */
+ socket_tcp.agent.server = &socket_tcp;
+
+ socket_tcp.xprt = &raw_sock;
+
+#ifdef USE_OPENSSL
+
+ char *args[4];
+ struct srv_kw *kw;
+ int tmp_error;
+ char *error;
+
+ /* Init TCP server: unchanged parameters */
+ memset(&socket_ssl, 0, sizeof(socket_ssl));
+ socket_ssl.next = NULL;
+ socket_ssl.proxy = &socket_proxy;
+ socket_ssl.obj_type = OBJ_TYPE_SERVER;
+ LIST_INIT(&socket_ssl.actconns);
+ LIST_INIT(&socket_ssl.pendconns);
+ socket_ssl.state = SRV_ST_RUNNING; /* early server setup */
+ socket_ssl.last_change = 0;
+ socket_ssl.id = "LUA-SSL-CONN";
+ socket_ssl.check.state &= ~CHK_ST_ENABLED; /* Disable health checks. */
+ socket_ssl.agent.state &= ~CHK_ST_ENABLED; /* Disable health checks. */
+ socket_ssl.pp_opts = 0; /* Remove proxy protocol. */
+
+ /* XXX: Copy default parameter from default server,
+ * but the default server is not initialized.
+ */
+ socket_ssl.maxqueue = socket_proxy.defsrv.maxqueue;
+ socket_ssl.minconn = socket_proxy.defsrv.minconn;
+ socket_ssl.maxconn = socket_proxy.defsrv.maxconn;
+ socket_ssl.slowstart = socket_proxy.defsrv.slowstart;
+ socket_ssl.onerror = socket_proxy.defsrv.onerror;
+ socket_ssl.onmarkeddown = socket_proxy.defsrv.onmarkeddown;
+ socket_ssl.onmarkedup = socket_proxy.defsrv.onmarkedup;
+ socket_ssl.consecutive_errors_limit = socket_proxy.defsrv.consecutive_errors_limit;
+ socket_ssl.uweight = socket_proxy.defsrv.iweight;
+ socket_ssl.iweight = socket_proxy.defsrv.iweight;
+
+ socket_ssl.check.status = HCHK_STATUS_INI;
+ socket_ssl.check.rise = socket_proxy.defsrv.check.rise;
+ socket_ssl.check.fall = socket_proxy.defsrv.check.fall;
+ socket_ssl.check.health = socket_ssl.check.rise; /* socket, but will fall down at first failure */
+ socket_ssl.check.server = &socket_ssl;
+
+ socket_ssl.agent.status = HCHK_STATUS_INI;
+ socket_ssl.agent.rise = socket_proxy.defsrv.agent.rise;
+ socket_ssl.agent.fall = socket_proxy.defsrv.agent.fall;
+ socket_ssl.agent.health = socket_ssl.agent.rise; /* socket, but will fall down at first failure */
+ socket_ssl.agent.server = &socket_ssl;
+
+ socket_ssl.xprt = &raw_sock;
+
+ args[0] = "ssl";
+ args[1] = "verify";
+ args[2] = "none";
+ args[3] = NULL;
+
+ for (idx=0; idx<3; idx++) {
+ if ((kw = srv_find_kw(args[idx])) != NULL) { /* Maybe it's registered server keyword */
+ /*
+ *
+ * If the keyword is not known, we can search in the registered
+ * server keywords. This is usefull to configure special SSL
+ * features like client certificates and ssl_verify.
+ *
+ */
+ tmp_error = kw->parse(args, &idx, &socket_proxy, &socket_ssl, &error);
+ if (tmp_error != 0) {
+ fprintf(stderr, "INTERNAL ERROR: %s\n", error);
+ abort(); /* This must be never arrives because the command line
+ not editable by the user. */
+ }
+ idx += kw->skip;
+ }
+ }
+
+ /* Initialize SSL server. */
+ if (socket_ssl.xprt == &ssl_sock) {
+ socket_ssl.use_ssl = 1;
+ ssl_sock_prepare_srv_ctx(&socket_ssl, &socket_proxy);
+ }
+#endif
}