1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
package urpm::parallel_ka_run;
# $Id$
#- Copyright (C) 2002, 2003, 2004, 2005 MandrakeSoft SA
#- Copyright (C) 2005 Mandriva SA
use strict;
use urpm::util;
use urpm::msg;
use urpm::parallel;
our @ISA = 'urpm::parallel';
(our $VERSION) = q($Revision$) =~ /(\d+)/;
our $mput_command = $ENV{URPMI_MPUT_COMMAND};
our $rshp_command = $ENV{URPMI_RSHP_COMMAND};
if (!$mput_command) {
($mput_command) = grep { -x $_ } qw(/usr/bin/mput2 /usr/bin/mput);
}
$mput_command ||= 'mput';
if (!$rshp_command) {
($rshp_command) = grep { -x $_ } qw(/usr/bin/rshp2 /usr/bin/rshp);
}
$rshp_command ||= 'rshp';
sub _rshp_urpm {
my ($parallel, $urpm, $rshp_option, $cmd, $para) = @_;
my $command = "$rshp_command $rshp_option $parallel->{options} -- $cmd --no-locales $para";
$urpm->{log}("parallel_ka_run: $command");
$command;
}
sub _rshp_urpm_popen {
my ($parallel, $urpm, $cmd, $para) = @_;
my $command = _rshp_urpm($parallel, $urpm, '-v', $cmd, $para);
open(my $fh, "$command |") or $urpm->{fatal}(1, "Can't fork $rshp_command: $!");
$fh;
}
sub urpm_popen {
my ($parallel, $urpm, $cmd, $para, $do) = @_;
my $fh = _rshp_urpm_popen($parallel, $urpm, $cmd, $para);
while (my $s = <$fh>) {
chomp $s;
my ($node, $s_) = _parse_rshp_output($s) or next;
$do->($node, $s_) or last;
}
close $fh or $urpm->{fatal}(1, N("rshp failed, maybe a node is unreacheable"));
}
sub run_urpm_command {
my ($parallel, $urpm, $cmd, $para) = @_;
system(_rshp_urpm($parallel, $urpm, '', $cmd, $para)) == 0;
}
sub copy_to_dir { &_run_mput }
sub propagate_file {
my ($parallel, $urpm, $file) = @_;
_run_mput($parallel, $urpm, $file, $file);
}
sub _run_mput {
my ($parallel, $urpm, @para) = @_;
my @l = (split(' ', $parallel->{options}), '--', @para);
$urpm->{log}("parallel_ka_run: $mput_command " . join(' ', @l));
system $mput_command, @l;
$? == 0 || $? == 256 or $urpm->{fatal}(1, N("mput failed, maybe a node is unreacheable"));
}
sub _parse_rshp_output {
my ($s) = @_;
#- eg of output of rshp2: <tata2.mandriva.com> [rank:2]:@removing@mpich-1.2.5.2-10mlcs4.x86_64
if ($s =~ /<([^>]*)>.*:->:(.*)/ || $s =~ /<([^>]*)>\s*\[[^]]*\]:(.*)/) {
($1, $2);
} else {
warn "bad rshp output $s\n";
();
}
}
#- allow to bootstrap from urpmi code directly (namespace is urpm).
package urpm;
no warnings 'redefine';
sub handle_parallel_options {
my (undef, $options) = @_;
my ($media, $ka_run_options) = $options =~ /ka-run(?:\(([^\)]*)\))?:(.*)/;
if ($ka_run_options) {
my ($flush_nodes, %nodes);
foreach (split ' ', $ka_run_options) {
if ($_ eq '-m') {
$flush_nodes = 1;
} else {
$flush_nodes and $nodes{/host=([^,]*)/ ? $1 : $_} = undef;
undef $flush_nodes;
}
}
return bless {
media => $media,
options => $ka_run_options,
nodes => \%nodes,
}, "urpm::parallel_ka_run";
}
return undef;
}
1;
|