Я новичок в Hadoop и Linux.
Проблема
- Hadoop уменьшает зависание (или движется очень-очень медленно), когда входные данные велики (например, 600 тысяч строк или 6 миллионов строк), даже несмотря на то, что функции Map и Reduce довольно просты,
2021-08-08 22:53:12,350 INFO mapreduce.Job: map 100% reduce 67%
. - В Linux Системный монитор Я вижу, что когда уменьшение достигает 67%, только один процессор продолжает работать на 100%, а остальные спят :) см. эту картинку
Что успешно выполнено
- Я быстро и успешно выполнил задание MapReduce с небольшими входными данными (600 строк) без каких-либо проблем.
Мапер (Python)
#!/usr/bin/env python3
import sys
from itertools import islice
from operator import itemgetter
def read_input(file):
# read file except first line
for line in islice(file, 1, None):
# split the line into words
yield line.split(',')
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_input(sys.stdin)
for words in data:
# for each row we take only the needed columns
data_row = list(itemgetter(*[1, 2, 4, 5, 6, 9, 10, 18])(words))
data_row[7] = data_row[7].replace('\n', '')
# taking year and month No.from first column to create the
# key that will send to reducer
date = data_row[0].split(' ')[0].split('-')
key = str(date[0]) + '_' + str(date[1])
# value that will send to reducer
value = ','.join(data_row)
# print here will send the output pair (key, value)
print('%s%s%s' % (key, separator, value))
if __name__ == "__main__":
main()
Переходник (Python)
#!/usr/bin/env python3
from itertools import groupby
from operator import itemgetter
import sys
import pandas as pd
import numpy as np
import time
def read_mapper_output(file):
for line in file:
yield line
def main(separator='\t'):
all_rows_2015 = []
all_rows_2016 = []
start_time = time.time()
names = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance',
'pickup_longitude', 'pickup_latitude', 'dropoff_longitude',
'dropoff_latitude', 'total_amount']
df = pd.DataFrame(columns=names)
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin)
for words in data:
# get key & value from Mapper
key, value = words.split(separator)
row = value.split(',')
# split data with belong to 2015 from data belong to 2016
if key in '2015_01 2015_02 2015_03':
all_rows_2015.append(row)
if len(all_rows_2015) >= 10:
df=df.append(pd.DataFrame(all_rows_2015, columns=names))
all_rows_2015 = []
elif key in '2016_01 2016_02 2016_03':
all_rows_2016.append(row)
if len(all_rows_2016) >= 10:
df=df.append(pd.DataFrame(all_rows_2016, columns=names))
all_rows_2016 = []
print(df.to_string())
print("--- %s seconds ---" % (time.time() - start_time))
if __name__ == "__main__":
main()
Подробнее
I'm using Hadoop v3.2.1 on Linux installed on VMware to run MapReduce job in Python.
Уменьшить задание в цифрах:
Размер входных данных | Количество строк | Сократить время выполнения задания | |
---|---|---|---|
~98 КБ | 600 строк | ~0,1 сек | хорошо |
~953 КБ | 6000 строк | ~1 сек | хорошо |
~9,5 Мб | 60 000 строк | ~52 сек | хорошо |
~94 Мб | 600 000 строк | ~5647 сек (~94 мин) | очень медленно |
~11 ГБ | 76 000 000 строк | ?? | невозможно |
Цель работает с входными данными ~76M строк, это невозможно, поскольку эта проблема остается.