Re: Bug#1091394: nproc: add new option to reduce emitted processors by system memory
Hi Julien,
On Mon, Jan 13, 2025 at 07:00:01PM +0100, Julien Plissonneau Duquène wrote:
> Let's start with this then. I implemented a PoC prototype [1] as a shell
> script that is currently fairly linux-specific and doesn't account for
> cgroup limits (yet?). Feedback is welcome (everything is open for discussion
> there, including the name) and if there is enough interest I may end up
> packaging it or submitting it to an existing collection (I am thinking about
> devscripts).
I'm sorry for not having come back earlier and thus caused duplicaton of
work. I had started a Python-based implementation last year and then
dropped the ball over other stuff. It also implements the --require-mem
flag in the way you suggested. It parses DEB_BUILD_OPTIONS,
RPM_BUILD_NCPUS and CMAKE_BUILD_PARALLEL_LEVEL and also considers cgroup
memory limits. I hope this captures all of the feedback I got during
discussions and research.
I'm attaching my proof of concept. Would you join forces and turn either
of these PoCs into a proper Debian package that could be used during
package builds? Once accepted, we may send patches to individual Debian
packages making use of it and call OOM FTBFS a packaging bug eventually.
Helmut
#!/usr/bin/python3
# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
# SPDX-License-Identifier: GPL-3
import argparse
import itertools
import os
import pathlib
import subprocess
import sys
import typing
def positive_integer(decstr: str) -> int:
"""Parse a positive, integral number from a string and raise a ValueError
otherwise.
>>> positive_integer(-1)
Traceback (most recent call last):
...
ValueError: integer must be positive
>>> positive_integer(0)
Traceback (most recent call last):
...
ValueError: integer must be positive
>>> positive_integer(1)
1
"""
value = int(decstr) # raises ValueError
if value < 1:
raise ValueError("integer must be positive")
return value
def parse_size(expression: str) -> int:
"""Parse an expression representing a data size with an optional unit
suffix into an integer. Raises a ValueError on failure.
>>> parse_size("5")
5
>>> parse_size("4KB")
4096
>>> parse_size("0.9g")
966367641
>>> parse_size("-1")
Traceback (most recent call last):
...
ValueError: number must be positive
"""
expression = expression.lower()
if expression.endswith("b"):
expression = expression[:-1]
suffixes = {
"k": 2**10,
"m": 2**20,
"g": 2**30,
"t": 2**40,
"e": 2**50,
}
factor = 1
if expression[-1:] in suffixes:
factor = suffixes[expression[-1]]
expression = expression[:-1]
fval = float(expression) # propagate ValueError
value = int(fval * factor) # propagate ValueError
if value < 1:
raise ValueError("number must be positive")
return value
def guess_python() -> int | None:
"""Estimate the number of processors using Python's os.cpu_count().
>>> guess_python() > 0
True
"""
return os.cpu_count()
def guess_nproc() -> int:
"""Estimate number of processors using coreutils' nproc.
>>> guess_nproc() > 0
True
"""
return positive_integer(
subprocess.check_output(["nproc"], encoding="ascii")
)
def guess_deb_build_parallel(
environ: typing.Mapping[str, str] = os.environ
) -> int | None:
"""Parse a possible parallel= assignment in a DEB_BUILD_OPTIONS environment
variable.
>>> guess_deb_build_parallel({})
>>> guess_deb_build_parallel({"DEB_BUILD_OPTIONS": "nocheck parallel=3"})
3
"""
try:
options = environ["DEB_BUILD_OPTIONS"]
except KeyError:
return None
for option in options.split():
if option.startswith("parallel="):
option = option.removeprefix("parallel=")
try:
return positive_integer(option)
except ValueError:
pass
return None
def guess_from_environment(
variable: str, environ: typing.Mapping[str, str] = os.environ
) -> int | None:
"""Read a number from an environment variable.
>>> guess_from_environment("CPUS", {"CPUS": 4})
4
>>> guess_from_environment("CPUS", {"other": 3})
"""
try:
return positive_integer(environ[variable])
except (KeyError, ValueError):
return None
def guess_memavailable() -> int:
"""Estimate the available memory from /proc/meminfo in bytes."""
with open("/proc/meminfo", encoding="ascii") as fh:
for line in fh:
if line.startswith("MemAvailable:"):
line = line.removeprefix("MemAvailable:").strip()
return 1024 * positive_integer(line.removesuffix("kB"))
raise RuntimeError("no MemAvailable line found in /proc/meminfo")
def guess_cgroup_memory() -> int | None:
"""Return the smalles "memory.high" or "memory.max" limit of the current
cgroup or any parent of it if any.
"""
guess: int | None = None
mygroup = pathlib.PurePath(
pathlib.Path("/proc/self/cgroup")
.read_text(encoding=sys.getfilesystemencoding())
.strip()
.split(":", 2)[2]
).relative_to("/")
sfc = pathlib.Path("/sys/fs/cgroup")
for group in itertools.chain((mygroup,), mygroup.parents):
for entry in ("memory.max", "memory.high"):
try:
value = positive_integer(
(sfc / group / entry).read_text(encoding="ascii")
)
except (FileNotFoundError, ValueError):
pass
else:
if guess is None:
guess = value
else:
guess = min(guess, value)
return guess
def parse_required_memory(expression: str) -> list[int]:
"""Parse comma-separated list of memory expressions. Empty expressions copy
the previous entry.
>>> parse_required_memory("1k,9,,1")
[1024, 9, 9, 1]
"""
values: list[int] = []
for memexpr in expression.split(","):
if not memexpr:
if values:
values.append(values[-1])
else:
raise ValueError("initial memory expression cannot be empty")
else:
values.append(parse_size(memexpr))
return values
def guess_memory_concurrency(memory: int, usage: list[int]) -> int:
"""Estimate the maximum number of cores that can be used given the
available memory and a sequence of per-core memory consumption.
>>> guess_memory_concurrency(4, [1])
4
>>> guess_memory_concurrency(10, [5, 4, 3])
2
>>> guess_memory_concurrency(2, [3])
1
"""
concurrency = 0
for use in usage[:-1]:
if use > memory:
break
memory -= use
concurrency += 1
else:
concurrency += memory // usage[-1]
return max(1, concurrency)
def clamp(
value: int, lower: int | None = None, upper: int | None = None
) -> int:
"""Return an adjusted value that does not exceed the lower or upper limits
if any.
>>> clamp(5, upper=4)
4
>>> clamp(9, 2)
9
"""
if upper is not None and upper < value:
value = upper
if lower is not None and lower > value:
value = lower
return value
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--detect",
action="store",
default="nproc",
metavar="METHOD",
help="supply a processor count or select a detection method"
"(nproc or python)",
)
parser.add_argument(
"--max",
action="store",
type=positive_integer,
default=None,
metavar="N",
help="limit the number of detected cores to a given maximum",
)
parser.add_argument(
"--min",
action="store",
type=positive_integer,
default=None,
metavar="N",
help="limit the number of detected cores to a given minimum",
)
parser.add_argument(
"--require-mem",
action="store",
type=parse_required_memory,
default=[],
metavar="MEMLIST",
help="specify per-core required memory as a comma separated list",
)
args = parser.parse_args()
guess = None
try:
guess = positive_integer(args.detect)
except ValueError:
try:
detector = {
"nproc": guess_nproc,
"python": guess_python,
}[args.detect]
except KeyError:
parser.error("invalid argument to --detect")
guess = detector()
guess = guess_from_environment("RPM_BUILD_NCPUS") or guess
guess = guess_deb_build_parallel() or guess
guess = guess_from_environment("CMAKE_BUILD_PARALLEL_LEVEL") or guess
if guess is None:
print("failed to guess processor count", file=sys.stderr)
sys.exit(1)
if args.require_mem and guess > 1:
memory = clamp(guess_memavailable(), upper=guess_cgroup_memory())
guess = clamp(
guess, upper=guess_memory_concurrency(memory, args.require_mem)
)
guess = clamp(guess, args.min, args.max)
print(guess)
if __name__ == "__main__":
main()
Reply to: