qq_lib.properties.resources
Structured representation of job resource requirements.
This module defines the Resources dataclass, which captures all CPU, GPU,
memory, storage, walltime, and property requirements associated with a qq job.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5Structured representation of job resource requirements. 6 7This module defines the `Resources` dataclass, which captures all CPU, GPU, 8memory, storage, walltime, and property requirements associated with a qq job. 9""" 10 11import re 12from dataclasses import asdict, dataclass, fields 13 14from qq_lib.core.common import equals_normalized, wdhms_to_hhmmss 15from qq_lib.core.error import QQError 16from qq_lib.core.field_coupling import FieldCoupling, HasCouplingMethods, coupled_fields 17from qq_lib.core.logger import get_logger 18 19from .size import Size 20 21logger = get_logger(__name__) 22 23 24# dataclass decorator has to come before `@coupled_fields`! 25@dataclass(init=False) 26@coupled_fields( 27 # if mem is set, ignore other mem properties; if mem_per_node is set, ignore mem_per_cpu 28 FieldCoupling("mem", "mem_per_node", "mem_per_cpu"), 29 # if work_size is set, ignore other work_size properties; if work_size_per_node is set, ignore work_size_per_cpu 30 FieldCoupling("work_size", "work_size_per_node", "work_size_per_cpu"), 31 # if ncpus is set, ignore ncpus_per_node 32 FieldCoupling("ncpus", "ncpus_per_node"), 33 # if ngpus is set, ignore ngpus_per_node 34 FieldCoupling("ngpus", "ngpus_per_node"), 35) 36class Resources(HasCouplingMethods): 37 """ 38 Dataclass representing computational resources requested for a qq job. 39 """ 40 41 # Number of computing nodes to use 42 nnodes: int | None = None 43 44 # Number of CPU cores to use for the job 45 ncpus: int | None = None 46 47 # Number of CPU cores to use per node 48 ncpus_per_node: int | None = None 49 50 # Absolute amount of memory to allocate for the job (overrides mem_per_cpu) 51 mem: Size | None = None 52 53 # Amount of memory to allocate per node 54 mem_per_node: Size | None = None 55 56 # Amount of memory to allocate per CPU core 57 mem_per_cpu: Size | None = None 58 59 # Number of GPUs to use 60 ngpus: int | None = None 61 62 # Number of GPUs to use per node 63 ngpus_per_node: int | None = None 64 65 # Maximum allowed runtime for the job 66 walltime: str | None = None 67 68 # Type of working directory to use (e.g., scratch_local, scratch_shared, input_dir) 69 work_dir: str | None = None 70 71 # Absolute size of storage requested for the job (overrides work_size_per_cpu) 72 work_size: Size | None = None 73 74 # Storage size requested per node 75 work_size_per_node: Size | None = None 76 77 # Storage size requested per CPU core 78 work_size_per_cpu: Size | None = None 79 80 # Dictionary of other properties the nodes must include or exclude 81 props: dict[str, str] | None = None 82 83 def __init__( 84 self, 85 nnodes: int | str | None = None, 86 ncpus: int | str | None = None, 87 ncpus_per_node: int | str | None = None, 88 mem: Size | str | dict[str, object] | None = None, 89 mem_per_node: Size | str | dict[str, object] | None = None, 90 mem_per_cpu: Size | str | dict[str, object] | None = None, 91 ngpus: int | str | None = None, 92 ngpus_per_node: int | str | None = None, 93 walltime: str | None = None, 94 work_dir: str | None = None, 95 work_size: Size | str | dict[str, object] | None = None, 96 work_size_per_node: Size | str | dict[str, object] | None = None, 97 work_size_per_cpu: Size | str | dict[str, object] | None = None, 98 props: dict[str, str] | str | None = None, 99 ): 100 # convert sizes 101 mem = Resources._parse_size(mem) 102 mem_per_node = Resources._parse_size(mem_per_node) 103 mem_per_cpu = Resources._parse_size(mem_per_cpu) 104 work_size = Resources._parse_size(work_size) 105 work_size_per_node = Resources._parse_size(work_size_per_node) 106 work_size_per_cpu = Resources._parse_size(work_size_per_cpu) 107 108 # convert walltime 109 if isinstance(walltime, str) and ":" not in walltime: 110 walltime = wdhms_to_hhmmss(walltime) 111 112 # convert properties to dictionary 113 if isinstance(props, str): 114 props = Resources._parse_props(props) 115 116 # convert nnodes, ncpus, and ngpus to integers 117 if isinstance(nnodes, str): 118 nnodes = int(nnodes) 119 if isinstance(ncpus, str): 120 ncpus = int(ncpus) 121 if isinstance(ncpus_per_node, str): 122 ncpus_per_node = int(ncpus_per_node) 123 if isinstance(ngpus, str): 124 ngpus = int(ngpus) 125 if isinstance(ngpus_per_node, str): 126 ngpus_per_node = int(ngpus_per_node) 127 128 # set attributes 129 self.nnodes = nnodes 130 self.ncpus = ncpus 131 self.ncpus_per_node = ncpus_per_node 132 self.mem = mem 133 self.mem_per_node = mem_per_node 134 self.mem_per_cpu = mem_per_cpu 135 self.ngpus = ngpus 136 self.ngpus_per_node = ngpus_per_node 137 self.walltime = walltime 138 self.work_dir = work_dir 139 self.work_size = work_size 140 self.work_size_per_node = work_size_per_node 141 self.work_size_per_cpu = work_size_per_cpu 142 self.props = props 143 144 # enforce coupling rules 145 self.__post_init__() # ty: ignore[unresolved-attribute] 146 147 logger.debug(f"Resources: {self}") 148 149 def to_dict(self) -> dict[str, object]: 150 """Return all fields as a dict, excluding fields set to None.""" 151 return {k: v for k, v in asdict(self).items() if v is not None} 152 153 def uses_scratch(self) -> bool: 154 """ 155 Determine if the job uses a scratch directory. 156 157 Returns: 158 bool: True if a work_dir is not 'job_dir' or 'input_dir', otherwise False. 159 """ 160 return not equals_normalized( 161 str(self.work_dir), "job_dir" 162 ) and not equals_normalized(str(self.work_dir), "input_dir") 163 164 @staticmethod 165 def merge_resources(*resources: "Resources") -> "Resources": 166 """ 167 Merge multiple Resources objects. 168 169 Earlier resources take precedence over later ones. Properties are merged. 170 171 If either field in a coupling is set in an earlier resource, both fields of 172 that coupling are taken from that resource and ignore later resources. 173 (This means that if e.g. a `mem-per-cpu` is set by the user, 174 it will not be overwritten by a default absolute `mem` value set by a queue, 175 even though `mem` is a dominant attribute and `mem-per-cpu` is recessive.) 176 177 Args: 178 *resources (Resources): One or more Resources objects, in order of precedence. 179 180 Returns: 181 Resources: A new Resources object with merged fields. 182 """ 183 merged_data = {} 184 processed_couplings: set[FieldCoupling] = set() 185 186 for f in fields(Resources): 187 # check if this field is part of a coupling 188 if coupling := Resources.get_coupling_for_field(f.name): 189 # skip if coupling already processed 190 if coupling in processed_couplings: 191 continue 192 processed_couplings.add(coupling) 193 194 # find first resource where either field in the coupling is set 195 source_resource = next( 196 (r for r in resources if coupling.has_value(r)), None 197 ) 198 199 # set all fields of the coupling 200 if source_resource: 201 for field in coupling.fields: 202 merged_data[field] = getattr(source_resource, field) 203 # if no resource has any attribute set for this coupling 204 else: 205 for field in coupling.fields: 206 merged_data[field] = None 207 continue 208 209 # default: pick the first non-None value for this field 210 merged_data[f.name] = next( 211 ( 212 getattr(r, f.name) 213 for r in resources 214 if getattr(r, f.name) is not None 215 ), 216 None, 217 ) 218 219 return Resources(**merged_data) 220 221 @staticmethod 222 def _parse_size(value: object) -> Size | None: 223 """ 224 Convert a raw value into a `Size` instance if possible. 225 226 Args: 227 value (object): A Size object or a raw size value (a string or a dictionary). 228 229 Returns: 230 Size | None: A `Size` object if the input could be parsed, 231 otherwise `None`. 232 """ 233 if isinstance(value, str): 234 return Size.from_string(value) 235 if isinstance(value, dict): 236 return Size(**value) # ty: ignore[invalid-argument-type] 237 if isinstance(value, Size): 238 return value 239 return None 240 241 @staticmethod 242 def _parse_props(props: str) -> dict[str, str]: 243 """ 244 Parse a properties string into a dictionary of key/value pairs. 245 246 The input may contain multiple properties separated by commas, 247 whitespace, or colons. Each property can be one of the following forms: 248 - "key=value" - stored as {"key": "value"} 249 - "key" - stored as {"key": "true"} 250 - "^key" - stored as {"key": "false"} 251 252 Args: 253 props (str): A string containing job properties. 254 255 Returns: 256 dict[str, str]: Parsed properties as key/value pairs. 257 258 Raises: 259 QQError: If a property key is defined multiple times. 260 """ 261 # split into parts by commas, whitespace, or colons 262 parts = filter(None, re.split(r"[,\s:]+", props)) 263 264 result = {} 265 for part in parts: 266 if "=" in part: 267 # explicit key=value pair 268 key, value = part.split("=", 1) 269 elif part.startswith("^"): 270 # ^key means false 271 key, value = part.lstrip("^"), "false" 272 else: 273 # bare key means true 274 key, value = part, "true" 275 276 if key in result: 277 raise QQError(f"Property '{key}' is defined multiple times.") 278 result[key] = value 279 280 return result 281 282 def _props_to_value(self) -> str | None: 283 """ 284 Convert a properties dictionary into a command-line raw value string. 285 286 Args: 287 props (dict[str, str]): Mapping of property names to their string values. 288 289 Returns: 290 str | None: A comma-separated command-line representation of the property definitions 291 or None if the dictionary is empty. 292 """ 293 if not self.props: 294 return None 295 296 properties = [] 297 for key, value in self.props.items(): 298 if value == "true": 299 properties.append(key) 300 elif value == "false": 301 properties.append(f"^{key}") 302 else: 303 properties.append(f"{key}={value}") 304 305 return ",".join(properties)
26@dataclass(init=False) 27@coupled_fields( 28 # if mem is set, ignore other mem properties; if mem_per_node is set, ignore mem_per_cpu 29 FieldCoupling("mem", "mem_per_node", "mem_per_cpu"), 30 # if work_size is set, ignore other work_size properties; if work_size_per_node is set, ignore work_size_per_cpu 31 FieldCoupling("work_size", "work_size_per_node", "work_size_per_cpu"), 32 # if ncpus is set, ignore ncpus_per_node 33 FieldCoupling("ncpus", "ncpus_per_node"), 34 # if ngpus is set, ignore ngpus_per_node 35 FieldCoupling("ngpus", "ngpus_per_node"), 36) 37class Resources(HasCouplingMethods): 38 """ 39 Dataclass representing computational resources requested for a qq job. 40 """ 41 42 # Number of computing nodes to use 43 nnodes: int | None = None 44 45 # Number of CPU cores to use for the job 46 ncpus: int | None = None 47 48 # Number of CPU cores to use per node 49 ncpus_per_node: int | None = None 50 51 # Absolute amount of memory to allocate for the job (overrides mem_per_cpu) 52 mem: Size | None = None 53 54 # Amount of memory to allocate per node 55 mem_per_node: Size | None = None 56 57 # Amount of memory to allocate per CPU core 58 mem_per_cpu: Size | None = None 59 60 # Number of GPUs to use 61 ngpus: int | None = None 62 63 # Number of GPUs to use per node 64 ngpus_per_node: int | None = None 65 66 # Maximum allowed runtime for the job 67 walltime: str | None = None 68 69 # Type of working directory to use (e.g., scratch_local, scratch_shared, input_dir) 70 work_dir: str | None = None 71 72 # Absolute size of storage requested for the job (overrides work_size_per_cpu) 73 work_size: Size | None = None 74 75 # Storage size requested per node 76 work_size_per_node: Size | None = None 77 78 # Storage size requested per CPU core 79 work_size_per_cpu: Size | None = None 80 81 # Dictionary of other properties the nodes must include or exclude 82 props: dict[str, str] | None = None 83 84 def __init__( 85 self, 86 nnodes: int | str | None = None, 87 ncpus: int | str | None = None, 88 ncpus_per_node: int | str | None = None, 89 mem: Size | str | dict[str, object] | None = None, 90 mem_per_node: Size | str | dict[str, object] | None = None, 91 mem_per_cpu: Size | str | dict[str, object] | None = None, 92 ngpus: int | str | None = None, 93 ngpus_per_node: int | str | None = None, 94 walltime: str | None = None, 95 work_dir: str | None = None, 96 work_size: Size | str | dict[str, object] | None = None, 97 work_size_per_node: Size | str | dict[str, object] | None = None, 98 work_size_per_cpu: Size | str | dict[str, object] | None = None, 99 props: dict[str, str] | str | None = None, 100 ): 101 # convert sizes 102 mem = Resources._parse_size(mem) 103 mem_per_node = Resources._parse_size(mem_per_node) 104 mem_per_cpu = Resources._parse_size(mem_per_cpu) 105 work_size = Resources._parse_size(work_size) 106 work_size_per_node = Resources._parse_size(work_size_per_node) 107 work_size_per_cpu = Resources._parse_size(work_size_per_cpu) 108 109 # convert walltime 110 if isinstance(walltime, str) and ":" not in walltime: 111 walltime = wdhms_to_hhmmss(walltime) 112 113 # convert properties to dictionary 114 if isinstance(props, str): 115 props = Resources._parse_props(props) 116 117 # convert nnodes, ncpus, and ngpus to integers 118 if isinstance(nnodes, str): 119 nnodes = int(nnodes) 120 if isinstance(ncpus, str): 121 ncpus = int(ncpus) 122 if isinstance(ncpus_per_node, str): 123 ncpus_per_node = int(ncpus_per_node) 124 if isinstance(ngpus, str): 125 ngpus = int(ngpus) 126 if isinstance(ngpus_per_node, str): 127 ngpus_per_node = int(ngpus_per_node) 128 129 # set attributes 130 self.nnodes = nnodes 131 self.ncpus = ncpus 132 self.ncpus_per_node = ncpus_per_node 133 self.mem = mem 134 self.mem_per_node = mem_per_node 135 self.mem_per_cpu = mem_per_cpu 136 self.ngpus = ngpus 137 self.ngpus_per_node = ngpus_per_node 138 self.walltime = walltime 139 self.work_dir = work_dir 140 self.work_size = work_size 141 self.work_size_per_node = work_size_per_node 142 self.work_size_per_cpu = work_size_per_cpu 143 self.props = props 144 145 # enforce coupling rules 146 self.__post_init__() # ty: ignore[unresolved-attribute] 147 148 logger.debug(f"Resources: {self}") 149 150 def to_dict(self) -> dict[str, object]: 151 """Return all fields as a dict, excluding fields set to None.""" 152 return {k: v for k, v in asdict(self).items() if v is not None} 153 154 def uses_scratch(self) -> bool: 155 """ 156 Determine if the job uses a scratch directory. 157 158 Returns: 159 bool: True if a work_dir is not 'job_dir' or 'input_dir', otherwise False. 160 """ 161 return not equals_normalized( 162 str(self.work_dir), "job_dir" 163 ) and not equals_normalized(str(self.work_dir), "input_dir") 164 165 @staticmethod 166 def merge_resources(*resources: "Resources") -> "Resources": 167 """ 168 Merge multiple Resources objects. 169 170 Earlier resources take precedence over later ones. Properties are merged. 171 172 If either field in a coupling is set in an earlier resource, both fields of 173 that coupling are taken from that resource and ignore later resources. 174 (This means that if e.g. a `mem-per-cpu` is set by the user, 175 it will not be overwritten by a default absolute `mem` value set by a queue, 176 even though `mem` is a dominant attribute and `mem-per-cpu` is recessive.) 177 178 Args: 179 *resources (Resources): One or more Resources objects, in order of precedence. 180 181 Returns: 182 Resources: A new Resources object with merged fields. 183 """ 184 merged_data = {} 185 processed_couplings: set[FieldCoupling] = set() 186 187 for f in fields(Resources): 188 # check if this field is part of a coupling 189 if coupling := Resources.get_coupling_for_field(f.name): 190 # skip if coupling already processed 191 if coupling in processed_couplings: 192 continue 193 processed_couplings.add(coupling) 194 195 # find first resource where either field in the coupling is set 196 source_resource = next( 197 (r for r in resources if coupling.has_value(r)), None 198 ) 199 200 # set all fields of the coupling 201 if source_resource: 202 for field in coupling.fields: 203 merged_data[field] = getattr(source_resource, field) 204 # if no resource has any attribute set for this coupling 205 else: 206 for field in coupling.fields: 207 merged_data[field] = None 208 continue 209 210 # default: pick the first non-None value for this field 211 merged_data[f.name] = next( 212 ( 213 getattr(r, f.name) 214 for r in resources 215 if getattr(r, f.name) is not None 216 ), 217 None, 218 ) 219 220 return Resources(**merged_data) 221 222 @staticmethod 223 def _parse_size(value: object) -> Size | None: 224 """ 225 Convert a raw value into a `Size` instance if possible. 226 227 Args: 228 value (object): A Size object or a raw size value (a string or a dictionary). 229 230 Returns: 231 Size | None: A `Size` object if the input could be parsed, 232 otherwise `None`. 233 """ 234 if isinstance(value, str): 235 return Size.from_string(value) 236 if isinstance(value, dict): 237 return Size(**value) # ty: ignore[invalid-argument-type] 238 if isinstance(value, Size): 239 return value 240 return None 241 242 @staticmethod 243 def _parse_props(props: str) -> dict[str, str]: 244 """ 245 Parse a properties string into a dictionary of key/value pairs. 246 247 The input may contain multiple properties separated by commas, 248 whitespace, or colons. Each property can be one of the following forms: 249 - "key=value" - stored as {"key": "value"} 250 - "key" - stored as {"key": "true"} 251 - "^key" - stored as {"key": "false"} 252 253 Args: 254 props (str): A string containing job properties. 255 256 Returns: 257 dict[str, str]: Parsed properties as key/value pairs. 258 259 Raises: 260 QQError: If a property key is defined multiple times. 261 """ 262 # split into parts by commas, whitespace, or colons 263 parts = filter(None, re.split(r"[,\s:]+", props)) 264 265 result = {} 266 for part in parts: 267 if "=" in part: 268 # explicit key=value pair 269 key, value = part.split("=", 1) 270 elif part.startswith("^"): 271 # ^key means false 272 key, value = part.lstrip("^"), "false" 273 else: 274 # bare key means true 275 key, value = part, "true" 276 277 if key in result: 278 raise QQError(f"Property '{key}' is defined multiple times.") 279 result[key] = value 280 281 return result 282 283 def _props_to_value(self) -> str | None: 284 """ 285 Convert a properties dictionary into a command-line raw value string. 286 287 Args: 288 props (dict[str, str]): Mapping of property names to their string values. 289 290 Returns: 291 str | None: A comma-separated command-line representation of the property definitions 292 or None if the dictionary is empty. 293 """ 294 if not self.props: 295 return None 296 297 properties = [] 298 for key, value in self.props.items(): 299 if value == "true": 300 properties.append(key) 301 elif value == "false": 302 properties.append(f"^{key}") 303 else: 304 properties.append(f"{key}={value}") 305 306 return ",".join(properties)
Dataclass representing computational resources requested for a qq job.
84 def __init__( 85 self, 86 nnodes: int | str | None = None, 87 ncpus: int | str | None = None, 88 ncpus_per_node: int | str | None = None, 89 mem: Size | str | dict[str, object] | None = None, 90 mem_per_node: Size | str | dict[str, object] | None = None, 91 mem_per_cpu: Size | str | dict[str, object] | None = None, 92 ngpus: int | str | None = None, 93 ngpus_per_node: int | str | None = None, 94 walltime: str | None = None, 95 work_dir: str | None = None, 96 work_size: Size | str | dict[str, object] | None = None, 97 work_size_per_node: Size | str | dict[str, object] | None = None, 98 work_size_per_cpu: Size | str | dict[str, object] | None = None, 99 props: dict[str, str] | str | None = None, 100 ): 101 # convert sizes 102 mem = Resources._parse_size(mem) 103 mem_per_node = Resources._parse_size(mem_per_node) 104 mem_per_cpu = Resources._parse_size(mem_per_cpu) 105 work_size = Resources._parse_size(work_size) 106 work_size_per_node = Resources._parse_size(work_size_per_node) 107 work_size_per_cpu = Resources._parse_size(work_size_per_cpu) 108 109 # convert walltime 110 if isinstance(walltime, str) and ":" not in walltime: 111 walltime = wdhms_to_hhmmss(walltime) 112 113 # convert properties to dictionary 114 if isinstance(props, str): 115 props = Resources._parse_props(props) 116 117 # convert nnodes, ncpus, and ngpus to integers 118 if isinstance(nnodes, str): 119 nnodes = int(nnodes) 120 if isinstance(ncpus, str): 121 ncpus = int(ncpus) 122 if isinstance(ncpus_per_node, str): 123 ncpus_per_node = int(ncpus_per_node) 124 if isinstance(ngpus, str): 125 ngpus = int(ngpus) 126 if isinstance(ngpus_per_node, str): 127 ngpus_per_node = int(ngpus_per_node) 128 129 # set attributes 130 self.nnodes = nnodes 131 self.ncpus = ncpus 132 self.ncpus_per_node = ncpus_per_node 133 self.mem = mem 134 self.mem_per_node = mem_per_node 135 self.mem_per_cpu = mem_per_cpu 136 self.ngpus = ngpus 137 self.ngpus_per_node = ngpus_per_node 138 self.walltime = walltime 139 self.work_dir = work_dir 140 self.work_size = work_size 141 self.work_size_per_node = work_size_per_node 142 self.work_size_per_cpu = work_size_per_cpu 143 self.props = props 144 145 # enforce coupling rules 146 self.__post_init__() # ty: ignore[unresolved-attribute] 147 148 logger.debug(f"Resources: {self}")
150 def to_dict(self) -> dict[str, object]: 151 """Return all fields as a dict, excluding fields set to None.""" 152 return {k: v for k, v in asdict(self).items() if v is not None}
Return all fields as a dict, excluding fields set to None.
154 def uses_scratch(self) -> bool: 155 """ 156 Determine if the job uses a scratch directory. 157 158 Returns: 159 bool: True if a work_dir is not 'job_dir' or 'input_dir', otherwise False. 160 """ 161 return not equals_normalized( 162 str(self.work_dir), "job_dir" 163 ) and not equals_normalized(str(self.work_dir), "input_dir")
Determine if the job uses a scratch directory.
Returns:
bool: True if a work_dir is not 'job_dir' or 'input_dir', otherwise False.
165 @staticmethod 166 def merge_resources(*resources: "Resources") -> "Resources": 167 """ 168 Merge multiple Resources objects. 169 170 Earlier resources take precedence over later ones. Properties are merged. 171 172 If either field in a coupling is set in an earlier resource, both fields of 173 that coupling are taken from that resource and ignore later resources. 174 (This means that if e.g. a `mem-per-cpu` is set by the user, 175 it will not be overwritten by a default absolute `mem` value set by a queue, 176 even though `mem` is a dominant attribute and `mem-per-cpu` is recessive.) 177 178 Args: 179 *resources (Resources): One or more Resources objects, in order of precedence. 180 181 Returns: 182 Resources: A new Resources object with merged fields. 183 """ 184 merged_data = {} 185 processed_couplings: set[FieldCoupling] = set() 186 187 for f in fields(Resources): 188 # check if this field is part of a coupling 189 if coupling := Resources.get_coupling_for_field(f.name): 190 # skip if coupling already processed 191 if coupling in processed_couplings: 192 continue 193 processed_couplings.add(coupling) 194 195 # find first resource where either field in the coupling is set 196 source_resource = next( 197 (r for r in resources if coupling.has_value(r)), None 198 ) 199 200 # set all fields of the coupling 201 if source_resource: 202 for field in coupling.fields: 203 merged_data[field] = getattr(source_resource, field) 204 # if no resource has any attribute set for this coupling 205 else: 206 for field in coupling.fields: 207 merged_data[field] = None 208 continue 209 210 # default: pick the first non-None value for this field 211 merged_data[f.name] = next( 212 ( 213 getattr(r, f.name) 214 for r in resources 215 if getattr(r, f.name) is not None 216 ), 217 None, 218 ) 219 220 return Resources(**merged_data)
Merge multiple Resources objects.
Earlier resources take precedence over later ones. Properties are merged.
If either field in a coupling is set in an earlier resource, both fields of
that coupling are taken from that resource and ignore later resources.
(This means that if e.g. a mem-per-cpu is set by the user,
it will not be overwritten by a default absolute mem value set by a queue,
even though mem is a dominant attribute and mem-per-cpu is recessive.)
Arguments:
- *resources (Resources): One or more Resources objects, in order of precedence.
Returns:
Resources: A new Resources object with merged fields.
83 @staticmethod 84 def get_coupling_for_field(field_name: str) -> FieldCoupling | None: 85 for coupling in cls._field_couplings: 86 if coupling.contains(field_name): 87 return coupling 88 89 return None
The type of the None singleton.