Page MenuHomeFreeBSD

D37923.diff
No OneTemporary

D37923.diff

diff --git a/tests/atf_python/atf_pytest.py b/tests/atf_python/atf_pytest.py
--- a/tests/atf_python/atf_pytest.py
+++ b/tests/atf_python/atf_pytest.py
@@ -3,6 +3,7 @@
from typing import Dict
from typing import List
from typing import NamedTuple
+from typing import Optional
from typing import Tuple
import pytest
@@ -51,10 +52,32 @@
return line
return obj.name
+ @staticmethod
+ def _convert_user_mark(mark, obj, ret: Dict):
+ username = mark.args[0]
+ if username == "unprivileged":
+ # Special unprivileged user requested.
+ # First, require the unprivileged-user config option presence
+ key = "require.config"
+ if key not in ret:
+ ret[key] = "unprivileged_user"
+ else:
+ ret[key] = "{} {}".format(ret[key], "unprivileged_user")
+ # Check if the framework requires root
+ test_cls = ATFHandler.get_test_class(obj)
+ if test_cls and getattr(test_cls, "NEED_ROOT", False):
+ # Yes, so we ask kyua to run us under root instead
+ # It is up to the implementation to switch back to the desired
+ # user
+ ret["require.user"] = "root"
+ else:
+ ret["require.user"] = username
+
+
def _convert_marks(self, obj) -> Dict[str, Any]:
wj_func = lambda x: " ".join(x) # noqa: E731
_map: Dict[str, Dict] = {
- "require_user": {"name": "require.user"},
+ "require_user": {"handler": self._convert_user_mark},
"require_arch": {"name": "require.arch", "fmt": wj_func},
"require_diskspace": {"name": "require.diskspace"},
"require_files": {"name": "require.files", "fmt": wj_func},
@@ -66,6 +89,9 @@
ret = {}
for mark in obj.iter_markers():
if mark.name in _map:
+ if "handler" in _map[mark.name]:
+ _map[mark.name]["handler"](mark, obj, ret)
+ continue
name = _map[mark.name].get("name", mark.name)
if "fmt" in _map[mark.name]:
val = _map[mark.name]["fmt"](mark.args[0])
@@ -91,8 +117,24 @@
state: str
reason: str
- def __init__(self):
+ def __init__(self, report_file_name: Optional[str]):
self._tests_state_map: Dict[str, ReportStatus] = {}
+ self._report_file_name = report_file_name
+ self._report_file_handle = None
+
+ def setup_configure(self):
+ fname = self._report_file_name
+ if fname:
+ self._report_file_handle = open(fname, mode="w")
+
+ def setup_method_pre(self, item):
+ """Called before actually running the test setup_method"""
+ # Check if we need to manually drop the privileges
+ for mark in item.iter_markers():
+ if mark.name == "require_user":
+ cls = self.get_test_class(item)
+ cls.TARGET_USER = mark.args[0]
+ break
def override_runtest(self, obj):
# Override basic runtest command
@@ -220,7 +262,9 @@
# global failure
self.set_report_state(test_name, state, reason)
- def write_report(self, path):
+ def write_report(self):
+ if self._report_file_handle is None:
+ return
if self._tests_state_map:
# If we're executing in ATF mode, there has to be just one test
# Anyway, deterministically pick the first one
@@ -230,8 +274,8 @@
line = test.state
else:
line = "{}: {}".format(test.state, test.reason)
- with open(path, mode="w") as f:
- print(line, file=f)
+ print(line, file=self._report_file_handle)
+ self._report_file_handle.close()
@staticmethod
def get_atf_vars() -> Dict[str, str]:
diff --git a/tests/atf_python/sys/net/vnet.py b/tests/atf_python/sys/net/vnet.py
--- a/tests/atf_python/sys/net/vnet.py
+++ b/tests/atf_python/sys/net/vnet.py
@@ -329,6 +329,7 @@
class VnetTestTemplate(BaseTest):
+ NEED_ROOT: bool = True
TOPOLOGY = {}
def _get_vnet_handler(self, vnet_alias: str):
@@ -374,6 +375,7 @@
# Do unbuffered stdout for children
# so the logs are present if the child hangs
sys.stdout.reconfigure(line_buffering=True)
+ self.drop_privileges()
handler(vnet)
def setup_topology(self, topo: Dict, topology_id: str):
@@ -465,6 +467,7 @@
# Save state for the main handler
self.iface_map = obj_map.iface_map
self.vnet_map = obj_map.vnet_map
+ self.drop_privileges()
def cleanup(self, test_id: str):
# pytest test id: file::class::test_name
diff --git a/tests/atf_python/utils.py b/tests/atf_python/utils.py
--- a/tests/atf_python/utils.py
+++ b/tests/atf_python/utils.py
@@ -1,8 +1,10 @@
#!/usr/bin/env python3
import os
+import pwd
from ctypes import CDLL
from ctypes import get_errno
from ctypes.util import find_library
+from typing import Dict
from typing import List
from typing import Optional
@@ -31,6 +33,8 @@
class BaseTest(object):
+ NEED_ROOT: bool = False # True if the class needs root privileges for the setup
+ TARGET_USER = None # Set to the target user by the framework
REQUIRED_MODULES: List[str] = []
def _check_modules(self):
@@ -41,9 +45,26 @@
pytest.skip(
"kernel module '{}' not available: {}".format(mod_name, err_str)
)
+ @property
+ def atf_vars(self) -> Dict[str, str]:
+ px = "_ATF_VAR_"
+ return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)}
+
+ def drop_privileges_user(self, user: str):
+ uid = pwd.getpwnam(user)[2]
+ print("Dropping privs to {}/{}".format(user, uid))
+ os.setuid(uid)
+
+ def drop_privileges(self):
+ if self.TARGET_USER:
+ if self.TARGET_USER == "unprivileged":
+ user = self.atf_vars["unprivileged-user"]
+ else:
+ user = self.TARGET_USER
+ self.drop_privileges_user(user)
@property
- def test_id(self):
+ def test_id(self) -> str:
# 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)'
return os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0]
diff --git a/tests/conftest.py b/tests/conftest.py
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -7,10 +7,14 @@
DEFAULT_HANDLER = None
+def set_handler(config):
+ global DEFAULT_HANDLER, PLUGIN_ENABLED
+ DEFAULT_HANDLER = ATFHandler(report_file_name=config.option.atf_file)
+ PLUGIN_ENABLED = True
+ return DEFAULT_HANDLER
+
+
def get_handler():
- global DEFAULT_HANDLER
- if DEFAULT_HANDLER is None:
- DEFAULT_HANDLER = ATFHandler()
return DEFAULT_HANDLER
@@ -81,11 +85,9 @@
"markers", "timeout(dur): int/float with max duration in sec"
)
- global PLUGIN_ENABLED
- PLUGIN_ENABLED = config.option.atf
- if not PLUGIN_ENABLED:
+ if not config.option.atf:
return
- get_handler()
+ handler = set_handler(config)
if config.option.collectonly:
# Need to output list of tests to stdout, hence override
@@ -93,6 +95,8 @@
reporter = config.pluginmanager.getplugin("terminalreporter")
if reporter:
config.pluginmanager.unregister(reporter)
+ else:
+ handler.setup_configure()
def pytest_collection_modifyitems(session, config, items):
@@ -114,6 +118,12 @@
handler.list_tests(session.items)
+def pytest_runtest_setup(item):
+ if PLUGIN_ENABLED:
+ handler = get_handler()
+ handler.setup_method_pre(item)
+
+
def pytest_runtest_logreport(report):
if PLUGIN_ENABLED:
handler = get_handler()
@@ -123,4 +133,4 @@
def pytest_unconfigure(config):
if PLUGIN_ENABLED and config.option.atf_file:
handler = get_handler()
- handler.write_report(config.option.atf_file)
+ handler.write_report()

File Metadata

Mime Type
text/plain
Expires
Thu, May 1, 9:54 PM (13 h, 29 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
17891433
Default Alt Text
D37923.diff (7 KB)

Event Timeline