Coverage for C:\Repos\ekr-pylint\pylint\lint\parallel.py: 29%

70 statements  

« prev     ^ index     » next       coverage.py v6.4, created at 2022-05-24 10:21 -0500

1# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html 

2# For details: https://github.com/PyCQA/pylint/blob/main/LICENSE 

3# Copyright (c) https://github.com/PyCQA/pylint/blob/main/CONTRIBUTORS.txt 

4 

5from __future__ import annotations 

6 

7import functools 

8import warnings 

9from collections import defaultdict 

10from collections.abc import Iterable, Sequence 

11from typing import TYPE_CHECKING, Any 

12 

13import dill 

14 

15from pylint import reporters 

16from pylint.lint.utils import _patch_sys_path 

17from pylint.message import Message 

18from pylint.typing import FileItem 

19from pylint.utils import LinterStats, merge_stats 

20 

21try: 

22 import multiprocessing 

23except ImportError: 

24 multiprocessing = None # type: ignore[assignment] 

25 

26if TYPE_CHECKING: 

27 from pylint.lint import PyLinter 

28 

29# PyLinter object used by worker processes when checking files using multiprocessing 

30# should only be used by the worker processes 

31_worker_linter: PyLinter | None = None 

32 

33 

34def _worker_initialize( 

35 linter: bytes, arguments: None | str | Sequence[str] = None 

36) -> None: 

37 """Function called to initialize a worker for a Process within a multiprocessing Pool. 

38 

39 :param linter: A linter-class (PyLinter) instance pickled with dill 

40 :param arguments: File or module name(s) to lint and to be added to sys.path 

41 """ 

42 global _worker_linter # pylint: disable=global-statement 

43 _worker_linter = dill.loads(linter) 

44 assert _worker_linter 

45 

46 # On the worker process side the messages are just collected and passed back to 

47 # parent process as _worker_check_file function's return value 

48 _worker_linter.set_reporter(reporters.CollectingReporter()) 

49 _worker_linter.open() 

50 

51 # Patch sys.path so that each argument is importable just like in single job mode 

52 _patch_sys_path(arguments or ()) 

53 

54 

55def _worker_check_single_file( 

56 file_item: FileItem, 

57) -> tuple[ 

58 int, 

59 # TODO: 3.0: Make this only str after deprecation has been removed 

60 str | None, 

61 str, 

62 str | None, 

63 list[Message], 

64 LinterStats, 

65 int, 

66 defaultdict[str, list[Any]], 

67]: 

68 if not _worker_linter: 

69 raise Exception("Worker linter not yet initialised") 

70 _worker_linter.open() 

71 _worker_linter.check_single_file_item(file_item) 

72 mapreduce_data = defaultdict(list) 

73 for checker in _worker_linter.get_checkers(): 

74 data = checker.get_map_data() 

75 if data is not None: 

76 mapreduce_data[checker.name].append(data) 

77 msgs = _worker_linter.reporter.messages 

78 assert isinstance(_worker_linter.reporter, reporters.CollectingReporter) 

79 _worker_linter.reporter.reset() 

80 if _worker_linter.current_name is None: 

81 warnings.warn( 

82 ( 

83 "In pylint 3.0 the current_name attribute of the linter object should be a string. " 

84 "If unknown it should be initialized as an empty string." 

85 ), 

86 DeprecationWarning, 

87 ) 

88 return ( 

89 id(multiprocessing.current_process()), 

90 _worker_linter.current_name, 

91 file_item.filepath, 

92 _worker_linter.file_state.base_name, 

93 msgs, 

94 _worker_linter.stats, 

95 _worker_linter.msg_status, 

96 mapreduce_data, 

97 ) 

98 

99 

100def _merge_mapreduce_data( 

101 linter: PyLinter, 

102 all_mapreduce_data: defaultdict[int, list[defaultdict[str, list[Any]]]], 

103) -> None: 

104 """Merges map/reduce data across workers, invoking relevant APIs on checkers.""" 

105 # First collate the data and prepare it, so we can send it to the checkers for 

106 # validation. The intent here is to collect all the mapreduce data for all checker- 

107 # runs across processes - that will then be passed to a static method on the 

108 # checkers to be reduced and further processed. 

109 collated_map_reduce_data: defaultdict[str, list[Any]] = defaultdict(list) 

110 for linter_data in all_mapreduce_data.values(): 

111 for run_data in linter_data: 

112 for checker_name, data in run_data.items(): 

113 collated_map_reduce_data[checker_name].extend(data) 

114 

115 # Send the data to checkers that support/require consolidated data 

116 original_checkers = linter.get_checkers() 

117 for checker in original_checkers: 

118 if checker.name in collated_map_reduce_data: 

119 # Assume that if the check has returned map/reduce data that it has the 

120 # reducer function 

121 checker.reduce_map_data(linter, collated_map_reduce_data[checker.name]) 

122 

123 

124def check_parallel( 

125 linter: PyLinter, 

126 jobs: int, 

127 files: Iterable[FileItem], 

128 arguments: None | str | Sequence[str] = None, 

129) -> None: 

130 """Use the given linter to lint the files with given amount of workers (jobs). 

131 

132 This splits the work filestream-by-filestream. If you need to do work across 

133 multiple files, as in the similarity-checker, then implement the map/reduce mixin functionality. 

134 """ 

135 # The linter is inherited by all the pool's workers, i.e. the linter 

136 # is identical to the linter object here. This is required so that 

137 # a custom PyLinter object can be used. 

138 initializer = functools.partial(_worker_initialize, arguments=arguments) 

139 with multiprocessing.Pool( 

140 jobs, initializer=initializer, initargs=[dill.dumps(linter)] 

141 ) as pool: 

142 linter.open() 

143 all_stats = [] 

144 all_mapreduce_data: defaultdict[ 

145 int, list[defaultdict[str, list[Any]]] 

146 ] = defaultdict(list) 

147 

148 # Maps each file to be worked on by a single _worker_check_single_file() call, 

149 # collecting any map/reduce data by checker module so that we can 'reduce' it 

150 # later. 

151 for ( 

152 worker_idx, # used to merge map/reduce data across workers 

153 module, 

154 file_path, 

155 base_name, 

156 messages, 

157 stats, 

158 msg_status, 

159 mapreduce_data, 

160 ) in pool.imap_unordered(_worker_check_single_file, files): 

161 linter.file_state.base_name = base_name 

162 linter.file_state._is_base_filestate = False 

163 linter.set_current_module(module, file_path) 

164 for msg in messages: 

165 linter.reporter.handle_message(msg) 

166 all_stats.append(stats) 

167 all_mapreduce_data[worker_idx].append(mapreduce_data) 

168 linter.msg_status |= msg_status 

169 

170 pool.close() 

171 pool.join() 

172 

173 _merge_mapreduce_data(linter, all_mapreduce_data) 

174 linter.stats = merge_stats([linter.stats] + all_stats)