Hadoop застрял на уменьшении 67% (только с большими данными)

avatar
Abdulaziz Alshehri
9 августа 2021 в 03:37
181
2
1

Я новичок в Hadoop и Linux.

Проблема

  • Hadoop уменьшает зависание (или движется очень-очень медленно), когда входные данные велики (например, 600 тысяч строк или 6 миллионов строк), даже несмотря на то, что функции Map и Reduce довольно просты, 2021-08-08 22:53:12,350 INFO mapreduce.Job: map 100% reduce 67%.
  • В Linux Системный монитор Я вижу, что когда уменьшение достигает 67%, только один процессор продолжает работать на 100%, а остальные спят :) см. эту картинку

Что успешно выполнено

  • Я быстро и успешно выполнил задание MapReduce с небольшими входными данными (600 строк) без каких-либо проблем.

Мапер (Python)

#!/usr/bin/env python3

import sys
from itertools import islice
from operator import itemgetter

def read_input(file):
    # read file except first line
    for line in islice(file, 1, None):
        # split the line into words
        yield line.split(',')

def main(separator='\t'):
    
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    
    for words in data:
        
        # for each row we take only the needed columns
        data_row = list(itemgetter(*[1, 2, 4, 5, 6, 9, 10, 18])(words))
        data_row[7] = data_row[7].replace('\n', '')

        # taking year and month No.from first column to create the
        # key that will send to reducer
        date = data_row[0].split(' ')[0].split('-')
        key = str(date[0]) + '_' + str(date[1])
        
        # value that will send to reducer
        value = ','.join(data_row)
        
        # print here will send the output pair (key, value)
        print('%s%s%s' % (key, separator, value))
        
if __name__ == "__main__":
    main()

Переходник (Python)

#!/usr/bin/env python3

from itertools import groupby
from operator import itemgetter
import sys
import pandas as pd
import numpy as np
import time

def read_mapper_output(file):
    for line in file:
        yield line

def main(separator='\t'):
    
    all_rows_2015 = []
    all_rows_2016 = []
    
    start_time = time.time()
    
    names = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 
             'pickup_longitude',     'pickup_latitude',       'dropoff_longitude', 
             'dropoff_latitude',     'total_amount']
    df = pd.DataFrame(columns=names)
    
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin)
    for words in data:

        # get key & value from Mapper
        key, value = words.split(separator)
        row = value.split(',')

        # split data with belong to 2015 from data belong to 2016
        if key in '2015_01 2015_02 2015_03': 
            all_rows_2015.append(row)
            if len(all_rows_2015) >= 10:
                df=df.append(pd.DataFrame(all_rows_2015, columns=names))
                all_rows_2015 = []
        elif key in '2016_01 2016_02 2016_03':
            all_rows_2016.append(row)
            if len(all_rows_2016) >= 10:
                df=df.append(pd.DataFrame(all_rows_2016, columns=names))
                all_rows_2016 = []
    
    print(df.to_string())
    print("--- %s seconds ---" % (time.time() - start_time))

if __name__ == "__main__":
    main()

Подробнее

I'm using Hadoop v3.2.1 on Linux installed on VMware to run MapReduce job in Python.

Уменьшить задание в цифрах:

Размер входных данных Количество строк Сократить время выполнения задания
~98 КБ 600 строк ~0,1 сек хорошо
~953 КБ 6000 строк ~1 сек хорошо
~9,5 Мб 60 000 строк ~52 сек хорошо
~94 Мб 600 000 строк ~5647 сек (~94 мин) очень медленно
~11 ГБ 76 000 000 строк ?? невозможно

Цель работает с входными данными ~76M строк, это невозможно, поскольку эта проблема остается.

Источник

Ответы (2)

avatar
Iñigo González
9 августа 2021 в 08:27
0

Я вижу здесь некоторые проблемы.

  • На этапе сокращения вы не выполняете никакого суммирования, только подходите к 2015Q1 и 2015Q2 — предполагается, что сокращение используется для суммирования, такого как группировка по ключу или выполнение некоторых вычислений на основе ключей.

    Если вам просто нужно отфильтровать данные, сделайте это на этапе сопоставления, чтобы сократить количество циклов (предполагается, что вы платите за все данные):

  • Вы храните много информации в оперативной памяти внутри фрейма данных. Поскольку вы не знаете, насколько велик ключ, вы испытываете удаление корзины. Это в сочетании с тяжелыми клавишами приведет к тому, что через некоторое время ваш процесс выдаст ошибку страницы для каждого DataFrame.append.

Есть некоторые исправления:

  • Вам действительно нужна фаза сокращения? Поскольку вы просто фильтруете первые три месяца os 2015 и 2016, вы можете сделать это на этапе карты. Это немного ускорит процесс, если вам понадобится уменьшить его позже, поскольку для фазы сокращения потребуется меньше данных.

    def main(separator='\t'):
    
        # input comes from STDIN (standard input)
        data = read_input(sys.stdin)
    
        for words in data:
    
            # for each row we take only the needed columns
            data_row = list(itemgetter(*[1, 2, 4, 5, 6, 9, 10, 18])(words))
    
            # Find out first if you are filtering this data
    
            # taking year and month No.from first column to create the
            # key that will send to reducer
            date = data_row[0].split(' ')[0].split('-')
    
            # Filter out
    
            if (date[1] in [1,2,3]) and (date[0] in [2015,2016]):
    
               # We keep this data. Calulate key and clean up data_row[7]
    
               key = str(date[0]) + '_' + str(date[1])
               data_row[7] = data_row[7].replace('\n', '')
    
               # value that will send to reducer
               value = ','.join(data_row)
    
               # print here will send the output pair (key, value)
               print('%s%s%s' % (key, separator, value))
    
  • Постарайтесь не сохранять данные в памяти во время сокращения. Поскольку вы фильтруете, печатайте() результаты, как только они у вас есть. Если ваши исходные данные не отсортированы, сокращение позволит собрать вместе все данные за один и тот же месяц.

  • У вас есть ошибка на этапе сокращения: вы теряете number_of_records_per_key по модулю 10, потому что не добавляете результаты в фрейм данных. Не добавляйте к фрейму данных и печатайте результат как можно скорее.

avatar
Ben Watson
9 августа 2021 в 07:06
1

"когда уменьшение достигает 67%, только один ЦП продолжает работать в это время на 100%, а остальные спят" - у вас перекос. Один ключ имеет гораздо больше значений, чем любой другой ключ.