پردازش کلان داده در پایتون — راهنمای جامع

۸۲۳ بازدید
آخرین به‌روزرسانی: ۲۸ خرداد ۱۴۰۱
زمان مطالعه: ۳۰ دقیقه
پردازش کلان داده در پایتون — راهنمای جامع

اگر کاربران با دنیای علم و فناوری آشنایی داشته باشند، بدون شک با واژگان و اصطلاحاتی نظیر «علم داده» (Data Science)، «علم تجزیه و تحلیل» (Analytics Science)، «یادگیری ماشین» (Machine Learning)، «کلان داده» (Big Data) مواردی مشابه برخورد داشته‌اند. در دنیای فناوری امروز، از این اصطلاحات برای تعریف مجموعه‌ای از فناوری‌های مهم و پیشرفته حوزه در «علوم کامپیوتر» (Computer Science) استفاده می‌شود که پایه و اساس پیشرفت‌های فناوری در آینده نیز به شمار می‌آیند. در این مطلب، با مهم‌ترین ابزارهای علم داده جهت پردازش کلان داده آشنا خواهید. همچنین، مثال‌های ساده و عملی جهت پردازش کلان داده نمایش داده خواهند شد. علاوه بر این، استفاده از «زبان برنامه‌نویسی پایتون» (Python Programming language) برای تحلیل داده آموزش داده خواهد شد (از این دسته از تکنیک‌ها در مقیاس وسیع‌تر، جهت پردازش کلان داده نیز استفاده می‌شود).

علم داده یکی از حوزه‌های گسترده علوم کامپیوتر محسوب می‌شود و زیر شاخه‌های گسترده‌ای نظیر «جمع‌آوری داده» (Data Collection)، «پاک‌سازی داده» (Data Cleaning)، «استاندارد سازی داده» (Data Standardization)، «تحلیل داده» (Data Analysis) و «گزارش» (Reporting) را شامل می‌شود. بزرگ‌ترین شرکت‌های حوزه فناوری، از تکنیک‌های علم داده و پردازش کلان داده برای استخراج دانش از «داده‌های غیر ساخت یافته» (Unstructured Data) و «داده‎‌های ساخت یافته» (Structured Data) سازمانی استفاده می‌کنند.

پردازش کلان داده

پردازش کلان داده

تحلیل و پردازش کلان داده هم اکنون به یکی از ترندهای تحقیقاتی در دنیا تبدیل شده است. به همین خاطر است که فرصت‌های شغلی بسیار خوبی، برای برنامه‌نویسان و توسعه‌دهندگانی که با تکنیک‌های پردازش کلان داده آشنایی دارند، فراهم شده است. یک دانشمند علم داده و یا یک برنامه‌نویس مسلط به پردازش کلان داده و تحلیل آن می‌تواند از مجموعه ابزارهای تولید شده برای پردازش کلان داده، جهت «تحلیل زبان» (Language Analysis)، «پیشنهاد فایل‌های ویدئویی» (Recommending Videos) و یا مشخص کردن محصولات جدید با توجه به داده‌های بازاریابی یا داده‌های جمع‌آوری شده از مشتریان استفاده کند.

به طور کلی، زمانی که تکنیک‌های موجود در حوزه پردازش کلان داده در علم داده مورد بررسی قرار می‌گیرند، منظور تکنیک‌هایی هستند که در حوزه خاصی به نام «علم داده در مقیاس وسیع» (large Scale Data Science) استفاده می‌شوند. تعریف کلمه کلان در اصطلاح پردازش کلان داده، به نوع پروژه‌های برنامه‌نویسی پردازش داده یا حجم داده‌هایی که قرار است پردازش و تحلیل شوند، بستگی دارد.

بسیاری از مسائل تحلیل داده، که برای حل آن‌ها پروژه‌های برنامه‌نویسی تعریف می‌شود، توسط راهکارهای «مرسوم» (Conventional) و متدهای یادگیری ماشین به راحتی قابل حل هستند. به عبارت دیگر، زمانی که داده‌های استفاده شده برای مدل‌سازی یا آموزش مدل یادگیری، به اندازه‌ای باشند که بتوان از سیستم‌های محاسباتی معمول نظیر کامپیوترهای شخصی برای پردازش آن‌ها استفاده کرد، معمولا نیازی به استفاده از روش‌های پردازش کلان داده وجود نخواهد داشت. با این حال، این امکان وجود دارد که بتوان از روش‌های تحلیل و پردازش کلان داده برای حل مسائل تحلیل داده استفاده کرد. به طور کلی، زمانی از تکنیک‌های پردازش کلان داده برای حل مسئله استفاده می‌شود که نیاز به پردازش حجم عظیمی از داده‌ها وجود دارد و سیستم‌های محاسباتی مرسوم، توان پردازشی لازم برای تحلیل و پردازش کلان داده را نداشته باشند.

پردازش کلان داده

مهم‌ترین ابزارهای پردازش کلان داده

یکی از مهم‌ترین مدل‌های ذخیره‌سازی توزیع شده داده‌ها و پردازش کلان داده، که به وفور توسط دانشمندان علم داده مورد استفاده قرار می‌گیرد، مدل برنامه‌نویسی به نام «نگاشت-کاهش» (MapReduce) است. مدل MapReduce، روشی بهینه برای مدیریت و پردازش کلان داده محسوب می‌شود که به دانشمندان علم داده و توسعه‌دهنده برنامه کاربردی اجازه می‌‌دهد تا ابتدا داده را با استفاده از یک «صفت» (Attribute)، فیلتر یا دسته‌بندی خاص نگاشت کنند و سپس، با استفاده از یک مکانیزم «تبدیل» (Transformation) یا «تجمیع» (Aggregation)، داده‌های نگاشت شده را کاهش دهند.

به عنوان نمونه، در صورتی که داده‌های جمع‌اوری شده مرتبط با مجموعه‌ای از گربه‌ها باشند، ابتدا گربه‌ها با استفاده از صفتی نظیر رنگ آن‌ها نگاشت می‌شوند. در مرحله بعد، داده‌ها، از طریق جمع کردن گروه‌بندی‌های انجام شده، کاهش پیدا می‌کنند. در پایان فرایند MapReduce، یک لیست یا مجموعه‌ای مرتبط با رنگ گربه‌ها و تعداد گربه‌های موجود در هر کدام از گروه‌بندی‌های رنگی در اختیار برنامه‌نویس یا دانشمند داده قرار می‌دهد.

تقریبا تمامی کتابخانه‌های برنامه‌نویسی توسعه داده شده برای مقاصد پردازش کلان داده و علم داده، از مدل برنامه‌نویسی MapReduce پشتیبانی می‌کنند. همچنین، تعداد زیادی کتابخانه برنامه‌نویسی وجود دارند که امکان مدیریت، پردازش و انجام عملیات «نگاشت-کاهش» (MapReduce) داده‌ها را به شکل توزیع شده (خوشه یا مجموعه‌ای از کامپیوترهای ) در اختیار برنامه‌نویسان و فعالان حوزه علم داده قرار می‌دهند. افرادی که به دنبال استفاده از مدل‌های MapReduce، جهت مدیریت و پردازش کلان‌ داده هستند، به راحتی می‌توانند از ابزارها، بسته‌ها و کتابخانه‌های توسعه داده شده در زبان پایتون، جهت پیاده‌سازی کاربردهای مرتبط با پردازش کلان داده استفاده کنند.

پردازش کلان داده

کتابخانه Hadoop

یکی از محبوب‌ترین کتابخانه‌ها برای انجام عملیات «نگاشت-کاهش» (MapReduce) روی داده‌های حجیم، کتابخانه هدوپ است که توسط بنیاد نرم‌افزاری Apache طراحی شده است. کتابخانه هدوپ با استفاده از «محاسبات خوشه‌ای» (Cluster Computing)، به فعالان حوزه علم داده اجازه می‌دهد تا بتوانند تا با سرعت و اثرگذاری به مراتب بیشتری، وظایف مرتبط با مدیریت و پردازش کلان داده را انجام دهند. کتابخانه‌ها و بسته‌های زیادی  در زبان پایتون توسعه داده شده‌اند که از طریق آن‌ها می‌توان داده‌ها (که به آن‌ها Jobs نیز گفته می‌شود) را به هدوپ ارسال کرد. انتخاب یکی از این کتابخانه‌ها و بسته‌های برنامه‌نویسی جهت مقاصد پردازش کلان داده و علم داده، به عواملی نظیر سادگی استفاده، زیرساخت‌های لازم برای پیاده‌سازی و کاربرد خاصی که قرار است در آن مورد استفاده قرار بگیرند، بستگی دارد.

پردازش کلان داده

کتابخانه Spark

