1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
package urpm::parallel_ssh;
# $Id$
#- Copyright (C) 2002, 2003, 2004, 2005 MandrakeSoft SA
#- Copyright (C) 2005 Mandriva SA
use strict;
use urpm::util;
use urpm::msg;
use urpm::parallel;
our @ISA = 'urpm::parallel';
(our $VERSION) = q($Revision$) =~ /(\d+)/;
sub _localhost { $_[0] eq 'localhost' }
sub _ssh { &_localhost ? '' : "ssh $_[0] " }
sub _host { &_localhost ? '' : "$_[0]:" }
sub _scp {
my ($urpm, $host, @para) = @_;
my $dest = pop @para;
$urpm->{log}("parallel_ssh: scp " . join(' ', @para) . " $host:$dest");
system('scp', @para, _host($host) . $dest) == 0
or $urpm->{fatal}(1, N("scp failed on host %s (%d)", $host, $? >> 8));
}
sub copy_to_dir {
my ($parallel, $urpm, @para) = @_;
my $dir = pop @para;
foreach my $host (keys %{$parallel->{nodes}}) {
if (_localhost($host)) {
if (my @f = grep { dirname($_) ne $dir } @para) {
$urpm->{log}("parallel_ssh: cp @f $urpm->{cachedir}/rpms");
system('cp', @f, $dir) == 0
or $urpm->{fatal}(1, N("cp failed on host %s (%d)", $host, $? >> 8));
}
} else {
_scp($urpm, $host, @para, $dir);
}
}
}
sub propagate_file {
my ($parallel, $urpm, $file) = @_;
foreach (grep { !_localhost($_) } keys %{$parallel->{nodes}}) {
_scp($urpm, $_, '-q', $file, $file);
}
}
sub _ssh_urpm {
my ($urpm, $node, $cmd, $para) = @_;
$cmd ne 'urpme' && _localhost($node) and $para = "--nolock $para";
# it doesn't matter for urpmq, and previous version of urpmq didn't handle it:
$cmd ne 'urpmq' and $para = "--no-locales $para";
$urpm->{log}("parallel_ssh: $node: $cmd $para");
_ssh($node) . " $cmd $para";
}
sub _ssh_urpm_popen {
my ($urpm, $node, $cmd, $para) = @_;
my $command = _ssh_urpm($urpm, $node, $cmd, $para);
open(my $fh, "$command |") or $urpm->{fatal}(1, "Can't fork ssh: $!");
$fh;
}
sub urpm_popen {
my ($parallel, $urpm, $cmd, $para, $do) = @_;
foreach my $node (keys %{$parallel->{nodes}}) {
my $fh = _ssh_urpm_popen($urpm, $node, $cmd, $para);
while (my $s = <$fh>) {
chomp $s;
$urpm->{debug}("parallel_ssh: $node: received: $s") if $urpm->{debug};
$do->($node, $s) and last;
}
close $fh or $urpm->{fatal}(1, N("host %s does not have a good version of urpmi (%d)", $node, $? >> 8));
}
}
sub run_urpm_command {
my ($parallel, $urpm, $cmd, $para) = @_;
foreach my $node (keys %{$parallel->{nodes}}) {
system(_ssh_urpm($urpm, $node, $cmd, $para));
}
}
#- allow to bootstrap from urpmi code directly (namespace is urpm).
package urpm;
no warnings 'redefine';
sub handle_parallel_options {
my (undef, $options) = @_;
my ($id, @nodes) = split /:/, $options;
if ($id =~ /^ssh(?:\(([^\)]*)\))?$/) {
my %nodes; @nodes{@nodes} = undef;
return bless {
media => $1,
nodes => \%nodes,
}, "urpm::parallel_ssh";
}
return undef;
}
1;
|