داده کاوی، هوش مصنوعی ۲۳۳۱ بازدید

اگر کاربران با دنیای علم و فناوری آشنایی داشته باشند، بدون شک با واژگان و اصطلاحاتی نظیر «علم داده» (Data Science)، «علم تجزیه و تحلیل» (Analytics Science)، «یادگیری ماشین» (Machine Learning)، «کلان داده» (Big Data) مواردی مشابه برخورد داشته‌اند. در دنیای فناوری امروز، از این اصطلاحات برای تعریف مجموعه‌ای از فناوری‌های مهم و پیشرفته حوزه در «علوم کامپیوتر» (Computer Science) استفاده می‌شود که پایه و اساس پیشرفت‌های فناوری در آینده نیز به شمار می‌آیند. در این مطلب، با مهم‌ترین ابزارهای علم داده جهت پردازش کلان داده آشنا خواهید. همچنین، مثال‌های ساده و عملی جهت پردازش کلان داده نمایش داده خواهند شد. علاوه بر این، استفاده از «زبان برنامه‌نویسی پایتون» (Python Programming language) برای تحلیل داده آموزش داده خواهد شد (از این دسته از تکنیک‌ها در مقیاس وسیع‌تر، جهت پردازش کلان داده نیز استفاده می‌شود).

علم داده یکی از حوزه‌های گسترده علوم کامپیوتر محسوب می‌شود و زیر شاخه‌های گسترده‌ای نظیر «جمع‌آوری داده» (Data Collection)، «پاک‌سازی داده» (Data Cleaning)، «استاندارد سازی داده» (Data Standardization)، «تحلیل داده» (Data Analysis) و «گزارش» (Reporting) را شامل می‌شود. بزرگ‌ترین شرکت‌های حوزه فناوری، از تکنیک‌های علم داده و پردازش کلان داده برای استخراج دانش از «داده‌های غیر ساخت یافته» (Unstructured Data) و «داده‎‌های ساخت یافته» (Structured Data) سازمانی استفاده می‌کنند.

پردازش کلان داده

پردازش کلان داده

تحلیل و پردازش کلان داده هم اکنون به یکی از ترندهای تحقیقاتی در دنیا تبدیل شده است. به همین خاطر است که فرصت‌های شغلی بسیار خوبی، برای برنامه‌نویسان و توسعه‌دهندگانی که با تکنیک‌های پردازش کلان داده آشنایی دارند، فراهم شده است. یک دانشمند علم داده و یا یک برنامه‌نویس مسلط به پردازش کلان داده و تحلیل آن می‌تواند از مجموعه ابزارهای تولید شده برای پردازش کلان داده، جهت «تحلیل زبان» (Language Analysis)، «پیشنهاد فایل‌های ویدئویی» (Recommending Videos) و یا مشخص کردن محصولات جدید با توجه به داده‌های بازاریابی یا داده‌های جمع‌آوری شده از مشتریان استفاده کند.

به طور کلی، زمانی که تکنیک‌های موجود در حوزه پردازش کلان داده در علم داده مورد بررسی قرار می‌گیرند، منظور تکنیک‌هایی هستند که در حوزه خاصی به نام «علم داده در مقیاس وسیع» (large Scale Data Science) استفاده می‌شوند. تعریف کلمه کلان در اصطلاح پردازش کلان داده، به نوع پروژه‌های برنامه‌نویسی پردازش داده یا حجم داده‌هایی که قرار است پردازش و تحلیل شوند، بستگی دارد.

بسیاری از مسائل تحلیل داده، که برای حل آن‌ها پروژه‌های برنامه‌نویسی تعریف می‌شود، توسط راهکارهای «مرسوم» (Conventional) و متدهای یادگیری ماشین به راحتی قابل حل هستند. به عبارت دیگر، زمانی که داده‌های استفاده شده برای مدل‌سازی یا آموزش مدل یادگیری، به اندازه‌ای باشند که بتوان از سیستم‌های محاسباتی معمول نظیر کامپیوترهای شخصی برای پردازش آن‌ها استفاده کرد، معمولا نیازی به استفاده از روش‌های پردازش کلان داده وجود نخواهد داشت. با این حال، این امکان وجود دارد که بتوان از روش‌های تحلیل و پردازش کلان داده برای حل مسائل تحلیل داده استفاده کرد. به طور کلی، زمانی از تکنیک‌های پردازش کلان داده برای حل مسئله استفاده می‌شود که نیاز به پردازش حجم عظیمی از داده‌ها وجود دارد و سیستم‌های محاسباتی مرسوم، توان پردازشی لازم برای تحلیل و پردازش کلان داده را نداشته باشند.

پردازش کلان داده

مهم‌ترین ابزارهای پردازش کلان داده

یکی از مهم‌ترین مدل‌های ذخیره‌سازی توزیع شده داده‌ها و پردازش کلان داده، که به وفور توسط دانشمندان علم داده مورد استفاده قرار می‌گیرد، مدل برنامه‌نویسی به نام «نگاشت-کاهش» (MapReduce) است. مدل MapReduce، روشی بهینه برای مدیریت و پردازش کلان داده محسوب می‌شود که به دانشمندان علم داده و توسعه‌دهنده برنامه کاربردی اجازه می‌‌دهد تا ابتدا داده را با استفاده از یک «صفت» (Attribute)، فیلتر یا دسته‌بندی خاص نگاشت کنند و سپس، با استفاده از یک مکانیزم «تبدیل» (Transformation) یا «تجمیع» (Aggregation)، داده‌های نگاشت شده را کاهش دهند.

به عنوان نمونه، در صورتی که داده‌های جمع‌اوری شده مرتبط با مجموعه‌ای از گربه‌ها باشند، ابتدا گربه‌ها با استفاده از صفتی نظیر رنگ آن‌ها نگاشت می‌شوند. در مرحله بعد، داده‌ها، از طریق جمع کردن گروه‌بندی‌های انجام شده، کاهش پیدا می‌کنند. در پایان فرایند MapReduce، یک لیست یا مجموعه‌ای مرتبط با رنگ گربه‌ها و تعداد گربه‌های موجود در هر کدام از گروه‌بندی‌های رنگی در اختیار برنامه‌نویس یا دانشمند داده قرار می‌دهد.

تقریبا تمامی کتابخانه‌های برنامه‌نویسی توسعه داده شده برای مقاصد پردازش کلان داده و علم داده، از مدل برنامه‌نویسی MapReduce پشتیبانی می‌کنند. همچنین، تعداد زیادی کتابخانه برنامه‌نویسی وجود دارند که امکان مدیریت، پردازش و انجام عملیات «نگاشت-کاهش» (MapReduce) داده‌ها را به شکل توزیع شده (خوشه یا مجموعه‌ای از کامپیوترهای ) در اختیار برنامه‌نویسان و فعالان حوزه علم داده قرار می‌دهند. افرادی که به دنبال استفاده از مدل‌های MapReduce، جهت مدیریت و پردازش کلان‌ داده هستند، به راحتی می‌توانند از ابزارها، بسته‌ها و کتابخانه‌های توسعه داده شده در زبان پایتون، جهت پیاده‌سازی کاربردهای مرتبط با پردازش کلان داده استفاده کنند.

پردازش کلان داده

کتابخانه Hadoop

یکی از محبوب‌ترین کتابخانه‌ها برای انجام عملیات «نگاشت-کاهش» (MapReduce) روی داده‌های حجیم، کتابخانه هدوپ است که توسط بنیاد نرم‌افزاری Apache طراحی شده است. کتابخانه هدوپ با استفاده از «محاسبات خوشه‌ای» (Cluster Computing)، به فعالان حوزه علم داده اجازه می‌دهد تا بتوانند تا با سرعت و اثرگذاری به مراتب بیشتری، وظایف مرتبط با مدیریت و پردازش کلان داده را انجام دهند. کتابخانه‌ها و بسته‌های زیادی  در زبان پایتون توسعه داده شده‌اند که از طریق آن‌ها می‌توان داده‌ها (که به آن‌ها Jobs نیز گفته می‌شود) را به هدوپ ارسال کرد. انتخاب یکی از این کتابخانه‌ها و بسته‌های برنامه‌نویسی جهت مقاصد پردازش کلان داده و علم داده، به عواملی نظیر سادگی استفاده، زیرساخت‌های لازم برای پیاده‌سازی و کاربرد خاصی که قرار است در آن مورد استفاده قرار بگیرند، بستگی دارد.

