پیاده سازی نگاشت کاهش (MapReduce) در پایتون — به زبان ساده

آخرین به‌روزرسانی: ۱۲ تیر ۱۴۰۲
«نگاشت کاهش» (MapReduce)، یک مدل بی‌نظیر است که پردازش مجموعه داده‌های دارای نمونه‌های زیاد (مجموعه داده‌های بزرگ) را آسان‌تر کرده است. این مدل، کاربردهای متعددی در مسائل تحلیل داده دارد و در مسائل جهان واقعی متعددی مورد استفاده قرار می‌گیرد. از این رو، در این مطلب روش پیاده سازی نگاشت کاهش در زبان برنامه‌نویسی پایتون شرح داده شده است. همچنین، مدل در یک پروژه عملی مورد بررسی قرار گرفته است. در این پروژه، تعداد کلمات کتاب رمان «Unveiling a Parallel» شمارش خواهد شد. در ادامه، برخی از پیش‌نیازهای لازم و موجود برای پیاده‌سازی این کد بیان شده است.

  • ماژول «چندپردازشی» (multiprocessing) برای پخش کردن پردازش‌ها، که با فراخوانی متد ()start روی شی Process ساخته شده مورد استفاده قرار می‌گیرد.
  • یک فایل خروجی متناظر با هر «رشته اجرایی» (thread) «کاهش» (Reduce) وجود دارد.
  • خروجی‌ها را می‌توان در پایان  در یک فایل یکتا ادغام کرد.
  • نتایج گام نگاشت (map)، با استفاده از «نشانه‌گذاری شی جاوا اسکریپت» (نشانه‌گذاری شی جاوا اسکریپت | JSON) ذخیره می‌شوند (مانند فایل‌های خروجی برای هر رشته اجرایی خروجی).

کاربر در پایان ممکن است تصمیم بگیرد که این فایل‌ها را حذف و یا رها کند.

پیاده سازی نگاشت کاهش (MapReduce)

ابتدا، اولین کاری که باید انجام شود نوشتن کلاس MapReduce است که نقش رابط را برای پیاده‌سازی شدن توسط کاربر، دارد. این کلاس دارای دو متد است: mapper و reducer که بعدا پیاده‌سازی خواهند شد (مثالی از پیاده‌سازی یک شمارنده لغات با استفاده از MapReduce در ادامه و در بخش مثال شمارنده کلمات، ارائه شده است).

بنابراین، کار نوشتن الگوریتم، با نوشتن کد زیر آغاز می‌شود:

1import settings
2class MapReduce(object):
3    """MapReduce class representing the mapreduce model
4    note: the 'mapper' and 'reducer' methods must be
5    implemented to use the mapreduce model.
6    """
7    def __init__(self, input_dir = settings.default_input_dir, output_dir = settings.default_output_dir,
8                 n_mappers = settings.default_n_mappers, n_reducers = settings.default_n_reducers,
9                 clean = True):
10        """
11        :param input_dir: directory of the input files,
12        taken from the default settings if not provided
13        :param output_dir: directory of the output files,
14        taken from the default settings if not provided
15        :param n_mappers: number of mapper threads to use,
16        taken from the default settings if not provided
17        :param n_reducers: number of reducer threads to use,
18        taken from the default settings if not provided
19        :param clean: optional, if True temporary files are
20        deleted, True by default.
21        """
22        self.input_dir = input_dir
23        self.output_dir = output_dir
24        self.n_mappers = n_mappers
25        self.n_reducers = n_reducers
26        self.clean = clean
27    def mapper(self, key, value):
28        """outputs a list of key-value pairs, where the key is
29        potentially new and the values are of a potentially different type.
30        Note: this function is to be implemented.
31        :param key:
32        :param value:
33        """
34        pass
35    def reducer(self, key, values_list):
36        """Outputs a single value together with the provided key.
37        Note: this function is to be implemented.
38        :param key:
39        :param value_list:
40        """
41        pass

