Одновременная выборка данных в Mojolicious

avatar
h q
8 апреля 2018 в 07:22
665
2
2

Я пытаюсь запустить несколько подпрограмм параллельно (для получения данных из внешних систем). Для имитации я использую sleep в моем примере ниже. Мой вопрос: как я могу добиться этого в Mojolicious?

#!/usr/bin/perl
use Mojolicious::Lite;
use Benchmark qw(:hireswallclock);

sub  add1 { my $a = shift; sleep 1; return $a+1; }
sub mult2 { my $b = shift; sleep 1; return $b*2; }
sub power { my ($x, $y) = @_; sleep 1; return $x ** $y; }

any '/' => sub {    
    my ( $self ) = @_;

    my $n = int(rand(5));

    my $t0 = Benchmark->new;
    my $x = mult2($n); # Need to run in parallel
    my $y =  add1($n); # Need to run in parallel
    my $z = power($x,$y);
    my $t1 = Benchmark->new;
    my $t = timediff($t1,$t0)->real();

    $self->render(text => "n=$n, x=$x, y=$y, z=$z;<br>T=$t seconds");
};

app->start;

Другими словами, я хотел бы сократить время выполнения до 2 секунд (вместо 3), запустив (add1 и mult2) параллельно.

В этом потоке используется Mojo::IOLoop->timer, который не подходит для моего случая? Если да, то я не знаю, как его использовать. Спасибо!

Источник
mpapec
8 апреля 2018 в 08:40
1

metacpan.org/pod/Mojo::IOLoop::ReadWriteFork

Ответы (2)

avatar
amon
8 апреля 2018 в 11:37
7

Чтобы избежать длительного ожидания, вы можете использовать неблокирующие операции Mojolicious. Вместо выполнения синхронного запроса к внешней системе используйте неблокирующие методы, которые вместо этого запускают некоторый обратный вызов после завершения. Например. чтобы избежать sleep, мы бы использовали Mojo::IOLoop->timer(...).

Вот вариант вашего кода, в котором используются неблокирующие операции и обещания для правильной последовательности функций:

use Mojolicious::Lite;
use Benchmark qw(:hireswallclock);

# example using non-blocking APIs
sub add1 {
    my ($a) = @_;
    my $promise = Mojo::Promise->new;
    Mojo::IOLoop->timer(1 => sub { $promise->resolve($a + 1) });
    return $promise;
}

# example using blocking APIs in a subprocess
sub mult2 {
    my ($b) = @_;
    my $promise = Mojo::Promise->new;
    Mojo::IOLoop->subprocess(
        sub {  # first callback is executed in subprocess
            sleep 1;
            return $b * 2;
        },
        sub {  # second callback resolves promise with subprocess result
            my ($self, $err, @result) = @_;
            return $promise->reject($err) if $err;
            $promise->resolve(@result);
        },
    );
    return $promise;
}

sub power {
    my ($x, $y) = @_;
    my $result = Mojo::Promise->new;
    Mojo::IOLoop->timer(1 => sub { $result->resolve($x ** $y) });
    return $result;
}

any '/' => sub {
    my ( $self ) = @_;

    my $n = int(rand(5));

    my $t0 = Benchmark->new;
    my $x_promise = mult2($n);
    my $y_promise = add1($n);
    my $z_promise = Mojo::Promise->all($x_promise, $y_promise)
        ->then(sub {
            my ($x, $y) = map { $_->[0] } @_;
            return power($x, $y);
        });
    Mojo::Promise->all($x_promise, $y_promise, $z_promise)
        ->then(sub {
            my ($x, $y, $z) = map { $_->[0] } @_;
            my $t1 = Benchmark->new;
            my $t = timediff($t1, $t0)->real();

            $self->render(text => "n=$n, x=$x, y=$y, z=$z;\nT=$t seconds\n");
        })
        ->wait;
};

app->start;

Это намного сложнее, чем ваш код, но выполняется в течение двух секунд вместо трех секунд и не блокирует другие запросы, которые выполняются в то же время. Таким образом, вы можете запросить этот маршрут тридцать раз за один раз и через две секунды получить тридцать ответов!

Обратите внимание, что Promise->all возвращает обещание со значениями всех ожидаемых обещаний, но помещает значения каждого обещания в массив ref, поэтому нам нужно распаковать их, чтобы получить фактические значения.

