blob: a4eb06142f0db46ecd392c4de4386919aeb2730f [file] [log] [blame]
From c43241d046e8a6ae75549c23d470b94f16c74ca7 Mon Sep 17 00:00:00 2001
From: Aditya Kumar Singh <quic_adisi@quicinc.com>
Date: Thu, 28 Mar 2024 23:46:51 +0530
Subject: [PATCH 021/104] tests: MLO: add basic cohosted MLDs functionality
testing
Add test case to test basic cohosted MLDs functionality. Add helper
functions to create the configuration file, start hostapd instance.
Client connectivty test case will be added via a subsequent change.
eht_mld_cohosted_discovery: 2 co-hosted MLDs without non-MLD RNR. Basic
bring up and beacon, MLD RNR, scan validation.
eht_mld_cohosted_discovery_with_rnr: Same like eht_mld_cohosted_discovery
but additionally non-MLD RNR (rnr=1) is also enabled. Validate the non-MLD
RNR as well.
Signed-off-by: Aditya Kumar Singh <quic_adisi@quicinc.com>
---
tests/hwsim/test_eht.py | 230 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 230 insertions(+)
diff --git a/tests/hwsim/test_eht.py b/tests/hwsim/test_eht.py
index a012fe4e7..732406219 100644
--- a/tests/hwsim/test_eht.py
+++ b/tests/hwsim/test_eht.py
@@ -15,6 +15,7 @@ from tshark import run_tshark
from test_gas import hs20_ap_params
from test_dpp import check_dpp_capab, wait_auth_success
from test_rrm import build_beacon_request, run_req_beacon, BeaconReport
+import os, subprocess, time, tempfile
def eht_verify_wifi_version(dev):
status = dev.get_status()
@@ -1823,3 +1824,232 @@ def test_eht_mlo_csa(dev, apdev):
traffic_test(wpas, hapd0)
#TODO: CSA on non-first link
+
+def create_base_conf_file(iface, channel, prefix='hostapd-', hw_mode='g',
+ op_class=None):
+ # Create configuration file and add phy characteristics
+ fd, fname = tempfile.mkstemp(dir='/tmp',
+ prefix=prefix + iface + "-chan-" + str(channel) + "-")
+ f = os.fdopen(fd, 'w')
+
+ f.write("driver=nl80211\n")
+ f.write("hw_mode=" + str(hw_mode) + "\n")
+ f.write("ieee80211n=1\n")
+ if hw_mode == 'a' and \
+ (op_class is None or \
+ op_class not in [131, 132, 133, 134, 135, 136, 137]):
+ f.write("ieee80211ac=1\n")
+ f.write("ieee80211ax=1\n")
+ f.write("ieee80211be=1\n")
+ f.write("channel=" + str(channel) + "\n")
+
+ return f, fname
+
+def append_bss_conf_to_file(f, ifname, params, first=False):
+ # Add BSS specific characteristics
+ config = "bss"
+
+ if first:
+ config = "interface"
+
+ f.write("\n" + config + "=%s\n" % ifname)
+
+ for k, v in list(params.items()):
+ f.write("{}={}\n".format(k,v))
+
+ f.write("mld_ap=1\n")
+
+def dump_config(fname):
+ with open(fname, 'r') as f:
+ cfg = f.read()
+ logger.debug("hostapd config: " + str(fname) + "\n" + cfg)
+
+def get_config(iface, count, ssid, passphrase, channel, bssid_regex,
+ rnr=False, debug=False):
+ f, fname = create_base_conf_file(iface, channel=channel)
+ hapds = []
+
+ for i in range(count):
+ if i == 0:
+ ifname = iface
+ else:
+ ifname = iface + "-" + str(i)
+
+ set_ssid = ssid + str(i)
+ set_passphrase = passphrase + str(i)
+ params = hostapd.wpa2_params(ssid=set_ssid, passphrase=set_passphrase,
+ wpa_key_mgmt="SAE", ieee80211w="2")
+ params['sae_pwe'] = "2"
+ params['group_mgmt_cipher'] = "AES-128-CMAC"
+ params['beacon_prot'] = "1"
+ params["ctrl_interface"] = "/var/run/hostapd/chan_" + str(channel)
+ params["bssid"] = bssid_regex % (i + 1)
+
+ if rnr:
+ params["rnr"]="1"
+
+ append_bss_conf_to_file(f, ifname, params, first=(i == 0))
+
+ hapds.append([ifname, params["ctrl_interface"], i])
+
+ f.close()
+
+ if debug:
+ dump_config(fname)
+
+ return fname, hapds
+
+def start_ap(prefix, configs):
+ pid = prefix + ".hostapd.pid"
+ configs = configs.split()
+
+ cmd = ['../../hostapd/hostapd', '-ddKtB', '-P', pid, '-f',
+ prefix + ".hostapd-log"]
+
+ cmd = cmd + configs
+
+ logger.info("Starting APs")
+ res = subprocess.check_call(cmd)
+ if res != 0:
+ raise Exception("Could not start hostapd: %s" % str(res))
+
+ # Wait for hostapd to complete initialization and daemonize.
+ time.sleep(2)
+
+ if not os.path.exists(pid):
+ raise Exception("hostapd did not create PID file.")
+
+def get_mld_devs(hapd_iface, count, prefix, rnr=False):
+ fname1, hapds1 = get_config(hapd_iface, count=count, ssid="mld-",
+ passphrase="qwertyuiop-", channel=1,
+ bssid_regex="02:00:00:00:07:%02x",
+ rnr=rnr, debug=True)
+ fname2, hapds2 = get_config(hapd_iface, count=count, ssid="mld-",
+ passphrase="qwertyuiop-", channel=6,
+ bssid_regex="02:00:00:00:08:%02x",
+ rnr=rnr, debug=True)
+
+ start_ap(prefix, fname1 + " " + fname2)
+
+ hapd_mld1_link0 = hostapd.Hostapd(ifname=hapds1[0][0], ctrl=hapds1[0][1],
+ bssidx=hapds1[0][2])
+ hapd_mld1_link1 = hostapd.Hostapd(ifname=hapds2[0][0], ctrl=hapds2[0][1],
+ bssidx=hapds2[0][2])
+
+ hapd_mld2_link0 = hostapd.Hostapd(ifname=hapds1[1][0], ctrl=hapds1[1][1],
+ bssidx=hapds1[1][2])
+ hapd_mld2_link1 = hostapd.Hostapd(ifname=hapds2[1][0], ctrl=hapds2[1][1],
+ bssidx=hapds2[1][2])
+
+ if not hapd_mld1_link0.ping():
+ raise Exception("Could not ping hostapd")
+
+ if not hapd_mld1_link1.ping():
+ raise Exception("Could not ping hostapd")
+
+ if not hapd_mld2_link0.ping():
+ raise Exception("Could not ping hostapd")
+
+ if not hapd_mld2_link1.ping():
+ raise Exception("Could not ping hostapd")
+
+ os.remove(fname1)
+ os.remove(fname2)
+
+ return [hapd_mld1_link0, hapd_mld1_link1, hapd_mld2_link0, hapd_mld2_link1]
+
+def stop_mld_devs(hapds, pid):
+ pid = pid + ".hostapd.pid"
+
+ if "OK" not in hapds[0].request("TERMINATE"):
+ raise Exception("Failed to terminate hostapd process")
+
+ ev = hapds[0].wait_event(["CTRL-EVENT-TERMINATING"], timeout=15)
+ if ev is None:
+ raise Exception("CTRL-EVENT-TERMINATING not seen")
+
+ time.sleep(0.5)
+
+ if os.path.exists(pid):
+ raise Exception("PID file exits after process termination")
+
+def eht_parse_rnr(bss, rnr=False, exp_bssid=None):
+ partner_rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=",
+ re.MULTILINE)
+ ml_pattern = re.compile(".*multi-link:.*, MLD addr=.*", re.MULTILINE)
+
+ if partner_rnr_pattern.search(bss) is None:
+ raise Exception("RNR element not found for first link of first MLD")
+
+ if ml_pattern.search(bss) is None:
+ raise Exception("ML element not found for first link of first MLD")
+
+ if not rnr:
+ return
+
+ coloc_rnr_pattern = re.compile(".*ap_info.*, mld ID=255, link ID=..",
+ re.MULTILINE)
+
+ if coloc_rnr_pattern.search(bss) is None:
+ raise Exception("RNR element not found for co-located BSS")
+
+ line = coloc_rnr_pattern.search(bss).group()
+ if line.count('bssid') > 1:
+ raise Exception("More than one BSS found for co-located RNR")
+
+ # Get the BSSID carried in the RNR
+ index = line.rindex('bssid')
+ bssid = line[index+len('bssid')+1:].split(',')[0]
+
+ # Get the MLD ID carried in the RNR
+ index = line.rindex('link ID')
+ link_id = line[index+len('link ID')+1:].split(',')[0]
+
+ if link_id != "15":
+ raise Exception("Unexpected link ID for co-located BSS which is not own partner")
+
+ if bssid != exp_bssid:
+ raise Exception("Unexpected BSSID for co-located BSS")
+
+def eht_mld_cohosted_discovery(dev, apdev, params, rnr=False):
+ with HWSimRadio(use_mlo=True, n_channels=2) as (hapd_radio, hapd_iface), \
+ HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
+
+ wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
+ wpas.interface_add(wpas_iface)
+
+ hapds = get_mld_devs(hapd_iface=hapd_iface, count=2, prefix=params['prefix'],
+ rnr=rnr)
+
+ # Only scan link 0
+ res = wpas.request("SCAN freq=2412")
+ if "FAIL" in res:
+ raise Exception("Failed to start scan")
+
+ ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"])
+ if ev is None:
+ raise Exception("Scan did not start")
+
+ ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
+ if ev is None:
+ raise Exception("Scan did not complete")
+
+ logger.info("Scan done")
+
+ bss = wpas.request("BSS " + hapds[0].own_addr())
+ logger.info("BSS 0_0: " + str(bss))
+ eht_parse_rnr(bss, rnr, hapds[2].own_addr())
+
+ bss = wpas.request("BSS " + hapds[2].own_addr())
+ logger.info("BSS 1_0: " + str(bss))
+ eht_parse_rnr(bss, rnr, hapds[0].own_addr())
+
+ stop_mld_devs(hapds, params['prefix'])
+
+def test_eht_mld_cohosted_discovery(dev, apdev, params):
+ """EHT 2 AP MLDs discovery"""
+ eht_mld_cohosted_discovery(dev, apdev, params)
+
+def test_eht_mld_cohosted_discovery_with_rnr(dev, apdev, params):
+ """EHT 2 AP MLDs discovery (with co-location RNR)"""
+ eht_mld_cohosted_discovery(dev, apdev, params, rnr=True)
--
2.39.2