| /* |
| * QUIC socket management. |
| * |
| * Copyright 2020 HAProxy Technologies, Frédéric Lécaille <flecaille@haproxy.com> |
| * |
| * This program is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU General Public License |
| * as published by the Free Software Foundation; either version |
| * 2 of the License, or (at your option) any later version. |
| * |
| */ |
| |
| #include <errno.h> |
| |
| #include <sys/socket.h> |
| #include <sys/types.h> |
| |
| #include <haproxy/connection.h> |
| #include <haproxy/listener.h> |
| #include <haproxy/quic_sock.h> |
| #include <haproxy/session.h> |
| #include <haproxy/xprt_quic.h> |
| |
| /* This function is called from the protocol layer accept() in order to |
| * instantiate a new session on behalf of a given listener and frontend. It |
| * returns a positive value upon success, 0 if the connection can be ignored, |
| * or a negative value upon critical failure. The accepted connection is |
| * closed if we return <= 0. If no handshake is needed, it immediately tries |
| * to instantiate a new stream. The connection must already have been filled |
| * with the incoming connection handle (a fd), a target (the listener) and a |
| * source address. |
| */ |
| int quic_session_accept(struct connection *cli_conn) |
| { |
| struct listener *l = __objt_listener(cli_conn->target); |
| struct proxy *p = l->bind_conf->frontend; |
| struct session *sess; |
| |
| cli_conn->proxy_netns = l->rx.settings->netns; |
| /* This flag is ordinarily set by conn_ctrl_init() which cannot |
| * be called for now. |
| */ |
| cli_conn->flags |= CO_FL_CTRL_READY; |
| |
| /* wait for a PROXY protocol header */ |
| if (l->options & LI_O_ACC_PROXY) |
| cli_conn->flags |= CO_FL_ACCEPT_PROXY; |
| |
| /* wait for a NetScaler client IP insertion protocol header */ |
| if (l->options & LI_O_ACC_CIP) |
| cli_conn->flags |= CO_FL_ACCEPT_CIP; |
| |
| /* Add the handshake pseudo-XPRT */ |
| if (cli_conn->flags & (CO_FL_ACCEPT_PROXY | CO_FL_ACCEPT_CIP)) { |
| if (xprt_add_hs(cli_conn) != 0) |
| goto out_free_conn; |
| } |
| |
| sess = session_new(p, l, &cli_conn->obj_type); |
| if (!sess) |
| goto out_free_conn; |
| |
| conn_set_owner(cli_conn, sess, NULL); |
| |
| if (conn_complete_session(cli_conn) < 0) |
| goto out_free_sess; |
| |
| if (conn_xprt_start(cli_conn) >= 0) |
| return 1; |
| |
| out_free_sess: |
| /* prevent call to listener_release during session_free. It will be |
| * done below, for all errors. */ |
| sess->listener = NULL; |
| session_free(sess); |
| out_free_conn: |
| cli_conn->qc->conn = NULL; |
| conn_stop_tracking(cli_conn); |
| conn_xprt_close(cli_conn); |
| conn_free(cli_conn); |
| out: |
| |
| return -1; |
| } |
| |
| /* |
| * Inspired from session_accept_fd(). |
| * Instantiate a new connection (connection struct) to be attached to <qc> |
| * QUIC connection of <l> listener. |
| * Returns 1 if succeeded, 0 if not. |
| */ |
| static int new_quic_cli_conn(struct quic_conn *qc, struct listener *l, |
| struct sockaddr_storage *saddr) |
| { |
| struct connection *cli_conn; |
| |
| if (unlikely((cli_conn = conn_new(&l->obj_type)) == NULL)) |
| goto out; |
| |
| if (!sockaddr_alloc(&cli_conn->dst, saddr, sizeof *saddr)) |
| goto out_free_conn; |
| |
| cli_conn->flags |= CO_FL_ADDR_TO_SET; |
| qc->conn = cli_conn; |
| cli_conn->qc = qc; |
| |
| cli_conn->handle.fd = l->rx.fd; |
| cli_conn->target = &l->obj_type; |
| |
| /* We need the xprt context before accepting (->accept()) the connection: |
| * we may receive packet before this connection acception. |
| */ |
| if (conn_prepare(cli_conn, l->rx.proto, l->bind_conf->xprt) < 0) |
| goto out_free_conn; |
| |
| return 1; |
| |
| out_free_conn: |
| qc->conn = NULL; |
| conn_stop_tracking(cli_conn); |
| conn_xprt_close(cli_conn); |
| conn_free(cli_conn); |
| out: |
| |
| return 0; |
| } |
| |
| /* Tests if the receiver supports accepting connections. Returns positive on |
| * success, 0 if not possible |
| */ |
| int quic_sock_accepting_conn(const struct receiver *rx) |
| { |
| return 1; |
| } |
| |
| /* Accept an incoming connection from listener <l>, and return it, as well as |
| * a CO_AC_* status code into <status> if not null. Null is returned on error. |
| * <l> must be a valid listener with a valid frontend. |
| */ |
| struct connection *quic_sock_accept_conn(struct listener *l, int *status) |
| { |
| struct quic_conn *qc; |
| struct li_per_thread *lthr = &l->per_thr[tid]; |
| |
| qc = MT_LIST_POP(<hr->quic_accept.conns, struct quic_conn *, accept_list); |
| if (!qc) |
| goto done; |
| |
| if (!new_quic_cli_conn(qc, l, &qc->peer_addr)) |
| goto err; |
| |
| done: |
| *status = CO_AC_DONE; |
| return qc ? qc->conn : NULL; |
| |
| err: |
| /* in case of error reinsert the element to process it later. */ |
| MT_LIST_INSERT(<hr->quic_accept.conns, &qc->accept_list); |
| |
| *status = CO_AC_PAUSE; |
| return NULL; |
| } |
| |
| /* Function called on a read event from a listening socket. It tries |
| * to handle as many connections as possible. |
| */ |
| void quic_sock_fd_iocb(int fd) |
| { |
| ssize_t ret; |
| struct rxbuf *rxbuf; |
| struct buffer *buf; |
| struct listener *l = objt_listener(fdtab[fd].owner); |
| struct quic_transport_params *params; |
| /* Source address */ |
| struct sockaddr_storage saddr = {0}; |
| size_t max_sz, cspace; |
| socklen_t saddrlen; |
| struct quic_dgram *dgram, *dgramp, *new_dgram; |
| unsigned char *dgram_buf; |
| |
| BUG_ON(!l); |
| |
| if (!l) |
| return; |
| |
| if (!(fdtab[fd].state & FD_POLL_IN) || !fd_recv_ready(fd)) |
| return; |
| |
| rxbuf = MT_LIST_POP(&l->rx.rxbuf_list, typeof(rxbuf), mt_list); |
| if (!rxbuf) |
| goto out; |
| |
| buf = &rxbuf->buf; |
| |
| new_dgram = NULL; |
| /* Remove all consumed datagrams of this buffer */ |
| list_for_each_entry_safe(dgram, dgramp, &rxbuf->dgrams, list) { |
| if (HA_ATOMIC_LOAD(&dgram->buf)) |
| break; |
| |
| LIST_DELETE(&dgram->list); |
| b_del(buf, dgram->len); |
| if (!new_dgram) |
| new_dgram = dgram; |
| else |
| pool_free(pool_head_quic_dgram, dgram); |
| } |
| |
| params = &l->bind_conf->quic_params; |
| max_sz = params->max_udp_payload_size; |
| cspace = b_contig_space(buf); |
| if (cspace < max_sz) { |
| struct quic_dgram *dgram; |
| |
| /* Allocate a fake datagram, without data to locate |
| * the end of the RX buffer (required during purging). |
| */ |
| dgram = pool_zalloc(pool_head_quic_dgram); |
| if (!dgram) |
| goto out; |
| |
| dgram->len = cspace; |
| LIST_APPEND(&rxbuf->dgrams, &dgram->list); |
| /* Consume the remaining space */ |
| b_add(buf, cspace); |
| if (b_contig_space(buf) < max_sz) |
| goto out; |
| |
| } |
| |
| dgram_buf = (unsigned char *)b_tail(buf); |
| saddrlen = sizeof saddr; |
| do { |
| ret = recvfrom(fd, dgram_buf, max_sz, 0, |
| (struct sockaddr *)&saddr, &saddrlen); |
| if (ret < 0 && errno == EAGAIN) { |
| fd_cant_recv(fd); |
| goto out; |
| } |
| } while (ret < 0 && errno == EINTR); |
| |
| b_add(buf, ret); |
| if (!quic_lstnr_dgram_dispatch(dgram_buf, ret, l, &saddr, |
| new_dgram, &rxbuf->dgrams)) { |
| /* If wrong, consume this datagram */ |
| b_del(buf, ret); |
| } |
| out: |
| MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->mt_list); |
| } |
| |
| /* TODO standardize this function for a generic UDP sendto wrapper. This can be |
| * done by removing the <qc> arg and replace it with address/port. |
| */ |
| size_t qc_snd_buf(struct quic_conn *qc, const struct buffer *buf, size_t count, |
| int flags) |
| { |
| ssize_t ret; |
| size_t try, done; |
| int send_flag; |
| |
| done = 0; |
| /* send the largest possible block. For this we perform only one call |
| * to send() unless the buffer wraps and we exactly fill the first hunk, |
| * in which case we accept to do it once again. |
| */ |
| while (count) { |
| try = b_contig_data(buf, done); |
| if (try > count) |
| try = count; |
| |
| send_flag = MSG_DONTWAIT | MSG_NOSIGNAL; |
| if (try < count || flags & CO_SFL_MSG_MORE) |
| send_flag |= MSG_MORE; |
| |
| ret = sendto(qc->li->rx.fd, b_peek(buf, done), try, send_flag, |
| (struct sockaddr *)&qc->peer_addr, get_addr_len(&qc->peer_addr)); |
| if (ret > 0) { |
| /* TODO remove partial sending support for UDP */ |
| count -= ret; |
| done += ret; |
| |
| if (ret < try) |
| break; |
| } |
| else if (ret == 0 || errno == EAGAIN || errno == ENOTCONN || errno == EINPROGRESS) { |
| /* TODO must be handle properly. It is justified for UDP ? */ |
| ABORT_NOW(); |
| } |
| else if (errno != EINTR) { |
| /* TODO must be handle properly. It is justified for UDP ? */ |
| ABORT_NOW(); |
| } |
| } |
| |
| if (done > 0) { |
| /* we count the total bytes sent, and the send rate for 32-byte |
| * blocks. The reason for the latter is that freq_ctr are |
| * limited to 4GB and that it's not enough per second. |
| */ |
| _HA_ATOMIC_ADD(&global.out_bytes, done); |
| update_freq_ctr(&global.out_32bps, (done + 16) / 32); |
| } |
| return done; |
| } |
| |
| |
| /*********************** QUIC accept queue management ***********************/ |
| /* per-thread accept queues */ |
| struct quic_accept_queue *quic_accept_queues; |
| |
| /* Install <qc> on the queue ready to be accepted. The queue task is then woken |
| * up. If <qc> accept is already scheduled or done, nothing is done. |
| */ |
| void quic_accept_push_qc(struct quic_conn *qc) |
| { |
| struct quic_accept_queue *queue = &quic_accept_queues[qc->tid]; |
| struct li_per_thread *lthr = &qc->li->per_thr[qc->tid]; |
| |
| /* early return if accept is already in progress/done for this |
| * connection |
| */ |
| if (HA_ATOMIC_BTS(&qc->flags, QUIC_FL_ACCEPT_REGISTERED_BIT)) |
| return; |
| |
| BUG_ON(MT_LIST_INLIST(&qc->accept_list)); |
| |
| /* 1. insert the listener in the accept queue |
| * |
| * Use TRY_APPEND as there is a possible race even with INLIST if |
| * multiple threads try to add the same listener instance from several |
| * quic_conn. |
| */ |
| if (!MT_LIST_INLIST(&(lthr->quic_accept.list))) |
| MT_LIST_TRY_APPEND(&queue->listeners, &(lthr->quic_accept.list)); |
| |
| /* 2. insert the quic_conn in the listener per-thread queue. */ |
| MT_LIST_APPEND(<hr->quic_accept.conns, &qc->accept_list); |
| |
| /* 3. wake up the queue tasklet */ |
| tasklet_wakeup(quic_accept_queues[qc->tid].tasklet); |
| } |
| |
| /* Tasklet handler to accept QUIC connections. Call listener_accept on every |
| * listener instances registered in the accept queue. |
| */ |
| static struct task *quic_accept_run(struct task *t, void *ctx, unsigned int i) |
| { |
| struct li_per_thread *lthr; |
| struct mt_list *elt1, elt2; |
| struct quic_accept_queue *queue = &quic_accept_queues[tid]; |
| |
| mt_list_for_each_entry_safe(lthr, &queue->listeners, quic_accept.list, elt1, elt2) { |
| listener_accept(lthr->li); |
| MT_LIST_DELETE_SAFE(elt1); |
| } |
| |
| return NULL; |
| } |
| |
| static int quic_alloc_accept_queues(void) |
| { |
| int i; |
| |
| quic_accept_queues = calloc(global.nbthread, sizeof(struct quic_accept_queue)); |
| if (!quic_accept_queues) { |
| ha_alert("Failed to allocate the quic accept queues.\n"); |
| return 0; |
| } |
| |
| for (i = 0; i < global.nbthread; ++i) { |
| struct tasklet *task; |
| if (!(task = tasklet_new())) { |
| ha_alert("Failed to allocate the quic accept queue on thread %d.\n", i); |
| return 0; |
| } |
| |
| tasklet_set_tid(task, i); |
| task->process = quic_accept_run; |
| quic_accept_queues[i].tasklet = task; |
| |
| MT_LIST_INIT(&quic_accept_queues[i].listeners); |
| } |
| |
| return 1; |
| } |
| REGISTER_POST_CHECK(quic_alloc_accept_queues); |
| |
| static int quic_deallocate_accept_queues(void) |
| { |
| int i; |
| |
| if (quic_accept_queues) { |
| for (i = 0; i < global.nbthread; ++i) |
| tasklet_free(quic_accept_queues[i].tasklet); |
| free(quic_accept_queues); |
| } |
| |
| return 1; |
| } |
| REGISTER_POST_DEINIT(quic_deallocate_accept_queues); |