پردازش کلان داده

کتابخانه Spark

در صورتی که در کاربرد پردازش کلان داده مورد نظر برنامه‌نویس و یا دانشمند علم داده، استفاده از داده‌هایی که در قالب «داده‌های جریانی» (Streaming Data) هستند (نظیر داده‌های «بلادرنگ» (Real-Time)، داده‌های Log و داده‌های جمع‌آوری شده از «واسط‌های برنامه‌نویسی کاربردی» (Application Programming Interface))، عملکرد به مراتب بهتری را برای سیستم به ارمغان می‌آورند، استفاده از کتابخانه Spark بهترین گزینه خواهد بود. کتابخانه Spark، نظیر کتابخانه هدوپ، توسط بنیاد نرم‌افزاری Apache توسعه داده شده است.

پردازش کلان داده

پایتون و پردازش کلان داده

سؤالی که ممکن در اینجا ذهن خوانندگان و مخاطبان این مطلب را به خود مشغول کند این است که زبان پایتون، جهت پردازش کلان داده و تحلیل آن، چه امکاناتی را در اختیار برنامه‌نویس یا «دانشمند علم داده» (Data Scientist) قرار می‌دهد. در چند سال اخیر، پایتون به عنوان یکی از مهم‌ترین ابزارهای دانشمندان علم داده برای برنامه‌نویسی پروژه‌های علم داده مطرح شده است؛ تحلیل و پردازش کلان داده هم از این قاعده مستثنی نیست.

اگرچه، بسیاری از دانشمندان علم داده و برنامه‌نویسان فعال در این حوزه، از زبان‌های برنامه‌نویسی نظیر جاوا، R و Julia و ابزارهایی مانند SPSS و SAS برای تحلیل داده و استخراج «بینش» (Insight) از آن‌ها استفاده می‌کنند، با این حال، محبوبیت روزافزون زبان پایتون، «کتابخانه‌های برنامه‌نویسی» (Libraries) متنوع و پرکاربرد و جامعه برنامه‌نویسی پویا و فعال آن، سبب شده است تا بسیاری از دانشمندان علم داده، از زبان پایتون برای تحلیل داده‌ها و پردازش کلان داده استفاده کنند. افزایش روزافزون محبوبیت پایتون برای مقاصد علم داده را می‌توان در رشد چشمگیر کتابخانه‌ها، ابزارها و بسته‌های برنامه‌نویسی در حوزه علم داده مشاهده کرد.

زبان پایتون، در چند سال اخیر، همواره به عنوان یکی از محبوب‌ترین زبان‌های برنامه‌نویسی شناخته شده است. بسیاری از فرصت‌های شغلی لیست شده برای زبان پایتون نیز مرتبط با حوزه علم داده، تحلیل داده، پردازش کلان داده و «هوش مصنوعی» (Artificial Intelligence) هستند. به همین دلیل است که بسیاری از «مهندسان داده» (Data Engineers) به استفاده از پایتون و تسلط بر این زبان برنامه‌نویسی روی می‌آورند.

پردازش کلان داده

یکی از مهم‌ترین ابزارهای پردازش کلان داده برای دانشمندان داده، کتابخانه‌ای به نام «هدوپ» (Hadoop) است. ابزارهای نرم‌افزاری منبع باز هدوپ، در ابتدا توسط «بنیاد نرم‌افزاری آپاچی» (Apache Software Foundation) و با هدف ارائه بستر نرم‌افزاری لازم برای محاسبات توزیع شده و پردازش کلان داده، در زبان برنامه‌نویسی جاوا تهیه شده‌اند. این دسته از ابزارهای نرم‌افزاری به برنامه‌نویسان و توسعه‌دهندگان اجازه می‌دهند تا با استفاده از شبکه‌ای متشکل از چندین کامپیوتر (به شکل «توزیع شده» (Distributed)) اقدام به حل مسائلی کنند که حجم عظیمی از داده و محاسبات را شامل می‌شوند. جهت پردازش کلان داده و تحلیل آن‌ها، هدوپ، چارچوبی نرم‌افزاری برای ذخیره‌سازی توزیع شده و پردازش کلان داده معرفی می‌کند؛ این چارچوب نرم‌افزاری بر پایه مدل برنامه‌نویسی «نگاشت-کاهش» (MapReduce) پیاده‌سازی شده است.

دو دلیل عمده برای محبوبیت بیش از پیش زبان پایتون، جهت برنامه‌نویسی علم داده و پردازش کلان داده، وجود دارد:

  • کتابخانه‌ها، ابزارها و بسته‌های برنامه‌نویسی بسیار قدرتمند و پرکاربردی برای پردازش، ذخیره‌سازی، نمایش، شکل دادن و مصورسازی داده‌ها در زبان برنامه‌نویسی پایتون ارائه شده است (در ادامه، این ابزارها شرح داده خواهند شد).
  • پس از افزایش روزافزون محبوبیت پایتون و روی آوردن تعداد زیادی از برنامه‌نویسان و توسعه‌دهندگان حوزه علم داده به این زبان برنامه‌نویسی، بنیاد نرم‌افزاری آپاچی به سرعت امکان استفاده از «اکوسیستم» (Ecosystem) هدوپ را برای دانشمندان علم داده و برنامه‌نویسان این حوزه فراهم آورد. هم‌اکنون برنامه‌نویسان زبان پایتون و دانشمندان علم داده مسلط به این زبان می‌توانند از ابزارهایی نظیر « استریمینگ هدوپ» (Hadoop Streaming)، افزونه Hadoopy، کتابخانه Pydoop و بسته Pydoop اشاره کرد.

پردازش کلان داده

استفاده از هدوپ (Hadoop) در پایتون به جای جاوا

یکی از سؤالاتی که ممکن است هنگام استفاده از هدوپ در پایتون (جهت پردازش کلان داده و تحلیل آن‌ها) برای فعالان حوزه علم داده پدید بیاید این است که چرا لازم است از پایتون به جای جاوا، برای پردازش کلان داده و تحلیل آن استفاده شود. در پاسخ به این سؤال باید گفت که اگرچه زبان جاوا هنوز هم یکی از محبوب‌ترین و پرکاربردترین زبان‌های برنامه‌نویسی در جهان و زبان اصلی توسعه چارچوب هدوپ (و Spark) محسوب می‌شود، با این حال، میان برخی از ویژگی‌های ساختاری این دو زبان تفاوت‌هایی وجود دارد که ممکن است سبب روی آوردن فعالان این حوزه به استفاده از پایتون شود. از جمله این تفاوت‌ها می‌توان به موارد زیر اشاره کرد:

  • نصب، پیاده‌سازی و اجرای محیط برنامه‌نویسی جاوا (Java Development Environment) نسبت به زبان پایتون سخت‌تر است (تنظیم کردن تنظیمات مرتبط با «متغیرهای محیطی» (Environment Variables)، تنظیمات فایل‌های XML مرتبط با وابستگی‌‎های نرم‌افزاری و سایر موارد). در نقطه مقابل، برای نصب، پیاده‌سازی و اجرای محیط برنامه‌نویسی پایتون، تنها کافی است پایتون را روی سیستم نصب و از واسط خط دستوری آن جهت کد نویسی استفاده کرد.
  • در محیط برنامه‌نویسی جاوا، ابتدا باید کدهای نوشته شده «کامپایل» (Compile) و سپس اجرا شوند. در حالی که در زبان پایتون، تنها کافی است کدها توسط برنامه‌نویس نوشته و سپس خط به خط در واسط خط دستوری اجرا شوند.
  • تعداد خط کدهای لازم برای نوشتن یک برنامه خاص در زبان جاوا، به مراتب بیشتر از تعداد خط کدهای لازم برای توسعه همان برنامه در زبان پایتون خواهد بود. به عنوان نمونه، به مثال زیر دقت کنید. در ادامه، تعداد خط کدهای لازم برای شمارش تعداد دفعات تکرار کلمات با استفاده از زبان جاوا و پایتون نمایش داده شده است. اولین قدم جهت پردازش کلان داده در فرم داده‌های متنی غیر ساخت یافته، شمارش تعداد دفعات تکرار کلمات ظاهر شده در این داده‌ها است:

