aboutsummaryrefslogtreecommitdiff
blob: 0e5d2748f08d65efdf3e29e3bb4bc5b6c759bc35 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#!/usr/bin/python

"""Data collection module"""

import re
from portage import os
import glob
import stat

import portage
from portage import _encodings, _unicode_encode
from portage.output import blue, yellow
from .settings import parse_revdep_config


def parse_conf(conf_file, visited=None, logger=None):
    """Parses supplied conf_file for libraries pathes.
    conf_file is file or files to parse
    visited is set of files already parsed
    """
    lib_dirs = set()
    to_parse = set()

    if isinstance(conf_file, str):
        conf_file = [conf_file]

    for conf in conf_file:
        try:
            with open(
                _unicode_encode(conf, encoding=_encodings["fs"]),
                encoding=_encodings["content"],
            ) as _file:
                for line in _file.readlines():
                    line = line.strip()
                    if line.startswith("#"):
                        continue
                    elif line.startswith("include"):
                        include_line = line.split()[1:]
                        for included in include_line:
                            if not included.startswith("/"):
                                path = os.path.join(os.path.dirname(conf), included)
                            else:
                                path = included

                            to_parse.update(glob.glob(path))
                    else:
                        lib_dirs.add(line)
        except EnvironmentError:
            logger.warn("\t" + yellow("Error when parsing file %s" % conf))

    if visited is None:
        visited = set()

    visited.update(conf_file)
    to_parse = to_parse.difference(visited)
    if to_parse:
        lib_dirs.update(parse_conf(to_parse, visited, logger=logger))

    return lib_dirs


def prepare_search_dirs(logger, settings):
    """Lookup for search dirs. Returns tuple with two lists,
    (list_of_bin_dirs, list_of_lib_dirs)
    """

    bin_dirs = set(
        [
            "/bin",
            "/usr/bin",
        ]
    )
    lib_dirs = set(
        [
            "/lib",
            "/usr/lib",
        ]
    )

    # try:
    with open(
        _unicode_encode(
            os.path.join(portage.root, settings["DEFAULT_ENV_FILE"]),
            encoding=_encodings["fs"],
        ),
        mode="r",
        encoding=_encodings["content"],
    ) as _file:
        for line in _file.readlines():
            line = line.strip()
            match = re.match(r"^export (ROOT)?PATH='([^']+)'", line)
            if match is not None:
                bin_dirs.update(set(match.group(2).split(":")))
    # except EnvironmentError:
    # logger.debug('\t' + yellow('Could not open file %s' % f))

    lib_dirs = parse_conf(settings["DEFAULT_LD_FILE"], logger=logger)
    return (bin_dirs, lib_dirs)


def collect_libraries_from_dir(dirs, mask, logger):
    """Collects all libraries from specified list of directories.
    mask is list of pathes, that are ommited in scanning, can be eighter single file or entire directory
    Returns tuple composed of: list of libraries, list of symlinks, and toupe with pair
    (symlink_id, library_id) for resolving dependencies
    """

    # contains list of directories found
    # allows us to reduce number of fnc calls
    found_directories = set()
    found_files = set()
    found_symlinks = set()
    found_la_files = set()  # la libraries

    for _dir in dirs:
        if _dir in mask:
            continue

        try:
            for _listing in os.listdir(_dir):
                listing = os.path.join(_dir, _listing)
                if listing in mask or _listing in mask:
                    continue

                if os.path.isdir(listing):
                    if os.path.islink(listing):
                        # we do not want scan symlink-directories
                        pass
                    else:
                        found_directories.add(listing)
                elif os.path.isfile(listing):
                    if (
                        listing.endswith(".so")
                        or listing.endswith(".a")
                        or ".so." in listing
                    ):

                        if os.path.islink(listing):
                            found_symlinks.add(listing)
                        else:
                            found_files.add(listing)
                        continue
                    elif listing.endswith(".la"):
                        if listing in found_la_files:
                            continue

                        found_la_files.add(listing)
                    else:
                        # sometimes there are binaries in libs' subdir,
                        # for example in nagios
                        if not os.path.islink(listing):
                            # if listing in found_files or listing in found_symlinks:
                            # continue
                            prv = os.stat(listing)[stat.ST_MODE]
                            if (
                                prv & stat.S_IXUSR == stat.S_IXUSR
                                or prv & stat.S_IXGRP == stat.S_IXGRP
                                or prv & stat.S_IXOTH == stat.S_IXOTH
                            ):
                                found_files.add(listing)
        except Exception as ex:
            logger.debug(
                "\t" + yellow("Exception collecting libraries: " + blue("%s") % str(ex))
            )

    if found_directories:
        _file, la_file, link = collect_libraries_from_dir(
            found_directories, mask, logger
        )
        found_files.update(_file)
        found_la_files.update(la_file)
        found_symlinks.update(link)
    return (found_files, found_la_files, found_symlinks)


def collect_binaries_from_dir(dirs, mask, logger):
    """Collects all binaries from specified list of directories.
    mask is list of pathes, that are ommited in scanning,
    can be eighter single file or entire directory
    Returns list of binaries
    """

    # contains list of directories found
    # allows us to reduce number of fnc calls
    found_directories = set()
    found_files = set()

    for _dir in dirs:
        if _dir in mask:
            continue

        try:
            for _listing in os.listdir(_dir):
                listing = os.path.join(_dir, _listing)
                if listing in mask or _listing in mask:
                    continue

                if os.path.isdir(listing):
                    if os.path.islink(listing):
                        # we do not want scan symlink-directories
                        pass
                    else:
                        found_directories.add(listing)
                elif os.path.isfile(listing):
                    # we're looking for binaries
                    # and with binaries we do not need links
                    # thus we can optimize a bit
                    if not os.path.islink(listing):
                        prv = os.stat(listing)[stat.ST_MODE]
                        if (
                            prv & stat.S_IXUSR == stat.S_IXUSR
                            or prv & stat.S_IXGRP == stat.S_IXGRP
                            or prv & stat.S_IXOTH == stat.S_IXOTH
                        ):
                            found_files.add(listing)
        except Exception as ex:
            logger.debug(
                "\t"
                + yellow(
                    "Exception during binaries collecting: " + blue("%s") % str(ex)
                )
            )

    if found_directories:
        found_files.update(collect_binaries_from_dir(found_directories, mask, logger))

    return found_files


if __name__ == "__main__":
    import logging

    bin_dirs, lib_dirs = prepare_search_dirs(logging)

    masked_dirs, masked_files, ld = parse_revdep_config()
    lib_dirs.update(ld)
    bin_dirs.update(ld)
    masked_dirs.update(
        set(
            [
                "/lib/modules",
                "/lib32/modules",
                "/lib64/modules",
            ]
        )
    )

    libraries, la_libraries, libraries_links = collect_libraries_from_dir(
        lib_dirs, masked_dirs, logging
    )
    binaries = collect_binaries_from_dir(bin_dirs, masked_dirs, logging)

    logging.debug(
        "Found: %i binaries and %i libraries." % (len(binaries), len(libraries))
    )