در صورتی که در کاربرد پردازش کلان داده مورد نظر برنامه‌نویس و یا دانشمند علم داده، استفاده از داده‌هایی که در قالب «داده‌های جریانی» (Streaming Data) هستند (نظیر داده‌های «بلادرنگ» (Real-Time)، داده‌های Log و داده‌های جمع‌آوری شده از «واسط‌های برنامه‌نویسی کاربردی» (Application Programming Interface))، عملکرد به مراتب بهتری را برای سیستم به ارمغان می‌آورند، استفاده از کتابخانه Spark بهترین گزینه خواهد بود. کتابخانه Spark، نظیر کتابخانه هدوپ، توسط بنیاد نرم‌افزاری Apache توسعه داده شده است.

پردازش کلان داده

پایتون و پردازش کلان داده

سؤالی که ممکن در اینجا ذهن خوانندگان و مخاطبان این مطلب را به خود مشغول کند این است که زبان پایتون، جهت پردازش کلان داده و تحلیل آن، چه امکاناتی را در اختیار برنامه‌نویس یا «دانشمند علم داده» (Data Scientist) قرار می‌دهد. در چند سال اخیر، پایتون به عنوان یکی از مهم‌ترین ابزارهای دانشمندان علم داده برای برنامه‌نویسی پروژه‌های علم داده مطرح شده است؛ تحلیل و پردازش کلان داده هم از این قاعده مستثنی نیست.

اگرچه، بسیاری از دانشمندان علم داده و برنامه‌نویسان فعال در این حوزه، از زبان‌های برنامه‌نویسی نظیر جاوا، R و Julia و ابزارهایی مانند SPSS و SAS برای تحلیل داده و استخراج «بینش» (Insight) از آن‌ها استفاده می‌کنند، با این حال، محبوبیت روزافزون زبان پایتون، «کتابخانه‌های برنامه‌نویسی» (Libraries) متنوع و پرکاربرد و جامعه برنامه‌نویسی پویا و فعال آن، سبب شده است تا بسیاری از دانشمندان علم داده، از زبان پایتون برای تحلیل داده‌ها و پردازش کلان داده استفاده کنند. افزایش روزافزون محبوبیت پایتون برای مقاصد علم داده را می‌توان در رشد چشمگیر کتابخانه‌ها، ابزارها و بسته‌های برنامه‌نویسی در حوزه علم داده مشاهده کرد.

زبان پایتون، در چند سال اخیر، همواره به عنوان یکی از محبوب‌ترین زبان‌های برنامه‌نویسی شناخته شده است. بسیاری از فرصت‌های شغلی لیست شده برای زبان پایتون نیز مرتبط با حوزه علم داده، تحلیل داده، پردازش کلان داده و «هوش مصنوعی» (Artificial Intelligence) هستند. به همین دلیل است که بسیاری از «مهندسان داده» (Data Engineers) به استفاده از پایتون و تسلط بر این زبان برنامه‌نویسی روی می‌آورند.

پردازش کلان داده

یکی از مهم‌ترین ابزارهای پردازش کلان داده برای دانشمندان داده، کتابخانه‌ای به نام «هدوپ» (Hadoop) است. ابزارهای نرم‌افزاری منبع باز هدوپ، در ابتدا توسط «بنیاد نرم‌افزاری آپاچی» (Apache Software Foundation) و با هدف ارائه بستر نرم‌افزاری لازم برای محاسبات توزیع شده و پردازش کلان داده، در زبان برنامه‌نویسی جاوا تهیه شده‌اند. این دسته از ابزارهای نرم‌افزاری به برنامه‌نویسان و توسعه‌دهندگان اجازه می‌دهند تا با استفاده از شبکه‌ای متشکل از چندین کامپیوتر (به شکل «توزیع شده» (Distributed)) اقدام به حل مسائلی کنند که حجم عظیمی از داده و محاسبات را شامل می‌شوند. جهت پردازش کلان داده و تحلیل آن‌ها، هدوپ، چارچوبی نرم‌افزاری برای ذخیره‌سازی توزیع شده و پردازش کلان داده معرفی می‌کند؛ این چارچوب نرم‌افزاری بر پایه مدل برنامه‌نویسی «نگاشت-کاهش» (MapReduce) پیاده‌سازی شده است.

دو دلیل عمده برای محبوبیت بیش از پیش زبان پایتون، جهت برنامه‌نویسی علم داده و پردازش کلان داده، وجود دارد:

  • کتابخانه‌ها، ابزارها و بسته‌های برنامه‌نویسی بسیار قدرتمند و پرکاربردی برای پردازش، ذخیره‌سازی، نمایش، شکل دادن و مصورسازی داده‌ها در زبان برنامه‌نویسی پایتون ارائه شده است (در ادامه، این ابزارها شرح داده خواهند شد).
  • پس از افزایش روزافزون محبوبیت پایتون و روی آوردن تعداد زیادی از برنامه‌نویسان و توسعه‌دهندگان حوزه علم داده به این زبان برنامه‌نویسی، بنیاد نرم‌افزاری آپاچی به سرعت امکان استفاده از «اکوسیستم» (Ecosystem) هدوپ را برای دانشمندان علم داده و برنامه‌نویسان این حوزه فراهم آورد. هم‌اکنون برنامه‌نویسان زبان پایتون و دانشمندان علم داده مسلط به این زبان می‌توانند از ابزارهایی نظیر « استریمینگ هدوپ» (Hadoop Streaming)، افزونه Hadoopy، کتابخانه Pydoop و بسته Pydoop اشاره کرد.

پردازش کلان داده

استفاده از هدوپ (Hadoop) در پایتون به جای جاوا

یکی از سؤالاتی که ممکن است هنگام استفاده از هدوپ در پایتون (جهت پردازش کلان داده و تحلیل آن‌ها) برای فعالان حوزه علم داده پدید بیاید این است که چرا لازم است از پایتون به جای جاوا، برای پردازش کلان داده و تحلیل آن استفاده شود. در پاسخ به این سؤال باید گفت که اگرچه زبان جاوا هنوز هم یکی از محبوب‌ترین و پرکاربردترین زبان‌های برنامه‌نویسی در جهان و زبان اصلی توسعه چارچوب هدوپ (و Spark) محسوب می‌شود، با این حال، میان برخی از ویژگی‌های ساختاری این دو زبان تفاوت‌هایی وجود دارد که ممکن است سبب روی آوردن فعالان این حوزه به استفاده از پایتون شود. از جمله این تفاوت‌ها می‌توان به موارد زیر اشاره کرد:

  • نصب، پیاده‌سازی و اجرای محیط برنامه‌نویسی جاوا (Java Development Environment) نسبت به زبان پایتون سخت‌تر است (تنظیم کردن تنظیمات مرتبط با «متغیرهای محیطی» (Environment Variables)، تنظیمات فایل‌های XML مرتبط با وابستگی‌‎های نرم‌افزاری و سایر موارد). در نقطه مقابل، برای نصب، پیاده‌سازی و اجرای محیط برنامه‌نویسی پایتون، تنها کافی است پایتون را روی سیستم نصب و از واسط خط دستوری آن جهت کد نویسی استفاده کرد.
  • در محیط برنامه‌نویسی جاوا، ابتدا باید کدهای نوشته شده «کامپایل» (Compile) و سپس اجرا شوند. در حالی که در زبان پایتون، تنها کافی است کدها توسط برنامه‌نویس نوشته و سپس خط به خط در واسط خط دستوری اجرا شوند.
  • تعداد خط کدهای لازم برای نوشتن یک برنامه خاص در زبان جاوا، به مراتب بیشتر از تعداد خط کدهای لازم برای توسعه همان برنامه در زبان پایتون خواهد بود. به عنوان نمونه، به مثال زیر دقت کنید. در ادامه، تعداد خط کدهای لازم برای شمارش تعداد دفعات تکرار کلمات با استفاده از زبان جاوا و پایتون نمایش داده شده است. اولین قدم جهت پردازش کلان داده در فرم داده‌های متنی غیر ساخت یافته، شمارش تعداد دفعات تکرار کلمات ظاهر شده در این داده‌ها است:

شمارش تعداد دفعات تکرار کلمات با استفاده از زبان جاوا:

