Spaces:
Running
Running
# Copyright (C) 2012 Anaconda, Inc | |
# SPDX-License-Identifier: BSD-3-Clause | |
from itertools import chain | |
import os | |
import re | |
import json | |
from conda.base.context import context | |
from conda.exceptions import EnvironmentFileEmpty, EnvironmentFileNotFound | |
from conda.cli import common # TODO: this should never have to import form conda.cli | |
from conda.common.serialize import yaml_safe_load, yaml_safe_dump | |
from conda.core.prefix_data import PrefixData | |
from conda.gateways.connection.download import download_text | |
from conda.gateways.connection.session import CONDA_SESSION_SCHEMES | |
from conda.models.enums import PackageType | |
from conda.models.match_spec import MatchSpec | |
from conda.models.prefix_graph import PrefixGraph | |
from conda.history import History | |
from conda.common.iterators import groupby_to_dict as groupby, unique | |
VALID_KEYS = ('name', 'dependencies', 'prefix', 'channels', 'variables') | |
def validate_keys(data, kwargs): | |
"""Check for unknown keys, remove them and print a warning.""" | |
invalid_keys = [] | |
new_data = data.copy() if data else {} | |
for key in data.keys(): | |
if key not in VALID_KEYS: | |
invalid_keys.append(key) | |
new_data.pop(key) | |
if invalid_keys: | |
filename = kwargs.get('filename') | |
verb = 'are' if len(invalid_keys) != 1 else 'is' | |
plural = 's' if len(invalid_keys) != 1 else '' | |
print("\nEnvironmentSectionNotValid: The following section{plural} on " | |
"'{filename}' {verb} invalid and will be ignored:" | |
"".format(filename=filename, plural=plural, verb=verb)) | |
for key in invalid_keys: | |
print(f" - {key}") | |
print() | |
deps = data.get('dependencies', []) | |
depsplit = re.compile(r"[<>~\s=]") | |
is_pip = lambda dep: 'pip' in depsplit.split(dep)[0].split('::') | |
lists_pip = any(is_pip(_) for _ in deps if not isinstance(_, dict)) | |
for dep in deps: | |
if (isinstance(dep, dict) and 'pip' in dep and not lists_pip): | |
print("Warning: you have pip-installed dependencies in your environment file, " | |
"but you do not list pip itself as one of your conda dependencies. Conda " | |
"may not use the correct pip to install your packages, and they may end up " | |
"in the wrong place. Please add an explicit pip dependency. I'm adding one" | |
" for you, but still nagging you.") | |
new_data['dependencies'].insert(0, 'pip') | |
break | |
return new_data | |
def load_from_directory(directory): | |
"""Load and return an ``Environment`` from a given ``directory``""" | |
files = ['environment.yml', 'environment.yaml'] | |
while True: | |
for f in files: | |
try: | |
return from_file(os.path.join(directory, f)) | |
except EnvironmentFileNotFound: | |
pass | |
old_directory = directory | |
directory = os.path.dirname(directory) | |
if directory == old_directory: | |
break | |
raise EnvironmentFileNotFound(files[0]) | |
# TODO tests!!! | |
def from_environment(name, prefix, no_builds=False, ignore_channels=False, from_history=False): | |
""" | |
Get environment object from prefix | |
Args: | |
name: The name of environment | |
prefix: The path of prefix | |
no_builds: Whether has build requirement | |
ignore_channels: whether ignore_channels | |
from_history: Whether environment file should be based on explicit specs in history | |
Returns: Environment object | |
""" | |
# requested_specs_map = History(prefix).get_requested_specs_map() | |
pd = PrefixData(prefix, pip_interop_enabled=True) | |
variables = pd.get_environment_env_vars() | |
if from_history: | |
history = History(prefix).get_requested_specs_map() | |
deps = [str(package) for package in history.values()] | |
return Environment(name=name, dependencies=deps, channels=list(context.channels), | |
prefix=prefix, variables=variables) | |
precs = tuple(PrefixGraph(pd.iter_records()).graph) | |
grouped_precs = groupby(lambda x: x.package_type, precs) | |
conda_precs = sorted( | |
( | |
*grouped_precs.get(None, ()), | |
*grouped_precs.get(PackageType.NOARCH_GENERIC, ()), | |
*grouped_precs.get(PackageType.NOARCH_PYTHON, ()), | |
), | |
key=lambda x: x.name, | |
) | |
pip_precs = sorted( | |
( | |
*grouped_precs.get(PackageType.VIRTUAL_PYTHON_WHEEL, ()), | |
*grouped_precs.get(PackageType.VIRTUAL_PYTHON_EGG_MANAGEABLE, ()), | |
*grouped_precs.get(PackageType.VIRTUAL_PYTHON_EGG_UNMANAGEABLE, ()), | |
# *grouped_precs.get(PackageType.SHADOW_PYTHON_EGG_LINK, ()), | |
), | |
key=lambda x: x.name, | |
) | |
if no_builds: | |
dependencies = ['='.join((a.name, a.version)) for a in conda_precs] | |
else: | |
dependencies = ['='.join((a.name, a.version, a.build)) for a in conda_precs] | |
if pip_precs: | |
dependencies.append({"pip": [f"{a.name}=={a.version}" for a in pip_precs]}) | |
channels = list(context.channels) | |
if not ignore_channels: | |
for prec in conda_precs: | |
canonical_name = prec.channel.canonical_name | |
if canonical_name not in channels: | |
channels.insert(0, canonical_name) | |
return Environment(name=name, dependencies=dependencies, channels=channels, prefix=prefix, | |
variables=variables) | |
def from_yaml(yamlstr, **kwargs): | |
"""Load and return a ``Environment`` from a given ``yaml string``""" | |
data = yaml_safe_load(yamlstr) | |
filename = kwargs.get("filename") | |
if data is None: | |
raise EnvironmentFileEmpty(filename) | |
data = validate_keys(data, kwargs) | |
if kwargs is not None: | |
for key, value in kwargs.items(): | |
data[key] = value | |
_expand_channels(data) | |
return Environment(**data) | |
def _expand_channels(data): | |
"""Expands environment variables for the channels found in the yaml data""" | |
data["channels"] = [os.path.expandvars(channel) for channel in data.get("channels", [])] | |
def from_file(filename): | |
url_scheme = filename.split("://", 1)[0] | |
if url_scheme in CONDA_SESSION_SCHEMES: | |
yamlstr = download_text(filename) | |
elif not os.path.exists(filename): | |
raise EnvironmentFileNotFound(filename) | |
else: | |
with open(filename, 'rb') as fp: | |
yamlb = fp.read() | |
try: | |
yamlstr = yamlb.decode('utf-8') | |
except UnicodeDecodeError: | |
yamlstr = yamlb.decode('utf-16') | |
return from_yaml(yamlstr, filename=filename) | |
# TODO test explicitly | |
class Dependencies(dict): | |
def __init__(self, raw, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.raw = raw | |
self.parse() | |
def parse(self): | |
if not self.raw: | |
return | |
self.update({'conda': []}) | |
for line in self.raw: | |
if isinstance(line, dict): | |
self.update(line) | |
else: | |
self['conda'].append(common.arg2spec(line)) | |
if 'pip' in self: | |
if not self['pip']: | |
del self['pip'] | |
if not any(MatchSpec(s).name == 'pip' for s in self['conda']): | |
self['conda'].append('pip') | |
# TODO only append when it's not already present | |
def add(self, package_name): | |
self.raw.append(package_name) | |
self.parse() | |
class Environment: | |
def __init__(self, name=None, filename=None, channels=None, | |
dependencies=None, prefix=None, variables=None): | |
self.name = name | |
self.filename = filename | |
self.prefix = prefix | |
self.dependencies = Dependencies(dependencies) | |
self.variables = variables | |
if channels is None: | |
channels = [] | |
self.channels = channels | |
def add_channels(self, channels): | |
self.channels = list(unique(chain.from_iterable((channels, self.channels)))) | |
def remove_channels(self): | |
self.channels = [] | |
def to_dict(self, stream=None): | |
d = {"name": self.name} | |
if self.channels: | |
d['channels'] = self.channels | |
if self.dependencies: | |
d['dependencies'] = self.dependencies.raw | |
if self.variables: | |
d['variables'] = self.variables | |
if self.prefix: | |
d['prefix'] = self.prefix | |
if stream is None: | |
return d | |
stream.write(json.dumps(d)) | |
def to_yaml(self, stream=None): | |
d = self.to_dict() | |
out = yaml_safe_dump(d, stream) | |
if stream is None: | |
return out | |
def save(self): | |
with open(self.filename, "wb") as fp: | |
self.to_yaml(stream=fp) | |