پردازش کلان داده در پایتون — راهنمای جامع

اگر کاربران با دنیای علم و فناوری آشنایی داشته باشند، بدون شک با واژگان و اصطلاحاتی نظیر «علم داده» (Data Science)، «علم تجزیه و تحلیل» (Analytics Science)، «یادگیری ماشین» (Machine Learning)، «کلان داده» (Big Data) مواردی مشابه برخورد داشتهاند. در دنیای فناوری امروز، از این اصطلاحات برای تعریف مجموعهای از فناوریهای مهم و پیشرفته حوزه در «علوم کامپیوتر» (Computer Science) استفاده میشود که پایه و اساس پیشرفتهای فناوری در آینده نیز به شمار میآیند. در این مطلب، با مهمترین ابزارهای علم داده جهت پردازش کلان داده آشنا خواهید. همچنین، مثالهای ساده و عملی جهت پردازش کلان داده نمایش داده خواهند شد. علاوه بر این، استفاده از «زبان برنامهنویسی پایتون» (Python Programming language) برای تحلیل داده آموزش داده خواهد شد (از این دسته از تکنیکها در مقیاس وسیعتر، جهت پردازش کلان داده نیز استفاده میشود).
علم داده یکی از حوزههای گسترده علوم کامپیوتر محسوب میشود و زیر شاخههای گستردهای نظیر «جمعآوری داده» (Data Collection)، «پاکسازی داده» (Data Cleaning)، «استاندارد سازی داده» (Data Standardization)، «تحلیل داده» (Data Analysis) و «گزارش» (Reporting) را شامل میشود. بزرگترین شرکتهای حوزه فناوری، از تکنیکهای علم داده و پردازش کلان داده برای استخراج دانش از «دادههای غیر ساخت یافته» (Unstructured Data) و «دادههای ساخت یافته» (Structured Data) سازمانی استفاده میکنند.
پردازش کلان داده
تحلیل و پردازش کلان داده هم اکنون به یکی از ترندهای تحقیقاتی در دنیا تبدیل شده است. به همین خاطر است که فرصتهای شغلی بسیار خوبی، برای برنامهنویسان و توسعهدهندگانی که با تکنیکهای پردازش کلان داده آشنایی دارند، فراهم شده است. یک دانشمند علم داده و یا یک برنامهنویس مسلط به پردازش کلان داده و تحلیل آن میتواند از مجموعه ابزارهای تولید شده برای پردازش کلان داده، جهت «تحلیل زبان» (Language Analysis)، «پیشنهاد فایلهای ویدئویی» (Recommending Videos) و یا مشخص کردن محصولات جدید با توجه به دادههای بازاریابی یا دادههای جمعآوری شده از مشتریان استفاده کند.
به طور کلی، زمانی که تکنیکهای موجود در حوزه پردازش کلان داده در علم داده مورد بررسی قرار میگیرند، منظور تکنیکهایی هستند که در حوزه خاصی به نام «علم داده در مقیاس وسیع» (large Scale Data Science) استفاده میشوند. تعریف کلمه کلان در اصطلاح پردازش کلان داده، به نوع پروژههای برنامهنویسی پردازش داده یا حجم دادههایی که قرار است پردازش و تحلیل شوند، بستگی دارد.
بسیاری از مسائل تحلیل داده، که برای حل آنها پروژههای برنامهنویسی تعریف میشود، توسط راهکارهای «مرسوم» (Conventional) و متدهای یادگیری ماشین به راحتی قابل حل هستند. به عبارت دیگر، زمانی که دادههای استفاده شده برای مدلسازی یا آموزش مدل یادگیری، به اندازهای باشند که بتوان از سیستمهای محاسباتی معمول نظیر کامپیوترهای شخصی برای پردازش آنها استفاده کرد، معمولا نیازی به استفاده از روشهای پردازش کلان داده وجود نخواهد داشت. با این حال، این امکان وجود دارد که بتوان از روشهای تحلیل و پردازش کلان داده برای حل مسائل تحلیل داده استفاده کرد. به طور کلی، زمانی از تکنیکهای پردازش کلان داده برای حل مسئله استفاده میشود که نیاز به پردازش حجم عظیمی از دادهها وجود دارد و سیستمهای محاسباتی مرسوم، توان پردازشی لازم برای تحلیل و پردازش کلان داده را نداشته باشند.
مهمترین ابزارهای پردازش کلان داده
یکی از مهمترین مدلهای ذخیرهسازی توزیع شده دادهها و پردازش کلان داده، که به وفور توسط دانشمندان علم داده مورد استفاده قرار میگیرد، مدل برنامهنویسی به نام «نگاشت-کاهش» (MapReduce) است. مدل MapReduce، روشی بهینه برای مدیریت و پردازش کلان داده محسوب میشود که به دانشمندان علم داده و توسعهدهنده برنامه کاربردی اجازه میدهد تا ابتدا داده را با استفاده از یک «صفت» (Attribute)، فیلتر یا دستهبندی خاص نگاشت کنند و سپس، با استفاده از یک مکانیزم «تبدیل» (Transformation) یا «تجمیع» (Aggregation)، دادههای نگاشت شده را کاهش دهند.
به عنوان نمونه، در صورتی که دادههای جمعاوری شده مرتبط با مجموعهای از گربهها باشند، ابتدا گربهها با استفاده از صفتی نظیر رنگ آنها نگاشت میشوند. در مرحله بعد، دادهها، از طریق جمع کردن گروهبندیهای انجام شده، کاهش پیدا میکنند. در پایان فرایند MapReduce، یک لیست یا مجموعهای مرتبط با رنگ گربهها و تعداد گربههای موجود در هر کدام از گروهبندیهای رنگی در اختیار برنامهنویس یا دانشمند داده قرار میدهد.
تقریبا تمامی کتابخانههای برنامهنویسی توسعه داده شده برای مقاصد پردازش کلان داده و علم داده، از مدل برنامهنویسی MapReduce پشتیبانی میکنند. همچنین، تعداد زیادی کتابخانه برنامهنویسی وجود دارند که امکان مدیریت، پردازش و انجام عملیات «نگاشت-کاهش» (MapReduce) دادهها را به شکل توزیع شده (خوشه یا مجموعهای از کامپیوترهای ) در اختیار برنامهنویسان و فعالان حوزه علم داده قرار میدهند. افرادی که به دنبال استفاده از مدلهای MapReduce، جهت مدیریت و پردازش کلان داده هستند، به راحتی میتوانند از ابزارها، بستهها و کتابخانههای توسعه داده شده در زبان پایتون، جهت پیادهسازی کاربردهای مرتبط با پردازش کلان داده استفاده کنند.
کتابخانه Hadoop
یکی از محبوبترین کتابخانهها برای انجام عملیات «نگاشت-کاهش» (MapReduce) روی دادههای حجیم، کتابخانه هدوپ است که توسط بنیاد نرمافزاری Apache طراحی شده است. کتابخانه هدوپ با استفاده از «محاسبات خوشهای» (Cluster Computing)، به فعالان حوزه علم داده اجازه میدهد تا بتوانند تا با سرعت و اثرگذاری به مراتب بیشتری، وظایف مرتبط با مدیریت و پردازش کلان داده را انجام دهند. کتابخانهها و بستههای زیادی در زبان پایتون توسعه داده شدهاند که از طریق آنها میتوان دادهها (که به آنها Jobs نیز گفته میشود) را به هدوپ ارسال کرد. انتخاب یکی از این کتابخانهها و بستههای برنامهنویسی جهت مقاصد پردازش کلان داده و علم داده، به عواملی نظیر سادگی استفاده، زیرساختهای لازم برای پیادهسازی و کاربرد خاصی که قرار است در آن مورد استفاده قرار بگیرند، بستگی دارد.
کتابخانه Spark
در صورتی که در کاربرد پردازش کلان داده مورد نظر برنامهنویس و یا دانشمند علم داده، استفاده از دادههایی که در قالب «دادههای جریانی» (Streaming Data) هستند (نظیر دادههای «بلادرنگ» (Real-Time)، دادههای Log و دادههای جمعآوری شده از «واسطهای برنامهنویسی کاربردی» (Application Programming Interface))، عملکرد به مراتب بهتری را برای سیستم به ارمغان میآورند، استفاده از کتابخانه Spark بهترین گزینه خواهد بود. کتابخانه Spark، نظیر کتابخانه هدوپ، توسط بنیاد نرمافزاری Apache توسعه داده شده است.
پایتون و پردازش کلان داده
سؤالی که ممکن در اینجا ذهن خوانندگان و مخاطبان این مطلب را به خود مشغول کند این است که زبان پایتون، جهت پردازش کلان داده و تحلیل آن، چه امکاناتی را در اختیار برنامهنویس یا «دانشمند علم داده» (Data Scientist) قرار میدهد. در چند سال اخیر، پایتون به عنوان یکی از مهمترین ابزارهای دانشمندان علم داده برای برنامهنویسی پروژههای علم داده مطرح شده است؛ تحلیل و پردازش کلان داده هم از این قاعده مستثنی نیست.
اگرچه، بسیاری از دانشمندان علم داده و برنامهنویسان فعال در این حوزه، از زبانهای برنامهنویسی نظیر جاوا، R و Julia و ابزارهایی مانند SPSS و SAS برای تحلیل داده و استخراج «بینش» (Insight) از آنها استفاده میکنند، با این حال، محبوبیت روزافزون زبان پایتون، «کتابخانههای برنامهنویسی» (Libraries) متنوع و پرکاربرد و جامعه برنامهنویسی پویا و فعال آن، سبب شده است تا بسیاری از دانشمندان علم داده، از زبان پایتون برای تحلیل دادهها و پردازش کلان داده استفاده کنند. افزایش روزافزون محبوبیت پایتون برای مقاصد علم داده را میتوان در رشد چشمگیر کتابخانهها، ابزارها و بستههای برنامهنویسی در حوزه علم داده مشاهده کرد.
زبان پایتون، در چند سال اخیر، همواره به عنوان یکی از محبوبترین زبانهای برنامهنویسی شناخته شده است. بسیاری از فرصتهای شغلی لیست شده برای زبان پایتون نیز مرتبط با حوزه علم داده، تحلیل داده، پردازش کلان داده و «هوش مصنوعی» (Artificial Intelligence) هستند. به همین دلیل است که بسیاری از «مهندسان داده» (Data Engineers) به استفاده از پایتون و تسلط بر این زبان برنامهنویسی روی میآورند.
یکی از مهمترین ابزارهای پردازش کلان داده برای دانشمندان داده، کتابخانهای به نام «هدوپ» (Hadoop) است. ابزارهای نرمافزاری منبع باز هدوپ، در ابتدا توسط «بنیاد نرمافزاری آپاچی» (Apache Software Foundation) و با هدف ارائه بستر نرمافزاری لازم برای محاسبات توزیع شده و پردازش کلان داده، در زبان برنامهنویسی جاوا تهیه شدهاند. این دسته از ابزارهای نرمافزاری به برنامهنویسان و توسعهدهندگان اجازه میدهند تا با استفاده از شبکهای متشکل از چندین کامپیوتر (به شکل «توزیع شده» (Distributed)) اقدام به حل مسائلی کنند که حجم عظیمی از داده و محاسبات را شامل میشوند. جهت پردازش کلان داده و تحلیل آنها، هدوپ، چارچوبی نرمافزاری برای ذخیرهسازی توزیع شده و پردازش کلان داده معرفی میکند؛ این چارچوب نرمافزاری بر پایه مدل برنامهنویسی «نگاشت-کاهش» (MapReduce) پیادهسازی شده است.
دو دلیل عمده برای محبوبیت بیش از پیش زبان پایتون، جهت برنامهنویسی علم داده و پردازش کلان داده، وجود دارد:
- کتابخانهها، ابزارها و بستههای برنامهنویسی بسیار قدرتمند و پرکاربردی برای پردازش، ذخیرهسازی، نمایش، شکل دادن و مصورسازی دادهها در زبان برنامهنویسی پایتون ارائه شده است (در ادامه، این ابزارها شرح داده خواهند شد).
- پس از افزایش روزافزون محبوبیت پایتون و روی آوردن تعداد زیادی از برنامهنویسان و توسعهدهندگان حوزه علم داده به این زبان برنامهنویسی، بنیاد نرمافزاری آپاچی به سرعت امکان استفاده از «اکوسیستم» (Ecosystem) هدوپ را برای دانشمندان علم داده و برنامهنویسان این حوزه فراهم آورد. هماکنون برنامهنویسان زبان پایتون و دانشمندان علم داده مسلط به این زبان میتوانند از ابزارهایی نظیر « استریمینگ هدوپ» (Hadoop Streaming)، افزونه Hadoopy، کتابخانه Pydoop و بسته Pydoop اشاره کرد.
استفاده از هدوپ (Hadoop) در پایتون به جای جاوا
یکی از سؤالاتی که ممکن است هنگام استفاده از هدوپ در پایتون (جهت پردازش کلان داده و تحلیل آنها) برای فعالان حوزه علم داده پدید بیاید این است که چرا لازم است از پایتون به جای جاوا، برای پردازش کلان داده و تحلیل آن استفاده شود. در پاسخ به این سؤال باید گفت که اگرچه زبان جاوا هنوز هم یکی از محبوبترین و پرکاربردترین زبانهای برنامهنویسی در جهان و زبان اصلی توسعه چارچوب هدوپ (و Spark) محسوب میشود، با این حال، میان برخی از ویژگیهای ساختاری این دو زبان تفاوتهایی وجود دارد که ممکن است سبب روی آوردن فعالان این حوزه به استفاده از پایتون شود. از جمله این تفاوتها میتوان به موارد زیر اشاره کرد:
- نصب، پیادهسازی و اجرای محیط برنامهنویسی جاوا (Java Development Environment) نسبت به زبان پایتون سختتر است (تنظیم کردن تنظیمات مرتبط با «متغیرهای محیطی» (Environment Variables)، تنظیمات فایلهای XML مرتبط با وابستگیهای نرمافزاری و سایر موارد). در نقطه مقابل، برای نصب، پیادهسازی و اجرای محیط برنامهنویسی پایتون، تنها کافی است پایتون را روی سیستم نصب و از واسط خط دستوری آن جهت کد نویسی استفاده کرد.
- در محیط برنامهنویسی جاوا، ابتدا باید کدهای نوشته شده «کامپایل» (Compile) و سپس اجرا شوند. در حالی که در زبان پایتون، تنها کافی است کدها توسط برنامهنویس نوشته و سپس خط به خط در واسط خط دستوری اجرا شوند.
- تعداد خط کدهای لازم برای نوشتن یک برنامه خاص در زبان جاوا، به مراتب بیشتر از تعداد خط کدهای لازم برای توسعه همان برنامه در زبان پایتون خواهد بود. به عنوان نمونه، به مثال زیر دقت کنید. در ادامه، تعداد خط کدهای لازم برای شمارش تعداد دفعات تکرار کلمات با استفاده از زبان جاوا و پایتون نمایش داده شده است. اولین قدم جهت پردازش کلان داده در فرم دادههای متنی غیر ساخت یافته، شمارش تعداد دفعات تکرار کلمات ظاهر شده در این دادهها است:
شمارش تعداد دفعات تکرار کلمات با استفاده از زبان جاوا:
package org.myorg; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
شمارش تعداد دفعات تکرار کلمات با استفاده از زبان پایتون:
import re WORD_RE = re.compile(r"[\w']+") class MRWordFreqCount(MRJob): def mapper(self, _, line): for word in WORD_RE.findall(line): yield (word.lower(), 1) def combiner(self, word, counts): yield (word, sum(counts)) def reducer(self, word, counts): yield (word, sum(counts)) if __name__ == '__main__': MRWordFreqCount.run() #Source #https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_word_freq_count.py
همانطور که مشاهده میشود، برنامهای که نوشتن آن در زبان جاوا به 57 خط کد احتیاج است، در زبان پایتون، تنها با 15 خط کد قابل نوشتن است. این ویژگی سبب میشود تا «نگهداری» (Maintain) کدها در زبان پایتون به مراتب راحتتر از زبان جاوا باشد.
کتابخانههای مدیریت و پردازش داده در پایتون
همانطور که پیش از این نیز اشاره شد، کتابخانهها، ابزارها و بستههای برنامهنویسی بسیار قدرتمند و پرکاربردی برای پردازش، ذخیرهسازی، نمایش، شکل دادن و مصورسازی دادهها در زبان برنامهنویسی پایتون ارائه شده است. از جمله مهمترین کتابخانههای مدیریت و پردازش داده در پایتون میتوان به موارد زیر اشاره کرد:
کتابخانه Pandas
یکی از محبوبترین کتابخانههای ارائه شده برای مقاصد علم داده، کتابخانه Pandas است. کتابخانه Pandas توسط دانشمندان علم دادهای که به زبانهای برنامهنویسی R و پایتون مسلط هستند، توسعه داده شده است. هم اکنون جامعه بسیار بزرگی از برنامهنویسان و توسعهدهندگان (متشکل از دانشمندان علم داده و تحلیلگران داده) از کتابخانه Pandas و فرایند توسعه آن (برطرف کردن مشکلات احتمالی و اضافه کردن قابلیتهای برنامهنویسی جدید) پشتیبانی میکنند.
کتابخانه Pandas ویژگیهای تعبیه شده بسیار مهم و پرکاربردی نظیر قابلیت خواندن دادهها از منابع مختلف، ساختن «دیتا فریم» (Dataframe) یا ماتریس/جدول متناظر با دادههای خوانده شده و محاسبه «تجزیه و تحلیل تجمیعی» (Aggregate Analytics) بر اساس نوع برنامه کاربردی در حال توسعه را برای برنامهنویسان و توسعهدهندگان فراهم میآورد.
علاوه بر امکانات تعبیه شده برای مدیریت و پردازش داده، کتابخانه Pandas از امکانات «مصورسازی داده» (Data Visualization) بسیار مناسبی نیز بهره میبرد. با استفاده از قابلیتهای مصورسازی تعبیه شده در این کتابخانه، امکان تولید گراف و نمودار از نتایج تولید شده برای برنامهنویسان و دانشمندان علم داده فراهم شده است. همچنین، مجموعهای از توابع داخلی در کتابخانه Pandas تعریف شدهاند که امکان «صادر کردن» (Export) تجزیه و تحلیلهای انجام شده روی دادهها را در قالب فایل «صفحات گسترده اکسل» (Excel Spreadsheet) فراهم میآورند.
کتابخانه Agate
یکی از کتابخانههای برنامهنویسی نوظهور برای مقاصد علم داده، کتابخانه Agate نام دارد. کتابخانه Agate با هدف استفاده در کاربردهای «روزنامهنگاری» (Journalism) پیادهسازی شده است و ویژگیهای بسیار مناسبی جهت تحلیل دادهها در اختیار برنامهنویسان و توسعهدهندگان قرار میدهد. از این کتابخانه میتوان جهت تحلیل و مقایسه فایلهای صفحه گسترده (Spreadsheet)، انجام محاسبات آماری رو دادهها و سایر موارد استفاده کرد.
یادگیری کار کردن با کتابخانه Agate بسیار ساده است و وابستگیهای به مراتب کمتری نسبت به Pandas دارد. همچنین، ویژگیهای جالبی نظیر مصورسازی دادهها و تولید نمودار، این کتابخانه را به ابزاری محبوب برای مقاصد تحلیل داده تبدیل کرده است.
کتابخانه Bokeh
در صورتی که تمایل به مصورسازی دادههای موجود در یک مجموعه داده ساخت یافته دارید، Bokeh ابزار بسیار مناسبی محسوب میشود. این ابزار را میتوان با کتابخانههای تحلیل داده نظیر Pandas ،Agate و سایر موارد مورد استفاده قرار داد. این ابزار، به برنامهنویسان و توسعهدهندگان اجازه میدهد تا با کمترین کد نویسی، نمودارهای گرافیکی و مصورسازیهای خیره کنندهای از دادهها تولید کنند.
کاوش، پردازش و تحلیل دادهها در پایتون با استفاده از کتابخانه Pandas
پیش از این که با ابزارهای پردازش کلان داده در پایتون آشنا شویم، با یک مثال ساده، نحوه تحلیل دادهها در پایتون مورد بررسی قرار میگیرد. پس از جستجو در سایت Kaggle، مجموعه داده حقوق کارکنان دولتی شهر سانفرانسیسکو برای این کار انتخاب شد. این مجموعه داده از طریق لینک [+] قابل دسترسی است.
ابتدا با استفاده از قطعه کد زیر دادهها وارد (Import) و در یک دیتا فریم Pandas خوانده میشوند.
import pandas as pd salaries = pd.read_csv('Salaries.csv') salaries.head()
سپس، با استفاده از تابع describe، توزیع دادههای موجود در مجموعه داده نمایش داده میشود.
salaries.head()
دادههای جمعآوری شده، مربوط به سالهای متعددی است. برای محدودتر کردن حوزه تحلیلی انجام شده از دادهها، تنها دادههای مربوط به سال 2014 مورد بررسی قرار گرفته میشوند.
latest_salaries = salaries[salaries['Year'] == 2014] latest_salaries.describe()
سپس، در این مرحله از فرایند تحلیل داده، مقدار درصد متوسطی از حقوق سالیانه که کارمندان دولتی برای پرداخت اجاره خانه استفاده میکنند، بررسی میشود.
average_yearly_rent = 3880 * 12 average_city_pay = latest_salaries['TotalPay'].mean() average_yearly_rent / average_city_pay
0.61698360153365217
بنابراین، همانطور که در خروجی نمایش داده شده مشخص است، کارمندان دولتی در سانفرانسیسکو، به طور متوسط چیزی حدود 60 درصد از حقوق خود را صرف پرداخت اجاره بها میکنند. در مرحله بعد، تعداد کارمندانی که حقوق آنها از متوسط اجاره بهای منزل در این شهر کمتر است مشخص میشود.
latest_salaries[latest_salaries['TotalPay'] < average_yearly_rent].shape[0]
11360
همانطور که در خروجی نمایش داده شده مشخص است، بیش از 11 هزار کارمند دولتی در سانفرانسیسکو، درآمد کمتر از متوسط اجاره بهای منزل دارند. در مرحله بعدی، تعداد کارمندانی که باید اضافه کار داشته باشند تا بتوانند از حقوق لازم برای پرداخت اجاره بها برخوردار شوند، نمایش داده میشود. برای اینکار، تمامی کسانی که بیش از 70 درصد حقوق خود را برای اجاره بها پرداخت میکنند، به عنوان افرادی در نظر گرفته میشوند که بدون اضافه کاری قادر به پرداخت اجاره بها نیستند.
pd.to_numeric(latest_salaries['BasePay'], errors='coerce').describe()
count 38119.000000 mean 66564.421924 std 44053.783972 min 0.000000 25% NaN 50% NaN 75% NaN max 318835.490000 Name: BasePay, dtype: float64
base_pay_series = pd.to_numeric(latest_salaries['BasePay'], errors='coerce') base_pay_series[base_pay_series * .7 < average_yearly_rent].shape
20074
بنابراین، بیش از 20 هزار کارمند دولتی بدون اضافه کاری قادر به پرداخت اجاره بها و هزینه زندگی در شهر سانفرانسیکو نخواهند بود. در مرحله بعد، با استفاده از تحلیل دادههای موجود، تعداد کارمندانی مشخص میشود که باید بیش از 1000 دلار در سال اضافه کاری داشته باشند تا بتوانند به زندگی خود در این شهر ادامه دهند.
overtime_series = pd.to_numeric(latest_salaries['OvertimePay'], errors='coerce') overtime_series.describe()
count 38119.000000 mean 5401.993737 std 11769.656257 min 0.000000 25% NaN 50% NaN 75% NaN max 173547.730000 Name: OvertimePay, dtype: float64
overtime_series[overtime_series > 1000].shape
15361
بنابراین، همانطور که در خروجی نمایش داده شده مشخص است، بیش از 15 هزار کارمند دولتی، با اضافه کاری بیش از 1000 دلار در سال، قادر به زندگی در این شهر پرخرج هستند. این دسته از تحلیلهای انجام شده روی مجموعه داده انتخابی، تنها بخشی از کارهای قابل انجام توسط کتابخانههای پردازش و تحلیل داده در پایتون هستند. بنابراین، همانطور که مشاهده میشود، پایتون یکی از بهترین و سادهترین زبانها برای تحلیل داده به شمار میآید و امکانات کاملی برای کاوش، پردازش و تحلیل داده در اختیار برنامهنویسان و دانشمندان علم داده قرار میدهد.
کتابخانههای پردازش کلان داده در پایتون
زبان برنامهنویسی پایتون و جامعه برنامهنویسی فعال آن، ابزارها و کتابخانههای متنوعی برای پردازش کلان داده در اختیار برنامهنویسان و دانشمندان علم داده قرار داده است. این ابزارها، مبتنی بر کتابخانه هدوپ و Spark هستند. از جمله، مهمترین ابزارهای پردازش کلان داده در پایتون میتواند به موارد زیر اشاره کرد:
کتابخانه PySpark
کتابخانه PySpark که واسط برنامهنویسی کاربردی Spark در زبان پایتون محسوب میشود، به دانشمندان علم داده و برنامهنویسان فعال در این حوزه اجازه میدهد تا به سرعت، زیرساختهای لازم برای نگاشت و کاهش مجموعههای داده را پیادهسازی کنند. همچنین، از آنجایی که در کتابخانه Spark مجموعهای از الگوریتمهای یادگیری ماشین تعبیه شده است، علاوه بر پردازش کلان داده و مدیریت آنها، جهت حل مسائل یادگیری ماشین نیز مورد استفاده قرار میگیرد.
ابزار جریانسازی یا Streaming
ابزار جریانسازی هدوپ یا Hadoop Streaming، یکی از مجبوبترین روشهای استفاده از Hadoop در پایتون محسوب میشود. جریانسازی یا Streaming، یکی از ویژگیهای تعبیه شده در کتابخانه Hadoop است. این ویژگی به برنامهنویس اجازه میدهد تا کدهای نوشته به زبان پایتون (و یا دیگر زبانهای برنامهنویسی) را جهت انجام عملیات نگاشت (Mapping) به تابع stdin پاس بدهند (به عنوان آرگومان، به این تابع ارسال کنند). به عبارت دیگر، توسعهدهندگان از جاوا به عنوان Wrapper استفاده میکنند تا بتوانند کدهای پایتون را به تابع stdin پاس بدهند. سپس، در زمان اجرا، عملیات نگاشت (Mapping) در زبان جاوا و توسط کتابخانه Hadoop انجام میشود.
افزونه Hadoopy
این ابزار، افزونهای برای جریانسازی هدوپ یا Hadoop Streaming محسوب میشود و از کتابخانه Cython برای انجام عملیات نگاشت-کاهش (MapReduce) در پایتون استفاده میکند. مستندسازی خوبی برای این ابزار انجام شده است و کار کردن با آن برای توسعهدهندگان راحت است. با این حال، این پروژه از پایان سال 2012 تا به امروز غیر فعال باقی مانده و بهروزرسانی جدیدی دریافت نکرده است.
بسته Pydoop
این بسته برنامهنویسی به توسعهدهندگان اجازه میدهد تا بتوانند برنامه مرتبط با پردازش کلان داده را در پایتون کد نویسی کنند. سپس، با استفاده از کدهای پیادهسازی شده، به طور مستقیم با دادههای ذخیره شده در خوشه هدوپ (Hadoop Cluster) ارتباط برقرار کرده و عملیات نگاشت-کاهش (MapReduce) انجام دهند. چنین کاری از طریق واسط برنامهنویسی کاربردی HDFS (یا HDFS API) تعبیه شده در بسته Pydoop امکانپذیر شده است. واسط برنامهنویسی کاربردی HDFS به برنامهنویسان اجازه میدهد تا در محیط پایتون، عملیات خواندن و نوشتن دادهها، از معماری فایل سیستم HDFS را انجام دهند.
کتابخانه MRJob
یک کتابخانه برنامهنویسی در زبان پایتون است که امکان انجام عملیات نگاشت-کاهش (MapReduce) و پردازش کلان داده را در اختیار برنامهنویسان و دانشمندان علم داده قرار میدهد. این کتابخانه نرمافزاری، یکی از پراستفادهترین بستههای برنامهنویسی جهت پردازش کلان داده در زبان پایتون محسوب میشود و توسط شرکت Yelp در اختیار برنامهنویسان زبان پایتون قرار گرفته است.
کتابخانه MRJob، از هدوپ، سرویس Cloud Dataproc گوگل و سرویس Elastic MapReduce یا EMR شرکت آمازون نیز پشتیبانی میکند. سرویس EMR، یک وب سرویس ارائه شده توسط شرکت آمازون است که امکان تحلیل و پردازش کلان داده با استفاده از هدوپ و Spark را در اختیار فعالان حوزه علم داده قرار میدهد. مستندسازی خوبی برای این کتابخانه انجام شده است و کار کردن با آن برای توسعهدهندگان و برنامهنویسان راحت است. همچنین، این پروژه کاملا فعال است و از پیادهسازیهای مختلف آن پشتیبانی میشود.
اجرای کدهای پایتون در هدوپ (Hadoop) با استفاده از کتابخانه MRJob
در این مثال، یک برنامه ساده برای محاسبه دفعات ظاهر شدن تمامی کلمات در متن، در زبان پایتون کد نویسی خواهد شد. سپس، با استفاده از هدوپ (Hadoop)، این قابلیت پدید میآید تا هر نوع داده متنی (با هر اندازه ممکن و در هر قالبی)، مورد تجزیه و تحلیل قرار بگیرد و تعداد دفعات ظاهر شدن تمامی کلمات موجود در آن محاسبه شود. برای چنین کاری، باید یک خوشه هدوپ در سیستم اجرایی باشد. برای اجرای کدهای پایتون در هدوپ (Hadoop) با استفاده از کتابخانه MRJob، لازم است مراحل زیر دنبال شود:
- ابتدا با استفاده از Pip، کتابخانه MRJob در پایتون نصب میشود.
pip install mrjob
- برنامه شمارش کلمات در متن: برای چنین کاری، در یک برنامه دلخواه ویرایشگر کد، از کدهای زیر استفاده کنید (یک فایل خالی و کدهای زی را در آن کپی کنید). سپس، فایل مورد نظر را با نام word-count.py ذخیره کنید.
کدهای لازم محاسبه دفعات ظاهر شدن تمامی کلمات در متن با استفاده از MRJob (ابزار Hadoop):
# Copyright 2009-2010 Yelp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """The classic MapReduce job: count the frequency of words. """ from mrjob.job import MRJob import re WORD_RE = re.compile(r"[\w']+") class MRWordFreqCount(MRJob): def mapper(self, _, line): for word in WORD_RE.findall(line): yield (word.lower(), 1) def combiner(self, word, counts): yield (word, sum(counts)) def reducer(self, word, counts): yield (word, sum(counts)) if __name__ == '__main__': MRWordFreqCount.run()
- اجرای کدهای پایتون در هدوپ (Hadoop) با استفاده از کتابخانه MRJob: پس از اینکه کد نویسی برنامه شمارش تعداد دفعات ظاهر شدن کلمات در متن به پایان رسید، در مرحله بعد لازم است تا کدهای پایتون برای هدوپ (hadoop) ارسال و اجرا شوند. برای این کار از کد دستوری زیر استفاده میشود:
$ python word_count.py [file location] -r hadoop > counts
کدهای کامل پیادهسازی سیستم پردازش دادههای متنی با استفاده از کتابخانه MRJob، در لینک [+] موجود است (در این پروژه، از تکنیکهای پردازش کلان داده، مدل برنامهنویسی MapReduce و کتابخانههای Spark و Hadoop استفاده شده است). در ادامه، تعدادی مثال مرتبط با پردازش دادههای متنی با استفاده از کتابخانه MRJob نمایش داده شده است. ذکر این نکته ضروری است که برای اطمینان از صحت عملکرد کدهای ارائه شده، تمامی ماژولهای تعریف شده در در لینک [+]، باید توسط برنامه کاربردی قابل دسترس باشند.
محاسبه دفعات ظاهر شدن تمامی کلماتی که حاوی حرف u هستند، با استفاده از MRJob (ابزار Hadoop):
# Copyright 2016 Yelp # Copyright 2017 Yelp and Contributors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """The classic MapReduce job, but only for words containing "u" """ from mrjob.job import MRJob import re WORD_RE = re.compile(r"[\w']*u[\w']*", re.I) class MRVWordFreqCount(MRJob): def mapper_pre_filter(self): return 'grep -i u' def mapper(self, _, line): for word in WORD_RE.findall(line): yield (word.lower(), 1) def combiner(self, word, counts): yield (word, sum(counts)) def reducer(self, word, counts): yield (word, sum(counts)) if __name__ == '__main__': MRVWordFreqCount.run()
محاسبه تعداد کلمات موجود در یک متن با استفاده از Pyspark:
# Copyright 2016 Yelp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import re import sys from operator import add from pyspark import SparkContext WORD_RE = re.compile(r"[\w']+") def main(): # read in input, output path args = sys.argv[1:] if len(args) != 2: raise ValueError inputPath, outputPath = args sc = SparkContext(appName='mrjob Spark wordcount script') lines = sc.textFile(inputPath) # lines.flatMap(WORD_RE.findall) doesn't work on Spark 1.6.2; apparently # it can't serialize instance methods? counts = ( lines.flatMap(lambda line: WORD_RE.findall(line)) .map(lambda word: (word, 1)) .reduceByKey(add)) counts.saveAsTextFile(outputPath) sc.stop() if __name__ == '__main__': main()
مشخص کردن کلمات پر استفاده در یک داده متنی ورودی با استفاده از MRJob (ابزار Hadoop):
# Copyright 2009-2010 Yelp # Copyright 2013 David Marin # Copyright 2018 Yelp # Copyright 2019 Yelp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Determine the most used word in the input, ignoring common "stop" words. Shows how to do a multi-step job, and how to load a support file from the same directory. """ import re from mrjob.job import MRJob from mrjob.protocol import JSONValueProtocol from mrjob.step import MRStep WORD_RE = re.compile(r"[\w']+") class MRMostUsedWord(MRJob): FILES = ['stop_words.txt'] OUTPUT_PROTOCOL = JSONValueProtocol def configure_args(self): super(MRMostUsedWord, self).configure_args() # allow for alternate stop words file self.add_file_arg( '--stop-words-file', dest='stop_words_file', default=None, help='alternate stop words file. lowercase words, one per line', ) def mapper_init(self): stop_words_path = self.options.stop_words_file or 'stop_words.txt' with open(stop_words_path) as f: self.stop_words = set(line.strip() for line in f) def mapper_get_words(self, _, line): # yield each word in the line for word in WORD_RE.findall(line): word = word.lower() if word not in self.stop_words: yield (word, 1) def combiner_count_words(self, word, counts): # sum the words we've seen so far yield (word, sum(counts)) def reducer_count_words(self, word, counts): # send all (num_occurrences, word) pairs to the same reducer. # num_occurrences is so we can easily use Python's max() function. yield None, (sum(counts), word) # discard the key; it is just None def reducer_find_max_word(self, _, word_count_pairs): # each item of word_count_pairs is (count, word), # so yielding one results in key=counts, value=word try: yield max(word_count_pairs) except ValueError: pass def steps(self): return [ MRStep(mapper_init=self.mapper_init, mapper=self.mapper_get_words, combiner=self.combiner_count_words, reducer=self.reducer_count_words), MRStep(reducer=self.reducer_find_max_word) ] if __name__ == '__main__': MRMostUsedWord.run()
مشخص کردن کلمات پر استفاده در یک داده متنی ورودی با استفاده از MRJob (ابزار spark):
# Copyright 2019 Yelp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Like mr_most_used_word.py, but on Spark. To make this work on the local[*] master, pass in your own --stop-words-file (it won't be able to see stop_words.txt because --files doesn't work on local[*] master) """ import json import re from operator import add from mrjob.job import MRJob WORD_RE = re.compile(r"[\w']+") class MRSparkMostUsedWord(MRJob): FILES = ['stop_words.txt'] def configure_args(self): super(MRSparkMostUsedWord, self).configure_args() # allow for alternate stop words file self.add_file_arg( '--stop-words-file', dest='stop_words_file', default=None, help='alternate stop words file. lowercase words, one per line', ) def spark(self, input_path, output_path): from pyspark import SparkContext sc = SparkContext() lines = sc.textFile(input_path) # do a word frequency count words_and_ones = lines.mapPartitions(self.get_words) word_counts = words_and_ones.reduceByKey(add) # pick pair with highest count (now in count, word format) max_count, word = word_counts.map(lambda w_c: (w_c[1], w_c[0])).max() # output our word output = sc.parallelize([json.dumps(word)]) output.saveAsTextFile(output_path) sc.stop() def get_words(self, line_iterator): # this only happens once per partition stop_words = self.load_stop_words() for line in line_iterator: for word in WORD_RE.findall(line): word = word.lower() if word not in stop_words: yield (word, 1) def load_stop_words(self): # this should only be called inside executors (i.e. inside functions # passed to RDDs) stop_words_path = self.options.stop_words_file or 'stop_words.txt' with open(stop_words_path) as f: return set(line.strip() for line in f) if __name__ == '__main__': MRSparkMostUsedWord.run()
دستهبندی دادههای متنی با استفاده از MRJob (ابزار Hadoop):
# Copyright 2009-2010 Yelp # Copyright 2013 David Marin # Copyright 2017 Yelp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """A text classifier that uses a modified version of Naive Bayes that is not sensitive to document length. This is a somewhat contrived example in that it does everything in one job; generally you'd run one job to generate n-gram scores, put them in a sqlite database, and run a second job to score documents. But this is simple, and it works! This takes as its input documents encoded by encode_document() below. For each document, you specify its text, and whether it belongs or does not belong to one or more categories. You can also specify a unique ID for each document. This job outputs the documents, with the field 'cat_to_score' filled in. Generally, positive scores indicate the document is in the category, and negative scores indicate it is not, but it's up to you to determine a threshold for each category. This job also outputs scores for each ngram, so that you can classify other documents. About half of the documents are placed in a test set (based on SHA1 hash of their text), which means they will not be used to train the classifier. The 'in_test_set' of each document will be filled accordingly. You can turn this off with the --no-test-set flag. (You can also effectively put docs in the training set by specifying no category information.) Some terminology: An "ngram" is a word or phrase. "foo" is a 1-gram; "foo bar baz" is a 3-gram. "tf" refers to term frequency, that is, the number of times an ngram appears. "df" referse to document frequency, that is, the number of documents an ngram appears in at least once. """ from collections import defaultdict import hashlib import math import re from mrjob.job import MRJob from mrjob.protocol import JSONValueProtocol from mrjob.step import MRStep def encode_document(text, cats=None, id=None): """Encode a document as a JSON so that MRTextClassifier can read it. Args: text -- the text of the document (as a unicode) cats -- a dictionary mapping a category name (e.g. 'sports') to True if the document is in the category, and False if it's not. None indicates that we have no information about this documents' categories id -- a unique ID for the document (any kind of JSON-able value should work). If not specified, we'll auto-generate one. """ text = unicode(text) cats = dict((unicode(cat), bool(is_in_cat)) for cat, is_in_cat in (cats or {}).iteritems()) return JSONValueProtocol.write( None, {'text': text, 'cats': cats, 'id': id}) + '\n' def count_ngrams(text, max_ngram_size, stop_words): """Break text down into ngrams, and return a dictionary mapping (n, ngram) to number of times that ngram occurs. n: ngram size ("foo" is a 1-gram, "foo bar baz" is a 3-gram) ngram: the ngram, as a space-separated string or None to indicate the ANY ngram (basically the number of words in the document). Args: text -- text, as a unicode max_ngram_size -- maximum size of ngrams to consider stop_words -- a collection of words (in lowercase) to remove before parsing out ngrams (e.g. "the", "and") """ if not isinstance(stop_words, set): stop_words = set(stop_words) words = [word.lower() for word in WORD_RE.findall(text) if word.lower() not in stop_words] ngram_counts = defaultdict(int) for i in range(len(words)): for n in range(1, max_ngram_size + 1): if i + n <= len(words): ngram = ' '.join(words[i:i + n]) ngram_counts[(n, ngram)] += 1 # add counts for ANY ngram for n in range(1, max_ngram_size + 1): ngram_counts[(n, None)] = len(words) - n + 1 return ngram_counts WORD_RE = re.compile(r"[\w']+", re.UNICODE) DEFAULT_MAX_NGRAM_SIZE = 4 DEFAULT_STOP_WORDS = [ 'a', 'about', 'also', 'am', 'an', 'and', 'any', 'are', 'as', 'at', 'be', 'but', 'by', 'can', 'com', 'did', 'do', 'does', 'for', 'from', 'had', 'has', 'have', 'he', "he'd", "he'll", "he's", 'her', 'here', 'hers', 'him', 'his', 'i', "i'd", "i'll", "i'm", "i've", 'if', 'in', 'into', 'is', 'it', "it's", 'its', 'just', 'me', 'mine', 'my', 'of', 'on', 'or', 'org', 'our', 'ours', 'she', "she'd", "she'll", "she's", 'some', 'than', 'that', 'the', 'their', 'them', 'then', 'there', 'these', 'they', "they'd", "they'll", "they're", 'this', 'those', 'to', 'us', 'was', 'we', "we'd", "we'll", "we're", 'were', 'what', 'where', 'which', 'who', 'will', 'with', 'would', 'you', 'your', 'yours', ] class MRTextClassifier(MRJob): INPUT_PROTOCOL = JSONValueProtocol def steps(self): """Conceptually, the steps are: 1. Parse documents into ngrams 2. Group by ngram to get a frequency count for each ngram, and to exclude very rare ngrams 3. Send all ngram information to one "global" reducer so we can assign scores for each category and ngram 4. Group scores and documents by ngram and compute score for that ngram for that document. Exclude very common ngrams to save memory. 5. Average together scores for each document to get its score for each category. The documents themselves are passed through from step 1 to step 5. Ngram scoring information is passed through from step 4 to step 5. """ return [MRStep(mapper=self.parse_doc, reducer=self.count_ngram_freq), MRStep(reducer=self.score_ngrams), MRStep(reducer=self.score_documents_by_ngram), MRStep(reducer=self.score_documents)] def configure_args(self): """Add command-line options specific to this script.""" super(MRTextClassifier, self).configure_args() self.add_passthru_arg( '--min-df', dest='min_df', default=2, type=int, help=('min number of documents an n-gram must appear in for us to' ' count it. Default: %(default)s')) self.add_passthru_arg( '--max-df', dest='max_df', default=10000000, type=int, help=('max number of documents an n-gram may appear in for us to' ' count it (this keeps reducers from running out of memory).' ' Default: %(default)s')) self.add_passthru_arg( '--max-ngram-size', dest='max_ngram_size', default=DEFAULT_MAX_NGRAM_SIZE, type=int, help='maximum phrase length to consider') self.add_passthru_arg( '--stop-words', dest='stop_words', default=', '.join(DEFAULT_STOP_WORDS), help=("comma-separated list of words to ignore. For example, " "--stop-words 'in, the' would cause 'hole in the wall' to be" " parsed as ['hole', 'wall']. Default: %(default)s")) self.add_passthru_arg( '--short-doc-threshold', dest='short_doc_threshold', type=int, default=None, help=('Normally, for each n-gram size, we take the average score' ' over all n-grams that appear. This allows us to penalize' ' short documents by using this threshold as the denominator' ' rather than the actual number of n-grams.')) self.add_passthru_arg( '--no-test-set', dest='no_test_set', action='store_true', default=False, help=("Choose about half of the documents to be the testing set" " (don't use them to train the classifier) based on a SHA1" " hash of their text")) def load_options(self, args): """Parse stop_words option.""" super(MRTextClassifier, self).load_options(args) self.stop_words = set() if self.options.stop_words: self.stop_words.update( s.strip().lower() for s in self.options.stop_words.split(',')) def parse_doc(self, _, doc): """Mapper: parse documents and emit ngram information. Input: JSON-encoded documents (see :py:func:`encode_document`) Output: ``('ngram', (n, ngram)), (count, cats)`` OR ``('doc', doc_id), doc`` n: ngram length ngram: ngram encoded encoded as a string (e.g. "pad thai") or None to indicate ANY ngram. count: # of times an ngram appears in the document cats: a map from category name to a boolean indicating whether it's this document is in the category doc_id: (hopefully) unique document ID doc: the encoded document. We'll fill these fields: ngram_counts: map from (n, ngram) to # of times ngram appears in the document, using (n, None) to represent the total number of times ANY ngram of that size appears (essentially number of words) in_test_set: boolean indicating if this doc is in the test set id: SHA1 hash of doc text (if not already filled) """ # only compute doc hash if we need it if doc.get('id') is not None and self.options.no_test_set: doc_hash = '0' # don't need doc hash else: doc_hash = hashlib.sha1(doc['text'].encode('utf-8')).hexdigest() # fill in ID if missing if doc.get('id') is None: doc['id'] = doc_hash # pick test/training docs if self.options.no_test_set: doc['in_test_set'] = False else: doc['in_test_set'] = bool(int(doc_hash[-1], 16) % 2) # map from (n, ngram) to number of times it appears ngram_counts = count_ngrams( doc['text'], self.options.max_ngram_size, self.stop_words) # yield the number of times the ngram appears in this doc # and the categories for this document, so we can train the classifier if not doc['in_test_set']: for (n, ngram), count in ngram_counts.iteritems(): yield ('ngram', (n, ngram)), (count, doc['cats']) # yield the document itself, for safekeeping doc['ngram_counts'] = ngram_counts.items() yield ('doc', doc['id']), doc def count_ngram_freq(self, type_and_key, values): """Reducer: Combine information about how many times each ngram appears for docs in/not in each category. Dump ngrams that appear in very few documents (according to --min-df switch). If two documents have the same ID, increment a counter and only keep one; otherwise pass docs through unchanged. Input (see parse_doc() for details): ('ngram', (n, ngram)), (count, cats) OR ('doc', doc_id), doc Output: ('global', None), ((n, ngram), (cat_to_df, cat_to_tf)) OR ('doc', doc_id), doc n: ngram length ngram: ngram encoded encoded as a string (e.g. "pad thai") or None to indicate ANY ngram. cat_to_df: list of tuples of ((cat_name, is_in_category), df); df is # of documents of this type that the ngram appears in cat_to_tf: list of tuples of ((cat_name, is_in_category), df); tf is # of time the ngram appears in docs of this type doc_id: unique document ID doc: the encoded document """ key_type, key = type_and_key # pass documents through if key_type == 'doc': doc_id = key docs = list(values) # if two documents end up with the same key, only keep one if len(docs) > 1: self.increment_counter( 'Document key collision', str(doc_id)) yield ('doc', doc_id), docs[0] return assert key_type == 'ngram' n, ngram = key # total # of docs this ngram appears in total_df = 0 # map from (cat, is_in_cat) to # number of documents in this cat it appears in (df), or # number of times it appears in documents of this type (tf) cat_to_df = defaultdict(int) cat_to_tf = defaultdict(int) for count, cats in values: total_df += 1 for cat in cats.iteritems(): cat_to_df[cat] += 1 cat_to_tf[cat] += count # don't bother with very rare ngrams if total_df < self.options.min_df: return yield (('global', None), ((n, ngram), (cat_to_df.items(), cat_to_tf.items()))) def score_ngrams(self, type_and_key, values): """Reducer: Look at all ngrams together, and assign scores by ngram and category. Also farm out documents to the reducer for any ngram they contain, and pass documents through to the next step. To score an ngram for a category, we compare the probability of any given ngram being our ngram for documents in the category against documents not in the category. The score is just the log of the ratio of probabilities (the "log difference") Input (see count_ngram_freq() for details): ('global', None), ((n, ngram), (cat_to_df, cat_to_tf)) OR ('doc', doc_id), doc Output: ('doc', doc_id), document OR ('ngram', (n, ngram)), ('doc_id', doc_id) OR ('ngram', (n, ngram)), ('cat_to_score', cat_to_score) n: ngram length ngram: ngram encoded encoded as a string (e.g. "pad thai") or None to indicate ANY ngram. cat_to_score: map from (cat_name, is_in_category) to score for this ngram doc_id: unique document ID doc: the encoded document """ key_type, key = type_and_key if key_type == 'doc': doc_id = key doc = list(values)[0] # pass document through yield ('doc', doc_id), doc # send document to reducer for every ngram it contains for (n, ngram), count in doc['ngram_counts']: # don't bother even creating a reducer for the ANY ngram # because we'd have to send all documents to it. if ngram is None: continue yield (('ngram', (n, ngram)), ('doc_id', doc_id)) return assert key_type == 'global' ngram_to_info = dict( ((n, ngram), (dict((tuple(cat), df) for cat, df in cat_to_df), dict((tuple(cat), tf) for cat, tf in cat_to_tf))) for (n, ngram), (cat_to_df, cat_to_tf) in values) # m = # of possible ngrams of any given type. This is not a very # rigorous estimate, but it's good enough m = len(ngram_to_info) for (n, ngram), info in ngram_to_info.iteritems(): # do this even for the special ANY ngram; it's useful # as a normalization factor. cat_to_df, cat_to_tf = info # get the total # of documents and terms for ngrams of this size cat_to_d, cat_to_t = ngram_to_info[(n, None)] # calculate the probability of any given term being # this term for documents of each type cat_to_p = {} for cat, t in cat_to_t.iteritems(): tf = cat_to_tf.get(cat) or 0 # use Laplace's rule of succession to estimate p. See: # http://en.wikipedia.org/wiki/Rule_of_succession#Generalization_to_any_number_of_possibilities cat_to_p[cat] = (tf + (2.0 / m)) / (t + 2) cats = set(cat for cat, in_cat in cat_to_t) cat_to_score = {} for cat in cats: p_if_in = cat_to_p.get((cat, True), 1.0 / m) p_if_out = cat_to_p.get((cat, False), 1.0 / m) # take the log difference of probabilities score = math.log(p_if_in) - math.log(p_if_out) cat_to_score[cat] = score yield (('ngram', (n, ngram)), ('cat_to_score', cat_to_score)) def score_documents_by_ngram(self, type_and_key, types_and_values): """Reducer: For all documents that contain a given ngram, send scoring info to that document. Also pass documents and scoring info through as-is Input (see score_ngrams() for details): ('doc', doc_id), doc OR ('ngram', (n, ngram)), ('doc_id', doc_id) OR ('ngram', (n, ngram)), ('cat_to_score', cat_to_score) Output: ('doc', doc_id), ('doc', doc) ('doc', doc_id), ('scores', ((n, ngram), cat_to_score)) ('cat_to_score', (n, ngram)), cat_to_score n: ngram length ngram: ngram encoded encoded as a string (e.g. "pad thai") or None to indicate ANY ngram. cat_to_score: map from (cat_name, is_in_category) to score for this ngram doc_id: unique document ID doc: the encoded document """ key_type, key = type_and_key # pass documents through if key_type == 'doc': doc_id = key doc = list(types_and_values)[0] yield ('doc', doc_id), ('doc', doc) return assert key_type == 'ngram' n, ngram = key doc_ids = [] cat_to_score = None for value_type, value in types_and_values: if value_type == 'cat_to_score': cat_to_score = value continue assert value_type == 'doc_id' doc_ids.append(value) if len(doc_ids) > self.options.max_df: self.increment_counter('Exceeded max df', repr((n, ngram))) return # skip ngrams that are too rare to score if cat_to_score is None: return # send score info for this ngram to this document for doc_id in doc_ids: yield ('doc', doc_id), ('scores', ((n, ngram), cat_to_score)) # keep scoring info yield ('cat_to_score', (n, ngram)), cat_to_score def score_documents(self, type_and_key, types_and_values): """Reducer: combine all scoring information for each document, and add it to the document. Also pass ngram scores through as-is. To score a document, we essentially take a weighted average of all the scores for ngrams of each size, and then sum together those averages. ngrams that aren't scored (because they're very rare or very common) are considered to have a score of zero. Using averages allows us to be insensitive to document size. There is a penalty for very small documents. Input (see score_ngrams() for details): ('doc', doc_id), ('doc', doc) ('doc', doc_id), ('scores', ((n, ngram), cat_to_score)) ('cat_to_score', (n, ngram)), cat_to_score Output: ('doc', doc_id), doc ('cat_to_score', (n, ngram)), cat_to_score n: ngram length ngram: ngram encoded encoded as a string (e.g. "pad thai") or None to indicate ANY ngram. cat_to_score: map from (cat_name, is_in_category) to score for this ngram doc_id: unique document ID doc: the encoded document. this will contain an extra field 'cat_to_score', and will no longer have the 'ngram_counts' field. """ key_type, key = type_and_key # pass through cat_to_score if key_type == 'cat_to_score': cat_to_score = list(types_and_values)[0] yield ('cat_to_score', key), cat_to_score return assert key_type == 'doc' doc_id = key # store the document and scoring info doc = None ngrams_and_scores = [] for value_type, value in types_and_values: if value_type == 'doc': doc = value continue assert value_type == 'scores' ((n, ngram), cat_to_score) = value ngrams_and_scores.append(((n, ngram), cat_to_score)) # total scores for each ngram size ngram_counts = dict(((n, ngram), count) for (n, ngram), count in doc['ngram_counts']) cat_to_n_to_total_score = defaultdict(lambda: defaultdict(float)) for (n, ngram), cat_to_score in ngrams_and_scores: tf = ngram_counts[(n, ngram)] for cat, score in cat_to_score.iteritems(): cat_to_n_to_total_score[cat][n] += score * tf # average scores for each ngram size cat_to_score = {} for cat, n_to_total_score in cat_to_n_to_total_score.iteritems(): total_score_for_cat = 0 for n, total_score in n_to_total_score.iteritems(): total_t = ngram_counts[(n, None)] total_score_for_cat += ( total_score / max(total_t, self.options.short_doc_threshold, 1)) cat_to_score[cat] = total_score_for_cat # add scores to the document, and get rid of ngram_counts doc['cat_to_score'] = cat_to_score del doc['ngram_counts'] yield ('doc', doc_id), doc if __name__ == '__main__': MRTextClassifier.run()
برنامه نگاشت شمارههای تلفن به آدرسهای URL با استفاده از از MRJob (ابزار Hadoop):
"""An example of how to parse non-line-based data. Map +1 (U.S. and Canadian) phone numbers to the most plausible URL for their webpage. This is similar to the article "Analyzing the Web for the Price of a Sandwich" (https://engineeringblog.yelp.com/2015/03/analyzing-the-web-for-the-price-of-a-sandwich.html) except that it doesn't include Yelp biz IDs, and it doesn't need to access S3 because it can read the input files directly. Sample command line: .. code-block:: sh python mr_phone_to_url.py -r emr --bootstrap 'sudo pip install warcio' --output-dir s3://your-bucket/path/ --no-output s3://commoncrawl/crawl-data/CC-MAIN-2018-09/segments/*/wet/*.wet.gz To find the latest crawl: ``aws s3 ls s3://commoncrawl/crawl-data/ | grep CC-MAIN`` WET data is often added after a release; usually the second-most recent release is a safe bet. """ import re from itertools import islice from mrjob.job import MRJob from mrjob.py2 import urlparse from mrjob.step import MRStep PHONE_RE = re.compile( br'(?:[\D\b]|^)(1?[2-9]\d{2}[\-. ()+]+\d{3}[\-. ()+]+\d{4})(?:[\D\b]|$)') PHONE_SEP_RE = re.compile(br'[\-. ()+]') # hosts with more than this many phone numbers are assumed to be directories MAX_PHONES_PER_HOST = 1000 def standardize_phone_number(number): """put *number* in a standard format, and convert it to a :py:class:`str`. """ number_sep = PHONE_SEP_RE.split(number) number = b''.join(number_sep).decode('ascii') if len(number) > 7: if number[-1] not in '0123456789': number = number[:-1] if number[0] not in '0123456789': number = number[1:] if len(number) <= 10: return "+1" + number else: return "+" + number class MRPhoneToURL(MRJob): """Use Common Crawl .wet files to map from phone number to the most likely URL.""" def steps(self): return [ MRStep(mapper_raw=self.extract_phone_and_url_mapper, reducer=self.count_by_host_reducer), MRStep(reducer=self.pick_best_url_reducer), ] def extract_phone_and_url_mapper(self, wet_path, wet_uri): """Read in .wet file, and extract phone ant URL """ from warcio.archiveiterator import ArchiveIterator with open(wet_path, 'rb') as f: for record in ArchiveIterator(f): if record.rec_type != 'conversion': continue headers = record.rec_headers if headers.get_header('Content-Type') != 'text/plain': continue url = headers.get_header('WARC-Target-URI') if not url: continue host = urlparse(url).netloc payload = record.content_stream().read() for phone in PHONE_RE.findall(payload): phone = standardize_phone_number(phone) yield host, (phone, url) def count_by_host_reducer(self, host, phone_urls): phone_urls = list(islice(phone_urls, MAX_PHONES_PER_HOST + 1)) # don't bother with directories, etc. host_phone_count = len(phone_urls) if host_phone_count > MAX_PHONES_PER_HOST: return for phone, url in phone_urls: yield phone, (url, host_phone_count) def pick_best_url_reducer(self, phone, urls_with_count): # pick the url that appears on a host with the least number of # phone numbers, breaking ties by choosing the shortest URL # and the one that comes first alphabetically urls_with_count = sorted( urls_with_count, key=lambda uc: (uc[1], -len(uc[0]), uc[0])) yield phone, urls_with_count[0][0] if __name__ == '__main__': MRPhoneToURL.run()
مثالهای ارائه شده تنها بخش کوچکی از قابلیتهای ابزارهایی نظیر هدوپ (Hadoop) و Spark هستند. این دسته از مدلهای برنامهنویسی، علاوه بر اینکه امکان ذخیرهسازی و پردازش توزیع شده دادههای مختلف را به برنامهنویسان و دانشمندان داده میدهند، قابلیت مدیریت و پردازش کلان داده را برای سیستم فراهم میکنند.
اگر نوشته بالا برای شما مفید بوده است، آموزشهای زیر نیز به شما پیشنهاد میشوند:
- مجموعه آموزشهای داده کاوی و یادگیری ماشین
- آموزش اصول و روشهای داده کاوی (Data Mining)
- مجموعه آموزشهای هوش مصنوعی
- آموزش مقدماتی Hadoop (هدوپ) برای تجزیه و تحلیل کلان داده
- مفاهیم کلان داده (Big Data) و انواع تحلیل داده — راهنمای جامع
- کلان داده یا مِه داده (Big Data) — از صفر تا صد
^^