1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# -*- coding:utf-8 -*-
import copy
import itertools
import json
import os
import stat
import yaml
class ConfigError(Exception):
"""Raised when a config file fails to load"""
pass
def merge_config(base, head):
"""
Merge two JSON or YAML documents into a single object. Arrays are
merged by extension. If dissimilar types are encountered, then the
head value overwrites the base value.
"""
if isinstance(head, dict):
if not isinstance(base, dict):
return copy.deepcopy(head)
result = {}
for k in itertools.chain(head, base):
try:
result[k] = merge_config(base[k], head[k])
except KeyError:
try:
result[k] = copy.deepcopy(head[k])
except KeyError:
result[k] = copy.deepcopy(base[k])
elif isinstance(head, list):
result = []
if not isinstance(base, list):
result.extend(copy.deepcopy(x) for x in head)
else:
if any(isinstance(x, (dict, list)) for x in itertools.chain(head, base)):
# merge items with identical indexes
for x, y in zip(base, head):
if isinstance(x, (dict, list)):
result.append(merge_config(x, y))
else:
# head overwrites base (preserving index)
result.append(copy.deepcopy(y))
# copy remaining items from the longer list
if len(base) != len(head):
if len(base) > len(head):
result.extend(copy.deepcopy(x) for x in base[len(head):])
else:
result.extend(copy.deepcopy(x) for x in head[len(base):])
else:
result.extend(copy.deepcopy(x) for x in base)
result.extend(copy.deepcopy(x) for x in head)
else:
result = copy.deepcopy(head)
return result
def _yaml_load(filename):
"""
Load filename as YAML and return a dict. Raise ConfigError if
it fails to load.
"""
with open(filename, 'rt') as f:
try:
return yaml.safe_load(f)
except yaml.parser.ParserError as e:
raise ConfigError("{}: {}".format(filename, e))
def _json_load(filename):
"""
Load filename as JSON and return a dict. Raise ConfigError if
it fails to load.
"""
with open(filename, 'rt') as f:
try:
return json.load(f) #nosec
except ValueError as e:
raise ConfigError("{}: {}".format(filename, e))
def iter_files(files_dirs):
"""
Iterate over nested file paths in lexical order.
"""
stack = list(reversed(files_dirs))
while stack:
location = stack.pop()
try:
st = os.stat(location)
except FileNotFoundError:
continue
if stat.S_ISDIR(st.st_mode):
stack.extend(os.path.join(location, x)
for x in sorted(os.listdir(location), reverse=True))
elif stat.S_ISREG(st.st_mode):
yield location
def load_config(conf_dirs, file_extensions=None, valid_versions=None):
"""
Load JSON and/or YAML files from a directories, and merge them together
into a single object.
@param conf_dirs: ordered iterable of directories to load the config from
@param file_extensions: Optional list of file extension types to load
@param valid_versions: list of compatible file versions allowed
@returns: the stacked config
"""
result = {}
for filename in iter_files(conf_dirs):
if file_extensions is not None and not filename.endswith(file_extensions):
continue
loaders = []
extension = filename.rsplit('.', 1)[1]
if extension in ['json']:
loaders.append(_json_load)
elif extension in ['yml', 'yaml']:
loaders.append(_yaml_load)
config = None
exception = None
for loader in loaders:
try:
config = loader(filename) or {}
except ConfigError as e:
exception = e
else:
break
if config is None:
print("Repoman.config.load_config(), Error loading file: %s" % filename)
print(" Aborting...")
raise exception
if config:
if config['version'] not in valid_versions:
raise ConfigError("Invalid file version: %s in: %s\nPlease upgrade repoman: current valid versions: %s"
% (config['version'], filename, valid_versions))
result = merge_config(result, config)
return result
|