پردازش کلان داده در پایتون — راهنمای جامع
اگر کاربران با دنیای علم و فناوری آشنایی داشته باشند، بدون شک با واژگان و اصطلاحاتی نظیر «علم داده» (Data Science)، «علم تجزیه و تحلیل» (Analytics Science)، «یادگیری ماشین» (Machine Learning)، «کلان داده» (Big Data) مواردی مشابه برخورد داشتهاند. در دنیای فناوری امروز، از این اصطلاحات برای تعریف مجموعهای از فناوریهای مهم و پیشرفته حوزه در «علوم کامپیوتر» (Computer Science) استفاده میشود که پایه و اساس پیشرفتهای فناوری در آینده نیز به شمار میآیند. در این مطلب، با مهمترین ابزارهای علم داده جهت پردازش کلان داده آشنا خواهید. همچنین، مثالهای ساده و عملی جهت پردازش کلان داده نمایش داده خواهند شد. علاوه بر این، استفاده از «زبان برنامهنویسی پایتون» (Python Programming language) برای تحلیل داده آموزش داده خواهد شد (از این دسته از تکنیکها در مقیاس وسیعتر، جهت پردازش کلان داده نیز استفاده میشود).
علم داده یکی از حوزههای گسترده علوم کامپیوتر محسوب میشود و زیر شاخههای گستردهای نظیر «جمعآوری داده» (Data Collection)، «پاکسازی داده» (Data Cleaning)، «استاندارد سازی داده» (Data Standardization)، «تحلیل داده» (Data Analysis) و «گزارش» (Reporting) را شامل میشود. بزرگترین شرکتهای حوزه فناوری، از تکنیکهای علم داده و پردازش کلان داده برای استخراج دانش از «دادههای غیر ساخت یافته» (Unstructured Data) و «دادههای ساخت یافته» (Structured Data) سازمانی استفاده میکنند.
پردازش کلان داده
تحلیل و پردازش کلان داده هم اکنون به یکی از ترندهای تحقیقاتی در دنیا تبدیل شده است. به همین خاطر است که فرصتهای شغلی بسیار خوبی، برای برنامهنویسان و توسعهدهندگانی که با تکنیکهای پردازش کلان داده آشنایی دارند، فراهم شده است. یک دانشمند علم داده و یا یک برنامهنویس مسلط به پردازش کلان داده و تحلیل آن میتواند از مجموعه ابزارهای تولید شده برای پردازش کلان داده، جهت «تحلیل زبان» (Language Analysis)، «پیشنهاد فایلهای ویدئویی» (Recommending Videos) و یا مشخص کردن محصولات جدید با توجه به دادههای بازاریابی یا دادههای جمعآوری شده از مشتریان استفاده کند.
به طور کلی، زمانی که تکنیکهای موجود در حوزه پردازش کلان داده در علم داده مورد بررسی قرار میگیرند، منظور تکنیکهایی هستند که در حوزه خاصی به نام «علم داده در مقیاس وسیع» (large Scale Data Science) استفاده میشوند. تعریف کلمه کلان در اصطلاح پردازش کلان داده، به نوع پروژههای برنامهنویسی پردازش داده یا حجم دادههایی که قرار است پردازش و تحلیل شوند، بستگی دارد.
بسیاری از مسائل تحلیل داده، که برای حل آنها پروژههای برنامهنویسی تعریف میشود، توسط راهکارهای «مرسوم» (Conventional) و متدهای یادگیری ماشین به راحتی قابل حل هستند. به عبارت دیگر، زمانی که دادههای استفاده شده برای مدلسازی یا آموزش مدل یادگیری، به اندازهای باشند که بتوان از سیستمهای محاسباتی معمول نظیر کامپیوترهای شخصی برای پردازش آنها استفاده کرد، معمولا نیازی به استفاده از روشهای پردازش کلان داده وجود نخواهد داشت. با این حال، این امکان وجود دارد که بتوان از روشهای تحلیل و پردازش کلان داده برای حل مسائل تحلیل داده استفاده کرد. به طور کلی، زمانی از تکنیکهای پردازش کلان داده برای حل مسئله استفاده میشود که نیاز به پردازش حجم عظیمی از دادهها وجود دارد و سیستمهای محاسباتی مرسوم، توان پردازشی لازم برای تحلیل و پردازش کلان داده را نداشته باشند.
مهمترین ابزارهای پردازش کلان داده
یکی از مهمترین مدلهای ذخیرهسازی توزیع شده دادهها و پردازش کلان داده، که به وفور توسط دانشمندان علم داده مورد استفاده قرار میگیرد، مدل برنامهنویسی به نام «نگاشت-کاهش» (MapReduce) است. مدل MapReduce، روشی بهینه برای مدیریت و پردازش کلان داده محسوب میشود که به دانشمندان علم داده و توسعهدهنده برنامه کاربردی اجازه میدهد تا ابتدا داده را با استفاده از یک «صفت» (Attribute)، فیلتر یا دستهبندی خاص نگاشت کنند و سپس، با استفاده از یک مکانیزم «تبدیل» (Transformation) یا «تجمیع» (Aggregation)، دادههای نگاشت شده را کاهش دهند.
به عنوان نمونه، در صورتی که دادههای جمعاوری شده مرتبط با مجموعهای از گربهها باشند، ابتدا گربهها با استفاده از صفتی نظیر رنگ آنها نگاشت میشوند. در مرحله بعد، دادهها، از طریق جمع کردن گروهبندیهای انجام شده، کاهش پیدا میکنند. در پایان فرایند MapReduce، یک لیست یا مجموعهای مرتبط با رنگ گربهها و تعداد گربههای موجود در هر کدام از گروهبندیهای رنگی در اختیار برنامهنویس یا دانشمند داده قرار میدهد.
تقریبا تمامی کتابخانههای برنامهنویسی توسعه داده شده برای مقاصد پردازش کلان داده و علم داده، از مدل برنامهنویسی MapReduce پشتیبانی میکنند. همچنین، تعداد زیادی کتابخانه برنامهنویسی وجود دارند که امکان مدیریت، پردازش و انجام عملیات «نگاشت-کاهش» (MapReduce) دادهها را به شکل توزیع شده (خوشه یا مجموعهای از کامپیوترهای ) در اختیار برنامهنویسان و فعالان حوزه علم داده قرار میدهند. افرادی که به دنبال استفاده از مدلهای MapReduce، جهت مدیریت و پردازش کلان داده هستند، به راحتی میتوانند از ابزارها، بستهها و کتابخانههای توسعه داده شده در زبان پایتون، جهت پیادهسازی کاربردهای مرتبط با پردازش کلان داده استفاده کنند.
کتابخانه Hadoop
یکی از محبوبترین کتابخانهها برای انجام عملیات «نگاشت-کاهش» (MapReduce) روی دادههای حجیم، کتابخانه هدوپ است که توسط بنیاد نرمافزاری Apache طراحی شده است. کتابخانه هدوپ با استفاده از «محاسبات خوشهای» (Cluster Computing)، به فعالان حوزه علم داده اجازه میدهد تا بتوانند تا با سرعت و اثرگذاری به مراتب بیشتری، وظایف مرتبط با مدیریت و پردازش کلان داده را انجام دهند. کتابخانهها و بستههای زیادی در زبان پایتون توسعه داده شدهاند که از طریق آنها میتوان دادهها (که به آنها Jobs نیز گفته میشود) را به هدوپ ارسال کرد. انتخاب یکی از این کتابخانهها و بستههای برنامهنویسی جهت مقاصد پردازش کلان داده و علم داده، به عواملی نظیر سادگی استفاده، زیرساختهای لازم برای پیادهسازی و کاربرد خاصی که قرار است در آن مورد استفاده قرار بگیرند، بستگی دارد.
کتابخانه Spark
در صورتی که در کاربرد پردازش کلان داده مورد نظر برنامهنویس و یا دانشمند علم داده، استفاده از دادههایی که در قالب «دادههای جریانی» (Streaming Data) هستند (نظیر دادههای «بلادرنگ» (Real-Time)، دادههای Log و دادههای جمعآوری شده از «واسطهای برنامهنویسی کاربردی» (Application Programming Interface))، عملکرد به مراتب بهتری را برای سیستم به ارمغان میآورند، استفاده از کتابخانه Spark بهترین گزینه خواهد بود. کتابخانه Spark، نظیر کتابخانه هدوپ، توسط بنیاد نرمافزاری Apache توسعه داده شده است.
پایتون و پردازش کلان داده
سؤالی که ممکن در اینجا ذهن خوانندگان و مخاطبان این مطلب را به خود مشغول کند این است که زبان پایتون، جهت پردازش کلان داده و تحلیل آن، چه امکاناتی را در اختیار برنامهنویس یا «دانشمند علم داده» (Data Scientist) قرار میدهد. در چند سال اخیر، پایتون به عنوان یکی از مهمترین ابزارهای دانشمندان علم داده برای برنامهنویسی پروژههای علم داده مطرح شده است؛ تحلیل و پردازش کلان داده هم از این قاعده مستثنی نیست.
اگرچه، بسیاری از دانشمندان علم داده و برنامهنویسان فعال در این حوزه، از زبانهای برنامهنویسی نظیر جاوا، R و Julia و ابزارهایی مانند SPSS و SAS برای تحلیل داده و استخراج «بینش» (Insight) از آنها استفاده میکنند، با این حال، محبوبیت روزافزون زبان پایتون، «کتابخانههای برنامهنویسی» (Libraries) متنوع و پرکاربرد و جامعه برنامهنویسی پویا و فعال آن، سبب شده است تا بسیاری از دانشمندان علم داده، از زبان پایتون برای تحلیل دادهها و پردازش کلان داده استفاده کنند. افزایش روزافزون محبوبیت پایتون برای مقاصد علم داده را میتوان در رشد چشمگیر کتابخانهها، ابزارها و بستههای برنامهنویسی در حوزه علم داده مشاهده کرد.
زبان پایتون، در چند سال اخیر، همواره به عنوان یکی از محبوبترین زبانهای برنامهنویسی شناخته شده است. بسیاری از فرصتهای شغلی لیست شده برای زبان پایتون نیز مرتبط با حوزه علم داده، تحلیل داده، پردازش کلان داده و «هوش مصنوعی» (Artificial Intelligence) هستند. به همین دلیل است که بسیاری از «مهندسان داده» (Data Engineers) به استفاده از پایتون و تسلط بر این زبان برنامهنویسی روی میآورند.
یکی از مهمترین ابزارهای پردازش کلان داده برای دانشمندان داده، کتابخانهای به نام «هدوپ» (Hadoop) است. ابزارهای نرمافزاری منبع باز هدوپ، در ابتدا توسط «بنیاد نرمافزاری آپاچی» (Apache Software Foundation) و با هدف ارائه بستر نرمافزاری لازم برای محاسبات توزیع شده و پردازش کلان داده، در زبان برنامهنویسی جاوا تهیه شدهاند. این دسته از ابزارهای نرمافزاری به برنامهنویسان و توسعهدهندگان اجازه میدهند تا با استفاده از شبکهای متشکل از چندین کامپیوتر (به شکل «توزیع شده» (Distributed)) اقدام به حل مسائلی کنند که حجم عظیمی از داده و محاسبات را شامل میشوند. جهت پردازش کلان داده و تحلیل آنها، هدوپ، چارچوبی نرمافزاری برای ذخیرهسازی توزیع شده و پردازش کلان داده معرفی میکند؛ این چارچوب نرمافزاری بر پایه مدل برنامهنویسی «نگاشت-کاهش» (MapReduce) پیادهسازی شده است.
دو دلیل عمده برای محبوبیت بیش از پیش زبان پایتون، جهت برنامهنویسی علم داده و پردازش کلان داده، وجود دارد:
- کتابخانهها، ابزارها و بستههای برنامهنویسی بسیار قدرتمند و پرکاربردی برای پردازش، ذخیرهسازی، نمایش، شکل دادن و مصورسازی دادهها در زبان برنامهنویسی پایتون ارائه شده است (در ادامه، این ابزارها شرح داده خواهند شد).
- پس از افزایش روزافزون محبوبیت پایتون و روی آوردن تعداد زیادی از برنامهنویسان و توسعهدهندگان حوزه علم داده به این زبان برنامهنویسی، بنیاد نرمافزاری آپاچی به سرعت امکان استفاده از «اکوسیستم» (Ecosystem) هدوپ را برای دانشمندان علم داده و برنامهنویسان این حوزه فراهم آورد. هماکنون برنامهنویسان زبان پایتون و دانشمندان علم داده مسلط به این زبان میتوانند از ابزارهایی نظیر « استریمینگ هدوپ» (Hadoop Streaming)، افزونه Hadoopy، کتابخانه Pydoop و بسته Pydoop اشاره کرد.
استفاده از هدوپ (Hadoop) در پایتون به جای جاوا
یکی از سؤالاتی که ممکن است هنگام استفاده از هدوپ در پایتون (جهت پردازش کلان داده و تحلیل آنها) برای فعالان حوزه علم داده پدید بیاید این است که چرا لازم است از پایتون به جای جاوا، برای پردازش کلان داده و تحلیل آن استفاده شود. در پاسخ به این سؤال باید گفت که اگرچه زبان جاوا هنوز هم یکی از محبوبترین و پرکاربردترین زبانهای برنامهنویسی در جهان و زبان اصلی توسعه چارچوب هدوپ (و Spark) محسوب میشود، با این حال، میان برخی از ویژگیهای ساختاری این دو زبان تفاوتهایی وجود دارد که ممکن است سبب روی آوردن فعالان این حوزه به استفاده از پایتون شود. از جمله این تفاوتها میتوان به موارد زیر اشاره کرد:
- نصب، پیادهسازی و اجرای محیط برنامهنویسی جاوا (Java Development Environment) نسبت به زبان پایتون سختتر است (تنظیم کردن تنظیمات مرتبط با «متغیرهای محیطی» (Environment Variables)، تنظیمات فایلهای XML مرتبط با وابستگیهای نرمافزاری و سایر موارد). در نقطه مقابل، برای نصب، پیادهسازی و اجرای محیط برنامهنویسی پایتون، تنها کافی است پایتون را روی سیستم نصب و از واسط خط دستوری آن جهت کد نویسی استفاده کرد.
- در محیط برنامهنویسی جاوا، ابتدا باید کدهای نوشته شده «کامپایل» (Compile) و سپس اجرا شوند. در حالی که در زبان پایتون، تنها کافی است کدها توسط برنامهنویس نوشته و سپس خط به خط در واسط خط دستوری اجرا شوند.
- تعداد خط کدهای لازم برای نوشتن یک برنامه خاص در زبان جاوا، به مراتب بیشتر از تعداد خط کدهای لازم برای توسعه همان برنامه در زبان پایتون خواهد بود. به عنوان نمونه، به مثال زیر دقت کنید. در ادامه، تعداد خط کدهای لازم برای شمارش تعداد دفعات تکرار کلمات با استفاده از زبان جاوا و پایتون نمایش داده شده است. اولین قدم جهت پردازش کلان داده در فرم دادههای متنی غیر ساخت یافته، شمارش تعداد دفعات تکرار کلمات ظاهر شده در این دادهها است:
شمارش تعداد دفعات تکرار کلمات با استفاده از زبان جاوا:
1package org.myorg;
2
3 import java.io.IOException;
4 import java.util.*;
5
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.conf.*;
8 import org.apache.hadoop.io.*;
9 import org.apache.hadoop.mapred.*;
10 import org.apache.hadoop.util.*;
11
12 public class WordCount {
13
14 public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
15 private final static IntWritable one = new IntWritable(1);
16 private Text word = new Text();
17
18 public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
19 String line = value.toString();
20 StringTokenizer tokenizer = new StringTokenizer(line);
21 while (tokenizer.hasMoreTokens()) {
22 word.set(tokenizer.nextToken());
23 output.collect(word, one);
24 }
25 }
26 }
27
28 public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
29 public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
30 int sum = 0;
31 while (values.hasNext()) {
32 sum += values.next().get();
33 }
34 output.collect(key, new IntWritable(sum));
35 }
36 }
37
38 public static void main(String[] args) throws Exception {
39 JobConf conf = new JobConf(WordCount.class);
40 conf.setJobName("wordcount");
41
42 conf.setOutputKeyClass(Text.class);
43 conf.setOutputValueClass(IntWritable.class);
44
45 conf.setMapperClass(Map.class);
46 conf.setCombinerClass(Reduce.class);
47 conf.setReducerClass(Reduce.class);
48
49 conf.setInputFormat(TextInputFormat.class);
50 conf.setOutputFormat(TextOutputFormat.class);
51
52 FileInputFormat.setInputPaths(conf, new Path(args[0]));
53 FileOutputFormat.setOutputPath(conf, new Path(args[1]));
54
55 JobClient.runJob(conf);
56 }
57 }
شمارش تعداد دفعات تکرار کلمات با استفاده از زبان پایتون:
1import re
2
3WORD_RE = re.compile(r"[\w']+")
4
5class MRWordFreqCount(MRJob):
6
7 def mapper(self, _, line):
8 for word in WORD_RE.findall(line):
9 yield (word.lower(), 1)
10
11 def combiner(self, word, counts):
12 yield (word, sum(counts))
13
14 def reducer(self, word, counts):
15 yield (word, sum(counts))
16
17if __name__ == '__main__':
18 MRWordFreqCount.run()
19
20#Source
21#https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_word_freq_count.py
همانطور که مشاهده میشود، برنامهای که نوشتن آن در زبان جاوا به 57 خط کد احتیاج است، در زبان پایتون، تنها با 15 خط کد قابل نوشتن است. این ویژگی سبب میشود تا «نگهداری» (Maintain) کدها در زبان پایتون به مراتب راحتتر از زبان جاوا باشد.
کتابخانههای مدیریت و پردازش داده در پایتون
همانطور که پیش از این نیز اشاره شد، کتابخانهها، ابزارها و بستههای برنامهنویسی بسیار قدرتمند و پرکاربردی برای پردازش، ذخیرهسازی، نمایش، شکل دادن و مصورسازی دادهها در زبان برنامهنویسی پایتون ارائه شده است. از جمله مهمترین کتابخانههای مدیریت و پردازش داده در پایتون میتوان به موارد زیر اشاره کرد:
کتابخانه Pandas
یکی از محبوبترین کتابخانههای ارائه شده برای مقاصد علم داده، کتابخانه Pandas است. کتابخانه Pandas توسط دانشمندان علم دادهای که به زبانهای برنامهنویسی R و پایتون مسلط هستند، توسعه داده شده است. هم اکنون جامعه بسیار بزرگی از برنامهنویسان و توسعهدهندگان (متشکل از دانشمندان علم داده و تحلیلگران داده) از کتابخانه Pandas و فرایند توسعه آن (برطرف کردن مشکلات احتمالی و اضافه کردن قابلیتهای برنامهنویسی جدید) پشتیبانی میکنند.
کتابخانه Pandas ویژگیهای تعبیه شده بسیار مهم و پرکاربردی نظیر قابلیت خواندن دادهها از منابع مختلف، ساختن «دیتا فریم» (Dataframe) یا ماتریس/جدول متناظر با دادههای خوانده شده و محاسبه «تجزیه و تحلیل تجمیعی» (Aggregate Analytics) بر اساس نوع برنامه کاربردی در حال توسعه را برای برنامهنویسان و توسعهدهندگان فراهم میآورد.
علاوه بر امکانات تعبیه شده برای مدیریت و پردازش داده، کتابخانه Pandas از امکانات «مصورسازی داده» (Data Visualization) بسیار مناسبی نیز بهره میبرد. با استفاده از قابلیتهای مصورسازی تعبیه شده در این کتابخانه، امکان تولید گراف و نمودار از نتایج تولید شده برای برنامهنویسان و دانشمندان علم داده فراهم شده است. همچنین، مجموعهای از توابع داخلی در کتابخانه Pandas تعریف شدهاند که امکان «صادر کردن» (Export) تجزیه و تحلیلهای انجام شده روی دادهها را در قالب فایل «صفحات گسترده اکسل» (Excel Spreadsheet) فراهم میآورند.
کتابخانه Agate
یکی از کتابخانههای برنامهنویسی نوظهور برای مقاصد علم داده، کتابخانه Agate نام دارد. کتابخانه Agate با هدف استفاده در کاربردهای «روزنامهنگاری» (Journalism) پیادهسازی شده است و ویژگیهای بسیار مناسبی جهت تحلیل دادهها در اختیار برنامهنویسان و توسعهدهندگان قرار میدهد. از این کتابخانه میتوان جهت تحلیل و مقایسه فایلهای صفحه گسترده (Spreadsheet)، انجام محاسبات آماری رو دادهها و سایر موارد استفاده کرد.
یادگیری کار کردن با کتابخانه Agate بسیار ساده است و وابستگیهای به مراتب کمتری نسبت به Pandas دارد. همچنین، ویژگیهای جالبی نظیر مصورسازی دادهها و تولید نمودار، این کتابخانه را به ابزاری محبوب برای مقاصد تحلیل داده تبدیل کرده است.
کتابخانه Bokeh
در صورتی که تمایل به مصورسازی دادههای موجود در یک مجموعه داده ساخت یافته دارید، Bokeh ابزار بسیار مناسبی محسوب میشود. این ابزار را میتوان با کتابخانههای تحلیل داده نظیر Pandas ،Agate و سایر موارد مورد استفاده قرار داد. این ابزار، به برنامهنویسان و توسعهدهندگان اجازه میدهد تا با کمترین کد نویسی، نمودارهای گرافیکی و مصورسازیهای خیره کنندهای از دادهها تولید کنند.
کاوش، پردازش و تحلیل دادهها در پایتون با استفاده از کتابخانه Pandas
پیش از این که با ابزارهای پردازش کلان داده در پایتون آشنا شویم، با یک مثال ساده، نحوه تحلیل دادهها در پایتون مورد بررسی قرار میگیرد. پس از جستجو در سایت Kaggle، مجموعه داده حقوق کارکنان دولتی شهر سانفرانسیسکو برای این کار انتخاب شد. این مجموعه داده از طریق لینک [+] قابل دسترسی است.
ابتدا با استفاده از قطعه کد زیر دادهها وارد (Import) و در یک دیتا فریم Pandas خوانده میشوند.
1import pandas as pd
2salaries = pd.read_csv('Salaries.csv')
3salaries.head()
سپس، با استفاده از تابع describe، توزیع دادههای موجود در مجموعه داده نمایش داده میشود.
1salaries.head()
دادههای جمعآوری شده، مربوط به سالهای متعددی است. برای محدودتر کردن حوزه تحلیلی انجام شده از دادهها، تنها دادههای مربوط به سال 2014 مورد بررسی قرار گرفته میشوند.
1latest_salaries = salaries[salaries['Year'] == 2014]
2latest_salaries.describe()
سپس، در این مرحله از فرایند تحلیل داده، مقدار درصد متوسطی از حقوق سالیانه که کارمندان دولتی برای پرداخت اجاره خانه استفاده میکنند، بررسی میشود.
1average_yearly_rent = 3880 * 12
2average_city_pay = latest_salaries['TotalPay'].mean()
3average_yearly_rent / average_city_pay
0.61698360153365217
بنابراین، همانطور که در خروجی نمایش داده شده مشخص است، کارمندان دولتی در سانفرانسیسکو، به طور متوسط چیزی حدود 60 درصد از حقوق خود را صرف پرداخت اجاره بها میکنند. در مرحله بعد، تعداد کارمندانی که حقوق آنها از متوسط اجاره بهای منزل در این شهر کمتر است مشخص میشود.
1latest_salaries[latest_salaries['TotalPay'] < average_yearly_rent].shape[0]
11360
همانطور که در خروجی نمایش داده شده مشخص است، بیش از 11 هزار کارمند دولتی در سانفرانسیسکو، درآمد کمتر از متوسط اجاره بهای منزل دارند. در مرحله بعدی، تعداد کارمندانی که باید اضافه کار داشته باشند تا بتوانند از حقوق لازم برای پرداخت اجاره بها برخوردار شوند، نمایش داده میشود. برای اینکار، تمامی کسانی که بیش از 70 درصد حقوق خود را برای اجاره بها پرداخت میکنند، به عنوان افرادی در نظر گرفته میشوند که بدون اضافه کاری قادر به پرداخت اجاره بها نیستند.
1pd.to_numeric(latest_salaries['BasePay'], errors='coerce').describe()
count 38119.000000 mean 66564.421924 std 44053.783972 min 0.000000 25% NaN 50% NaN 75% NaN max 318835.490000 Name: BasePay, dtype: float64
1base_pay_series = pd.to_numeric(latest_salaries['BasePay'], errors='coerce')
2base_pay_series[base_pay_series * .7 < average_yearly_rent].shape
20074
بنابراین، بیش از 20 هزار کارمند دولتی بدون اضافه کاری قادر به پرداخت اجاره بها و هزینه زندگی در شهر سانفرانسیکو نخواهند بود. در مرحله بعد، با استفاده از تحلیل دادههای موجود، تعداد کارمندانی مشخص میشود که باید بیش از 1000 دلار در سال اضافه کاری داشته باشند تا بتوانند به زندگی خود در این شهر ادامه دهند.
1overtime_series = pd.to_numeric(latest_salaries['OvertimePay'], errors='coerce')
2overtime_series.describe()
count 38119.000000 mean 5401.993737 std 11769.656257 min 0.000000 25% NaN 50% NaN 75% NaN max 173547.730000 Name: OvertimePay, dtype: float64
1overtime_series[overtime_series > 1000].shape
15361
بنابراین، همانطور که در خروجی نمایش داده شده مشخص است، بیش از 15 هزار کارمند دولتی، با اضافه کاری بیش از 1000 دلار در سال، قادر به زندگی در این شهر پرخرج هستند. این دسته از تحلیلهای انجام شده روی مجموعه داده انتخابی، تنها بخشی از کارهای قابل انجام توسط کتابخانههای پردازش و تحلیل داده در پایتون هستند. بنابراین، همانطور که مشاهده میشود، پایتون یکی از بهترین و سادهترین زبانها برای تحلیل داده به شمار میآید و امکانات کاملی برای کاوش، پردازش و تحلیل داده در اختیار برنامهنویسان و دانشمندان علم داده قرار میدهد.
کتابخانههای پردازش کلان داده در پایتون
زبان برنامهنویسی پایتون و جامعه برنامهنویسی فعال آن، ابزارها و کتابخانههای متنوعی برای پردازش کلان داده در اختیار برنامهنویسان و دانشمندان علم داده قرار داده است. این ابزارها، مبتنی بر کتابخانه هدوپ و Spark هستند. از جمله، مهمترین ابزارهای پردازش کلان داده در پایتون میتواند به موارد زیر اشاره کرد:
کتابخانه PySpark
کتابخانه PySpark که واسط برنامهنویسی کاربردی Spark در زبان پایتون محسوب میشود، به دانشمندان علم داده و برنامهنویسان فعال در این حوزه اجازه میدهد تا به سرعت، زیرساختهای لازم برای نگاشت و کاهش مجموعههای داده را پیادهسازی کنند. همچنین، از آنجایی که در کتابخانه Spark مجموعهای از الگوریتمهای یادگیری ماشین تعبیه شده است، علاوه بر پردازش کلان داده و مدیریت آنها، جهت حل مسائل یادگیری ماشین نیز مورد استفاده قرار میگیرد.
ابزار جریانسازی یا Streaming
ابزار جریانسازی هدوپ یا Hadoop Streaming، یکی از مجبوبترین روشهای استفاده از Hadoop در پایتون محسوب میشود. جریانسازی یا Streaming، یکی از ویژگیهای تعبیه شده در کتابخانه Hadoop است. این ویژگی به برنامهنویس اجازه میدهد تا کدهای نوشته به زبان پایتون (و یا دیگر زبانهای برنامهنویسی) را جهت انجام عملیات نگاشت (Mapping) به تابع stdin پاس بدهند (به عنوان آرگومان، به این تابع ارسال کنند). به عبارت دیگر، توسعهدهندگان از جاوا به عنوان Wrapper استفاده میکنند تا بتوانند کدهای پایتون را به تابع stdin پاس بدهند. سپس، در زمان اجرا، عملیات نگاشت (Mapping) در زبان جاوا و توسط کتابخانه Hadoop انجام میشود.
افزونه Hadoopy
این ابزار، افزونهای برای جریانسازی هدوپ یا Hadoop Streaming محسوب میشود و از کتابخانه Cython برای انجام عملیات نگاشت-کاهش (MapReduce) در پایتون استفاده میکند. مستندسازی خوبی برای این ابزار انجام شده است و کار کردن با آن برای توسعهدهندگان راحت است. با این حال، این پروژه از پایان سال 2012 تا به امروز غیر فعال باقی مانده و بهروزرسانی جدیدی دریافت نکرده است.
بسته Pydoop
این بسته برنامهنویسی به توسعهدهندگان اجازه میدهد تا بتوانند برنامه مرتبط با پردازش کلان داده را در پایتون کد نویسی کنند. سپس، با استفاده از کدهای پیادهسازی شده، به طور مستقیم با دادههای ذخیره شده در خوشه هدوپ (Hadoop Cluster) ارتباط برقرار کرده و عملیات نگاشت-کاهش (MapReduce) انجام دهند. چنین کاری از طریق واسط برنامهنویسی کاربردی HDFS (یا HDFS API) تعبیه شده در بسته Pydoop امکانپذیر شده است. واسط برنامهنویسی کاربردی HDFS به برنامهنویسان اجازه میدهد تا در محیط پایتون، عملیات خواندن و نوشتن دادهها، از معماری فایل سیستم HDFS را انجام دهند.
کتابخانه MRJob
یک کتابخانه برنامهنویسی در زبان پایتون است که امکان انجام عملیات نگاشت-کاهش (MapReduce) و پردازش کلان داده را در اختیار برنامهنویسان و دانشمندان علم داده قرار میدهد. این کتابخانه نرمافزاری، یکی از پراستفادهترین بستههای برنامهنویسی جهت پردازش کلان داده در زبان پایتون محسوب میشود و توسط شرکت Yelp در اختیار برنامهنویسان زبان پایتون قرار گرفته است.
کتابخانه MRJob، از هدوپ، سرویس Cloud Dataproc گوگل و سرویس Elastic MapReduce یا EMR شرکت آمازون نیز پشتیبانی میکند. سرویس EMR، یک وب سرویس ارائه شده توسط شرکت آمازون است که امکان تحلیل و پردازش کلان داده با استفاده از هدوپ و Spark را در اختیار فعالان حوزه علم داده قرار میدهد. مستندسازی خوبی برای این کتابخانه انجام شده است و کار کردن با آن برای توسعهدهندگان و برنامهنویسان راحت است. همچنین، این پروژه کاملا فعال است و از پیادهسازیهای مختلف آن پشتیبانی میشود.
اجرای کدهای پایتون در هدوپ (Hadoop) با استفاده از کتابخانه MRJob
در این مثال، یک برنامه ساده برای محاسبه دفعات ظاهر شدن تمامی کلمات در متن، در زبان پایتون کد نویسی خواهد شد. سپس، با استفاده از هدوپ (Hadoop)، این قابلیت پدید میآید تا هر نوع داده متنی (با هر اندازه ممکن و در هر قالبی)، مورد تجزیه و تحلیل قرار بگیرد و تعداد دفعات ظاهر شدن تمامی کلمات موجود در آن محاسبه شود. برای چنین کاری، باید یک خوشه هدوپ در سیستم اجرایی باشد. برای اجرای کدهای پایتون در هدوپ (Hadoop) با استفاده از کتابخانه MRJob، لازم است مراحل زیر دنبال شود:
- ابتدا با استفاده از Pip، کتابخانه MRJob در پایتون نصب میشود.
1pip install mrjob
- برنامه شمارش کلمات در متن: برای چنین کاری، در یک برنامه دلخواه ویرایشگر کد، از کدهای زیر استفاده کنید (یک فایل خالی و کدهای زی را در آن کپی کنید). سپس، فایل مورد نظر را با نام word-count.py ذخیره کنید.
کدهای لازم محاسبه دفعات ظاهر شدن تمامی کلمات در متن با استفاده از MRJob (ابزار Hadoop):
1# Copyright 2009-2010 Yelp
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""The classic MapReduce job: count the frequency of words.
15"""
16from mrjob.job import MRJob
17import re
18
19WORD_RE = re.compile(r"[\w']+")
20
21
22class MRWordFreqCount(MRJob):
23
24 def mapper(self, _, line):
25 for word in WORD_RE.findall(line):
26 yield (word.lower(), 1)
27
28 def combiner(self, word, counts):
29 yield (word, sum(counts))
30
31 def reducer(self, word, counts):
32 yield (word, sum(counts))
33
34
35if __name__ == '__main__':
36 MRWordFreqCount.run()
- اجرای کدهای پایتون در هدوپ (Hadoop) با استفاده از کتابخانه MRJob: پس از اینکه کد نویسی برنامه شمارش تعداد دفعات ظاهر شدن کلمات در متن به پایان رسید، در مرحله بعد لازم است تا کدهای پایتون برای هدوپ (hadoop) ارسال و اجرا شوند. برای این کار از کد دستوری زیر استفاده میشود:
1$ python word_count.py [file location] -r hadoop > counts
کدهای کامل پیادهسازی سیستم پردازش دادههای متنی با استفاده از کتابخانه MRJob، در لینک [+] موجود است (در این پروژه، از تکنیکهای پردازش کلان داده، مدل برنامهنویسی MapReduce و کتابخانههای Spark و Hadoop استفاده شده است). در ادامه، تعدادی مثال مرتبط با پردازش دادههای متنی با استفاده از کتابخانه MRJob نمایش داده شده است. ذکر این نکته ضروری است که برای اطمینان از صحت عملکرد کدهای ارائه شده، تمامی ماژولهای تعریف شده در در لینک [+]، باید توسط برنامه کاربردی قابل دسترس باشند.
محاسبه دفعات ظاهر شدن تمامی کلماتی که حاوی حرف u هستند، با استفاده از MRJob (ابزار Hadoop):
1# Copyright 2016 Yelp
2# Copyright 2017 Yelp and Contributors
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""The classic MapReduce job, but only for words containing "u"
16"""
17from mrjob.job import MRJob
18import re
19
20WORD_RE = re.compile(r"[\w']*u[\w']*", re.I)
21
22
23class MRVWordFreqCount(MRJob):
24
25 def mapper_pre_filter(self):
26 return 'grep -i u'
27
28 def mapper(self, _, line):
29 for word in WORD_RE.findall(line):
30 yield (word.lower(), 1)
31
32 def combiner(self, word, counts):
33 yield (word, sum(counts))
34
35 def reducer(self, word, counts):
36 yield (word, sum(counts))
37
38
39if __name__ == '__main__':
40 MRVWordFreqCount.run()
محاسبه تعداد کلمات موجود در یک متن با استفاده از Pyspark:
1# Copyright 2016 Yelp
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import re
15import sys
16from operator import add
17
18from pyspark import SparkContext
19
20
21WORD_RE = re.compile(r"[\w']+")
22
23
24def main():
25 # read in input, output path
26 args = sys.argv[1:]
27
28 if len(args) != 2:
29 raise ValueError
30
31 inputPath, outputPath = args
32
33 sc = SparkContext(appName='mrjob Spark wordcount script')
34
35 lines = sc.textFile(inputPath)
36
37 # lines.flatMap(WORD_RE.findall) doesn't work on Spark 1.6.2; apparently
38 # it can't serialize instance methods?
39 counts = (
40 lines.flatMap(lambda line: WORD_RE.findall(line))
41 .map(lambda word: (word, 1))
42 .reduceByKey(add))
43
44 counts.saveAsTextFile(outputPath)
45
46 sc.stop()
47
48
49if __name__ == '__main__':
50 main()
مشخص کردن کلمات پر استفاده در یک داده متنی ورودی با استفاده از MRJob (ابزار Hadoop):
1# Copyright 2009-2010 Yelp
2# Copyright 2013 David Marin
3# Copyright 2018 Yelp
4# Copyright 2019 Yelp
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17"""Determine the most used word in the input, ignoring common "stop" words.
18Shows how to do a multi-step job, and how to load a support file
19from the same directory.
20"""
21import re
22
23from mrjob.job import MRJob
24from mrjob.protocol import JSONValueProtocol
25from mrjob.step import MRStep
26
27WORD_RE = re.compile(r"[\w']+")
28
29
30class MRMostUsedWord(MRJob):
31 FILES = ['stop_words.txt']
32
33 OUTPUT_PROTOCOL = JSONValueProtocol
34
35 def configure_args(self):
36 super(MRMostUsedWord, self).configure_args()
37
38 # allow for alternate stop words file
39 self.add_file_arg(
40 '--stop-words-file',
41 dest='stop_words_file',
42 default=None,
43 help='alternate stop words file. lowercase words, one per line',
44 )
45
46 def mapper_init(self):
47 stop_words_path = self.options.stop_words_file or 'stop_words.txt'
48
49 with open(stop_words_path) as f:
50 self.stop_words = set(line.strip() for line in f)
51
52 def mapper_get_words(self, _, line):
53 # yield each word in the line
54 for word in WORD_RE.findall(line):
55 word = word.lower()
56 if word not in self.stop_words:
57 yield (word, 1)
58
59 def combiner_count_words(self, word, counts):
60 # sum the words we've seen so far
61 yield (word, sum(counts))
62
63 def reducer_count_words(self, word, counts):
64 # send all (num_occurrences, word) pairs to the same reducer.
65 # num_occurrences is so we can easily use Python's max() function.
66 yield None, (sum(counts), word)
67
68 # discard the key; it is just None
69 def reducer_find_max_word(self, _, word_count_pairs):
70 # each item of word_count_pairs is (count, word),
71 # so yielding one results in key=counts, value=word
72 try:
73 yield max(word_count_pairs)
74 except ValueError:
75 pass
76
77 def steps(self):
78 return [
79 MRStep(mapper_init=self.mapper_init,
80 mapper=self.mapper_get_words,
81 combiner=self.combiner_count_words,
82 reducer=self.reducer_count_words),
83 MRStep(reducer=self.reducer_find_max_word)
84 ]
85
86
87if __name__ == '__main__':
88 MRMostUsedWord.run()
مشخص کردن کلمات پر استفاده در یک داده متنی ورودی با استفاده از MRJob (ابزار spark):
1# Copyright 2019 Yelp
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Like mr_most_used_word.py, but on Spark.
15To make this work on the local[*] master, pass in your own --stop-words-file
16(it won't be able to see stop_words.txt because --files doesn't work
17on local[*] master)
18"""
19import json
20import re
21from operator import add
22
23from mrjob.job import MRJob
24
25WORD_RE = re.compile(r"[\w']+")
26
27
28class MRSparkMostUsedWord(MRJob):
29
30 FILES = ['stop_words.txt']
31
32 def configure_args(self):
33 super(MRSparkMostUsedWord, self).configure_args()
34
35 # allow for alternate stop words file
36 self.add_file_arg(
37 '--stop-words-file',
38 dest='stop_words_file',
39 default=None,
40 help='alternate stop words file. lowercase words, one per line',
41 )
42
43 def spark(self, input_path, output_path):
44 from pyspark import SparkContext
45
46 sc = SparkContext()
47
48 lines = sc.textFile(input_path)
49
50 # do a word frequency count
51 words_and_ones = lines.mapPartitions(self.get_words)
52 word_counts = words_and_ones.reduceByKey(add)
53
54 # pick pair with highest count (now in count, word format)
55 max_count, word = word_counts.map(lambda w_c: (w_c[1], w_c[0])).max()
56
57 # output our word
58 output = sc.parallelize([json.dumps(word)])
59 output.saveAsTextFile(output_path)
60
61 sc.stop()
62
63 def get_words(self, line_iterator):
64 # this only happens once per partition
65 stop_words = self.load_stop_words()
66
67 for line in line_iterator:
68 for word in WORD_RE.findall(line):
69 word = word.lower()
70 if word not in stop_words:
71 yield (word, 1)
72
73 def load_stop_words(self):
74 # this should only be called inside executors (i.e. inside functions
75 # passed to RDDs)
76 stop_words_path = self.options.stop_words_file or 'stop_words.txt'
77
78 with open(stop_words_path) as f:
79 return set(line.strip() for line in f)
80
81
82if __name__ == '__main__':
83 MRSparkMostUsedWord.run()
دستهبندی دادههای متنی با استفاده از MRJob (ابزار Hadoop):
1# Copyright 2009-2010 Yelp
2# Copyright 2013 David Marin
3# Copyright 2017 Yelp
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""A text classifier that uses a modified version of Naive Bayes that is not
18sensitive to document length.
19This is a somewhat contrived example in that it does everything in one job;
20generally you'd run one job to generate n-gram scores, put them in a sqlite
21database, and run a second job to score documents. But this is simple, and
22it works!
23This takes as its input documents encoded by encode_document() below. For each
24document, you specify its text, and whether it belongs or does not belong
25to one or more categories. You can also specify a unique ID for each document.
26This job outputs the documents, with the field 'cat_to_score' filled in.
27Generally, positive scores indicate the document is in the category, and
28negative scores indicate it is not, but it's up to you to determine a
29threshold for each category. This job also outputs scores for each ngram,
30so that you can classify other documents.
31About half of the documents are placed in a test set (based on SHA1 hash of
32their text), which means they will not be used to train the classifier. The
33'in_test_set' of each document will be filled accordingly. You can turn
34this off with the --no-test-set flag. (You can also effectively put docs
35in the training set by specifying no category information.)
36Some terminology:
37An "ngram" is a word or phrase. "foo" is a 1-gram; "foo bar baz" is a 3-gram.
38"tf" refers to term frequency, that is, the number of times an ngram appears.
39"df" referse to document frequency, that is, the number of documents an ngram
40appears in at least once.
41"""
42from collections import defaultdict
43import hashlib
44import math
45import re
46
47from mrjob.job import MRJob
48from mrjob.protocol import JSONValueProtocol
49from mrjob.step import MRStep
50
51
52def encode_document(text, cats=None, id=None):
53 """Encode a document as a JSON so that MRTextClassifier can read it.
54 Args:
55 text -- the text of the document (as a unicode)
56 cats -- a dictionary mapping a category name (e.g. 'sports') to True if
57 the document is in the category, and False if it's not. None indicates
58 that we have no information about this documents' categories
59 id -- a unique ID for the document (any kind of JSON-able value should
60 work). If not specified, we'll auto-generate one.
61 """
62 text = unicode(text)
63 cats = dict((unicode(cat), bool(is_in_cat))
64 for cat, is_in_cat
65 in (cats or {}).iteritems())
66
67 return JSONValueProtocol.write(
68 None, {'text': text, 'cats': cats, 'id': id}) + '\n'
69
70
71def count_ngrams(text, max_ngram_size, stop_words):
72 """Break text down into ngrams, and return a dictionary mapping
73 (n, ngram) to number of times that ngram occurs.
74 n: ngram size ("foo" is a 1-gram, "foo bar baz" is a 3-gram)
75 ngram: the ngram, as a space-separated string or None to indicate the
76 ANY ngram (basically the number of words in the document).
77 Args:
78 text -- text, as a unicode
79 max_ngram_size -- maximum size of ngrams to consider
80 stop_words -- a collection of words (in lowercase) to remove before
81 parsing out ngrams (e.g. "the", "and")
82 """
83 if not isinstance(stop_words, set):
84 stop_words = set(stop_words)
85
86 words = [word.lower() for word in WORD_RE.findall(text)
87 if word.lower() not in stop_words]
88
89 ngram_counts = defaultdict(int)
90
91 for i in range(len(words)):
92 for n in range(1, max_ngram_size + 1):
93 if i + n <= len(words):
94 ngram = ' '.join(words[i:i + n])
95 ngram_counts[(n, ngram)] += 1
96
97 # add counts for ANY ngram
98 for n in range(1, max_ngram_size + 1):
99 ngram_counts[(n, None)] = len(words) - n + 1
100
101 return ngram_counts
102
103
104WORD_RE = re.compile(r"[\w']+", re.UNICODE)
105
106DEFAULT_MAX_NGRAM_SIZE = 4
107
108DEFAULT_STOP_WORDS = [
109 'a', 'about', 'also', 'am', 'an', 'and', 'any', 'are', 'as', 'at', 'be',
110 'but', 'by', 'can', 'com', 'did', 'do', 'does', 'for', 'from', 'had',
111 'has', 'have', 'he', "he'd", "he'll", "he's", 'her', 'here', 'hers',
112 'him', 'his', 'i', "i'd", "i'll", "i'm", "i've", 'if', 'in', 'into', 'is',
113 'it', "it's", 'its', 'just', 'me', 'mine', 'my', 'of', 'on', 'or', 'org',
114 'our', 'ours', 'she', "she'd", "she'll", "she's", 'some', 'than', 'that',
115 'the', 'their', 'them', 'then', 'there', 'these', 'they', "they'd",
116 "they'll", "they're", 'this', 'those', 'to', 'us', 'was', 'we', "we'd",
117 "we'll", "we're", 'were', 'what', 'where', 'which', 'who', 'will', 'with',
118 'would', 'you', 'your', 'yours',
119]
120
121
122class MRTextClassifier(MRJob):
123 INPUT_PROTOCOL = JSONValueProtocol
124
125 def steps(self):
126 """Conceptually, the steps are:
127 1. Parse documents into ngrams
128 2. Group by ngram to get a frequency count for each ngram, and to
129 exclude very rare ngrams
130 3. Send all ngram information to one "global" reducer so we can
131 assign scores for each category and ngram
132 4. Group scores and documents by ngram and compute score for that
133 ngram for that document. Exclude very common ngrams to save memory.
134 5. Average together scores for each document to get its score for
135 each category.
136 The documents themselves are passed through from step 1 to step 5.
137 Ngram scoring information is passed through from step 4 to step 5.
138 """
139 return [MRStep(mapper=self.parse_doc,
140 reducer=self.count_ngram_freq),
141 MRStep(reducer=self.score_ngrams),
142 MRStep(reducer=self.score_documents_by_ngram),
143 MRStep(reducer=self.score_documents)]
144
145 def configure_args(self):
146 """Add command-line options specific to this script."""
147 super(MRTextClassifier, self).configure_args()
148
149 self.add_passthru_arg(
150 '--min-df', dest='min_df', default=2, type=int,
151 help=('min number of documents an n-gram must appear in for us to'
152 ' count it. Default: %(default)s'))
153 self.add_passthru_arg(
154 '--max-df', dest='max_df', default=10000000, type=int,
155 help=('max number of documents an n-gram may appear in for us to'
156 ' count it (this keeps reducers from running out of memory).'
157 ' Default: %(default)s'))
158 self.add_passthru_arg(
159 '--max-ngram-size', dest='max_ngram_size',
160 default=DEFAULT_MAX_NGRAM_SIZE, type=int,
161 help='maximum phrase length to consider')
162 self.add_passthru_arg(
163 '--stop-words', dest='stop_words',
164 default=', '.join(DEFAULT_STOP_WORDS),
165 help=("comma-separated list of words to ignore. For example, "
166 "--stop-words 'in, the' would cause 'hole in the wall' to be"
167 " parsed as ['hole', 'wall']. Default: %(default)s"))
168 self.add_passthru_arg(
169 '--short-doc-threshold', dest='short_doc_threshold',
170 type=int, default=None,
171 help=('Normally, for each n-gram size, we take the average score'
172 ' over all n-grams that appear. This allows us to penalize'
173 ' short documents by using this threshold as the denominator'
174 ' rather than the actual number of n-grams.'))
175 self.add_passthru_arg(
176 '--no-test-set', dest='no_test_set',
177 action='store_true', default=False,
178 help=("Choose about half of the documents to be the testing set"
179 " (don't use them to train the classifier) based on a SHA1"
180 " hash of their text"))
181
182 def load_options(self, args):
183 """Parse stop_words option."""
184 super(MRTextClassifier, self).load_options(args)
185
186 self.stop_words = set()
187 if self.options.stop_words:
188 self.stop_words.update(
189 s.strip().lower() for s in self.options.stop_words.split(','))
190
191 def parse_doc(self, _, doc):
192 """Mapper: parse documents and emit ngram information.
193 Input: JSON-encoded documents (see :py:func:`encode_document`)
194 Output:
195 ``('ngram', (n, ngram)), (count, cats)`` OR
196 ``('doc', doc_id), doc``
197 n: ngram length
198 ngram: ngram encoded encoded as a string (e.g. "pad thai")
199 or None to indicate ANY ngram.
200 count: # of times an ngram appears in the document
201 cats: a map from category name to a boolean indicating whether it's
202 this document is in the category
203 doc_id: (hopefully) unique document ID
204 doc: the encoded document. We'll fill these fields:
205 ngram_counts: map from (n, ngram) to # of times ngram appears
206 in the document, using (n, None) to represent the total
207 number of times ANY ngram of that size appears (essentially
208 number of words)
209 in_test_set: boolean indicating if this doc is in the test set
210 id: SHA1 hash of doc text (if not already filled)
211 """
212 # only compute doc hash if we need it
213 if doc.get('id') is not None and self.options.no_test_set:
214 doc_hash = '0' # don't need doc hash
215 else:
216 doc_hash = hashlib.sha1(doc['text'].encode('utf-8')).hexdigest()
217
218 # fill in ID if missing
219 if doc.get('id') is None:
220 doc['id'] = doc_hash
221
222 # pick test/training docs
223 if self.options.no_test_set:
224 doc['in_test_set'] = False
225 else:
226 doc['in_test_set'] = bool(int(doc_hash[-1], 16) % 2)
227
228 # map from (n, ngram) to number of times it appears
229 ngram_counts = count_ngrams(
230 doc['text'], self.options.max_ngram_size, self.stop_words)
231
232 # yield the number of times the ngram appears in this doc
233 # and the categories for this document, so we can train the classifier
234 if not doc['in_test_set']:
235 for (n, ngram), count in ngram_counts.iteritems():
236 yield ('ngram', (n, ngram)), (count, doc['cats'])
237
238 # yield the document itself, for safekeeping
239 doc['ngram_counts'] = ngram_counts.items()
240 yield ('doc', doc['id']), doc
241
242 def count_ngram_freq(self, type_and_key, values):
243 """Reducer: Combine information about how many times each ngram
244 appears for docs in/not in each category. Dump ngrams that appear
245 in very few documents (according to --min-df switch). If two documents
246 have the same ID, increment a counter and only keep one; otherwise
247 pass docs through unchanged.
248 Input (see parse_doc() for details):
249 ('ngram', (n, ngram)), (count, cats) OR
250 ('doc', doc_id), doc
251 Output:
252 ('global', None), ((n, ngram), (cat_to_df, cat_to_tf)) OR
253 ('doc', doc_id), doc
254 n: ngram length
255 ngram: ngram encoded encoded as a string (e.g. "pad thai")
256 or None to indicate ANY ngram.
257 cat_to_df: list of tuples of ((cat_name, is_in_category), df); df
258 is # of documents of this type that the ngram appears in
259 cat_to_tf: list of tuples of ((cat_name, is_in_category), df); tf
260 is # of time the ngram appears in docs of this type
261 doc_id: unique document ID
262 doc: the encoded document
263 """
264 key_type, key = type_and_key
265
266 # pass documents through
267 if key_type == 'doc':
268 doc_id = key
269 docs = list(values)
270 # if two documents end up with the same key, only keep one
271 if len(docs) > 1:
272 self.increment_counter(
273 'Document key collision', str(doc_id))
274 yield ('doc', doc_id), docs[0]
275 return
276
277 assert key_type == 'ngram'
278 n, ngram = key
279
280 # total # of docs this ngram appears in
281 total_df = 0
282 # map from (cat, is_in_cat) to
283 # number of documents in this cat it appears in (df), or
284 # number of times it appears in documents of this type (tf)
285 cat_to_df = defaultdict(int)
286 cat_to_tf = defaultdict(int)
287
288 for count, cats in values:
289 total_df += 1
290 for cat in cats.iteritems():
291 cat_to_df[cat] += 1
292 cat_to_tf[cat] += count
293
294 # don't bother with very rare ngrams
295 if total_df < self.options.min_df:
296 return
297
298 yield (('global', None),
299 ((n, ngram), (cat_to_df.items(), cat_to_tf.items())))
300
301 def score_ngrams(self, type_and_key, values):
302 """Reducer: Look at all ngrams together, and assign scores by
303 ngram and category. Also farm out documents to the reducer for
304 any ngram they contain, and pass documents through to the next
305 step.
306 To score an ngram for a category, we compare the probability of any
307 given ngram being our ngram for documents in the category against
308 documents not in the category. The score is just the log of the
309 ratio of probabilities (the "log difference")
310 Input (see count_ngram_freq() for details):
311 ('global', None), ((n, ngram), (cat_to_df, cat_to_tf)) OR
312 ('doc', doc_id), doc
313 Output:
314 ('doc', doc_id), document OR
315 ('ngram', (n, ngram)), ('doc_id', doc_id) OR
316 ('ngram', (n, ngram)), ('cat_to_score', cat_to_score)
317 n: ngram length
318 ngram: ngram encoded encoded as a string (e.g. "pad thai")
319 or None to indicate ANY ngram.
320 cat_to_score: map from (cat_name, is_in_category) to score for
321 this ngram
322 doc_id: unique document ID
323 doc: the encoded document
324 """
325 key_type, key = type_and_key
326 if key_type == 'doc':
327 doc_id = key
328 doc = list(values)[0]
329 # pass document through
330 yield ('doc', doc_id), doc
331
332 # send document to reducer for every ngram it contains
333 for (n, ngram), count in doc['ngram_counts']:
334 # don't bother even creating a reducer for the ANY ngram
335 # because we'd have to send all documents to it.
336 if ngram is None:
337 continue
338 yield (('ngram', (n, ngram)),
339 ('doc_id', doc_id))
340
341 return
342
343 assert key_type == 'global'
344 ngram_to_info = dict(
345 ((n, ngram),
346 (dict((tuple(cat), df) for cat, df in cat_to_df),
347 dict((tuple(cat), tf) for cat, tf in cat_to_tf)))
348 for (n, ngram), (cat_to_df, cat_to_tf)
349 in values)
350
351 # m = # of possible ngrams of any given type. This is not a very
352 # rigorous estimate, but it's good enough
353 m = len(ngram_to_info)
354
355 for (n, ngram), info in ngram_to_info.iteritems():
356 # do this even for the special ANY ngram; it's useful
357 # as a normalization factor.
358 cat_to_df, cat_to_tf = info
359
360 # get the total # of documents and terms for ngrams of this size
361 cat_to_d, cat_to_t = ngram_to_info[(n, None)]
362
363 # calculate the probability of any given term being
364 # this term for documents of each type
365 cat_to_p = {}
366 for cat, t in cat_to_t.iteritems():
367 tf = cat_to_tf.get(cat) or 0
368 # use Laplace's rule of succession to estimate p. See:
369 # http://en.wikipedia.org/wiki/Rule_of_succession#Generalization_to_any_number_of_possibilities
370 cat_to_p[cat] = (tf + (2.0 / m)) / (t + 2)
371
372 cats = set(cat for cat, in_cat in cat_to_t)
373 cat_to_score = {}
374 for cat in cats:
375 p_if_in = cat_to_p.get((cat, True), 1.0 / m)
376 p_if_out = cat_to_p.get((cat, False), 1.0 / m)
377 # take the log difference of probabilities
378 score = math.log(p_if_in) - math.log(p_if_out)
379 cat_to_score[cat] = score
380
381 yield (('ngram', (n, ngram)),
382 ('cat_to_score', cat_to_score))
383
384 def score_documents_by_ngram(self, type_and_key, types_and_values):
385 """Reducer: For all documents that contain a given ngram, send
386 scoring info to that document. Also pass documents and scoring
387 info through as-is
388 Input (see score_ngrams() for details):
389 ('doc', doc_id), doc OR
390 ('ngram', (n, ngram)), ('doc_id', doc_id) OR
391 ('ngram', (n, ngram)), ('cat_to_score', cat_to_score)
392 Output:
393 ('doc', doc_id), ('doc', doc)
394 ('doc', doc_id), ('scores', ((n, ngram), cat_to_score))
395 ('cat_to_score', (n, ngram)), cat_to_score
396 n: ngram length
397 ngram: ngram encoded encoded as a string (e.g. "pad thai")
398 or None to indicate ANY ngram.
399 cat_to_score: map from (cat_name, is_in_category) to score for
400 this ngram
401 doc_id: unique document ID
402 doc: the encoded document
403 """
404 key_type, key = type_and_key
405
406 # pass documents through
407 if key_type == 'doc':
408 doc_id = key
409 doc = list(types_and_values)[0]
410 yield ('doc', doc_id), ('doc', doc)
411 return
412
413 assert key_type == 'ngram'
414 n, ngram = key
415
416 doc_ids = []
417 cat_to_score = None
418
419 for value_type, value in types_and_values:
420 if value_type == 'cat_to_score':
421 cat_to_score = value
422 continue
423
424 assert value_type == 'doc_id'
425 doc_ids.append(value)
426
427 if len(doc_ids) > self.options.max_df:
428 self.increment_counter('Exceeded max df', repr((n, ngram)))
429 return
430
431 # skip ngrams that are too rare to score
432 if cat_to_score is None:
433 return
434
435 # send score info for this ngram to this document
436 for doc_id in doc_ids:
437 yield ('doc', doc_id), ('scores', ((n, ngram), cat_to_score))
438
439 # keep scoring info
440 yield ('cat_to_score', (n, ngram)), cat_to_score
441
442 def score_documents(self, type_and_key, types_and_values):
443 """Reducer: combine all scoring information for each document, and
444 add it to the document. Also pass ngram scores through as-is.
445 To score a document, we essentially take a weighted average of all
446 the scores for ngrams of each size, and then sum together those
447 averages. ngrams that aren't scored (because they're very rare or
448 very common) are considered to have a score of zero. Using averages
449 allows us to be insensitive to document size. There is a penalty
450 for very small documents.
451 Input (see score_ngrams() for details):
452 ('doc', doc_id), ('doc', doc)
453 ('doc', doc_id), ('scores', ((n, ngram), cat_to_score))
454 ('cat_to_score', (n, ngram)), cat_to_score
455 Output:
456 ('doc', doc_id), doc
457 ('cat_to_score', (n, ngram)), cat_to_score
458 n: ngram length
459 ngram: ngram encoded encoded as a string (e.g. "pad thai")
460 or None to indicate ANY ngram.
461 cat_to_score: map from (cat_name, is_in_category) to score for
462 this ngram
463 doc_id: unique document ID
464 doc: the encoded document. this will contain an extra field
465 'cat_to_score', and will no longer have the 'ngram_counts' field.
466 """
467 key_type, key = type_and_key
468
469 # pass through cat_to_score
470 if key_type == 'cat_to_score':
471 cat_to_score = list(types_and_values)[0]
472 yield ('cat_to_score', key), cat_to_score
473 return
474
475 assert key_type == 'doc'
476 doc_id = key
477
478 # store the document and scoring info
479 doc = None
480 ngrams_and_scores = []
481
482 for value_type, value in types_and_values:
483 if value_type == 'doc':
484 doc = value
485 continue
486
487 assert value_type == 'scores'
488 ((n, ngram), cat_to_score) = value
489 ngrams_and_scores.append(((n, ngram), cat_to_score))
490
491 # total scores for each ngram size
492 ngram_counts = dict(((n, ngram), count)
493 for (n, ngram), count in doc['ngram_counts'])
494
495 cat_to_n_to_total_score = defaultdict(lambda: defaultdict(float))
496
497 for (n, ngram), cat_to_score in ngrams_and_scores:
498 tf = ngram_counts[(n, ngram)]
499 for cat, score in cat_to_score.iteritems():
500 cat_to_n_to_total_score[cat][n] += score * tf
501
502 # average scores for each ngram size
503 cat_to_score = {}
504 for cat, n_to_total_score in cat_to_n_to_total_score.iteritems():
505 total_score_for_cat = 0
506 for n, total_score in n_to_total_score.iteritems():
507 total_t = ngram_counts[(n, None)]
508 total_score_for_cat += (
509 total_score /
510 max(total_t, self.options.short_doc_threshold, 1))
511 cat_to_score[cat] = total_score_for_cat
512
513 # add scores to the document, and get rid of ngram_counts
514 doc['cat_to_score'] = cat_to_score
515 del doc['ngram_counts']
516
517 yield ('doc', doc_id), doc
518
519if __name__ == '__main__':
520 MRTextClassifier.run()
برنامه نگاشت شمارههای تلفن به آدرسهای URL با استفاده از از MRJob (ابزار Hadoop):
1"""An example of how to parse non-line-based data.
2Map +1 (U.S. and Canadian) phone numbers to the most plausible
3URL for their webpage.
4This is similar to the article "Analyzing the Web for the Price of a Sandwich"
5(https://engineeringblog.yelp.com/2015/03/analyzing-the-web-for-the-price-of-a-sandwich.html)
6except that it doesn't include Yelp biz IDs, and it doesn't need to access
7S3 because it can read the input files directly.
8Sample command line:
9.. code-block:: sh
10 python mr_phone_to_url.py -r emr --bootstrap 'sudo pip install warcio' --output-dir s3://your-bucket/path/ --no-output s3://commoncrawl/crawl-data/CC-MAIN-2018-09/segments/*/wet/*.wet.gz
11To find the latest crawl:
12``aws s3 ls s3://commoncrawl/crawl-data/ | grep CC-MAIN``
13WET data is often added after a release; usually the second-most recent
14release is a safe bet.
15"""
16import re
17from itertools import islice
18
19from mrjob.job import MRJob
20from mrjob.py2 import urlparse
21from mrjob.step import MRStep
22
23PHONE_RE = re.compile(
24 br'(?:[\D\b]|^)(1?[2-9]\d{2}[\-. ()+]+\d{3}[\-. ()+]+\d{4})(?:[\D\b]|$)')
25PHONE_SEP_RE = re.compile(br'[\-. ()+]')
26
27# hosts with more than this many phone numbers are assumed to be directories
28MAX_PHONES_PER_HOST = 1000
29
30
31def standardize_phone_number(number):
32 """put *number* in a standard format, and convert it to a :py:class:`str`.
33 """
34 number_sep = PHONE_SEP_RE.split(number)
35 number = b''.join(number_sep).decode('ascii')
36 if len(number) > 7:
37 if number[-1] not in '0123456789':
38 number = number[:-1]
39 if number[0] not in '0123456789':
40 number = number[1:]
41 if len(number) <= 10:
42 return "+1" + number
43 else:
44 return "+" + number
45
46
47class MRPhoneToURL(MRJob):
48 """Use Common Crawl .wet files to map from phone number to the most
49 likely URL."""
50
51 def steps(self):
52 return [
53 MRStep(mapper_raw=self.extract_phone_and_url_mapper,
54 reducer=self.count_by_host_reducer),
55 MRStep(reducer=self.pick_best_url_reducer),
56 ]
57
58 def extract_phone_and_url_mapper(self, wet_path, wet_uri):
59 """Read in .wet file, and extract phone ant URL
60 """
61 from warcio.archiveiterator import ArchiveIterator
62
63 with open(wet_path, 'rb') as f:
64 for record in ArchiveIterator(f):
65 if record.rec_type != 'conversion':
66 continue
67
68 headers = record.rec_headers
69 if headers.get_header('Content-Type') != 'text/plain':
70 continue
71
72 url = headers.get_header('WARC-Target-URI')
73 if not url:
74 continue
75
76 host = urlparse(url).netloc
77
78 payload = record.content_stream().read()
79 for phone in PHONE_RE.findall(payload):
80 phone = standardize_phone_number(phone)
81 yield host, (phone, url)
82
83 def count_by_host_reducer(self, host, phone_urls):
84 phone_urls = list(islice(phone_urls, MAX_PHONES_PER_HOST + 1))
85
86 # don't bother with directories, etc.
87 host_phone_count = len(phone_urls)
88 if host_phone_count > MAX_PHONES_PER_HOST:
89 return
90
91 for phone, url in phone_urls:
92 yield phone, (url, host_phone_count)
93
94 def pick_best_url_reducer(self, phone, urls_with_count):
95 # pick the url that appears on a host with the least number of
96 # phone numbers, breaking ties by choosing the shortest URL
97 # and the one that comes first alphabetically
98 urls_with_count = sorted(
99 urls_with_count, key=lambda uc: (uc[1], -len(uc[0]), uc[0]))
100
101 yield phone, urls_with_count[0][0]
102
103
104if __name__ == '__main__':
105 MRPhoneToURL.run()
مثالهای ارائه شده تنها بخش کوچکی از قابلیتهای ابزارهایی نظیر هدوپ (Hadoop) و Spark هستند. این دسته از مدلهای برنامهنویسی، علاوه بر اینکه امکان ذخیرهسازی و پردازش توزیع شده دادههای مختلف را به برنامهنویسان و دانشمندان داده میدهند، قابلیت مدیریت و پردازش کلان داده را برای سیستم فراهم میکنند.
اگر نوشته بالا برای شما مفید بوده است، آموزشهای زیر نیز به شما پیشنهاد میشوند:
- مجموعه آموزشهای داده کاوی و یادگیری ماشین
- آموزش اصول و روشهای داده کاوی (Data Mining)
- مجموعه آموزشهای هوش مصنوعی
- آموزش مقدماتی Hadoop (هدوپ) برای تجزیه و تحلیل کلان داده
- مفاهیم کلان داده (Big Data) و انواع تحلیل داده — راهنمای جامع
- کلان داده یا مِه داده (Big Data) — از صفر تا صد
^^