1package org.myorg;
2	
3	import java.io.IOException;
4	import java.util.*;
5	
6	import org.apache.hadoop.fs.Path;
7	import org.apache.hadoop.conf.*;
8	import org.apache.hadoop.io.*;
9	import org.apache.hadoop.mapred.*;
10	import org.apache.hadoop.util.*;
11	
12	public class WordCount {
13	
14	   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
15	     private final static IntWritable one = new IntWritable(1);
16	     private Text word = new Text();
17	
18	     public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
19	       String line = value.toString();
20	       StringTokenizer tokenizer = new StringTokenizer(line);
21	       while (tokenizer.hasMoreTokens()) {
22	         word.set(tokenizer.nextToken());
23	         output.collect(word, one);
24	       }
25	     }
26	   }
27	
28	   public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
29	     public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
30	       int sum = 0;
31	       while (values.hasNext()) {
32	         sum += values.next().get();
33	       }
34	       output.collect(key, new IntWritable(sum));
35	     }
36	   }
37	
38	   public static void main(String[] args) throws Exception {
39	     JobConf conf = new JobConf(WordCount.class);
40	     conf.setJobName("wordcount");
41	
42	     conf.setOutputKeyClass(Text.class);
43	     conf.setOutputValueClass(IntWritable.class);
44	
45	     conf.setMapperClass(Map.class);
46	     conf.setCombinerClass(Reduce.class);
47	     conf.setReducerClass(Reduce.class);
48	
49	     conf.setInputFormat(TextInputFormat.class);
50	     conf.setOutputFormat(TextOutputFormat.class);
51	
52	     FileInputFormat.setInputPaths(conf, new Path(args[0]));
53	     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
54	
55	     JobClient.runJob(conf);
56	   }
57	}

شمارش تعداد دفعات تکرار کلمات با استفاده از زبان پایتون:

1import re
2
3WORD_RE = re.compile(r"[\w']+")
4
5class MRWordFreqCount(MRJob):
6
7    def mapper(self, _, line):
8        for word in WORD_RE.findall(line):
9            yield (word.lower(), 1)
10
11    def combiner(self, word, counts):
12        yield (word, sum(counts))
13
14    def reducer(self, word, counts):
15        yield (word, sum(counts))
16
17if __name__ == '__main__':
18    MRWordFreqCount.run()
19
20#Source
21#https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_word_freq_count.py

همانطور که مشاهده می‌شود، برنامه‌ای که نوشتن آن در زبان جاوا به 57 خط کد احتیاج است، در زبان پایتون، تنها با 15 خط کد قابل نوشتن است. این ویژگی سبب می‌شود تا «نگه‌داری» (Maintain) کدها در زبان پایتون به مراتب راحت‌تر از زبان جاوا باشد.

کتابخانه‌های مدیریت و پردازش داده در پایتون

همانطور که پیش از این نیز اشاره شد، کتابخانه‌ها، ابزارها و بسته‌های برنامه‌نویسی بسیار قدرتمند و پرکاربردی برای پردازش، ذخیره‌سازی، نمایش، شکل دادن و مصورسازی داده‌ها در زبان برنامه‌نویسی پایتون ارائه شده است. از جمله مهم‌ترین کتابخانه‌های مدیریت و پردازش داده در پایتون می‌توان به موارد زیر اشاره کرد:

کتابخانه Pandas

یکی از محبوب‌ترین کتابخانه‌های ارائه شده برای مقاصد علم داده، کتابخانه Pandas است. کتابخانه Pandas توسط دانشمندان علم داده‌ای که به زبان‌های برنامه‌نویسی R و پایتون مسلط هستند، توسعه داده شده است. هم اکنون جامعه بسیار بزرگی از برنامه‌نویسان و توسعه‌دهندگان (متشکل از دانشمندان علم داده و تحلیل‌گران داده) از کتابخانه Pandas و فرایند توسعه آن (برطرف کردن مشکلات احتمالی و اضافه کردن قابلیت‌های برنامه‌نویسی جدید) پشتیبانی می‌کنند.

کتابخانه Pandas ویژگی‌های تعبیه شده بسیار مهم و پرکاربردی نظیر قابلیت خواندن داده‌ها از منابع مختلف، ساختن «دیتا فریم» (Dataframe) یا ماتریس/جدول متناظر با داده‌های خوانده شده و محاسبه «تجزیه و تحلیل تجمیعی» (Aggregate Analytics) بر اساس نوع برنامه کاربردی در حال توسعه را برای برنامه‌نویسان و توسعه‌دهندگان فراهم می‌آورد.

پردازش کلان داده

علاوه بر امکانات تعبیه شده برای مدیریت و پردازش داده، کتابخانه Pandas از امکانات «مصور‌سازی داده» (Data Visualization) بسیار مناسبی نیز بهره می‌برد. با استفاده از قابلیت‌های مصورسازی تعبیه شده در این کتابخانه، امکان تولید گراف و نمودار از نتایج تولید شده برای برنامه‌نویسان و دانشمندان علم داده فراهم شده است. همچنین، مجموعه‌ای از توابع داخلی در کتابخانه Pandas تعریف شده‌اند که امکان «صادر کردن» (Export) تجزیه و تحلیل‌های انجام شده روی داده‌ها را در قالب فایل «صفحات گسترده اکسل» (Excel Spreadsheet) فراهم می‌آورند.

کتابخانه Agate

یکی از کتابخانه‌های برنامه‌نویسی نوظهور برای مقاصد علم داده، کتابخانه Agate نام دارد. کتابخانه Agate با هدف استفاده در کاربردهای «روزنامه‌نگاری» (Journalism) پیاده‌سازی شده است و ویژگی‌های بسیار مناسبی جهت تحلیل داده‌ها در اختیار برنامه‌نویسان و توسعه‌دهندگان قرار می‌دهد. از این کتابخانه می‌توان جهت تحلیل و مقایسه فایل‌های صفحه گسترده (Spreadsheet)، انجام محاسبات آماری رو داده‌ها و سایر موارد استفاده کرد.

یادگیری کار کردن با کتابخانه Agate بسیار ساده است و وابستگی‌های به مراتب کمتری نسبت به Pandas دارد. همچنین، ویژگی‌های جالبی نظیر مصورسازی داده‌ها و تولید نمودار، این کتابخانه را به ابزاری محبوب برای مقاصد تحلیل داده تبدیل کرده است.

کتابخانه Bokeh

در صورتی که تمایل به مصورسازی داده‌های موجود در یک مجموعه داده ساخت یافته دارید، Bokeh ابزار بسیار مناسبی محسوب می‌شود. این ابزار را می‌توان با کتابخانه‌های تحلیل داده نظیر Pandas ،Agate و سایر موارد مورد استفاده قرار داد. این ابزار، به برنامه‌نویسان و توسعه‌دهندگان اجازه می‌دهد تا با کمترین کد نویسی، نمودارهای گرافیکی و مصورسازی‌های خیره کننده‌ای از داده‌ها تولید کنند.

پردازش کلان داده

کاوش، پردازش و تحلیل داده‌ها در پایتون با استفاده از کتابخانه Pandas

پیش از این که با ابزارهای پردازش کلان داده در پایتون آشنا شویم، با یک مثال ساده، نحوه تحلیل داده‌ها در پایتون مورد بررسی قرار می‌گیرد. پس از جستجو در سایت Kaggle، مجموعه داده حقوق کارکنان دولتی شهر سان‌فرانسیسکو برای این کار انتخاب شد. این مجموعه داده از طریق لینک [+] قابل دسترسی است.

ابتدا با استفاده از قطعه کد زیر داده‌ها وارد (Import) و در یک دیتا فریم Pandas خوانده می‌شوند.

1import pandas as pd
2salaries = pd.read_csv('Salaries.csv')
3salaries.head()

پردازش کلان داده

سپس، با استفاده از تابع describe، توزیع داده‌های موجود در مجموعه داده نمایش داده می‌شود.

1salaries.head()

پردازش کلان داده

داده‌های جمع‌آوری شده، مربوط به سال‌های متعددی است. برای محدودتر کردن حوزه تحلیلی انجام شده از داده‌ها، تنها داده‌های مربوط به سال 2014 مورد بررسی قرار گرفته می‌شوند.

1latest_salaries = salaries[salaries['Year'] == 2014]
2latest_salaries.describe()

پردازش کلان داده

سپس، در این مرحله از فرایند تحلیل داده، مقدار درصد متوسطی از حقوق سالیانه که کارمندان دولتی برای پرداخت اجاره خانه استفاده می‌کنند، بررسی می‌شود.

1average_yearly_rent = 3880 * 12
2average_city_pay = latest_salaries['TotalPay'].mean()
3average_yearly_rent / average_city_pay
0.61698360153365217

بنابراین، همانطور که در خروجی نمایش داده شده مشخص است، کارمندان دولتی در سان‌فرانسیسکو، به طور متوسط چیزی حدود 60 درصد از حقوق خود را صرف پرداخت اجاره بها می‌کنند. در مرحله بعد، تعداد کارمندانی که حقوق آن‌ها از متوسط اجاره بهای منزل در این شهر کمتر است مشخص می‌شود.

1latest_salaries[latest_salaries['TotalPay'] < average_yearly_rent].shape[0]
11360