Если вы не можете использовать неблокирующие операции, вы можете запустить блокирующий код в подпроцессе, используя Mojo::IOLoop->subprocess(...). Вы по-прежнему можете координировать поток данных через промисы. Например. см. приведенную выше функцию mult2 для примера.

h q
8 апреля 2018 в 11:43
0

Спасибо. Мне нужно изучить ваш код дальше. У меня точно нет sleep в моем коде. Я просто пытался проиллюстрировать это (это требует времени). Мне нужно попробовать ваш пример кода без Mojo::IOLoop->timer. Я также рассмотрю Mojo::IOLoop->subprocess(...).

DavidO
8 апреля 2018 в 17:09
0

Было бы полезно описать, как ждать подпроцесса, чтобы согласовать, когда применять вызов питания.

amon
8 апреля 2018 в 17:50
1

@DavidO Я обновил пример подпроцесса. Вы можете продолжать использовать промисы для запуска кода в правильной последовательности.

DavidO
9 апреля 2018 в 05:32
0

@amon: Спасибо за это. Учитывая ваш пример, я предпринял дополнительный шаг, отделив код бизнес-логики от контроллера, чтобы медленные подпрограммы могли жить без изменений, при этом используя Mojo::IOLoop::subprocess и ::promise. См. gist.github.com/daoswald/17c1c37de52c700d794dc867cae9ca49

amon
9 апреля 2018 в 09:06
0

@DavidO По возможности лучше использовать полностью асинхронный режим, чем пытаться распараллелить синхронные операции. Например. разветвление подпроцесса сравнительно дорого и требует сериализации возвращаемых значений. Некоторые ресурсы (часто соединения с базой данных) нельзя повторно использовать в подпроцессе. В вашем случае вы случайно все еще вызываете функцию power() синхронно, которая блокирует все запросы! Поэтому я думаю, что ваш подход является жизнеспособным в качестве стратегии перехода на полностью асинхронный код, но он еще не дает всех преимуществ.

DavidO
9 апреля 2018 в 13:50
0

Полностью согласен с тем, что специально созданные асинхронные вызовы будут более идеальными в нескольких отношениях, и в моем примере было намеренно вызывать синхронный вызов мощности. Я просто хотел посмотреть, как это будет выглядеть, если нам придется предположить, что методы ОП неприкосновенны. Спасибо за ваш ответ, он был познавательным.

h q
20 апреля 2018 в 08:12
0

Спасибо @amon за ваше проницательное решение. Спасибо DavidO за ваш пример.

avatar
Patrick Michael Mooney
9 апреля 2018 в 22:27
1

Параллельный и параллельный

Чтобы два события произошли одновременно (параллельно), вам потребуется несколько процессоров.

Например, с одним процессором вы можете выполнять только одну математическую операцию в любой момент времени, поэтому они должны выполняться последовательно независимо от одновременных тем в вашем коде.

Нематематические операции, такие как ввод/вывод (например, сеть, жесткий диск) могут выполняться в параллельном режиме, поскольку они, по большей части, не зависят от вашего одного ЦП (я не буду использовать многоядерные системы не поддаются объяснению, поскольку, вообще говоря, Perl не оптимизирован для их использования).

Hypnotoad, производственный веб-сервер Mojolicious, полагается на правильную реализацию неблокирующего ввода-вывода. Таким образом, они предоставили неблокирующий пользовательский агент как часть вашего контроллера.

$controller->ua->get(
    $the_url,
    sub {
        my ( $ua, $tx ) = @_;
        if ( my $result = $tx->success ) {
            # do stuff with the result
        }
        else {
            # throw error
        }
    }
);

Вы можете внедрить Mojo::Promise здесь, чтобы улучшить поток вашего кода.

Если возможно, я бы рекомендовал реализовать неблокирующий UA при получении данных из "внешних систем". Если вы обнаружите, что ваши рабочие процессы Hypnotoad блокируются слишком долго (5 секунд), они, скорее всего, будут уничтожены и заменены, что может нарушить работу вашей системы.

h q
20 апреля 2018 в 07:32
0

Большое спасибо. Я согласен с вами, я не буду выполнять математические операции, а буду получать данные из внешних систем. Не могли бы вы привести рабочий пример?