شمارش تعداد دفعات تکرار کلمات با استفاده از زبان جاوا:

package org.myorg;
	
	import java.io.IOException;
	import java.util.*;
	
	import org.apache.hadoop.fs.Path;
	import org.apache.hadoop.conf.*;
	import org.apache.hadoop.io.*;
	import org.apache.hadoop.mapred.*;
	import org.apache.hadoop.util.*;
	
	public class WordCount {
	
	   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
	     private final static IntWritable one = new IntWritable(1);
	     private Text word = new Text();
	
	     public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
	       String line = value.toString();
	       StringTokenizer tokenizer = new StringTokenizer(line);
	       while (tokenizer.hasMoreTokens()) {
	         word.set(tokenizer.nextToken());
	         output.collect(word, one);
	       }
	     }
	   }
	
	   public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
	     public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
	       int sum = 0;
	       while (values.hasNext()) {
	         sum += values.next().get();
	       }
	       output.collect(key, new IntWritable(sum));
	     }
	   }
	
	   public static void main(String[] args) throws Exception {
	     JobConf conf = new JobConf(WordCount.class);
	     conf.setJobName("wordcount");
	
	     conf.setOutputKeyClass(Text.class);
	     conf.setOutputValueClass(IntWritable.class);
	
	     conf.setMapperClass(Map.class);
	     conf.setCombinerClass(Reduce.class);
	     conf.setReducerClass(Reduce.class);
	
	     conf.setInputFormat(TextInputFormat.class);
	     conf.setOutputFormat(TextOutputFormat.class);
	
	     FileInputFormat.setInputPaths(conf, new Path(args[0]));
	     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
	
	     JobClient.runJob(conf);
	   }
	}

شمارش تعداد دفعات تکرار کلمات با استفاده از زبان پایتون:

import re

WORD_RE = re.compile(r"[\w']+")

class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))

if __name__ == '__main__':
    MRWordFreqCount.run()

#Source
#https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_word_freq_count.py

همانطور که مشاهده می‌شود، برنامه‌ای که نوشتن آن در زبان جاوا به 57 خط کد احتیاج است، در زبان پایتون، تنها با 15 خط کد قابل نوشتن است. این ویژگی سبب می‌شود تا «نگه‌داری» (Maintain) کدها در زبان پایتون به مراتب راحت‌تر از زبان جاوا باشد.

کتابخانه‌های مدیریت و پردازش داده در پایتون

همانطور که پیش از این نیز اشاره شد، کتابخانه‌ها، ابزارها و بسته‌های برنامه‌نویسی بسیار قدرتمند و پرکاربردی برای پردازش، ذخیره‌سازی، نمایش، شکل دادن و مصورسازی داده‌ها در زبان برنامه‌نویسی پایتون ارائه شده است. از جمله مهم‌ترین کتابخانه‌های مدیریت و پردازش داده در پایتون می‌توان به موارد زیر اشاره کرد:

کتابخانه Pandas

یکی از محبوب‌ترین کتابخانه‌های ارائه شده برای مقاصد علم داده، کتابخانه Pandas است. کتابخانه Pandas توسط دانشمندان علم داده‌ای که به زبان‌های برنامه‌نویسی R و پایتون مسلط هستند، توسعه داده شده است. هم اکنون جامعه بسیار بزرگی از برنامه‌نویسان و توسعه‌دهندگان (متشکل از دانشمندان علم داده و تحلیل‌گران داده) از کتابخانه Pandas و فرایند توسعه آن (برطرف کردن مشکلات احتمالی و اضافه کردن قابلیت‌های برنامه‌نویسی جدید) پشتیبانی می‌کنند.

کتابخانه Pandas ویژگی‌های تعبیه شده بسیار مهم و پرکاربردی نظیر قابلیت خواندن داده‌ها از منابع مختلف، ساختن «دیتا فریم» (Dataframe) یا ماتریس/جدول متناظر با داده‌های خوانده شده و محاسبه «تجزیه و تحلیل تجمیعی» (Aggregate Analytics) بر اساس نوع برنامه کاربردی در حال توسعه را برای برنامه‌نویسان و توسعه‌دهندگان فراهم می‌آورد.

علاوه بر امکانات تعبیه شده برای مدیریت و پردازش داده، کتابخانه Pandas از امکانات «مصور‌سازی داده» (Data Visualization) بسیار مناسبی نیز بهره می‌برد. با استفاده از قابلیت‌های مصورسازی تعبیه شده در این کتابخانه، امکان تولید گراف و نمودار از نتایج تولید شده برای برنامه‌نویسان و دانشمندان علم داده فراهم شده است. همچنین، مجموعه‌ای از توابع داخلی در کتابخانه Pandas تعریف شده‌اند که امکان «صادر کردن» (Export) تجزیه و تحلیل‌های انجام شده روی داده‌ها را در قالب فایل «صفحات گسترده اکسل» (Excel Spreadsheet) فراهم می‌آورند.

پردازش کلان داده

کتابخانه Agate

یکی از کتابخانه‌های برنامه‌نویسی نوظهور برای مقاصد علم داده، کتابخانه Agate نام دارد. کتابخانه Agate با هدف استفاده در کاربردهای «روزنامه‌نگاری» (Journalism) پیاده‌سازی شده است و ویژگی‌های بسیار مناسبی جهت تحلیل داده‌ها در اختیار برنامه‌نویسان و توسعه‌دهندگان قرار می‌دهد. از این کتابخانه می‌توان جهت تحلیل و مقایسه فایل‌های صفحه گسترده (Spreadsheet)، انجام محاسبات آماری رو داده‌ها و سایر موارد استفاده کرد.

یادگیری کار کردن با کتابخانه Agate بسیار ساده است و وابستگی‌های به مراتب کمتری نسبت به Pandas دارد. همچنین، ویژگی‌های جالبی نظیر مصورسازی داده‌ها و تولید نمودار، این کتابخانه را به ابزاری محبوب برای مقاصد تحلیل داده تبدیل کرده است.

کتابخانه Bokeh

در صورتی که تمایل به مصورسازی داده‌های موجود در یک مجموعه داده ساخت یافته دارید، Bokeh ابزار بسیار مناسبی محسوب می‌شود. این ابزار را می‌توان با کتابخانه‌های تحلیل داده نظیر Pandas ،Agate و سایر موارد مورد استفاده قرار داد. این ابزار، به برنامه‌نویسان و توسعه‌دهندگان اجازه می‌دهد تا با کمترین کد نویسی، نمودارهای گرافیکی و مصورسازی‌های خیره کننده‌ای از داده‌ها تولید کنند.

پردازش کلان داده

کاوش، پردازش و تحلیل داده‌ها در پایتون با استفاده از کتابخانه Pandas

پیش از این که با ابزارهای پردازش کلان داده در پایتون آشنا شویم، با یک مثال ساده، نحوه تحلیل داده‌ها در پایتون مورد بررسی قرار می‌گیرد. پس از جستجو در سایت Kaggle، مجموعه داده حقوق کارکنان دولتی شهر سان‌فرانسیسکو برای این کار انتخاب شد. این مجموعه داده از طریق لینک [+] قابل دسترسی است.

ابتدا با استفاده از قطعه کد زیر داده‌ها وارد (Import) و در یک دیتا فریم Pandas خوانده می‌شوند.

import pandas as pd
salaries = pd.read_csv('Salaries.csv')
salaries.head()

پردازش کلان داده

سپس، با استفاده از تابع describe، توزیع داده‌های موجود در مجموعه داده نمایش داده می‌شود.

salaries.head()

پردازش کلان داده

داده‌های جمع‌آوری شده، مربوط به سال‌های متعددی است. برای محدودتر کردن حوزه تحلیلی انجام شده از داده‌ها، تنها داده‌های مربوط به سال 2014 مورد بررسی قرار گرفته می‌شوند.

latest_salaries = salaries[salaries['Year'] == 2014]
latest_salaries.describe()

پردازش کلان داده

سپس، در این مرحله از فرایند تحلیل داده، مقدار درصد متوسطی از حقوق سالیانه که کارمندان دولتی برای پرداخت اجاره خانه استفاده می‌کنند، بررسی می‌شود.

average_yearly_rent = 3880 * 12
average_city_pay = latest_salaries['TotalPay'].mean()
average_yearly_rent / average_city_pay
0.61698360153365217

بنابراین، همانطور که در خروجی نمایش داده شده مشخص است، کارمندان دولتی در سان‌فرانسیسکو، به طور متوسط چیزی حدود 60 درصد از حقوق خود را صرف پرداخت اجاره بها می‌کنند. در مرحله بعد، تعداد کارمندانی که حقوق آن‌ها از متوسط اجاره بهای منزل در این شهر کمتر است مشخص می‌شود.

latest_salaries[latest_salaries['TotalPay'] < average_yearly_rent].shape[0]
11360

همانطور که در خروجی نمایش داده شده مشخص است، بیش از 11 هزار کارمند دولتی در سان‌فرانسیسکو، درآمد کمتر از متوسط اجاره بهای منزل دارند. در مرحله بعدی، تعداد کارمندانی که باید اضافه کار داشته باشند تا بتوانند از حقوق لازم برای پرداخت اجاره بها برخوردار شوند، نمایش داده می‌شود. برای اینکار، تمامی کسانی که بیش از 70 درصد حقوق خود را برای اجاره بها پرداخت می‌کنند، به عنوان افرادی در نظر گرفته می‌شوند که بدون اضافه کاری قادر به پرداخت اجاره بها نیستند.

pd.to_numeric(latest_salaries['BasePay'], errors='coerce').describe()
count     38119.000000
mean      66564.421924
std       44053.783972
min           0.000000
25%                NaN
50%                NaN
75%                NaN
max      318835.490000
Name: BasePay, dtype: float64
base_pay_series = pd.to_numeric(latest_salaries['BasePay'], errors='coerce')
base_pay_series[base_pay_series * .7 < average_yearly_rent].shape
20074

بنابراین، بیش از 20 هزار کارمند دولتی بدون اضافه کاری قادر به پرداخت اجاره بها و هزینه زندگی در شهر سان‌فرانسیکو نخواهند بود. در مرحله بعد، با استفاده از تحلیل داده‌های موجود، تعداد کارمندانی مشخص می‌شود که باید بیش از 1000 دلار در سال اضافه کاری داشته باشند تا بتوانند به زندگی خود در این شهر ادامه دهند.

overtime_series = pd.to_numeric(latest_salaries['OvertimePay'], errors='coerce')
overtime_series.describe()
count     38119.000000
mean       5401.993737
std       11769.656257
min           0.000000
25%                NaN
50%                NaN
75%                NaN
max      173547.730000
Name: OvertimePay, dtype: float64
overtime_series[overtime_series > 1000].shape
15361

بنابراین، همانطور که در خروجی نمایش داده شده مشخص است، بیش از 15 هزار کارمند دولتی، با اضافه کاری بیش از 1000 دلار در سال، قادر به زندگی در این شهر پرخرج هستند. این دسته از تحلیل‌های انجام شده روی مجموعه داده انتخابی، تنها بخشی از کارهای قابل انجام توسط کتابخانه‌های پردازش و تحلیل داده در پایتون هستند. بنابراین، همانطور که مشاهده می‌شود، پایتون یکی از بهترین و ساده‌ترین زبان‌ها برای تحلیل داده به شمار می‌آید و امکانات کاملی برای کاوش، پردازش و تحلیل داده در اختیار برنامه‌نویسان و دانشمندان علم داده قرار می‌دهد.

کتابخانه‌های پردازش کلان داده در پایتون

زبان برنامه‌نویسی پایتون و جامعه برنامه‌نویسی فعال آن، ابزارها و کتابخانه‌های متنوعی برای پردازش کلان داده در اختیار برنامه‌نویسان و دانشمندان علم داده قرار داده است. این ابزارها، مبتنی بر کتابخانه هدوپ و Spark هستند. از جمله، مهم‌ترین ابزارهای پردازش کلان داده در پایتون می‌تواند به موارد زیر اشاره کرد:

کتابخانه PySpark

کتابخانه PySpark که واسط برنامه‌نویسی کاربردی Spark در زبان پایتون محسوب می‌شود، به دانشمندان علم داده و برنامه‌نویسان فعال در این حوزه اجازه می‌دهد تا به سرعت، زیرساخت‌های لازم برای نگاشت و کاهش مجموعه‌های داده را پیاده‌سازی کنند. همچنین، از آنجایی که در کتابخانه Spark مجموعه‌ای از الگوریتم‌های یادگیری ماشین تعبیه شده است، علاوه بر پردازش کلان داده و مدیریت آن‌ها، جهت حل مسائل یادگیری ماشین نیز مورد استفاده قرار می‌گیرد.

پردازش کلان داده

ابزار جریان‌سازی یا Streaming

ابزار جریان‌سازی هدوپ یا Hadoop Streaming، یکی از مجبوب‌ترین روش‌های استفاده از Hadoop در پایتون محسوب می‌شود. جریان‌سازی یا Streaming، یکی از ویژگی‌های تعبیه شده در کتابخانه Hadoop است. این ویژگی به برنامه‌نویس اجازه می‌دهد تا کدهای نوشته به زبان پایتون (و یا دیگر زبان‌های برنامه‌نویسی) را جهت انجام عملیات نگاشت (Mapping) به تابع stdin پاس بدهند (به عنوان آرگومان، به این تابع ارسال کنند). به عبارت دیگر، توسعه‌دهندگان از جاوا به عنوان Wrapper استفاده می‌کنند تا بتوانند کدهای پایتون را به تابع stdin پاس بدهند. سپس، در زمان اجرا، عملیات نگاشت (Mapping) در زبان جاوا و توسط کتابخانه Hadoop انجام می‌شود.

پردازش کلان داده

افزونه Hadoopy

این ابزار، افزونه‌ای برای جریان‌سازی هدوپ یا Hadoop Streaming محسوب می‌شود و از کتابخانه Cython برای انجام عملیات نگاشت-کاهش (MapReduce) در پایتون استفاده می‌کند. مستندسازی خوبی برای این ابزار انجام شده است و کار کردن با آن برای توسعه‌دهندگان راحت است. با این حال، این پروژه از پایان سال 2012 تا به امروز غیر فعال باقی مانده و به‌روز‌رسانی جدیدی دریافت نکرده است.

بسته Pydoop

این بسته برنامه‌نویسی به توسعه‌دهندگان اجازه می‌دهد تا بتوانند برنامه مرتبط با پردازش کلان داده را در پایتون کد نویسی کنند. سپس، با استفاده از کدهای پیاده‌سازی شده، به طور مستقیم با داده‌های ذخیره شده در خوشه هدوپ (Hadoop Cluster) ارتباط برقرار کرده و عملیات نگاشت-کاهش (MapReduce) انجام دهند. چنین کاری از طریق واسط برنامه‌نویسی کاربردی HDFS (یا HDFS API) تعبیه شده در بسته Pydoop امکان‌پذیر شده است. واسط برنامه‌نویسی کاربردی HDFS به برنامه‌نویسان اجازه می‌دهد تا در محیط پایتون، عملیات خواندن و نوشتن داده‌ها، از معماری فایل سیستم HDFS را انجام دهند.

پردازش کلان داده

کتابخانه ‏MRJob

یک کتابخانه برنامه‌نویسی در زبان پایتون است که امکان انجام عملیات نگاشت-کاهش (MapReduce) و پردازش کلان داده را در اختیار برنامه‌نویسان و دانشمندان علم داده قرار می‌دهد. این کتابخانه نرم‌افزاری، یکی از پراستفاده‌ترین بسته‌های برنامه‌نویسی جهت پردازش کلان داده در زبان پایتون محسوب می‌شود و توسط شرکت Yelp در اختیار برنامه‌نویسان زبان پایتون قرار گرفته است.

کتابخانه ‏MRJob، از هدوپ، سرویس Cloud Dataproc گوگل و سرویس Elastic MapReduce یا EMR شرکت آمازون نیز پشتیبانی می‌کند. سرویس EMR، یک وب سرویس ارائه شده توسط شرکت آمازون است که امکان تحلیل و پردازش کلان داده با استفاده از هدوپ و Spark را در اختیار فعالان حوزه علم داده قرار می‌دهد. مستندسازی خوبی برای این کتابخانه انجام شده است و کار کردن با آن برای توسعه‌دهندگان و برنامه‌نویسان راحت است. همچنین، این پروژه کاملا فعال است و از پیاده‌سازی‌های مختلف آن پشتیبانی می‌شود.

پردازش کلان داده

اجرای کدهای پایتون در هدوپ (Hadoop) با استفاده از کتابخانه MRJob

در این مثال، یک برنامه ساده برای محاسبه دفعات ظاهر شدن تمامی کلمات در متن، در زبان پایتون کد نویسی خواهد شد. سپس، با استفاده از هدوپ (Hadoop)، این قابلیت پدید می‌آید تا هر نوع داده متنی (با هر اندازه ممکن و در هر قالبی)، مورد تجزیه و تحلیل قرار بگیرد و تعداد دفعات ظاهر شدن تمامی کلمات موجود در آن محاسبه شود. برای چنین کاری، باید یک خوشه هدوپ در سیستم اجرایی باشد. برای اجرای کدهای پایتون در هدوپ (Hadoop) با استفاده از کتابخانه MRJob، لازم است مراحل زیر دنبال شود:

  • ابتدا با استفاده از Pip، کتابخانه MRJob در پایتون نصب می‌شود.
pip install mrjob
  • برنامه شمارش کلمات در متن: برای چنین کاری، در یک برنامه دلخواه ویرایشگر کد، از کدهای زیر استفاده کنید (یک فایل خالی و کدهای زی را در آن کپی کنید). سپس، فایل مورد نظر را با نام word-count.py ذخیره کنید.

کدهای لازم محاسبه دفعات ظاهر شدن تمامی کلمات در متن با استفاده از MRJob (ابزار Hadoop):

# Copyright 2009-2010 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The classic MapReduce job: count the frequency of words.
"""
from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))


if __name__ == '__main__':
    MRWordFreqCount.run()
  • اجرای کدهای پایتون در هدوپ (Hadoop) با استفاده از کتابخانه MRJob: پس از اینکه کد نویسی برنامه شمارش تعداد دفعات ظاهر شدن کلمات در متن به پایان رسید، در مرحله بعد لازم است تا کدهای پایتون برای هدوپ (hadoop) ارسال و اجرا شوند. برای این کار از کد دستوری زیر استفاده می‌شود:
$ python word_count.py [file location] -r hadoop > counts

کدهای کامل پیاده‌سازی سیستم پردازش داده‌های متنی با استفاده از کتابخانه MRJob، در لینک [+] موجود است (در این پروژه، از تکنیک‌های پردازش کلان داده، مدل برنامه‌نویسی MapReduce و کتابخانه‌های Spark و Hadoop استفاده شده است). در ادامه، تعدادی مثال مرتبط با پردازش داده‌های متنی با استفاده از کتابخانه MRJob نمایش داده شده است. ذکر این نکته ضروری است که برای اطمینان از صحت عملکرد کدهای ارائه شده، تمامی ماژول‌های تعریف شده در در لینک [+]، باید توسط برنامه کاربردی قابل دسترس باشند.

محاسبه دفعات ظاهر شدن تمامی کلماتی که حاوی حرف u هستند، با استفاده از MRJob (ابزار Hadoop):

# Copyright 2016 Yelp
# Copyright 2017 Yelp and Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The classic MapReduce job, but only for words containing "u"
"""
from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']*u[\w']*", re.I)


class MRVWordFreqCount(MRJob):

    def mapper_pre_filter(self):
        return 'grep -i u'

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))


if __name__ == '__main__':
    MRVWordFreqCount.run()

محاسبه تعداد کلمات موجود در یک متن با استفاده از Pyspark:

# Copyright 2016 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import sys
from operator import add

from pyspark import SparkContext


WORD_RE = re.compile(r"[\w']+")


def main():
    # read in input, output path
    args = sys.argv[1:]

    if len(args) != 2:
        raise ValueError

    inputPath, outputPath = args

    sc = SparkContext(appName='mrjob Spark wordcount script')

    lines = sc.textFile(inputPath)

    # lines.flatMap(WORD_RE.findall) doesn't work on Spark 1.6.2; apparently
    # it can't serialize instance methods?
    counts = (
        lines.flatMap(lambda line: WORD_RE.findall(line))
        .map(lambda word: (word, 1))
        .reduceByKey(add))

    counts.saveAsTextFile(outputPath)

    sc.stop()


if __name__ == '__main__':
    main()

مشخص کردن کلمات پر استفاده در یک داده متنی ورودی با استفاده از MRJob (ابزار Hadoop):

# Copyright 2009-2010 Yelp
# Copyright 2013 David Marin
# Copyright 2018 Yelp
# Copyright 2019 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Determine the most used word in the input, ignoring common "stop" words.
Shows how to do a multi-step job, and how to load a support file
from the same directory.
"""
import re

from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
from mrjob.step import MRStep

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):
    FILES = ['stop_words.txt']

    OUTPUT_PROTOCOL = JSONValueProtocol

    def configure_args(self):
        super(MRMostUsedWord, self).configure_args()

        # allow for alternate stop words file
        self.add_file_arg(
            '--stop-words-file',
            dest='stop_words_file',
            default=None,
            help='alternate stop words file. lowercase words, one per line',
        )

    def mapper_init(self):
        stop_words_path = self.options.stop_words_file or 'stop_words.txt'

        with open(stop_words_path) as f:
            self.stop_words = set(line.strip() for line in f)

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            word = word.lower()
            if word not in self.stop_words:
                yield (word, 1)

    def combiner_count_words(self, word, counts):
        # sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        try:
            yield max(word_count_pairs)
        except ValueError:
            pass

    def steps(self):
        return [
            MRStep(mapper_init=self.mapper_init,
                   mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]


if __name__ == '__main__':
    MRMostUsedWord.run()

مشخص کردن کلمات پر استفاده در یک داده متنی ورودی با استفاده از MRJob (ابزار spark):

# Copyright 2019 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Like mr_most_used_word.py, but on Spark.
To make this work on the local[*] master, pass in your own --stop-words-file
(it won't be able to see stop_words.txt because --files doesn't work
on local[*] master)
"""
import json
import re
from operator import add

from mrjob.job import MRJob

WORD_RE = re.compile(r"[\w']+")


class MRSparkMostUsedWord(MRJob):

    FILES = ['stop_words.txt']

    def configure_args(self):
        super(MRSparkMostUsedWord, self).configure_args()

        # allow for alternate stop words file
        self.add_file_arg(
            '--stop-words-file',
            dest='stop_words_file',
            default=None,
            help='alternate stop words file. lowercase words, one per line',
        )

    def spark(self, input_path, output_path):
        from pyspark import SparkContext

        sc = SparkContext()

        lines = sc.textFile(input_path)

        # do a word frequency count
        words_and_ones = lines.mapPartitions(self.get_words)
        word_counts = words_and_ones.reduceByKey(add)

        # pick pair with highest count (now in count, word format)
        max_count, word = word_counts.map(lambda w_c: (w_c[1], w_c[0])).max()

        # output our word
        output = sc.parallelize([json.dumps(word)])
        output.saveAsTextFile(output_path)

        sc.stop()

    def get_words(self, line_iterator):
        # this only happens once per partition
        stop_words = self.load_stop_words()

        for line in line_iterator:
            for word in WORD_RE.findall(line):
                word = word.lower()
                if word not in stop_words:
                    yield (word, 1)

    def load_stop_words(self):
        # this should only be called inside executors (i.e. inside functions
        # passed to RDDs)
        stop_words_path = self.options.stop_words_file or 'stop_words.txt'

        with open(stop_words_path) as f:
            return set(line.strip() for line in f)


if __name__ == '__main__':
    MRSparkMostUsedWord.run()

دسته‌بندی داده‌های متنی با استفاده از MRJob (ابزار Hadoop):

# Copyright 2009-2010 Yelp
# Copyright 2013 David Marin
# Copyright 2017 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A text classifier that uses a modified version of Naive Bayes that is not
sensitive to document length.
This is a somewhat contrived example in that it does everything in one job;
generally you'd run one job to generate n-gram scores, put them in a sqlite
database, and run a second job to score documents. But this is simple, and
it works!
This takes as its input documents encoded by encode_document() below. For each
document, you specify its text, and whether it belongs or does not belong
to one or more categories. You can also specify a unique ID for each document.
This job outputs the documents, with the field 'cat_to_score' filled in.
Generally, positive scores indicate the document is in the category, and
negative scores indicate it is not, but it's up to you to determine a
threshold for each category. This job also outputs scores for each ngram,
so that you can classify other documents.
About half of the documents are placed in a test set (based on SHA1 hash of
their text), which means they will not be used to train the classifier. The
'in_test_set' of each document will be filled accordingly. You can turn
this off with the --no-test-set flag. (You can also effectively put docs
in the training set by specifying no category information.)
Some terminology:
An "ngram" is a word or phrase. "foo" is a 1-gram; "foo bar baz" is a 3-gram.
"tf" refers to term frequency, that is, the number of times an ngram appears.
"df" referse to document frequency, that is, the number of documents an ngram
appears in at least once.
"""
from collections import defaultdict
import hashlib
import math
import re

from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
from mrjob.step import MRStep


def encode_document(text, cats=None, id=None):
    """Encode a document as a JSON so that MRTextClassifier can read it.
    Args:
    text -- the text of the document (as a unicode)
    cats -- a dictionary mapping a category name (e.g. 'sports') to True if
        the document is in the category, and False if it's not. None indicates
        that we have no information about this documents' categories
    id -- a unique ID for the document (any kind of JSON-able value should
        work). If not specified, we'll auto-generate one.
    """
    text = unicode(text)
    cats = dict((unicode(cat), bool(is_in_cat))
                for cat, is_in_cat
                in (cats or {}).iteritems())

    return JSONValueProtocol.write(
        None, {'text': text, 'cats': cats, 'id': id}) + '\n'


def count_ngrams(text, max_ngram_size, stop_words):
    """Break text down into ngrams, and return a dictionary mapping
    (n, ngram) to number of times that ngram occurs.
    n: ngram size ("foo" is a 1-gram, "foo bar baz" is a 3-gram)
    ngram: the ngram, as a space-separated string or None to indicate the
        ANY ngram (basically the number of words in the document).
    Args:
    text -- text, as a unicode
    max_ngram_size -- maximum size of ngrams to consider
    stop_words -- a collection of words (in lowercase) to remove before
        parsing out ngrams (e.g. "the", "and")
    """
    if not isinstance(stop_words, set):
        stop_words = set(stop_words)

    words = [word.lower() for word in WORD_RE.findall(text)
             if word.lower() not in stop_words]

    ngram_counts = defaultdict(int)

    for i in range(len(words)):
        for n in range(1, max_ngram_size + 1):
            if i + n <= len(words):
                ngram = ' '.join(words[i:i + n])
                ngram_counts[(n, ngram)] += 1

    # add counts for ANY ngram
    for n in range(1, max_ngram_size + 1):
        ngram_counts[(n, None)] = len(words) - n + 1

    return ngram_counts


WORD_RE = re.compile(r"[\w']+", re.UNICODE)

DEFAULT_MAX_NGRAM_SIZE = 4

DEFAULT_STOP_WORDS = [
    'a', 'about', 'also', 'am', 'an', 'and', 'any', 'are', 'as', 'at', 'be',
    'but', 'by', 'can', 'com', 'did', 'do', 'does', 'for', 'from', 'had',
    'has', 'have', 'he', "he'd", "he'll", "he's", 'her', 'here', 'hers',
    'him', 'his', 'i', "i'd", "i'll", "i'm", "i've", 'if', 'in', 'into', 'is',
    'it', "it's", 'its', 'just', 'me', 'mine', 'my', 'of', 'on', 'or', 'org',
    'our', 'ours', 'she', "she'd", "she'll", "she's", 'some', 'than', 'that',
    'the', 'their', 'them', 'then', 'there', 'these', 'they', "they'd",
    "they'll", "they're", 'this', 'those', 'to', 'us', 'was', 'we', "we'd",
    "we'll", "we're", 'were', 'what', 'where', 'which', 'who', 'will', 'with',
    'would', 'you', 'your', 'yours',
]


class MRTextClassifier(MRJob):
    INPUT_PROTOCOL = JSONValueProtocol

    def steps(self):
        """Conceptually, the steps are:
        1. Parse documents into ngrams
        2. Group by ngram to get a frequency count for each ngram, and to
           exclude very rare ngrams
        3. Send all ngram information to one "global" reducer so we can
           assign scores for each category and ngram
        4. Group scores and documents by ngram and compute score for that
           ngram for that document. Exclude very common ngrams to save memory.
        5. Average together scores for each document to get its score for
           each category.
        The documents themselves are passed through from step 1 to step 5.
        Ngram scoring information is passed through from step 4 to step 5.
        """
        return [MRStep(mapper=self.parse_doc,
                       reducer=self.count_ngram_freq),
                MRStep(reducer=self.score_ngrams),
                MRStep(reducer=self.score_documents_by_ngram),
                MRStep(reducer=self.score_documents)]

    def configure_args(self):
        """Add command-line options specific to this script."""
        super(MRTextClassifier, self).configure_args()

        self.add_passthru_arg(
            '--min-df', dest='min_df', default=2, type=int,
            help=('min number of documents an n-gram must appear in for us to'
                  ' count it. Default: %(default)s'))
        self.add_passthru_arg(
            '--max-df', dest='max_df', default=10000000, type=int,
            help=('max number of documents an n-gram may appear in for us to'
                  ' count it (this keeps reducers from running out of memory).'
                  ' Default: %(default)s'))
        self.add_passthru_arg(
            '--max-ngram-size', dest='max_ngram_size',
            default=DEFAULT_MAX_NGRAM_SIZE, type=int,
            help='maximum phrase length to consider')
        self.add_passthru_arg(
            '--stop-words', dest='stop_words',
            default=', '.join(DEFAULT_STOP_WORDS),
            help=("comma-separated list of words to ignore. For example, "
                  "--stop-words 'in, the' would cause 'hole in the wall' to be"
                  " parsed as ['hole', 'wall']. Default: %(default)s"))
        self.add_passthru_arg(
            '--short-doc-threshold', dest='short_doc_threshold',
            type=int, default=None,
            help=('Normally, for each n-gram size, we take the average score'
                  ' over all n-grams that appear. This allows us to penalize'
                  ' short documents by using this threshold as the denominator'
                  ' rather than the actual number of n-grams.'))
        self.add_passthru_arg(
            '--no-test-set', dest='no_test_set',
            action='store_true', default=False,
            help=("Choose about half of the documents to be the testing set"
                  " (don't use them to train the classifier) based on a SHA1"
                  " hash of their text"))

    def load_options(self, args):
        """Parse stop_words option."""
        super(MRTextClassifier, self).load_options(args)

        self.stop_words = set()
        if self.options.stop_words:
            self.stop_words.update(
                s.strip().lower() for s in self.options.stop_words.split(','))

    def parse_doc(self, _, doc):
        """Mapper: parse documents and emit ngram information.
        Input: JSON-encoded documents (see :py:func:`encode_document`)
        Output:
        ``('ngram', (n, ngram)), (count, cats)`` OR
        ``('doc', doc_id), doc``
        n: ngram length
        ngram: ngram encoded encoded as a string (e.g. "pad thai")
            or None to indicate ANY ngram.
        count:  # of times an ngram appears in the document
        cats: a map from category name to a boolean indicating whether it's
            this document is in the category
        doc_id: (hopefully) unique document ID
        doc: the encoded document. We'll fill these fields:
            ngram_counts: map from (n, ngram) to  # of times ngram appears
                in the document, using (n, None) to represent the total
                number of times ANY ngram of that size appears (essentially
                number of words)
            in_test_set: boolean indicating if this doc is in the test set
            id: SHA1 hash of doc text (if not already filled)
        """
        # only compute doc hash if we need it
        if doc.get('id') is not None and self.options.no_test_set:
            doc_hash = '0'  # don't need doc hash
        else:
            doc_hash = hashlib.sha1(doc['text'].encode('utf-8')).hexdigest()

        # fill in ID if missing
        if doc.get('id') is None:
            doc['id'] = doc_hash

        # pick test/training docs
        if self.options.no_test_set:
            doc['in_test_set'] = False
        else:
            doc['in_test_set'] = bool(int(doc_hash[-1], 16) % 2)

        # map from (n, ngram) to number of times it appears
        ngram_counts = count_ngrams(
            doc['text'], self.options.max_ngram_size, self.stop_words)

        # yield the number of times the ngram appears in this doc
        # and the categories for this document, so we can train the classifier
        if not doc['in_test_set']:
            for (n, ngram), count in ngram_counts.iteritems():
                yield ('ngram', (n, ngram)), (count, doc['cats'])

        # yield the document itself, for safekeeping
        doc['ngram_counts'] = ngram_counts.items()
        yield ('doc', doc['id']), doc

    def count_ngram_freq(self, type_and_key, values):
        """Reducer: Combine information about how many times each ngram
        appears for docs in/not in each category. Dump ngrams that appear
        in very few documents (according to --min-df switch). If two documents
        have the same ID, increment a counter and only keep one; otherwise
        pass docs through unchanged.
        Input (see parse_doc() for details):
        ('ngram', (n, ngram)), (count, cats) OR
        ('doc', doc_id), doc
        Output:
        ('global', None), ((n, ngram), (cat_to_df, cat_to_tf)) OR
        ('doc', doc_id), doc
        n: ngram length
        ngram: ngram encoded encoded as a string (e.g. "pad thai")
            or None to indicate ANY ngram.
        cat_to_df: list of tuples of ((cat_name, is_in_category), df); df
            is  # of documents of this type that the ngram appears in
        cat_to_tf: list of tuples of ((cat_name, is_in_category), df); tf
            is  # of time the ngram appears in docs of this type
        doc_id: unique document ID
        doc: the encoded document
        """
        key_type, key = type_and_key

        # pass documents through
        if key_type == 'doc':
            doc_id = key
            docs = list(values)
            # if two documents end up with the same key, only keep one
            if len(docs) > 1:
                self.increment_counter(
                    'Document key collision', str(doc_id))
            yield ('doc', doc_id), docs[0]
            return

        assert key_type == 'ngram'
        n, ngram = key

        # total # of docs this ngram appears in
        total_df = 0
        # map from (cat, is_in_cat) to
        # number of documents in this cat it appears in (df), or
        # number of times it appears in documents of this type (tf)
        cat_to_df = defaultdict(int)
        cat_to_tf = defaultdict(int)

        for count, cats in values:
            total_df += 1
            for cat in cats.iteritems():
                cat_to_df[cat] += 1
                cat_to_tf[cat] += count

        # don't bother with very rare ngrams
        if total_df < self.options.min_df:
            return

        yield (('global', None),
               ((n, ngram), (cat_to_df.items(), cat_to_tf.items())))

    def score_ngrams(self, type_and_key, values):
        """Reducer: Look at all ngrams together, and assign scores by
        ngram and category. Also farm out documents to the reducer for
        any ngram they contain, and pass documents through to the next
        step.
        To score an ngram for a category, we compare the probability of any
        given ngram being our ngram for documents in the category against
        documents not in the category. The score is just the log of the
        ratio of probabilities (the "log difference")
        Input (see count_ngram_freq() for details):
        ('global', None), ((n, ngram), (cat_to_df, cat_to_tf)) OR
        ('doc', doc_id), doc
        Output:
        ('doc', doc_id), document OR
        ('ngram', (n, ngram)), ('doc_id', doc_id) OR
        ('ngram', (n, ngram)), ('cat_to_score', cat_to_score)
        n: ngram length
        ngram: ngram encoded encoded as a string (e.g. "pad thai")
            or None to indicate ANY ngram.
        cat_to_score: map from (cat_name, is_in_category) to score for
            this ngram
        doc_id: unique document ID
        doc: the encoded document
        """
        key_type, key = type_and_key
        if key_type == 'doc':
            doc_id = key
            doc = list(values)[0]
            # pass document through
            yield ('doc', doc_id), doc

            # send document to reducer for every ngram it contains
            for (n, ngram), count in doc['ngram_counts']:
                # don't bother even creating a reducer for the ANY ngram
                # because we'd have to send all documents to it.
                if ngram is None:
                    continue
                yield (('ngram', (n, ngram)),
                       ('doc_id', doc_id))

            return

        assert key_type == 'global'
        ngram_to_info = dict(
            ((n, ngram),
             (dict((tuple(cat), df) for cat, df in cat_to_df),
              dict((tuple(cat), tf) for cat, tf in cat_to_tf)))
            for (n, ngram), (cat_to_df, cat_to_tf)
            in values)

        # m = # of possible ngrams of any given type. This is not a very
        # rigorous estimate, but it's good enough
        m = len(ngram_to_info)

        for (n, ngram), info in ngram_to_info.iteritems():
            # do this even for the special ANY ngram; it's useful
            # as a normalization factor.
            cat_to_df, cat_to_tf = info

            # get the total # of documents and terms for ngrams of this size
            cat_to_d, cat_to_t = ngram_to_info[(n, None)]

            # calculate the probability of any given term being
            # this term for documents of each type
            cat_to_p = {}
            for cat, t in cat_to_t.iteritems():
                tf = cat_to_tf.get(cat) or 0
                # use Laplace's rule of succession to estimate p. See:
                # http://en.wikipedia.org/wiki/Rule_of_succession#Generalization_to_any_number_of_possibilities
                cat_to_p[cat] = (tf + (2.0 / m)) / (t + 2)

            cats = set(cat for cat, in_cat in cat_to_t)
            cat_to_score = {}
            for cat in cats:
                p_if_in = cat_to_p.get((cat, True), 1.0 / m)
                p_if_out = cat_to_p.get((cat, False), 1.0 / m)
                # take the log difference of probabilities
                score = math.log(p_if_in) - math.log(p_if_out)
                cat_to_score[cat] = score

            yield (('ngram', (n, ngram)),
                   ('cat_to_score', cat_to_score))

    def score_documents_by_ngram(self, type_and_key, types_and_values):
        """Reducer: For all documents that contain a given ngram, send
        scoring info to that document. Also pass documents and scoring
        info through as-is
        Input (see score_ngrams() for details):
        ('doc', doc_id), doc OR
        ('ngram', (n, ngram)), ('doc_id', doc_id) OR
        ('ngram', (n, ngram)), ('cat_to_score', cat_to_score)
        Output:
        ('doc', doc_id), ('doc', doc)
        ('doc', doc_id), ('scores', ((n, ngram), cat_to_score))
        ('cat_to_score', (n, ngram)), cat_to_score
        n: ngram length
        ngram: ngram encoded encoded as a string (e.g. "pad thai")
            or None to indicate ANY ngram.
        cat_to_score: map from (cat_name, is_in_category) to score for
            this ngram
        doc_id: unique document ID
        doc: the encoded document
        """
        key_type, key = type_and_key

        # pass documents through
        if key_type == 'doc':
            doc_id = key
            doc = list(types_and_values)[0]
            yield ('doc', doc_id), ('doc', doc)
            return

        assert key_type == 'ngram'
        n, ngram = key

        doc_ids = []
        cat_to_score = None

        for value_type, value in types_and_values:
            if value_type == 'cat_to_score':
                cat_to_score = value
                continue

            assert value_type == 'doc_id'
            doc_ids.append(value)

            if len(doc_ids) > self.options.max_df:
                self.increment_counter('Exceeded max df', repr((n, ngram)))
                return

        # skip ngrams that are too rare to score
        if cat_to_score is None:
            return

        # send score info for this ngram to this document
        for doc_id in doc_ids:
            yield ('doc', doc_id), ('scores', ((n, ngram), cat_to_score))

        # keep scoring info
        yield ('cat_to_score', (n, ngram)), cat_to_score

    def score_documents(self, type_and_key, types_and_values):
        """Reducer: combine all scoring information for each document, and
        add it to the document. Also pass ngram scores through as-is.
        To score a document, we essentially take a weighted average of all
        the scores for ngrams of each size, and then sum together those
        averages. ngrams that aren't scored (because they're very rare or
        very common) are considered to have a score of zero. Using averages
        allows us to be insensitive to document size. There is a penalty
        for very small documents.
        Input (see score_ngrams() for details):
        ('doc', doc_id), ('doc', doc)
        ('doc', doc_id), ('scores', ((n, ngram), cat_to_score))
        ('cat_to_score', (n, ngram)), cat_to_score
        Output:
        ('doc', doc_id), doc
        ('cat_to_score', (n, ngram)), cat_to_score
        n: ngram length
        ngram: ngram encoded encoded as a string (e.g. "pad thai")
            or None to indicate ANY ngram.
        cat_to_score: map from (cat_name, is_in_category) to score for
            this ngram
        doc_id: unique document ID
        doc: the encoded document. this will contain an extra field
            'cat_to_score', and will no longer have the 'ngram_counts' field.
        """
        key_type, key = type_and_key

        # pass through cat_to_score
        if key_type == 'cat_to_score':
            cat_to_score = list(types_and_values)[0]
            yield ('cat_to_score', key), cat_to_score
            return

        assert key_type == 'doc'
        doc_id = key

        # store the document and scoring info
        doc = None
        ngrams_and_scores = []

        for value_type, value in types_and_values:
            if value_type == 'doc':
                doc = value
                continue

            assert value_type == 'scores'
            ((n, ngram), cat_to_score) = value
            ngrams_and_scores.append(((n, ngram), cat_to_score))

        # total scores for each ngram size
        ngram_counts = dict(((n, ngram), count)
                            for (n, ngram), count in doc['ngram_counts'])

        cat_to_n_to_total_score = defaultdict(lambda: defaultdict(float))

        for (n, ngram), cat_to_score in ngrams_and_scores:
            tf = ngram_counts[(n, ngram)]
            for cat, score in cat_to_score.iteritems():
                cat_to_n_to_total_score[cat][n] += score * tf

        # average scores for each ngram size
        cat_to_score = {}
        for cat, n_to_total_score in cat_to_n_to_total_score.iteritems():
            total_score_for_cat = 0
            for n, total_score in n_to_total_score.iteritems():
                total_t = ngram_counts[(n, None)]
                total_score_for_cat += (
                    total_score /
                    max(total_t, self.options.short_doc_threshold, 1))
            cat_to_score[cat] = total_score_for_cat

        # add scores to the document, and get rid of ngram_counts
        doc['cat_to_score'] = cat_to_score
        del doc['ngram_counts']

        yield ('doc', doc_id), doc

if __name__ == '__main__':
    MRTextClassifier.run()

برنامه نگاشت شماره‌های تلفن به آدرس‌های URL با استفاده از از MRJob (ابزار Hadoop):

"""An example of how to parse non-line-based data.
Map +1 (U.S. and Canadian) phone numbers to the most plausible
URL for their webpage.
This is similar to the article "Analyzing the Web for the Price of a Sandwich"
(https://engineeringblog.yelp.com/2015/03/analyzing-the-web-for-the-price-of-a-sandwich.html)
except that it doesn't include Yelp biz IDs, and it doesn't need to access
S3 because it can read the input files directly.
Sample command line:
.. code-block:: sh
   python mr_phone_to_url.py -r emr --bootstrap 'sudo pip install warcio' --output-dir s3://your-bucket/path/ --no-output s3://commoncrawl/crawl-data/CC-MAIN-2018-09/segments/*/wet/*.wet.gz
To find the latest crawl:
``aws s3 ls s3://commoncrawl/crawl-data/ | grep CC-MAIN``
WET data is often added after a release; usually the second-most recent
release is a safe bet.
"""
import re
from itertools import islice

from mrjob.job import MRJob
from mrjob.py2 import urlparse
from mrjob.step import MRStep

PHONE_RE = re.compile(
    br'(?:[\D\b]|^)(1?[2-9]\d{2}[\-. ()+]+\d{3}[\-. ()+]+\d{4})(?:[\D\b]|$)')
PHONE_SEP_RE = re.compile(br'[\-. ()+]')

# hosts with more than this many phone numbers are assumed to be directories
MAX_PHONES_PER_HOST = 1000


def standardize_phone_number(number):
    """put *number* in a standard format, and convert it to a :py:class:`str`.
    """
    number_sep = PHONE_SEP_RE.split(number)
    number = b''.join(number_sep).decode('ascii')
    if len(number) > 7:
        if number[-1] not in '0123456789':
            number = number[:-1]
        if number[0] not in '0123456789':
            number = number[1:]
    if len(number) <= 10:
        return "+1" + number
    else:
        return "+" + number


class MRPhoneToURL(MRJob):
    """Use Common Crawl .wet files to map from phone number to the most
    likely URL."""

    def steps(self):
        return [
            MRStep(mapper_raw=self.extract_phone_and_url_mapper,
                   reducer=self.count_by_host_reducer),
            MRStep(reducer=self.pick_best_url_reducer),
        ]

    def extract_phone_and_url_mapper(self, wet_path, wet_uri):
        """Read in .wet file, and extract phone ant URL
        """
        from warcio.archiveiterator import ArchiveIterator

        with open(wet_path, 'rb') as f:
            for record in ArchiveIterator(f):
                if record.rec_type != 'conversion':
                    continue

                headers = record.rec_headers
                if headers.get_header('Content-Type') != 'text/plain':
                    continue

                url = headers.get_header('WARC-Target-URI')
                if not url:
                    continue

                host = urlparse(url).netloc

                payload = record.content_stream().read()
                for phone in PHONE_RE.findall(payload):
                    phone = standardize_phone_number(phone)
                    yield host, (phone, url)

    def count_by_host_reducer(self, host, phone_urls):
        phone_urls = list(islice(phone_urls, MAX_PHONES_PER_HOST + 1))

        # don't bother with directories, etc.
        host_phone_count = len(phone_urls)
        if host_phone_count > MAX_PHONES_PER_HOST:
            return

        for phone, url in phone_urls:
            yield phone, (url, host_phone_count)

    def pick_best_url_reducer(self, phone, urls_with_count):
        # pick the url that appears on a host with the least number of
        # phone numbers, breaking ties by choosing the shortest URL
        # and the one that comes first alphabetically
        urls_with_count = sorted(
            urls_with_count, key=lambda uc: (uc[1], -len(uc[0]), uc[0]))

        yield phone, urls_with_count[0][0]


if __name__ == '__main__':
    MRPhoneToURL.run()

مثال‌های ارائه شده تنها بخش کوچکی از قابلیت‌های ابزارهایی نظیر هدوپ (Hadoop) و Spark هستند. این دسته از مدل‌های برنامه‌نویسی، علاوه بر اینکه امکان ذخیره‌سازی و پردازش توزیع شده داده‌های مختلف را به برنامه‌نویسان و دانشمندان داده می‌دهند، قابلیت مدیریت و پردازش کلان داده را برای سیستم فراهم می‌کنند.

اگر نوشته بالا برای شما مفید بوده است، آموزش‌های زیر نیز به شما پیشنهاد می‌شوند:

^^

بر اساس رای ۵ نفر
آیا این مطلب برای شما مفید بود؟
شما قبلا رای داده‌اید!
اگر بازخوردی درباره این مطلب دارید یا پرسشی دارید که بدون پاسخ مانده است، آن را از طریق بخش نظرات مطرح کنید.

«مرتضی جادریان»، دانشجوی مقطع دکتری مهندسی کامپیوتر گرایش هوش مصنوعی است. او در زمینه سیستم‌های هوشمند، به ویژه سیستم‌های هوشمند اطلاعاتی، روش‌های یادگیری ماشین، سیستم‌های دانش محور و محاسبات تکاملی فعالیت دارد.

نظر شما چیست؟

نشانی ایمیل شما منتشر نخواهد شد.

برچسب‌ها

مشاهده بیشتر