همانطور که در خروجی نمایش داده شده مشخص است، بیش از 11 هزار کارمند دولتی در سان‌فرانسیسکو، درآمد کمتر از متوسط اجاره بهای منزل دارند. در مرحله بعدی، تعداد کارمندانی که باید اضافه کار داشته باشند تا بتوانند از حقوق لازم برای پرداخت اجاره بها برخوردار شوند، نمایش داده می‌شود. برای اینکار، تمامی کسانی که بیش از 70 درصد حقوق خود را برای اجاره بها پرداخت می‌کنند، به عنوان افرادی در نظر گرفته می‌شوند که بدون اضافه کاری قادر به پرداخت اجاره بها نیستند.

1pd.to_numeric(latest_salaries['BasePay'], errors='coerce').describe()
count     38119.000000
mean      66564.421924
std       44053.783972
min           0.000000
25%                NaN
50%                NaN
75%                NaN
max      318835.490000
Name: BasePay, dtype: float64
1base_pay_series = pd.to_numeric(latest_salaries['BasePay'], errors='coerce')
2base_pay_series[base_pay_series * .7 < average_yearly_rent].shape
20074

بنابراین، بیش از 20 هزار کارمند دولتی بدون اضافه کاری قادر به پرداخت اجاره بها و هزینه زندگی در شهر سان‌فرانسیکو نخواهند بود. در مرحله بعد، با استفاده از تحلیل داده‌های موجود، تعداد کارمندانی مشخص می‌شود که باید بیش از 1000 دلار در سال اضافه کاری داشته باشند تا بتوانند به زندگی خود در این شهر ادامه دهند.

1overtime_series = pd.to_numeric(latest_salaries['OvertimePay'], errors='coerce')
2overtime_series.describe()
count     38119.000000
mean       5401.993737
std       11769.656257
min           0.000000
25%                NaN
50%                NaN
75%                NaN
max      173547.730000
Name: OvertimePay, dtype: float64
1overtime_series[overtime_series > 1000].shape
15361

بنابراین، همانطور که در خروجی نمایش داده شده مشخص است، بیش از 15 هزار کارمند دولتی، با اضافه کاری بیش از 1000 دلار در سال، قادر به زندگی در این شهر پرخرج هستند. این دسته از تحلیل‌های انجام شده روی مجموعه داده انتخابی، تنها بخشی از کارهای قابل انجام توسط کتابخانه‌های پردازش و تحلیل داده در پایتون هستند. بنابراین، همانطور که مشاهده می‌شود، پایتون یکی از بهترین و ساده‌ترین زبان‌ها برای تحلیل داده به شمار می‌آید و امکانات کاملی برای کاوش، پردازش و تحلیل داده در اختیار برنامه‌نویسان و دانشمندان علم داده قرار می‌دهد.

کتابخانه‌های پردازش کلان داده در پایتون

زبان برنامه‌نویسی پایتون و جامعه برنامه‌نویسی فعال آن، ابزارها و کتابخانه‌های متنوعی برای پردازش کلان داده در اختیار برنامه‌نویسان و دانشمندان علم داده قرار داده است. این ابزارها، مبتنی بر کتابخانه هدوپ و Spark هستند. از جمله، مهم‌ترین ابزارهای پردازش کلان داده در پایتون می‌تواند به موارد زیر اشاره کرد:

کتابخانه PySpark

کتابخانه PySpark که واسط برنامه‌نویسی کاربردی Spark در زبان پایتون محسوب می‌شود، به دانشمندان علم داده و برنامه‌نویسان فعال در این حوزه اجازه می‌دهد تا به سرعت، زیرساخت‌های لازم برای نگاشت و کاهش مجموعه‌های داده را پیاده‌سازی کنند. همچنین، از آنجایی که در کتابخانه Spark مجموعه‌ای از الگوریتم‌های یادگیری ماشین تعبیه شده است، علاوه بر پردازش کلان داده و مدیریت آن‌ها، جهت حل مسائل یادگیری ماشین نیز مورد استفاده قرار می‌گیرد.

پردازش کلان داده

ابزار جریان‌سازی یا Streaming

ابزار جریان‌سازی هدوپ یا Hadoop Streaming، یکی از مجبوب‌ترین روش‌های استفاده از Hadoop در پایتون محسوب می‌شود. جریان‌سازی یا Streaming، یکی از ویژگی‌های تعبیه شده در کتابخانه Hadoop است. این ویژگی به برنامه‌نویس اجازه می‌دهد تا کدهای نوشته به زبان پایتون (و یا دیگر زبان‌های برنامه‌نویسی) را جهت انجام عملیات نگاشت (Mapping) به تابع stdin پاس بدهند (به عنوان آرگومان، به این تابع ارسال کنند). به عبارت دیگر، توسعه‌دهندگان از جاوا به عنوان Wrapper استفاده می‌کنند تا بتوانند کدهای پایتون را به تابع stdin پاس بدهند. سپس، در زمان اجرا، عملیات نگاشت (Mapping) در زبان جاوا و توسط کتابخانه Hadoop انجام می‌شود.

پردازش کلان داده

افزونه Hadoopy

این ابزار، افزونه‌ای برای جریان‌سازی هدوپ یا Hadoop Streaming محسوب می‌شود و از کتابخانه Cython برای انجام عملیات نگاشت-کاهش (MapReduce) در پایتون استفاده می‌کند. مستندسازی خوبی برای این ابزار انجام شده است و کار کردن با آن برای توسعه‌دهندگان راحت است. با این حال، این پروژه از پایان سال 2012 تا به امروز غیر فعال باقی مانده و به‌روز‌رسانی جدیدی دریافت نکرده است.

بسته Pydoop

این بسته برنامه‌نویسی به توسعه‌دهندگان اجازه می‌دهد تا بتوانند برنامه مرتبط با پردازش کلان داده را در پایتون کد نویسی کنند. سپس، با استفاده از کدهای پیاده‌سازی شده، به طور مستقیم با داده‌های ذخیره شده در خوشه هدوپ (Hadoop Cluster) ارتباط برقرار کرده و عملیات نگاشت-کاهش (MapReduce) انجام دهند. چنین کاری از طریق واسط برنامه‌نویسی کاربردی HDFS (یا HDFS API) تعبیه شده در بسته Pydoop امکان‌پذیر شده است. واسط برنامه‌نویسی کاربردی HDFS به برنامه‌نویسان اجازه می‌دهد تا در محیط پایتون، عملیات خواندن و نوشتن داده‌ها، از معماری فایل سیستم HDFS را انجام دهند.

پردازش کلان داده

کتابخانه ‏MRJob

یک کتابخانه برنامه‌نویسی در زبان پایتون است که امکان انجام عملیات نگاشت-کاهش (MapReduce) و پردازش کلان داده را در اختیار برنامه‌نویسان و دانشمندان علم داده قرار می‌دهد. این کتابخانه نرم‌افزاری، یکی از پراستفاده‌ترین بسته‌های برنامه‌نویسی جهت پردازش کلان داده در زبان پایتون محسوب می‌شود و توسط شرکت Yelp در اختیار برنامه‌نویسان زبان پایتون قرار گرفته است.

کتابخانه ‏MRJob، از هدوپ، سرویس Cloud Dataproc گوگل و سرویس Elastic MapReduce یا EMR شرکت آمازون نیز پشتیبانی می‌کند. سرویس EMR، یک وب سرویس ارائه شده توسط شرکت آمازون است که امکان تحلیل و پردازش کلان داده با استفاده از هدوپ و Spark را در اختیار فعالان حوزه علم داده قرار می‌دهد. مستندسازی خوبی برای این کتابخانه انجام شده است و کار کردن با آن برای توسعه‌دهندگان و برنامه‌نویسان راحت است. همچنین، این پروژه کاملا فعال است و از پیاده‌سازی‌های مختلف آن پشتیبانی می‌شود.

پردازش کلان داده

اجرای کدهای پایتون در هدوپ (Hadoop) با استفاده از کتابخانه MRJob

در این مثال، یک برنامه ساده برای محاسبه دفعات ظاهر شدن تمامی کلمات در متن، در زبان پایتون کد نویسی خواهد شد. سپس، با استفاده از هدوپ (Hadoop)، این قابلیت پدید می‌آید تا هر نوع داده متنی (با هر اندازه ممکن و در هر قالبی)، مورد تجزیه و تحلیل قرار بگیرد و تعداد دفعات ظاهر شدن تمامی کلمات موجود در آن محاسبه شود. برای چنین کاری، باید یک خوشه هدوپ در سیستم اجرایی باشد. برای اجرای کدهای پایتون در هدوپ (Hadoop) با استفاده از کتابخانه MRJob، لازم است مراحل زیر دنبال شود:

  • ابتدا با استفاده از Pip، کتابخانه MRJob در پایتون نصب می‌شود.
1pip install mrjob
  • برنامه شمارش کلمات در متن: برای چنین کاری، در یک برنامه دلخواه ویرایشگر کد، از کدهای زیر استفاده کنید (یک فایل خالی و کدهای زی را در آن کپی کنید). سپس، فایل مورد نظر را با نام word-count.py ذخیره کنید.

کدهای لازم محاسبه دفعات ظاهر شدن تمامی کلمات در متن با استفاده از MRJob (ابزار Hadoop):

1# Copyright 2009-2010 Yelp
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""The classic MapReduce job: count the frequency of words.
15"""
16from mrjob.job import MRJob
17import re
18
19WORD_RE = re.compile(r"[\w']+")
20
21
22class MRWordFreqCount(MRJob):
23
24    def mapper(self, _, line):
25        for word in WORD_RE.findall(line):
26            yield (word.lower(), 1)
27
28    def combiner(self, word, counts):
29        yield (word, sum(counts))
30
31    def reducer(self, word, counts):
32        yield (word, sum(counts))
33
34
35if __name__ == '__main__':
36    MRWordFreqCount.run()
  • اجرای کدهای پایتون در هدوپ (Hadoop) با استفاده از کتابخانه MRJob: پس از اینکه کد نویسی برنامه شمارش تعداد دفعات ظاهر شدن کلمات در متن به پایان رسید، در مرحله بعد لازم است تا کدهای پایتون برای هدوپ (hadoop) ارسال و اجرا شوند. برای این کار از کد دستوری زیر استفاده می‌شود:
1$ python word_count.py [file location] -r hadoop > counts

کدهای کامل پیاده‌سازی سیستم پردازش داده‌های متنی با استفاده از کتابخانه MRJob، در لینک [+] موجود است (در این پروژه، از تکنیک‌های پردازش کلان داده، مدل برنامه‌نویسی MapReduce و کتابخانه‌های Spark و Hadoop استفاده شده است). در ادامه، تعدادی مثال مرتبط با پردازش داده‌های متنی با استفاده از کتابخانه MRJob نمایش داده شده است. ذکر این نکته ضروری است که برای اطمینان از صحت عملکرد کدهای ارائه شده، تمامی ماژول‌های تعریف شده در در لینک [+]، باید توسط برنامه کاربردی قابل دسترس باشند.

محاسبه دفعات ظاهر شدن تمامی کلماتی که حاوی حرف u هستند، با استفاده از MRJob (ابزار Hadoop):

1# Copyright 2016 Yelp
2# Copyright 2017 Yelp and Contributors
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""The classic MapReduce job, but only for words containing "u"
16"""
17from mrjob.job import MRJob
18import re
19
20WORD_RE = re.compile(r"[\w']*u[\w']*", re.I)
21
22
23class MRVWordFreqCount(MRJob):
24
25    def mapper_pre_filter(self):
26        return 'grep -i u'
27
28    def mapper(self, _, line):
29        for word in WORD_RE.findall(line):
30            yield (word.lower(), 1)
31
32    def combiner(self, word, counts):
33        yield (word, sum(counts))
34
35    def reducer(self, word, counts):
36        yield (word, sum(counts))
37
38
39if __name__ == '__main__':
40    MRVWordFreqCount.run()

محاسبه تعداد کلمات موجود در یک متن با استفاده از Pyspark:

1# Copyright 2016 Yelp
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import re
15import sys
16from operator import add
17
18from pyspark import SparkContext
19
20
21WORD_RE = re.compile(r"[\w']+")
22
23
24def main():
25    # read in input, output path
26    args = sys.argv[1:]
27
28    if len(args) != 2:
29        raise ValueError
30
31    inputPath, outputPath = args
32
33    sc = SparkContext(appName='mrjob Spark wordcount script')
34
35    lines = sc.textFile(inputPath)
36
37    # lines.flatMap(WORD_RE.findall) doesn't work on Spark 1.6.2; apparently
38    # it can't serialize instance methods?
39    counts = (
40        lines.flatMap(lambda line: WORD_RE.findall(line))
41        .map(lambda word: (word, 1))
42        .reduceByKey(add))
43
44    counts.saveAsTextFile(outputPath)
45
46    sc.stop()
47
48
49if __name__ == '__main__':
50    main()

مشخص کردن کلمات پر استفاده در یک داده متنی ورودی با استفاده از MRJob (ابزار Hadoop):

1# Copyright 2009-2010 Yelp
2# Copyright 2013 David Marin
3# Copyright 2018 Yelp
4# Copyright 2019 Yelp
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17"""Determine the most used word in the input, ignoring common "stop" words.
18Shows how to do a multi-step job, and how to load a support file
19from the same directory.
20"""
21import re
22
23from mrjob.job import MRJob
24from mrjob.protocol import JSONValueProtocol
25from mrjob.step import MRStep
26
27WORD_RE = re.compile(r"[\w']+")
28
29
30class MRMostUsedWord(MRJob):
31    FILES = ['stop_words.txt']
32
33    OUTPUT_PROTOCOL = JSONValueProtocol
34
35    def configure_args(self):
36        super(MRMostUsedWord, self).configure_args()
37
38        # allow for alternate stop words file
39        self.add_file_arg(
40            '--stop-words-file',
41            dest='stop_words_file',
42            default=None,
43            help='alternate stop words file. lowercase words, one per line',
44        )
45
46    def mapper_init(self):
47        stop_words_path = self.options.stop_words_file or 'stop_words.txt'
48
49        with open(stop_words_path) as f:
50            self.stop_words = set(line.strip() for line in f)
51
52    def mapper_get_words(self, _, line):
53        # yield each word in the line
54        for word in WORD_RE.findall(line):
55            word = word.lower()
56            if word not in self.stop_words:
57                yield (word, 1)
58
59    def combiner_count_words(self, word, counts):
60        # sum the words we've seen so far
61        yield (word, sum(counts))
62
63    def reducer_count_words(self, word, counts):
64        # send all (num_occurrences, word) pairs to the same reducer.
65        # num_occurrences is so we can easily use Python's max() function.
66        yield None, (sum(counts), word)
67
68    # discard the key; it is just None
69    def reducer_find_max_word(self, _, word_count_pairs):
70        # each item of word_count_pairs is (count, word),
71        # so yielding one results in key=counts, value=word
72        try:
73            yield max(word_count_pairs)
74        except ValueError:
75            pass
76
77    def steps(self):
78        return [
79            MRStep(mapper_init=self.mapper_init,
80                   mapper=self.mapper_get_words,
81                   combiner=self.combiner_count_words,
82                   reducer=self.reducer_count_words),
83            MRStep(reducer=self.reducer_find_max_word)
84        ]
85
86
87if __name__ == '__main__':
88    MRMostUsedWord.run()

مشخص کردن کلمات پر استفاده در یک داده متنی ورودی با استفاده از MRJob (ابزار spark):

1# Copyright 2019 Yelp
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Like mr_most_used_word.py, but on Spark.
15To make this work on the local[*] master, pass in your own --stop-words-file
16(it won't be able to see stop_words.txt because --files doesn't work
17on local[*] master)
18"""
19import json
20import re
21from operator import add
22
23from mrjob.job import MRJob
24
25WORD_RE = re.compile(r"[\w']+")
26
27
28class MRSparkMostUsedWord(MRJob):
29
30    FILES = ['stop_words.txt']
31
32    def configure_args(self):
33        super(MRSparkMostUsedWord, self).configure_args()
34
35        # allow for alternate stop words file
36        self.add_file_arg(
37            '--stop-words-file',
38            dest='stop_words_file',
39            default=None,
40            help='alternate stop words file. lowercase words, one per line',
41        )
42
43    def spark(self, input_path, output_path):
44        from pyspark import SparkContext
45
46        sc = SparkContext()
47
48        lines = sc.textFile(input_path)
49
50        # do a word frequency count
51        words_and_ones = lines.mapPartitions(self.get_words)
52        word_counts = words_and_ones.reduceByKey(add)
53
54        # pick pair with highest count (now in count, word format)
55        max_count, word = word_counts.map(lambda w_c: (w_c[1], w_c[0])).max()
56
57        # output our word
58        output = sc.parallelize([json.dumps(word)])
59        output.saveAsTextFile(output_path)
60
61        sc.stop()
62
63    def get_words(self, line_iterator):
64        # this only happens once per partition
65        stop_words = self.load_stop_words()
66
67        for line in line_iterator:
68            for word in WORD_RE.findall(line):
69                word = word.lower()
70                if word not in stop_words:
71                    yield (word, 1)
72
73    def load_stop_words(self):
74        # this should only be called inside executors (i.e. inside functions
75        # passed to RDDs)
76        stop_words_path = self.options.stop_words_file or 'stop_words.txt'
77
78        with open(stop_words_path) as f:
79            return set(line.strip() for line in f)
80
81
82if __name__ == '__main__':
83    MRSparkMostUsedWord.run()

دسته‌بندی داده‌های متنی با استفاده از MRJob (ابزار Hadoop):

1# Copyright 2009-2010 Yelp
2# Copyright 2013 David Marin
3# Copyright 2017 Yelp
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""A text classifier that uses a modified version of Naive Bayes that is not
18sensitive to document length.
19This is a somewhat contrived example in that it does everything in one job;
20generally you'd run one job to generate n-gram scores, put them in a sqlite
21database, and run a second job to score documents. But this is simple, and
22it works!
23This takes as its input documents encoded by encode_document() below. For each
24document, you specify its text, and whether it belongs or does not belong
25to one or more categories. You can also specify a unique ID for each document.
26This job outputs the documents, with the field 'cat_to_score' filled in.
27Generally, positive scores indicate the document is in the category, and
28negative scores indicate it is not, but it's up to you to determine a
29threshold for each category. This job also outputs scores for each ngram,
30so that you can classify other documents.
31About half of the documents are placed in a test set (based on SHA1 hash of
32their text), which means they will not be used to train the classifier. The
33'in_test_set' of each document will be filled accordingly. You can turn
34this off with the --no-test-set flag. (You can also effectively put docs
35in the training set by specifying no category information.)
36Some terminology:
37An "ngram" is a word or phrase. "foo" is a 1-gram; "foo bar baz" is a 3-gram.
38"tf" refers to term frequency, that is, the number of times an ngram appears.
39"df" referse to document frequency, that is, the number of documents an ngram
40appears in at least once.
41"""
42from collections import defaultdict
43import hashlib
44import math
45import re
46
47from mrjob.job import MRJob
48from mrjob.protocol import JSONValueProtocol
49from mrjob.step import MRStep
50
51
52def encode_document(text, cats=None, id=None):
53    """Encode a document as a JSON so that MRTextClassifier can read it.
54    Args:
55    text -- the text of the document (as a unicode)
56    cats -- a dictionary mapping a category name (e.g. 'sports') to True if
57        the document is in the category, and False if it's not. None indicates
58        that we have no information about this documents' categories
59    id -- a unique ID for the document (any kind of JSON-able value should
60        work). If not specified, we'll auto-generate one.
61    """
62    text = unicode(text)
63    cats = dict((unicode(cat), bool(is_in_cat))
64                for cat, is_in_cat
65                in (cats or {}).iteritems())
66
67    return JSONValueProtocol.write(
68        None, {'text': text, 'cats': cats, 'id': id}) + '\n'
69
70
71def count_ngrams(text, max_ngram_size, stop_words):
72    """Break text down into ngrams, and return a dictionary mapping
73    (n, ngram) to number of times that ngram occurs.
74    n: ngram size ("foo" is a 1-gram, "foo bar baz" is a 3-gram)
75    ngram: the ngram, as a space-separated string or None to indicate the
76        ANY ngram (basically the number of words in the document).
77    Args:
78    text -- text, as a unicode
79    max_ngram_size -- maximum size of ngrams to consider
80    stop_words -- a collection of words (in lowercase) to remove before
81        parsing out ngrams (e.g. "the", "and")
82    """
83    if not isinstance(stop_words, set):
84        stop_words = set(stop_words)
85
86    words = [word.lower() for word in WORD_RE.findall(text)
87             if word.lower() not in stop_words]
88
89    ngram_counts = defaultdict(int)
90
91    for i in range(len(words)):
92        for n in range(1, max_ngram_size + 1):
93            if i + n <= len(words):
94                ngram = ' '.join(words[i:i + n])
95                ngram_counts[(n, ngram)] += 1
96
97    # add counts for ANY ngram
98    for n in range(1, max_ngram_size + 1):
99        ngram_counts[(n, None)] = len(words) - n + 1
100
101    return ngram_counts
102
103
104WORD_RE = re.compile(r"[\w']+", re.UNICODE)
105
106DEFAULT_MAX_NGRAM_SIZE = 4
107
108DEFAULT_STOP_WORDS = [
109    'a', 'about', 'also', 'am', 'an', 'and', 'any', 'are', 'as', 'at', 'be',
110    'but', 'by', 'can', 'com', 'did', 'do', 'does', 'for', 'from', 'had',
111    'has', 'have', 'he', "he'd", "he'll", "he's", 'her', 'here', 'hers',
112    'him', 'his', 'i', "i'd", "i'll", "i'm", "i've", 'if', 'in', 'into', 'is',
113    'it', "it's", 'its', 'just', 'me', 'mine', 'my', 'of', 'on', 'or', 'org',
114    'our', 'ours', 'she', "she'd", "she'll", "she's", 'some', 'than', 'that',
115    'the', 'their', 'them', 'then', 'there', 'these', 'they', "they'd",
116    "they'll", "they're", 'this', 'those', 'to', 'us', 'was', 'we', "we'd",
117    "we'll", "we're", 'were', 'what', 'where', 'which', 'who', 'will', 'with',
118    'would', 'you', 'your', 'yours',
119]
120
121
122class MRTextClassifier(MRJob):
123    INPUT_PROTOCOL = JSONValueProtocol
124
125    def steps(self):
126        """Conceptually, the steps are:
127        1. Parse documents into ngrams
128        2. Group by ngram to get a frequency count for each ngram, and to
129           exclude very rare ngrams
130        3. Send all ngram information to one "global" reducer so we can
131           assign scores for each category and ngram
132        4. Group scores and documents by ngram and compute score for that
133           ngram for that document. Exclude very common ngrams to save memory.
134        5. Average together scores for each document to get its score for
135           each category.
136        The documents themselves are passed through from step 1 to step 5.
137        Ngram scoring information is passed through from step 4 to step 5.
138        """
139        return [MRStep(mapper=self.parse_doc,
140                       reducer=self.count_ngram_freq),
141                MRStep(reducer=self.score_ngrams),
142                MRStep(reducer=self.score_documents_by_ngram),
143                MRStep(reducer=self.score_documents)]
144
145    def configure_args(self):
146        """Add command-line options specific to this script."""
147        super(MRTextClassifier, self).configure_args()
148
149        self.add_passthru_arg(
150            '--min-df', dest='min_df', default=2, type=int,
151            help=('min number of documents an n-gram must appear in for us to'
152                  ' count it. Default: %(default)s'))
153        self.add_passthru_arg(
154            '--max-df', dest='max_df', default=10000000, type=int,
155            help=('max number of documents an n-gram may appear in for us to'
156                  ' count it (this keeps reducers from running out of memory).'
157                  ' Default: %(default)s'))
158        self.add_passthru_arg(
159            '--max-ngram-size', dest='max_ngram_size',
160            default=DEFAULT_MAX_NGRAM_SIZE, type=int,
161            help='maximum phrase length to consider')
162        self.add_passthru_arg(
163            '--stop-words', dest='stop_words',
164            default=', '.join(DEFAULT_STOP_WORDS),
165            help=("comma-separated list of words to ignore. For example, "
166                  "--stop-words 'in, the' would cause 'hole in the wall' to be"
167                  " parsed as ['hole', 'wall']. Default: %(default)s"))
168        self.add_passthru_arg(
169            '--short-doc-threshold', dest='short_doc_threshold',
170            type=int, default=None,
171            help=('Normally, for each n-gram size, we take the average score'
172                  ' over all n-grams that appear. This allows us to penalize'
173                  ' short documents by using this threshold as the denominator'
174                  ' rather than the actual number of n-grams.'))
175        self.add_passthru_arg(
176            '--no-test-set', dest='no_test_set',
177            action='store_true', default=False,
178            help=("Choose about half of the documents to be the testing set"
179                  " (don't use them to train the classifier) based on a SHA1"
180                  " hash of their text"))
181
182    def load_options(self, args):
183        """Parse stop_words option."""
184        super(MRTextClassifier, self).load_options(args)
185
186        self.stop_words = set()
187        if self.options.stop_words:
188            self.stop_words.update(
189                s.strip().lower() for s in self.options.stop_words.split(','))
190
191    def parse_doc(self, _, doc):
192        """Mapper: parse documents and emit ngram information.
193        Input: JSON-encoded documents (see :py:func:`encode_document`)
194        Output:
195        ``('ngram', (n, ngram)), (count, cats)`` OR
196        ``('doc', doc_id), doc``
197        n: ngram length
198        ngram: ngram encoded encoded as a string (e.g. "pad thai")
199            or None to indicate ANY ngram.
200        count:  # of times an ngram appears in the document
201        cats: a map from category name to a boolean indicating whether it's
202            this document is in the category
203        doc_id: (hopefully) unique document ID
204        doc: the encoded document. We'll fill these fields:
205            ngram_counts: map from (n, ngram) to  # of times ngram appears
206                in the document, using (n, None) to represent the total
207                number of times ANY ngram of that size appears (essentially
208                number of words)
209            in_test_set: boolean indicating if this doc is in the test set
210            id: SHA1 hash of doc text (if not already filled)
211        """
212        # only compute doc hash if we need it
213        if doc.get('id') is not None and self.options.no_test_set:
214            doc_hash = '0'  # don't need doc hash
215        else:
216            doc_hash = hashlib.sha1(doc['text'].encode('utf-8')).hexdigest()
217
218        # fill in ID if missing
219        if doc.get('id') is None:
220            doc['id'] = doc_hash
221
222        # pick test/training docs
223        if self.options.no_test_set:
224            doc['in_test_set'] = False
225        else:
226            doc['in_test_set'] = bool(int(doc_hash[-1], 16) % 2)
227
228        # map from (n, ngram) to number of times it appears
229        ngram_counts = count_ngrams(
230            doc['text'], self.options.max_ngram_size, self.stop_words)
231
232        # yield the number of times the ngram appears in this doc
233        # and the categories for this document, so we can train the classifier
234        if not doc['in_test_set']:
235            for (n, ngram), count in ngram_counts.iteritems():
236                yield ('ngram', (n, ngram)), (count, doc['cats'])
237
238        # yield the document itself, for safekeeping
239        doc['ngram_counts'] = ngram_counts.items()
240        yield ('doc', doc['id']), doc
241
242    def count_ngram_freq(self, type_and_key, values):
243        """Reducer: Combine information about how many times each ngram
244        appears for docs in/not in each category. Dump ngrams that appear
245        in very few documents (according to --min-df switch). If two documents
246        have the same ID, increment a counter and only keep one; otherwise
247        pass docs through unchanged.
248        Input (see parse_doc() for details):
249        ('ngram', (n, ngram)), (count, cats) OR
250        ('doc', doc_id), doc
251        Output:
252        ('global', None), ((n, ngram), (cat_to_df, cat_to_tf)) OR
253        ('doc', doc_id), doc
254        n: ngram length
255        ngram: ngram encoded encoded as a string (e.g. "pad thai")
256            or None to indicate ANY ngram.
257        cat_to_df: list of tuples of ((cat_name, is_in_category), df); df
258            is  # of documents of this type that the ngram appears in
259        cat_to_tf: list of tuples of ((cat_name, is_in_category), df); tf
260            is  # of time the ngram appears in docs of this type
261        doc_id: unique document ID
262        doc: the encoded document
263        """
264        key_type, key = type_and_key
265
266        # pass documents through
267        if key_type == 'doc':
268            doc_id = key
269            docs = list(values)
270            # if two documents end up with the same key, only keep one
271            if len(docs) > 1:
272                self.increment_counter(
273                    'Document key collision', str(doc_id))
274            yield ('doc', doc_id), docs[0]
275            return
276
277        assert key_type == 'ngram'
278        n, ngram = key
279
280        # total # of docs this ngram appears in
281        total_df = 0
282        # map from (cat, is_in_cat) to
283        # number of documents in this cat it appears in (df), or
284        # number of times it appears in documents of this type (tf)
285        cat_to_df = defaultdict(int)
286        cat_to_tf = defaultdict(int)
287
288        for count, cats in values:
289            total_df += 1
290            for cat in cats.iteritems():
291                cat_to_df[cat] += 1
292                cat_to_tf[cat] += count
293
294        # don't bother with very rare ngrams
295        if total_df < self.options.min_df:
296            return
297
298        yield (('global', None),
299               ((n, ngram), (cat_to_df.items(), cat_to_tf.items())))
300
301    def score_ngrams(self, type_and_key, values):
302        """Reducer: Look at all ngrams together, and assign scores by
303        ngram and category. Also farm out documents to the reducer for
304        any ngram they contain, and pass documents through to the next
305        step.
306        To score an ngram for a category, we compare the probability of any
307        given ngram being our ngram for documents in the category against
308        documents not in the category. The score is just the log of the
309        ratio of probabilities (the "log difference")
310        Input (see count_ngram_freq() for details):
311        ('global', None), ((n, ngram), (cat_to_df, cat_to_tf)) OR
312        ('doc', doc_id), doc
313        Output:
314        ('doc', doc_id), document OR
315        ('ngram', (n, ngram)), ('doc_id', doc_id) OR
316        ('ngram', (n, ngram)), ('cat_to_score', cat_to_score)
317        n: ngram length
318        ngram: ngram encoded encoded as a string (e.g. "pad thai")
319            or None to indicate ANY ngram.
320        cat_to_score: map from (cat_name, is_in_category) to score for
321            this ngram
322        doc_id: unique document ID
323        doc: the encoded document
324        """
325        key_type, key = type_and_key
326        if key_type == 'doc':
327            doc_id = key
328            doc = list(values)[0]
329            # pass document through
330            yield ('doc', doc_id), doc
331
332            # send document to reducer for every ngram it contains
333            for (n, ngram), count in doc['ngram_counts']:
334                # don't bother even creating a reducer for the ANY ngram
335                # because we'd have to send all documents to it.
336                if ngram is None:
337                    continue
338                yield (('ngram', (n, ngram)),
339                       ('doc_id', doc_id))
340
341            return
342
343        assert key_type == 'global'
344        ngram_to_info = dict(
345            ((n, ngram),
346             (dict((tuple(cat), df) for cat, df in cat_to_df),
347              dict((tuple(cat), tf) for cat, tf in cat_to_tf)))
348            for (n, ngram), (cat_to_df, cat_to_tf)
349            in values)
350
351        # m = # of possible ngrams of any given type. This is not a very
352        # rigorous estimate, but it's good enough
353        m = len(ngram_to_info)
354
355        for (n, ngram), info in ngram_to_info.iteritems():
356            # do this even for the special ANY ngram; it's useful
357            # as a normalization factor.
358            cat_to_df, cat_to_tf = info
359
360            # get the total # of documents and terms for ngrams of this size
361            cat_to_d, cat_to_t = ngram_to_info[(n, None)]
362
363            # calculate the probability of any given term being
364            # this term for documents of each type
365            cat_to_p = {}
366            for cat, t in cat_to_t.iteritems():
367                tf = cat_to_tf.get(cat) or 0
368                # use Laplace's rule of succession to estimate p. See:
369                # http://en.wikipedia.org/wiki/Rule_of_succession#Generalization_to_any_number_of_possibilities
370                cat_to_p[cat] = (tf + (2.0 / m)) / (t + 2)
371
372            cats = set(cat for cat, in_cat in cat_to_t)
373            cat_to_score = {}
374            for cat in cats:
375                p_if_in = cat_to_p.get((cat, True), 1.0 / m)
376                p_if_out = cat_to_p.get((cat, False), 1.0 / m)
377                # take the log difference of probabilities
378                score = math.log(p_if_in) - math.log(p_if_out)
379                cat_to_score[cat] = score
380
381            yield (('ngram', (n, ngram)),
382                   ('cat_to_score', cat_to_score))
383
384    def score_documents_by_ngram(self, type_and_key, types_and_values):
385        """Reducer: For all documents that contain a given ngram, send
386        scoring info to that document. Also pass documents and scoring
387        info through as-is
388        Input (see score_ngrams() for details):
389        ('doc', doc_id), doc OR
390        ('ngram', (n, ngram)), ('doc_id', doc_id) OR
391        ('ngram', (n, ngram)), ('cat_to_score', cat_to_score)
392        Output:
393        ('doc', doc_id), ('doc', doc)
394        ('doc', doc_id), ('scores', ((n, ngram), cat_to_score))
395        ('cat_to_score', (n, ngram)), cat_to_score
396        n: ngram length
397        ngram: ngram encoded encoded as a string (e.g. "pad thai")
398            or None to indicate ANY ngram.
399        cat_to_score: map from (cat_name, is_in_category) to score for
400            this ngram
401        doc_id: unique document ID
402        doc: the encoded document
403        """
404        key_type, key = type_and_key
405
406        # pass documents through
407        if key_type == 'doc':
408            doc_id = key
409            doc = list(types_and_values)[0]
410            yield ('doc', doc_id), ('doc', doc)
411            return
412
413        assert key_type == 'ngram'
414        n, ngram = key
415
416        doc_ids = []
417        cat_to_score = None
418
419        for value_type, value in types_and_values:
420            if value_type == 'cat_to_score':
421                cat_to_score = value
422                continue
423
424            assert value_type == 'doc_id'
425            doc_ids.append(value)
426
427            if len(doc_ids) > self.options.max_df:
428                self.increment_counter('Exceeded max df', repr((n, ngram)))
429                return
430
431        # skip ngrams that are too rare to score
432        if cat_to_score is None:
433            return
434
435        # send score info for this ngram to this document
436        for doc_id in doc_ids:
437            yield ('doc', doc_id), ('scores', ((n, ngram), cat_to_score))
438
439        # keep scoring info
440        yield ('cat_to_score', (n, ngram)), cat_to_score
441
442    def score_documents(self, type_and_key, types_and_values):
443        """Reducer: combine all scoring information for each document, and
444        add it to the document. Also pass ngram scores through as-is.
445        To score a document, we essentially take a weighted average of all
446        the scores for ngrams of each size, and then sum together those
447        averages. ngrams that aren't scored (because they're very rare or
448        very common) are considered to have a score of zero. Using averages
449        allows us to be insensitive to document size. There is a penalty
450        for very small documents.
451        Input (see score_ngrams() for details):
452        ('doc', doc_id), ('doc', doc)
453        ('doc', doc_id), ('scores', ((n, ngram), cat_to_score))
454        ('cat_to_score', (n, ngram)), cat_to_score
455        Output:
456        ('doc', doc_id), doc
457        ('cat_to_score', (n, ngram)), cat_to_score
458        n: ngram length
459        ngram: ngram encoded encoded as a string (e.g. "pad thai")
460            or None to indicate ANY ngram.
461        cat_to_score: map from (cat_name, is_in_category) to score for
462            this ngram
463        doc_id: unique document ID
464        doc: the encoded document. this will contain an extra field
465            'cat_to_score', and will no longer have the 'ngram_counts' field.
466        """
467        key_type, key = type_and_key
468
469        # pass through cat_to_score
470        if key_type == 'cat_to_score':
471            cat_to_score = list(types_and_values)[0]
472            yield ('cat_to_score', key), cat_to_score
473            return
474
475        assert key_type == 'doc'
476        doc_id = key
477
478        # store the document and scoring info
479        doc = None
480        ngrams_and_scores = []
481
482        for value_type, value in types_and_values:
483            if value_type == 'doc':
484                doc = value
485                continue
486
487            assert value_type == 'scores'
488            ((n, ngram), cat_to_score) = value
489            ngrams_and_scores.append(((n, ngram), cat_to_score))
490
491        # total scores for each ngram size
492        ngram_counts = dict(((n, ngram), count)
493                            for (n, ngram), count in doc['ngram_counts'])
494
495        cat_to_n_to_total_score = defaultdict(lambda: defaultdict(float))
496
497        for (n, ngram), cat_to_score in ngrams_and_scores:
498            tf = ngram_counts[(n, ngram)]
499            for cat, score in cat_to_score.iteritems():
500                cat_to_n_to_total_score[cat][n] += score * tf
501
502        # average scores for each ngram size
503        cat_to_score = {}
504        for cat, n_to_total_score in cat_to_n_to_total_score.iteritems():
505            total_score_for_cat = 0
506            for n, total_score in n_to_total_score.iteritems():
507                total_t = ngram_counts[(n, None)]
508                total_score_for_cat += (
509                    total_score /
510                    max(total_t, self.options.short_doc_threshold, 1))
511            cat_to_score[cat] = total_score_for_cat
512
513        # add scores to the document, and get rid of ngram_counts
514        doc['cat_to_score'] = cat_to_score
515        del doc['ngram_counts']
516
517        yield ('doc', doc_id), doc
518
519if __name__ == '__main__':
520    MRTextClassifier.run()

برنامه نگاشت شماره‌های تلفن به آدرس‌های URL با استفاده از از MRJob (ابزار Hadoop):

1"""An example of how to parse non-line-based data.
2Map +1 (U.S. and Canadian) phone numbers to the most plausible
3URL for their webpage.
4This is similar to the article "Analyzing the Web for the Price of a Sandwich"
5(https://engineeringblog.yelp.com/2015/03/analyzing-the-web-for-the-price-of-a-sandwich.html)
6except that it doesn't include Yelp biz IDs, and it doesn't need to access
7S3 because it can read the input files directly.
8Sample command line:
9.. code-block:: sh
10   python mr_phone_to_url.py -r emr --bootstrap 'sudo pip install warcio' --output-dir s3://your-bucket/path/ --no-output s3://commoncrawl/crawl-data/CC-MAIN-2018-09/segments/*/wet/*.wet.gz
11To find the latest crawl:
12``aws s3 ls s3://commoncrawl/crawl-data/ | grep CC-MAIN``
13WET data is often added after a release; usually the second-most recent
14release is a safe bet.
15"""
16import re
17from itertools import islice
18
19from mrjob.job import MRJob
20from mrjob.py2 import urlparse
21from mrjob.step import MRStep
22
23PHONE_RE = re.compile(
24    br'(?:[\D\b]|^)(1?[2-9]\d{2}[\-. ()+]+\d{3}[\-. ()+]+\d{4})(?:[\D\b]|$)')
25PHONE_SEP_RE = re.compile(br'[\-. ()+]')
26
27# hosts with more than this many phone numbers are assumed to be directories
28MAX_PHONES_PER_HOST = 1000
29
30
31def standardize_phone_number(number):
32    """put *number* in a standard format, and convert it to a :py:class:`str`.
33    """
34    number_sep = PHONE_SEP_RE.split(number)
35    number = b''.join(number_sep).decode('ascii')
36    if len(number) > 7:
37        if number[-1] not in '0123456789':
38            number = number[:-1]
39        if number[0] not in '0123456789':
40            number = number[1:]
41    if len(number) <= 10:
42        return "+1" + number
43    else:
44        return "+" + number
45
46
47class MRPhoneToURL(MRJob):
48    """Use Common Crawl .wet files to map from phone number to the most
49    likely URL."""
50
51    def steps(self):
52        return [
53            MRStep(mapper_raw=self.extract_phone_and_url_mapper,
54                   reducer=self.count_by_host_reducer),
55            MRStep(reducer=self.pick_best_url_reducer),
56        ]
57
58    def extract_phone_and_url_mapper(self, wet_path, wet_uri):
59        """Read in .wet file, and extract phone ant URL
60        """
61        from warcio.archiveiterator import ArchiveIterator
62
63        with open(wet_path, 'rb') as f:
64            for record in ArchiveIterator(f):
65                if record.rec_type != 'conversion':
66                    continue
67
68                headers = record.rec_headers
69                if headers.get_header('Content-Type') != 'text/plain':
70                    continue
71
72                url = headers.get_header('WARC-Target-URI')
73                if not url:
74                    continue
75
76                host = urlparse(url).netloc
77
78                payload = record.content_stream().read()
79                for phone in PHONE_RE.findall(payload):
80                    phone = standardize_phone_number(phone)
81                    yield host, (phone, url)
82
83    def count_by_host_reducer(self, host, phone_urls):
84        phone_urls = list(islice(phone_urls, MAX_PHONES_PER_HOST + 1))
85
86        # don't bother with directories, etc.
87        host_phone_count = len(phone_urls)
88        if host_phone_count > MAX_PHONES_PER_HOST:
89            return
90
91        for phone, url in phone_urls:
92            yield phone, (url, host_phone_count)
93
94    def pick_best_url_reducer(self, phone, urls_with_count):
95        # pick the url that appears on a host with the least number of
96        # phone numbers, breaking ties by choosing the shortest URL
97        # and the one that comes first alphabetically
98        urls_with_count = sorted(
99            urls_with_count, key=lambda uc: (uc[1], -len(uc[0]), uc[0]))
100
101        yield phone, urls_with_count[0][0]
102
103
104if __name__ == '__main__':
105    MRPhoneToURL.run()

مثال‌های ارائه شده تنها بخش کوچکی از قابلیت‌های ابزارهایی نظیر هدوپ (Hadoop) و Spark هستند. این دسته از مدل‌های برنامه‌نویسی، علاوه بر اینکه امکان ذخیره‌سازی و پردازش توزیع شده داده‌های مختلف را به برنامه‌نویسان و دانشمندان داده می‌دهند، قابلیت مدیریت و پردازش کلان داده را برای سیستم فراهم می‌کنند.

اگر نوشته بالا برای شما مفید بوده است، آموزش‌های زیر نیز به شما پیشنهاد می‌شوند:

^^

بر اساس رای ۷ نفر
آیا این مطلب برای شما مفید بود؟
اگر بازخوردی درباره این مطلب دارید یا پرسشی دارید که بدون پاسخ مانده است، آن را از طریق بخش نظرات مطرح کنید.
منابع:
DataconomyThomas Henson Website
نظر شما چیست؟

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *