| From c43241d046e8a6ae75549c23d470b94f16c74ca7 Mon Sep 17 00:00:00 2001 |
| From: Aditya Kumar Singh <quic_adisi@quicinc.com> |
| Date: Thu, 28 Mar 2024 23:46:51 +0530 |
| Subject: [PATCH 021/104] tests: MLO: add basic cohosted MLDs functionality |
| testing |
| |
| Add test case to test basic cohosted MLDs functionality. Add helper |
| functions to create the configuration file, start hostapd instance. |
| |
| Client connectivty test case will be added via a subsequent change. |
| |
| eht_mld_cohosted_discovery: 2 co-hosted MLDs without non-MLD RNR. Basic |
| bring up and beacon, MLD RNR, scan validation. |
| |
| eht_mld_cohosted_discovery_with_rnr: Same like eht_mld_cohosted_discovery |
| but additionally non-MLD RNR (rnr=1) is also enabled. Validate the non-MLD |
| RNR as well. |
| |
| Signed-off-by: Aditya Kumar Singh <quic_adisi@quicinc.com> |
| --- |
| tests/hwsim/test_eht.py | 230 ++++++++++++++++++++++++++++++++++++++++ |
| 1 file changed, 230 insertions(+) |
| |
| diff --git a/tests/hwsim/test_eht.py b/tests/hwsim/test_eht.py |
| index a012fe4e7..732406219 100644 |
| --- a/tests/hwsim/test_eht.py |
| +++ b/tests/hwsim/test_eht.py |
| @@ -15,6 +15,7 @@ from tshark import run_tshark |
| from test_gas import hs20_ap_params |
| from test_dpp import check_dpp_capab, wait_auth_success |
| from test_rrm import build_beacon_request, run_req_beacon, BeaconReport |
| +import os, subprocess, time, tempfile |
| |
| def eht_verify_wifi_version(dev): |
| status = dev.get_status() |
| @@ -1823,3 +1824,232 @@ def test_eht_mlo_csa(dev, apdev): |
| traffic_test(wpas, hapd0) |
| |
| #TODO: CSA on non-first link |
| + |
| +def create_base_conf_file(iface, channel, prefix='hostapd-', hw_mode='g', |
| + op_class=None): |
| + # Create configuration file and add phy characteristics |
| + fd, fname = tempfile.mkstemp(dir='/tmp', |
| + prefix=prefix + iface + "-chan-" + str(channel) + "-") |
| + f = os.fdopen(fd, 'w') |
| + |
| + f.write("driver=nl80211\n") |
| + f.write("hw_mode=" + str(hw_mode) + "\n") |
| + f.write("ieee80211n=1\n") |
| + if hw_mode == 'a' and \ |
| + (op_class is None or \ |
| + op_class not in [131, 132, 133, 134, 135, 136, 137]): |
| + f.write("ieee80211ac=1\n") |
| + f.write("ieee80211ax=1\n") |
| + f.write("ieee80211be=1\n") |
| + f.write("channel=" + str(channel) + "\n") |
| + |
| + return f, fname |
| + |
| +def append_bss_conf_to_file(f, ifname, params, first=False): |
| + # Add BSS specific characteristics |
| + config = "bss" |
| + |
| + if first: |
| + config = "interface" |
| + |
| + f.write("\n" + config + "=%s\n" % ifname) |
| + |
| + for k, v in list(params.items()): |
| + f.write("{}={}\n".format(k,v)) |
| + |
| + f.write("mld_ap=1\n") |
| + |
| +def dump_config(fname): |
| + with open(fname, 'r') as f: |
| + cfg = f.read() |
| + logger.debug("hostapd config: " + str(fname) + "\n" + cfg) |
| + |
| +def get_config(iface, count, ssid, passphrase, channel, bssid_regex, |
| + rnr=False, debug=False): |
| + f, fname = create_base_conf_file(iface, channel=channel) |
| + hapds = [] |
| + |
| + for i in range(count): |
| + if i == 0: |
| + ifname = iface |
| + else: |
| + ifname = iface + "-" + str(i) |
| + |
| + set_ssid = ssid + str(i) |
| + set_passphrase = passphrase + str(i) |
| + params = hostapd.wpa2_params(ssid=set_ssid, passphrase=set_passphrase, |
| + wpa_key_mgmt="SAE", ieee80211w="2") |
| + params['sae_pwe'] = "2" |
| + params['group_mgmt_cipher'] = "AES-128-CMAC" |
| + params['beacon_prot'] = "1" |
| + params["ctrl_interface"] = "/var/run/hostapd/chan_" + str(channel) |
| + params["bssid"] = bssid_regex % (i + 1) |
| + |
| + if rnr: |
| + params["rnr"]="1" |
| + |
| + append_bss_conf_to_file(f, ifname, params, first=(i == 0)) |
| + |
| + hapds.append([ifname, params["ctrl_interface"], i]) |
| + |
| + f.close() |
| + |
| + if debug: |
| + dump_config(fname) |
| + |
| + return fname, hapds |
| + |
| +def start_ap(prefix, configs): |
| + pid = prefix + ".hostapd.pid" |
| + configs = configs.split() |
| + |
| + cmd = ['../../hostapd/hostapd', '-ddKtB', '-P', pid, '-f', |
| + prefix + ".hostapd-log"] |
| + |
| + cmd = cmd + configs |
| + |
| + logger.info("Starting APs") |
| + res = subprocess.check_call(cmd) |
| + if res != 0: |
| + raise Exception("Could not start hostapd: %s" % str(res)) |
| + |
| + # Wait for hostapd to complete initialization and daemonize. |
| + time.sleep(2) |
| + |
| + if not os.path.exists(pid): |
| + raise Exception("hostapd did not create PID file.") |
| + |
| +def get_mld_devs(hapd_iface, count, prefix, rnr=False): |
| + fname1, hapds1 = get_config(hapd_iface, count=count, ssid="mld-", |
| + passphrase="qwertyuiop-", channel=1, |
| + bssid_regex="02:00:00:00:07:%02x", |
| + rnr=rnr, debug=True) |
| + fname2, hapds2 = get_config(hapd_iface, count=count, ssid="mld-", |
| + passphrase="qwertyuiop-", channel=6, |
| + bssid_regex="02:00:00:00:08:%02x", |
| + rnr=rnr, debug=True) |
| + |
| + start_ap(prefix, fname1 + " " + fname2) |
| + |
| + hapd_mld1_link0 = hostapd.Hostapd(ifname=hapds1[0][0], ctrl=hapds1[0][1], |
| + bssidx=hapds1[0][2]) |
| + hapd_mld1_link1 = hostapd.Hostapd(ifname=hapds2[0][0], ctrl=hapds2[0][1], |
| + bssidx=hapds2[0][2]) |
| + |
| + hapd_mld2_link0 = hostapd.Hostapd(ifname=hapds1[1][0], ctrl=hapds1[1][1], |
| + bssidx=hapds1[1][2]) |
| + hapd_mld2_link1 = hostapd.Hostapd(ifname=hapds2[1][0], ctrl=hapds2[1][1], |
| + bssidx=hapds2[1][2]) |
| + |
| + if not hapd_mld1_link0.ping(): |
| + raise Exception("Could not ping hostapd") |
| + |
| + if not hapd_mld1_link1.ping(): |
| + raise Exception("Could not ping hostapd") |
| + |
| + if not hapd_mld2_link0.ping(): |
| + raise Exception("Could not ping hostapd") |
| + |
| + if not hapd_mld2_link1.ping(): |
| + raise Exception("Could not ping hostapd") |
| + |
| + os.remove(fname1) |
| + os.remove(fname2) |
| + |
| + return [hapd_mld1_link0, hapd_mld1_link1, hapd_mld2_link0, hapd_mld2_link1] |
| + |
| +def stop_mld_devs(hapds, pid): |
| + pid = pid + ".hostapd.pid" |
| + |
| + if "OK" not in hapds[0].request("TERMINATE"): |
| + raise Exception("Failed to terminate hostapd process") |
| + |
| + ev = hapds[0].wait_event(["CTRL-EVENT-TERMINATING"], timeout=15) |
| + if ev is None: |
| + raise Exception("CTRL-EVENT-TERMINATING not seen") |
| + |
| + time.sleep(0.5) |
| + |
| + if os.path.exists(pid): |
| + raise Exception("PID file exits after process termination") |
| + |
| +def eht_parse_rnr(bss, rnr=False, exp_bssid=None): |
| + partner_rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=", |
| + re.MULTILINE) |
| + ml_pattern = re.compile(".*multi-link:.*, MLD addr=.*", re.MULTILINE) |
| + |
| + if partner_rnr_pattern.search(bss) is None: |
| + raise Exception("RNR element not found for first link of first MLD") |
| + |
| + if ml_pattern.search(bss) is None: |
| + raise Exception("ML element not found for first link of first MLD") |
| + |
| + if not rnr: |
| + return |
| + |
| + coloc_rnr_pattern = re.compile(".*ap_info.*, mld ID=255, link ID=..", |
| + re.MULTILINE) |
| + |
| + if coloc_rnr_pattern.search(bss) is None: |
| + raise Exception("RNR element not found for co-located BSS") |
| + |
| + line = coloc_rnr_pattern.search(bss).group() |
| + if line.count('bssid') > 1: |
| + raise Exception("More than one BSS found for co-located RNR") |
| + |
| + # Get the BSSID carried in the RNR |
| + index = line.rindex('bssid') |
| + bssid = line[index+len('bssid')+1:].split(',')[0] |
| + |
| + # Get the MLD ID carried in the RNR |
| + index = line.rindex('link ID') |
| + link_id = line[index+len('link ID')+1:].split(',')[0] |
| + |
| + if link_id != "15": |
| + raise Exception("Unexpected link ID for co-located BSS which is not own partner") |
| + |
| + if bssid != exp_bssid: |
| + raise Exception("Unexpected BSSID for co-located BSS") |
| + |
| +def eht_mld_cohosted_discovery(dev, apdev, params, rnr=False): |
| + with HWSimRadio(use_mlo=True, n_channels=2) as (hapd_radio, hapd_iface), \ |
| + HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): |
| + |
| + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') |
| + wpas.interface_add(wpas_iface) |
| + |
| + hapds = get_mld_devs(hapd_iface=hapd_iface, count=2, prefix=params['prefix'], |
| + rnr=rnr) |
| + |
| + # Only scan link 0 |
| + res = wpas.request("SCAN freq=2412") |
| + if "FAIL" in res: |
| + raise Exception("Failed to start scan") |
| + |
| + ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"]) |
| + if ev is None: |
| + raise Exception("Scan did not start") |
| + |
| + ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"]) |
| + if ev is None: |
| + raise Exception("Scan did not complete") |
| + |
| + logger.info("Scan done") |
| + |
| + bss = wpas.request("BSS " + hapds[0].own_addr()) |
| + logger.info("BSS 0_0: " + str(bss)) |
| + eht_parse_rnr(bss, rnr, hapds[2].own_addr()) |
| + |
| + bss = wpas.request("BSS " + hapds[2].own_addr()) |
| + logger.info("BSS 1_0: " + str(bss)) |
| + eht_parse_rnr(bss, rnr, hapds[0].own_addr()) |
| + |
| + stop_mld_devs(hapds, params['prefix']) |
| + |
| +def test_eht_mld_cohosted_discovery(dev, apdev, params): |
| + """EHT 2 AP MLDs discovery""" |
| + eht_mld_cohosted_discovery(dev, apdev, params) |
| + |
| +def test_eht_mld_cohosted_discovery_with_rnr(dev, apdev, params): |
| + """EHT 2 AP MLDs discovery (with co-location RNR)""" |
| + eht_mld_cohosted_discovery(dev, apdev, params, rnr=True) |
| -- |
| 2.39.2 |
| |