Re: Bug#1091394: nproc: add new option to reduce emitted processors by system memory
Hi Ángel,
On Fri, Jan 17, 2025 at 01:14:04AM +0100, Ángel wrote:
> The script looks good, and easy to read. It wouldn't be hard to
> translate it to another language if needed to drop the python
> dependency (but that would increase the nloc)
thank you for the detailed review. I also picked up Julien's request for
detecting cores rater than threads and attach an updated version. I
suppose moving to some git would be good.
https://salsa.debian.org/helmutg/guess_concurrency
To get things going, I created it in my personal namespace, but I'm
happy to move it to debian or elsewhere.
> I find this behavior a bit surprising:
>
> $ python3 guess_concurrency.py --min 10 --max 2
> 10
>
> If there is a minimum limit, it is returned, even if that violates the
> max. It makes some sense to pick something but I as actually expecting
> an error to the above.
How could one disagree with that? Fixed.
> The order of processing the cpus is a bit awkward as well.
>
> The order it uses is CMAKE_BUILD_PARALLEL_LEVEL, DEB_BUILD_OPTIONS,
> RPM_BUILD_NCPUS, --detect <n>, nproc/os.cpu_count()
>
> But the order in the code is 4, 5, 3, 2, 1
> Not straightforward.
Again, how would I disagree? I have changed the order of invocation to
match the order of preference and in that process also changed
--detect=N to take precendence over all environment variables while
still preferring environment variables over other detectors. Hope that
makes sense.
> Also, it is doing actions such as running external program nproc even
> it if's going to be discarded later. (nproc is in an essential package,
> I know, but still)
This is fixed as a result of your earlier remark.
> Also, why would the user want to manually specify between nptoc and
> os.cpu_count()?
While nproc is universally available on Debian, I anticipate that the
tool could be useful on non-Linux platforms and there os.cpu_count() may
be more portable. I mainly added it due to the low effort and to
demonstrate the ability to use different detection methods. The values I
see users pass to --detect are actual numbers or "cores".
> I would unconditionally call nproc, with a fallback to os.cpu_count()
> if that fails (I'm assuming nproc may be smarter than os.cpu_count(),
> otherwise one could use cpu_count() always)
Indeed nproc kinda is smarter as it honours some environment variables
such as OMP_NUM_THREADS and OMP_THREAD_LIMIT. Not sure we need that
fallback.
> I suggest doing:
>
> def main() -> None:
> parser = argparse.ArgumentParser()
> parser.add_argument(
> "--count",
> action="store",
> default=None,
> metavar="NUMBER",
> help="supply a processor count",
> )
Is there a reason to rename it from --detect to --count? The advantage
of reusing --detect I see is that you cannot reasonably combine those
two options and --detect=9 intuitively makes sufficient sense to me.
> args = parser.parse_args()
> guess = None
> try:
> if args.count:
> guess = positive_integer(args.count)
> except ValueError:
> parser.error("invalid argument to --count")
> guess = guess or guess_from_environment("CMAKE_BUILD_PARALLEL_LEVEL")
> guess = guess or guess_deb_build_parallel()
> guess = guess or guess_from_environment("RPM_BUILD_NCPUS")
> if not guess:
> try:
> guess = guess_nproc()
> finally:
> guess = guess or guess_python()
I see three main proposal in this code:
* Order of preference reflects order of code. (implemented)
* A given count overrides all other detection mechanisms. (implemented)
* Fallback from nproc to os.cpucount(). (not convinced)
Did I miss anything?
I'm more inclined to drop --detect=python entirely as you say its use is
fairly limited.
> Additionally, the --ignore argument of nproc(1) might be of use for
> this script as well.
Can you give a practical use case for this?
Helmut
#!/usr/bin/python3
# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
# SPDX-License-Identifier: GPL-3
import argparse
import itertools
import os
import pathlib
import re
import subprocess
import sys
import typing
def positive_integer(decstr: str) -> int:
"""Parse a positive, integral number from a string and raise a ValueError
otherwise.
>>> positive_integer(-1)
Traceback (most recent call last):
...
ValueError: integer must be positive
>>> positive_integer(0)
Traceback (most recent call last):
...
ValueError: integer must be positive
>>> positive_integer(1)
1
"""
value = int(decstr) # raises ValueError
if value < 1:
raise ValueError("integer must be positive")
return value
def parse_size(expression: str) -> int:
"""Parse an expression representing a data size with an optional unit
suffix into an integer. Raises a ValueError on failure.
>>> parse_size("5")
5
>>> parse_size("4KB")
4096
>>> parse_size("0.9g")
966367641
>>> parse_size("-1")
Traceback (most recent call last):
...
ValueError: number must be positive
"""
expression = expression.lower()
if expression.endswith("b"):
expression = expression[:-1]
suffixes = {
"k": 2**10,
"m": 2**20,
"g": 2**30,
"t": 2**40,
"e": 2**50,
}
factor = 1
if expression[-1:] in suffixes:
factor = suffixes[expression[-1]]
expression = expression[:-1]
fval = float(expression) # propagate ValueError
value = int(fval * factor) # propagate ValueError
if value < 1:
raise ValueError("number must be positive")
return value
def guess_python() -> int | None:
"""Estimate the number of processors using Python's os.cpu_count().
>>> guess_python() > 0
True
"""
return os.cpu_count()
def guess_nproc() -> int:
"""Estimate number of processors using coreutils' nproc.
>>> guess_nproc() > 0
True
"""
return positive_integer(
subprocess.check_output(["nproc"], encoding="ascii")
)
def guess_cores() -> int | None:
"""Estimate the number of cores (not SMT threads) using /proc/cpuinfo.
This is done by counting the number of distinct "core id" values.
"""
cpus = set()
try:
with open("/proc/cpuinfo", encoding="utf8") as cf:
for line in cf:
if m := re.match(r"^core id\s+:\s+(\d+)$", line, re.ASCII):
cpus.add(int(m.group(1)))
except FileNotFoundError:
return None
if not cpus:
return None
return len(cpus)
def guess_deb_build_parallel(
environ: typing.Mapping[str, str] = os.environ
) -> int | None:
"""Parse a possible parallel= assignment in a DEB_BUILD_OPTIONS environment
variable.
>>> guess_deb_build_parallel({})
>>> guess_deb_build_parallel({"DEB_BUILD_OPTIONS": "nocheck parallel=3"})
3
"""
try:
options = environ["DEB_BUILD_OPTIONS"]
except KeyError:
return None
for option in options.split():
if option.startswith("parallel="):
option = option.removeprefix("parallel=")
try:
return positive_integer(option)
except ValueError:
pass
return None
def guess_from_environment(
variable: str, environ: typing.Mapping[str, str] = os.environ
) -> int | None:
"""Read a number from an environment variable.
>>> guess_from_environment("CPUS", {"CPUS": 4})
4
>>> guess_from_environment("CPUS", {"other": 3})
"""
try:
return positive_integer(environ[variable])
except (KeyError, ValueError):
return None
def guess_memavailable() -> int:
"""Estimate the available memory from /proc/meminfo in bytes."""
with open("/proc/meminfo", encoding="ascii") as fh:
for line in fh:
if line.startswith("MemAvailable:"):
line = line.removeprefix("MemAvailable:").strip()
return 1024 * positive_integer(line.removesuffix("kB"))
raise RuntimeError("no MemAvailable line found in /proc/meminfo")
def guess_cgroup_memory() -> int | None:
"""Return the smalles "memory.high" or "memory.max" limit of the current
cgroup or any parent of it if any.
"""
guess: int | None = None
mygroup = pathlib.PurePath(
pathlib.Path("/proc/self/cgroup")
.read_text(encoding=sys.getfilesystemencoding())
.strip()
.split(":", 2)[2]
).relative_to("/")
sfc = pathlib.Path("/sys/fs/cgroup")
for group in itertools.chain((mygroup,), mygroup.parents):
for entry in ("memory.max", "memory.high"):
try:
value = positive_integer(
(sfc / group / entry).read_text(encoding="ascii")
)
except (FileNotFoundError, ValueError):
pass
else:
if guess is None:
guess = value
else:
guess = min(guess, value)
return guess
def parse_required_memory(expression: str) -> list[int]:
"""Parse comma-separated list of memory expressions. Empty expressions copy
the previous entry.
>>> parse_required_memory("1k,9,,1")
[1024, 9, 9, 1]
"""
values: list[int] = []
for memexpr in expression.split(","):
if not memexpr:
if values:
values.append(values[-1])
else:
raise ValueError("initial memory expression cannot be empty")
else:
values.append(parse_size(memexpr))
return values
def guess_memory_concurrency(memory: int, usage: list[int]) -> int:
"""Estimate the maximum number of cores that can be used given the
available memory and a sequence of per-core memory consumption.
>>> guess_memory_concurrency(4, [1])
4
>>> guess_memory_concurrency(10, [5, 4, 3])
2
>>> guess_memory_concurrency(2, [3])
1
"""
concurrency = 0
for use in usage[:-1]:
if use > memory:
break
memory -= use
concurrency += 1
else:
concurrency += memory // usage[-1]
return max(1, concurrency)
def clamp(
value: int, lower: int | None = None, upper: int | None = None
) -> int:
"""Return an adjusted value that does not exceed the lower or upper limits
if any.
>>> clamp(5, upper=4)
4
>>> clamp(9, 2)
9
"""
if upper is not None and upper < value:
value = upper
if lower is not None and lower > value:
value = lower
return value
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--detect",
action="store",
default="nproc",
metavar="METHOD",
help="supply a processor count or select a detection method"
"(nproc, python or cores)",
)
parser.add_argument(
"--max",
action="store",
type=positive_integer,
default=None,
metavar="N",
help="limit the number of detected cores to a given maximum",
)
parser.add_argument(
"--min",
action="store",
type=positive_integer,
default=None,
metavar="N",
help="limit the number of detected cores to a given minimum",
)
parser.add_argument(
"--require-mem",
action="store",
type=parse_required_memory,
default=[],
metavar="MEMLIST",
help="specify per-core required memory as a comma separated list",
)
args = parser.parse_args()
if args.min is not None and args.max is not None and args.min > args.max:
parser.error("--min value larger than --max value")
guess = None
detectfunc = None
for detector in args.detect.split(","):
try:
guess = positive_integer(detector)
except ValueError:
try:
detectfunc = {
"nproc": guess_nproc,
"python": guess_python,
"cores": guess_cores,
}[detector]
except KeyError:
parser.error("invalid argument to --detect")
if guess is None:
assert detectfunc is not None
guess = (
guess_from_environment("CMAKE_BUILD_PARALLEL_LEVEL")
or guess_deb_build_parallel()
or guess_from_environment("RPM_BUILD_NCPUS")
or detectfunc()
)
if guess is None:
print("failed to guess processor count", file=sys.stderr)
sys.exit(1)
if args.require_mem and guess > 1:
memory = clamp(guess_memavailable(), upper=guess_cgroup_memory())
guess = clamp(
guess, upper=guess_memory_concurrency(memory, args.require_mem)
)
guess = clamp(guess, args.min, args.max)
print(guess)
if __name__ == "__main__":
main()
Reply to: