Coverage for C:\Repos\ekr-pylint\pylint\lint\parallel.py: 29%
70 statements
« prev ^ index » next coverage.py v6.4, created at 2022-05-24 10:21 -0500
« prev ^ index » next coverage.py v6.4, created at 2022-05-24 10:21 -0500
1# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
2# For details: https://github.com/PyCQA/pylint/blob/main/LICENSE
3# Copyright (c) https://github.com/PyCQA/pylint/blob/main/CONTRIBUTORS.txt
5from __future__ import annotations
7import functools
8import warnings
9from collections import defaultdict
10from collections.abc import Iterable, Sequence
11from typing import TYPE_CHECKING, Any
13import dill
15from pylint import reporters
16from pylint.lint.utils import _patch_sys_path
17from pylint.message import Message
18from pylint.typing import FileItem
19from pylint.utils import LinterStats, merge_stats
21try:
22 import multiprocessing
23except ImportError:
24 multiprocessing = None # type: ignore[assignment]
26if TYPE_CHECKING:
27 from pylint.lint import PyLinter
29# PyLinter object used by worker processes when checking files using multiprocessing
30# should only be used by the worker processes
31_worker_linter: PyLinter | None = None
34def _worker_initialize(
35 linter: bytes, arguments: None | str | Sequence[str] = None
36) -> None:
37 """Function called to initialize a worker for a Process within a multiprocessing Pool.
39 :param linter: A linter-class (PyLinter) instance pickled with dill
40 :param arguments: File or module name(s) to lint and to be added to sys.path
41 """
42 global _worker_linter # pylint: disable=global-statement
43 _worker_linter = dill.loads(linter)
44 assert _worker_linter
46 # On the worker process side the messages are just collected and passed back to
47 # parent process as _worker_check_file function's return value
48 _worker_linter.set_reporter(reporters.CollectingReporter())
49 _worker_linter.open()
51 # Patch sys.path so that each argument is importable just like in single job mode
52 _patch_sys_path(arguments or ())
55def _worker_check_single_file(
56 file_item: FileItem,
57) -> tuple[
58 int,
59 # TODO: 3.0: Make this only str after deprecation has been removed
60 str | None,
61 str,
62 str | None,
63 list[Message],
64 LinterStats,
65 int,
66 defaultdict[str, list[Any]],
67]:
68 if not _worker_linter:
69 raise Exception("Worker linter not yet initialised")
70 _worker_linter.open()
71 _worker_linter.check_single_file_item(file_item)
72 mapreduce_data = defaultdict(list)
73 for checker in _worker_linter.get_checkers():
74 data = checker.get_map_data()
75 if data is not None:
76 mapreduce_data[checker.name].append(data)
77 msgs = _worker_linter.reporter.messages
78 assert isinstance(_worker_linter.reporter, reporters.CollectingReporter)
79 _worker_linter.reporter.reset()
80 if _worker_linter.current_name is None:
81 warnings.warn(
82 (
83 "In pylint 3.0 the current_name attribute of the linter object should be a string. "
84 "If unknown it should be initialized as an empty string."
85 ),
86 DeprecationWarning,
87 )
88 return (
89 id(multiprocessing.current_process()),
90 _worker_linter.current_name,
91 file_item.filepath,
92 _worker_linter.file_state.base_name,
93 msgs,
94 _worker_linter.stats,
95 _worker_linter.msg_status,
96 mapreduce_data,
97 )
100def _merge_mapreduce_data(
101 linter: PyLinter,
102 all_mapreduce_data: defaultdict[int, list[defaultdict[str, list[Any]]]],
103) -> None:
104 """Merges map/reduce data across workers, invoking relevant APIs on checkers."""
105 # First collate the data and prepare it, so we can send it to the checkers for
106 # validation. The intent here is to collect all the mapreduce data for all checker-
107 # runs across processes - that will then be passed to a static method on the
108 # checkers to be reduced and further processed.
109 collated_map_reduce_data: defaultdict[str, list[Any]] = defaultdict(list)
110 for linter_data in all_mapreduce_data.values():
111 for run_data in linter_data:
112 for checker_name, data in run_data.items():
113 collated_map_reduce_data[checker_name].extend(data)
115 # Send the data to checkers that support/require consolidated data
116 original_checkers = linter.get_checkers()
117 for checker in original_checkers:
118 if checker.name in collated_map_reduce_data:
119 # Assume that if the check has returned map/reduce data that it has the
120 # reducer function
121 checker.reduce_map_data(linter, collated_map_reduce_data[checker.name])
124def check_parallel(
125 linter: PyLinter,
126 jobs: int,
127 files: Iterable[FileItem],
128 arguments: None | str | Sequence[str] = None,
129) -> None:
130 """Use the given linter to lint the files with given amount of workers (jobs).
132 This splits the work filestream-by-filestream. If you need to do work across
133 multiple files, as in the similarity-checker, then implement the map/reduce mixin functionality.
134 """
135 # The linter is inherited by all the pool's workers, i.e. the linter
136 # is identical to the linter object here. This is required so that
137 # a custom PyLinter object can be used.
138 initializer = functools.partial(_worker_initialize, arguments=arguments)
139 with multiprocessing.Pool(
140 jobs, initializer=initializer, initargs=[dill.dumps(linter)]
141 ) as pool:
142 linter.open()
143 all_stats = []
144 all_mapreduce_data: defaultdict[
145 int, list[defaultdict[str, list[Any]]]
146 ] = defaultdict(list)
148 # Maps each file to be worked on by a single _worker_check_single_file() call,
149 # collecting any map/reduce data by checker module so that we can 'reduce' it
150 # later.
151 for (
152 worker_idx, # used to merge map/reduce data across workers
153 module,
154 file_path,
155 base_name,
156 messages,
157 stats,
158 msg_status,
159 mapreduce_data,
160 ) in pool.imap_unordered(_worker_check_single_file, files):
161 linter.file_state.base_name = base_name
162 linter.file_state._is_base_filestate = False
163 linter.set_current_module(module, file_path)
164 for msg in messages:
165 linter.reporter.handle_message(msg)
166 all_stats.append(stats)
167 all_mapreduce_data[worker_idx].append(mapreduce_data)
168 linter.msg_status |= msg_status
170 pool.close()
171 pool.join()
173 _merge_mapreduce_data(linter, all_mapreduce_data)
174 linter.stats = merge_stats([linter.stats] + all_stats)