Revert "MINOR: queue: factor out the proxy/server queuing code"
This reverts commit 3eecdb65c5a6b933399ebb0ac4ef86a7b97cf85d.
The recent changes since 5304669e1 MEDIUM: queue: make
pendconn_process_next_strm() only return the pendconn opened a tiny race
condition between stream_free() and process_srv_queue(), as the pendconn
is accessed outside of the lock, possibly while it's being freed. A
different approach is required.
diff --git a/src/queue.c b/src/queue.c
index 9208911..618825e 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -391,9 +391,6 @@
struct pendconn *p;
struct proxy *px;
struct server *srv;
- struct queue *q;
- unsigned int *max_ptr;
- unsigned int old_max, new_max;
p = pool_alloc(pool_head_pendconn);
if (!p)
@@ -414,26 +411,37 @@
strm->pend_pos = p;
if (srv) {
- q = &srv->queue;
- max_ptr = &srv->counters.nbpend_max;
+ unsigned int old_max, new_max;
+
+ new_max = _HA_ATOMIC_ADD_FETCH(&srv->queue.length, 1);
+ old_max = srv->counters.nbpend_max;
+ while (new_max > old_max) {
+ if (likely(_HA_ATOMIC_CAS(&srv->counters.nbpend_max, &old_max, new_max)))
+ break;
+ }
+ __ha_barrier_atomic_store();
+
+ HA_SPIN_LOCK(QUEUE_LOCK, &p->srv->queue.lock);
+ p->queue_idx = srv->queue.idx - 1; // for increment
+ eb32_insert(&srv->queue.head, &p->node);
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &p->srv->queue.lock);
}
else {
- q = &px->queue;
- max_ptr = &px->be_counters.nbpend_max;
- }
+ unsigned int old_max, new_max;
- new_max = _HA_ATOMIC_ADD_FETCH(&q->length, 1);
- old_max = _HA_ATOMIC_LOAD(max_ptr);
- while (new_max > old_max) {
- if (likely(_HA_ATOMIC_CAS(max_ptr, &old_max, new_max)))
- break;
- }
- __ha_barrier_atomic_store();
+ new_max = _HA_ATOMIC_ADD_FETCH(&px->queue.length, 1);
+ old_max = px->be_counters.nbpend_max;
+ while (new_max > old_max) {
+ if (likely(_HA_ATOMIC_CAS(&px->be_counters.nbpend_max, &old_max, new_max)))
+ break;
+ }
+ __ha_barrier_atomic_store();
- HA_SPIN_LOCK(QUEUE_LOCK, &q->lock);
- p->queue_idx = q->idx - 1; // for increment
- eb32_insert(&q->head, &p->node);
- HA_SPIN_UNLOCK(QUEUE_LOCK, &q->lock);
+ HA_SPIN_LOCK(QUEUE_LOCK, &p->px->queue.lock);
+ p->queue_idx = px->queue.idx - 1; // for increment
+ eb32_insert(&px->queue.head, &p->node);
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &p->px->queue.lock);
+ }
_HA_ATOMIC_INC(&px->totpend);
return p;