برای بررسی تنظیمات گوناگون، باید ماژول «تنظیمات» (settings) را بررسی کرد. سپس، نیاز به افزودن متد ()run برای کلاس MapReduce است که عملیات map و reduce را اجرا می‌کند. بدین منظور، نیاز به تعریف یک متد (run_mapper(index است (که در آن، index به رشته اجرایی کنونی ارجاع دارد) که از mapper استفاده می‌کند و نتایج را روی دیسک ذخیره می‌کند و (run_reducer(index که reducer را روی نتایج map اعمال و نتایج رار وی دیسک ذخیره می‌کند.

متد ()run تعداد دلخواهی از نگاشت‌کننده‌ها (mappers) و سپس، تعداد لازم از کاهش‌دهنده‌ها (reducers) را پخش می‌کند. شی Process از ماژول multiprocessing به صورت زیر مورد استفاده قرار می‌گیرد.

1def run_mapper(self, index):
2    """Runs the implemented mapper
3    :param index: the index of the thread to run on
4    """
5    # read a key
6    # read a value
7    # get the result of the mapper
8    # store the result to be used by the reducer
9    pass
10def run_reducer(self, index):
11    """Runs the implemented reducer
12    :param index: the index of the thread to run on
13    """
14    # load the results of the map
15    # for each key reduce the values
16    # store the results for this reducer 
17    pass
18def run(self):
19    """Executes the map and reduce operations
20    """
21    # initialize mappers list
22    map_workers = []
23    # initialize reducers list
24    rdc_workers = []
25    # run the map step
26    for thread_id in range(self.n_mappers):
27        p = Process(target=self.run_mapper, args=(thread_id,))
28        p.start()
29        map_workers.append(p)
30    [t.join() for t in map_workers]
31    # run the reduce step
32    for thread_id in range(self.n_reducers):
33        p = Process(target=self.run_reducer, args=(thread_id,))
34        p.start()
35        map_workers.append(p)
36    [t.join() for t in rdc_workers]

اکنون، باید متدهای run_mapper و run_reducer را تکمیل کرد. اما، از آنجا که این متدها نیازمند خواندن و ذخیره داده‌ها از یک فایل ورودی هستند، ابتدا باید یک کلاس FileHandler ساخت. این کلاس، فایل ورودی را با استفاده از متد split_file (همان number_of_splits) (که در آن تعداد تقسیمات یا همان number of splits، تعداد کل بخش‌هایی است که به عنوان نتیجه تقسیم‌بندی مورد نیاز هستند) تقسیم‌بندی می‌کند. کلاس FileHandler نیز با استفاده از متد join_files (داریم number_of_files ,clean ,sort ,decreasing) به خروجی‌ها می‌پیوندد (که در آن number_of_files تعداد کل فایل‌ها برای join است.

clean ،sort و decreasing مجموعه همه آرگومان‌های بولی دلخواه هستند که در این مثال، همه به طور پیش‌فرض در حالت True قرار دارند). clean بیان می‌کند که کاربر کجا می‌خواهد فایل‌های موقتی را بعد از join حذف کند، sort نشان می‌دهد که آیا نتایج مرتب شده یا نه و decreasing نشان می‌دهد که کاربر می‌خواهد مرتب‌سازی را به ترتیب معکوس انجام دهد یا خیر. باید این موضوع را به خاطر داشت که کار با نوشتن شی FileHandler به صورت زیر، آغاز شده است:

1class FileHandler(object):
2    """FileHandler class
3    Manages splitting input files and joining outputs together.
4    """
5    def __init__(self, input_file_path, output_dir):
6        """
7        Note: the input file path should be given for splitting.
8        The output directory is needed for joining the outputs.
9        :param input_file_path: input file path
10        :param output_dir: output directory path
11        """
12        self.input_file_path = input_file_path
13        self.output_dir = output_dir
14    def split_file(self, number_of_splits):
15        """split a file into multiple files.
16        :param number_of_splits: the number of splits.
17        """
18        pass
19    def join_files(self, number_of_files, clean = None, sort = True, decreasing = True):
20        """join all the files in the output directory into a
21        single output file.
22        :param number_of_files: total number of files.
23        :param clean: if True the reduce outputs will be deleted,
24        by default takes the value of self.clean.
25        :param sort: sort the outputs.
26        :param decreasing: sort by decreasing order, high value
27        to low value.
28        :return output_join_list: a list of the outputs
29        """
30        pass

سپس، نوشتن متدهای split و join کامل می‌شود.

1import os
2import json
4class FileHandler(object):
5    """FileHandler class
6    Manages splitting input files and joining outputs together.
7    """
8    def __init__(self, input_file_path, output_dir):
9        """
10        Note: the input file path should be given for splitting.
11        The output directory is needed for joining the outputs.
12        :param input_file_path: input file path
13        :param output_dir: output directory path
14        """
15        self.input_file_path = input_file_path
16        self.output_dir = output_dir
17    def begin_file_split(self, split_index, index):
18        """initialize a split file by opening and adding an index.
19        :param split_index: the split index we are currently on, to be used for naming the file.
20        :param index: the index given to the file.
21        """
22        file_split = open(settings.get_input_split_file(split_index-1), "w+")
23        file_split.write(str(index) + "\n")
24        return file_split
25    def is_on_split_position(self, character, index, split_size, current_split):
26        """Check if it is the right time to split.
27        i.e: character is a space and the limit has been reached.
28        :param character: the character we are currently on.
29        :param index: the index we are currently on.
30        :param split_size: the size of each single split.
31        :param current_split: the split we are currently on.
32        """
33        return index>split_size*current_split+1 and character.isspace()
34    def split_file(self, number_of_splits):
35        """split a file into multiple files.
36        note: this has not been optimized to avoid overhead.
37        :param number_of_splits: the number of chunks to
38        split the file into.
39        """
40        file_size = os.path.getsize(self.input_file_path)
41        unit_size = file_size / number_of_splits + 1
42        original_file = open(self.input_file_path, "r")
43        file_content = original_file.read()
44        original_file.close()
45        (index, current_split_index) = (1, 1)
46        current_split_unit = self.begin_file_split(current_split_index, index)
47        for character in file_content:
48            current_split_unit.write(character)
49            if self.is_on_split_position(character, index, unit_size, current_split_index):
50                current_split_unit.close()
51                current_split_index += 1
52                current_split_unit = self.begin_file_split(current_split_index,index)
53            index += 1
54        current_split_unit.close()

اکنون، می‌توان متدهای run_mapper و  run_reducer را مانند زیر کامل کرد:

1def run_mapper(self, index):
2    """Runs the implemented mapper
3    :param index: the index of the thread to run on
4    """
5    input_split_file = open(settings.get_input_split_file(index), "r")
6    key = input_split_file.readline()
7    value = input_split_file.read()
8    input_split_file.close()
9    if(self.clean):
10        os.unlink(settings.get_input_split_file(index))
11    mapper_result = self.mapper(key, value)
12    for reducer_index in range(self.n_reducers):
13        temp_map_file = open(settings.get_temp_map_file(index, reducer_index), "w+")
14        json.dump([(key, value) for (key, value) in mapper_result 
15                                    if self.check_position(key, reducer_index)]
16                    , temp_map_file)
17        temp_map_file.close()
19def run_reducer(self, index):
20    """Runs the implemented reducer
21    :param index: the index of the thread to run on
22    """
23    key_values_map = {}
24    for mapper_index in range(self.n_mappers):
25        temp_map_file = open(settings.get_temp_map_file(mapper_index, index), "r")
26        mapper_results = json.load(temp_map_file)
27        for (key, value) in mapper_results:
28            if not(key in key_values_map):
29                key_values_map[key] = []
30            try:
31                key_values_map[key].append(value)
32            except Exception, e:
33                print "Exception while inserting key: "+str(e)
34        temp_map_file.close()
35        if self.clean:
36            os.unlink(settings.get_temp_map_file(mapper_index, index))
37    key_value_list = []
38    for key in key_values_map:
39        key_value_list.append(self.reducer(key, key_values_map[key]))
40    output_file = open(settings.get_output_file(index), "w+")
41    json.dump(key_value_list, output_file)
42    output_file.close()

در نهایت، متد run اندکی ویرایش می‌شود تا کاربر قادر به تعیین این باشد که خروجی‌ها متصل شوند یا خیر. متد run به صورت زیر می‌شود:

1def run(self, join=False):
2    """Executes the map and reduce operations
3    :param join: True if we need to join the outputs, False by default.
4    """
5    # initialize mappers list
6    map_workers = []
7    # initialize reducers list
8    rdc_workers = []
9    # run the map step
10    for thread_id in range(self.n_mappers):
11        p = Process(target=self.run_mapper, args=(thread_id,))
12        p.start()
13        map_workers.append(p)
14    [t.join() for t in map_workers]
15    # run the reduce step
16    for thread_id in range(self.n_reducers):
17        p = Process(target=self.run_reducer, args=(thread_id,))
18        p.start()
19        map_workers.append(p)
20    [t.join() for t in rdc_workers]
21    if join:
22        self.join_outputs()

کد نهایی الگوریتم نگاشت کاهش (Map Reduce)، در زبان برنامه‌نویسی پایتون، به صورت زیر خواهد بود:

1import os
2import json
3import settings
4from multiprocessing import Process
7class FileHandler(object):
8    """FileHandler class
9    Manages splitting input files and joining outputs together.
11    """
12    def __init__(self, input_file_path, output_dir):
13        """
14        Note: the input file path should be given for splitting.
15        The output directory is needed for joining the outputs.
16        :param input_file_path: input file path
17        :param output_dir: output directory path
18        """
19        self.input_file_path = input_file_path
20        self.output_dir = output_dir
22    def initiate_file_split(self, split_index, index):
23        """initialize a split file by opening and adding an index.
24        :param split_index: the split index we are currently on, to be used for naming the file.
25        :param index: the index given to the file.
26        """
27        file_split = open(settings.get_input_split_file(split_index-1), "w+")
28        file_split.write(str(index) + "\n")
29        return file_split
31    def is_on_split_position(self, character, index, split_size, current_split):
32        """Check if it is the right time to split.
33        i.e: character is a space and the limit has been reached.
34        :param character: the character we are currently on.
35        :param index: the index we are currently on.
36        :param split_size: the size of each single split.
37        :param current_split: the split we are currently on.
38        """
39        return index>split_size*current_split+1 and character.isspace()
41    def split_file(self, number_of_splits):
42        """split a file into multiple files.
43        note: this has not been optimized to avoid overhead.
44        :param number_of_splits: the number of chunks to
45        split the file into.
46        """
47        file_size = os.path.getsize(self.input_file_path)
48        unit_size = file_size / number_of_splits + 1
49        original_file = open(self.input_file_path, "r")
50        file_content = original_file.read()
51        original_file.close()
52        (index, current_split_index) = (1, 1)
53        current_split_unit = self.initiate_file_split(current_split_index, index)
54        for character in file_content:
55            current_split_unit.write(character)
56            if self.is_on_split_position(character, index, unit_size, current_split_index):
57                current_split_unit.close()
58                current_split_index += 1
59                current_split_unit = self.initiate_file_split(current_split_index, index)
60            index += 1
61        current_split_unit.close()
63    def join_files(self, number_of_files, clean = False, sort = True, decreasing = True):
64        """join all the files in the output directory into a
65        single output file.
66        :param number_of_files: total number of files.
67        :param clean: if True the reduce outputs will be deleted,
68        by default takes the value of self.clean.
69        :param sort: sort the outputs.
70        :param decreasing: sort by decreasing order, high value
71        to low value.
72        :return output_join_list: a list of the outputs
73        """
74        output_join_list = []
75        for reducer_index in xrange(0, number_of_files):
76            f = open(settings.get_output_file(reducer_index), "r")
77            output_join_list += json.load(f)
78            f.close()
79            if clean:
80                os.unlink(settings.get_output_file(reducer_index))
81        if sort:
82            from operator import itemgetter as operator_ig
83            # sort using the key
84            output_join_list.sort(key=operator_ig(1), reverse=decreasing)
85        output_join_file = open(settings.get_output_join_file(self.output_dir), "w+")
86        json.dump(output_join_list, output_join_file)
87        output_join_file.close()
88        return output_join_list
90class MapReduce(object):
91    """MapReduce class
92    Note: mapper and reducer functions need to be implemented.
94    """
96    def __init__(self, input_dir = settings.default_input_dir, output_dir = settings.default_output_dir,
97                 n_mappers = settings.default_n_mappers, n_reducers = settings.default_n_reducers,
98                 clean = True):
99        """
100        :param input_dir: directory of the input files,
101        taken from the default settings if not provided
102        :param output_dir: directory of the output files,
103        taken from the default settings if not provided
104        :param n_mappers: number of mapper threads to use,
105        taken from the default settings if not provided
106        :param n_reducers: number of reducer threads to use,
107        taken from the default settings if not provided
108        :param clean: optional, if True temporary files are
109        deleted, True by default.
110        """
111        self.input_dir = input_dir
112        self.output_dir = output_dir
113        self.n_mappers = n_mappers
114        self.n_reducers = n_reducers
115        self.clean = clean
116        self.file_handler = FileHandler(settings.get_input_file(self.input_dir), self.output_dir)
117        self.file_handler.split_file(self.n_mappers)
119    def mapper(self, key, value):
120        """outputs a list of key-value pairs, where the key is
121        potentially new and the values are of a potentially different type.
122        Note: this function is to be implemented.
123        :param key: key of the current mapper
124        :param value: value for the corresponding key
125        note: this method should be implemented
126        """
127        pass
129    def reducer(self, key, values_list):
130        """Outputs a single value together with the provided key.
131        Note: this function is to be implemented.
132        :param key: key of the reducer
133        :param value_list: list of values for the key
134        note: this method should be implemented
135        """
136        pass
138    def check_position(self, key, position):
139        """Checks if we are on the right position
140        """
141        return position == (hash(key) % self.n_reducers)
143    def run_mapper(self, index):
144        """Runs the implemented mapper
145        :param index: the index of the thread to run on
146        """
147        input_split_file = open(settings.get_input_split_file(index), "r")
148        key = input_split_file.readline()
149        value = input_split_file.read()
150        input_split_file.close()
151        if(self.clean):
152            os.unlink(settings.get_input_split_file(index))
153        mapper_result = self.mapper(key, value)
154        for reducer_index in range(self.n_reducers):
155            temp_map_file = open(settings.get_temp_map_file(index, reducer_index), "w+")
156            json.dump([(key, value) for (key, value) in mapper_result 
157                                        if self.check_position(key, reducer_index)]
158                        , temp_map_file)
159            temp_map_file.close()
161    def run_reducer(self, index):
162        """Runs the implemented reducer
163        :param index: the index of the thread to run on
164        """
165        key_values_map = {}
166        for mapper_index in range(self.n_mappers):
167            temp_map_file = open(settings.get_temp_map_file(mapper_index, index), "r")
168            mapper_results = json.load(temp_map_file)
169            for (key, value) in mapper_results:
170                if not(key in key_values_map):
171                    key_values_map[key] = []
172                try:
173                    key_values_map[key].append(value)
174                except Exception, e:
175                    print "Exception while inserting key: "+str(e)
176            temp_map_file.close()
177            if self.clean:
178                os.unlink(settings.get_temp_map_file(mapper_index, index))
179        key_value_list = []
180        for key in key_values_map:
181            key_value_list.append(self.reducer(key, key_values_map[key]))
182        output_file = open(settings.get_output_file(index), "w+")
183        json.dump(key_value_list, output_file)
184        output_file.close()
186    def run(self, join=False):
187        """Executes the map and reduce operations
188        :param join: True if we need to join the outputs, False by default.
189        """
190        # initialize mappers list
191        map_workers = []
192        # initialize reducers list
193        rdc_workers = []
194        # run the map step
195        for thread_id in range(self.n_mappers):
196            p = Process(target=self.run_mapper, args=(thread_id,))
197            p.start()
198            map_workers.append(p)
199        [t.join() for t in map_workers]
200        # run the reduce step
201        for thread_id in range(self.n_reducers):
202            p = Process(target=self.run_reducer, args=(thread_id,))
203            p.start()
204            map_workers.append(p)
205        [t.join() for t in rdc_workers]
206        if join:
207            self.join_outputs()
209    def join_outputs(self, clean = True, sort = True, decreasing = True):
210        """Join all the reduce output files into a single output file.
212        :param clean: if True the reduce outputs will be deleted, by default takes the value of self.clean
213        :param sort: sort the outputs
214        :param decreasing: sort by decreasing order, high value to low value
216        """
217        try:
218            return self.file_handler.join_files(self.n_reducers, clean, sort, decreasing)
219        except Exception, e:
220            print "Exception occured while joining: maybe the join has been performed already  -- "+str(e)
221            return []

ماژول Settings

این ماژول حاوی تنظیمات و تابع‌های مفید پیش‌فرض برای ساخت نام‌های مسیر برای ورودی، خروجی و فایل‌های موقت است.

این متدهای کارآمد در قسمت «توضیحات» (comment) قطعه کد زیر شرح داده شده‌اند.

1# set default directory for the input files
2default_input_dir = "input_files"
3# set default directory for the temporary map files
4default_map_dir = "temp_map_files"
5# set default directory for the output files
6default_output_dir = "output_files"
7# set default number for the map and reduce threads
8default_n_mappers = 4
9default_n_reducers = 4
10# return the name of the input file to be split into chunks
11def get_input_file(input_dir = None, extension = ".ext"):
12    if not(input_dir is None):
13        return input_dir+"/file" + extension
14    return default_input_dir + "/file" + extension
17# return the name of the current split file corresponding to the given index
18def get_input_split_file(index, input_dir = None, extension = ".ext"):
19    if not(input_dir is None):
20        return input_dir+"/file_"+ str(index) + extension
21    return default_input_dir + "/file_" + str(index) + extension
24# return the name of the temporary map file corresponding to the given index
25def get_temp_map_file(index, reducer, output_dir = None, extension = ".ext"):
26    if not(output_dir is None):
27        return output_dir + "/map_file_" + str(index)+"-" + str(reducer) + extension
28    return default_output_dir + "/map_file_" + str(index) + "-" + str(reducer) + extension
30# return the name of the output file given its corresponding index
31def get_output_file(index, output_dir = None, extension = ".out"):
32    if not(output_dir is None):
33        return output_dir+"/reduce_file_"+ str(index) + extension
34    return default_output_dir + "/reduce_file_" + str(index) + extension
36# return the name of the output file
37def get_output_join_file(output_dir = None, extension = ".out"):
38    if not(output_dir is None):
39        return output_dir +"/output" + extension
40    return default_output_dir + "/output" + extension

مثال شمارنده کلمات

در این مثال، فرض می‌شود که یک سند وجود دارد و هدف، شمردن تعداد وقوع هر کلمه در آن است. برای انجام این کار، نیاز به تعریف عملیات نگاشت و کاهش است تا بتوان متدهای mapper و reducer از کلاس MapReduce را پیاده‌سازی کرد.

راهکار برای شمارش کلمات، خیلی ساده است:

map: متن تقسیم می‌شود، سپس کلماتی که فقط حاوی کاراکترهای ascii و حروف کوچک هستند شمارش می‌شوند. پس از آن، هر کلمه به عنوان کلیدی با شمار ۱ ارسال می‌شود.

reduce: به سادگی می‌توان همه مقادیر پیشین برای هر کلمه را جمع کرد.

بنابراین، کلاس MapReduce به صورت زیر پیاده‌سازی می‌شود.

1from mapreduce import MapReduce
2import sys
3class WordCount(MapReduce):
4    def __init__(self, input_dir, output_dir, n_mappers, n_reducers):
5        MapReduce.__init__(self,  input_dir, output_dir, n_mappers, n_reducers)
6    def mapper(self, key, value):
7        """Map function for the word count example
8        Note: Each line needs to be separated into words, and each word
9        needs to be converted to lower case.
10        """
11        results = []
12        default_count = 1
13        # seperate line into words
14        for word in value.split():
15            if self.is_valid_word(word):
16                # lowercase words

اگر نوشته بالا برای شما مفید بوده است، آموزش‌های زیر نیز به شما پیشنهاد می‌شوند:


