package MGA::Mirrors::DB; # $Id$ use strict; use warnings; use Config::IniFiles; use URI; use DBI; use File::Temp qw(tempfile); use Net::DNS; sub configfile { '/etc/mga-mirror.ini' } sub new { my ($class) = @_; my $conf = (-f './mga-mirror.ini') ? Config::IniFiles->new(-file => './mga-mirror.ini') : Config::IniFiles->new(-file => configfile()) or return; my $db = DBI->connect( 'dbi:Pg:' . $conf->val('db', 'pgconn', ''), $conf->val('db', 'user') || undef, $conf->val('db', 'password') || undef, { AutoCommit => 0, PrintError => 1, } ) or return; bless { db => $db, conf => $conf, }, $class; } sub host_ips { my ($self, $hostname) = @_; my $resolver = Net::DNS::Resolver->new; my @addresses; foreach my $type (qw'A AAAA') { my $packet = $resolver->search($hostname, $type) or next; foreach ($packet->answer) { $_->type eq $type or next; push(@addresses, $_->address); } } @addresses; } sub db { $_[0]->{db} } sub locate_ips { my ($self, @ips) = @_; my $find = $self->db->prepare(q{ select countries.* from geoip join countries on geoip.code = countries.code where ipmin <= $1 and ipmax >= $1 }); foreach (@ips) { $find->execute($_); my $res = $find->fetchrow_hashref; if ($res) { $find->finish; return $res; } } return; } sub protocol_list { my ($self) = @_; my $select = $self->db->prepare(q{ select name from protocol order by name }); $select->execute; return keys %{ $select->fetchall_hashref([ 'name' ]) } } sub bandwidth_name { my ($self, $value) = @_; $value or return; my $select = $self->db->prepare(q{ select name from bandwidth where value = ? }); $select->execute($value); my $res = $select->fetchrow_hashref; $select->finish; $res->{name} } sub bandwidth_list { my ($self) = @_; my $list = $self->db->prepare(q{ select * from bandwidth order by value }); $list->execute; return $list->fetchall_arrayref({}); } sub country_info { my ($self, $code) = @_; my $list = $self->db->prepare(q{ select * from countries where code = ? }); $list->execute($code); my $res = $list->fetchrow_hashref; $list->finish; $res } sub country_list_with_mirror { my ($self) = @_; my $list = $self->db->prepare(q{ select * from countries where code in (select country from hosts join toplevel_urls on toplevel_urls.hostname = hosts.hostname) order by name }); $list->execute; return $list->fetchall_arrayref({}); } sub country_list { my ($self) = @_; my $list = $self->db->prepare(q{ select * from countries order by name }); $list->execute; return $list->fetchall_arrayref({}); } sub version_list { my ($self) = @_; my $list = $self->db->prepare(q{ select * from distributions }); $list->execute; return $list->fetchall_hashref([qw(version)]); } sub arch_list { my ($self, $version) = @_; my $list = $self->db->prepare(q{ select * from distributions } . ($version ? q{ where version = ? } : q{}) ); $list->execute($version ? $version : ()); return $list->fetchall_hashref([qw(arch)]); } sub mirror_level { my ($self, $hostname) = @_; my $minfo = $self->find_mirrors({ hostname => $hostname }); if (my $mir = @{ $minfo || []}[0]) { if ($mir->{syncfrom}) { return $self->_mirror_level($hostname); } else { return } } else { return; } } sub _mirror_level { my ($self, $hostname, @prev) = @_; return if (grep { $hostname eq $_ } @prev); my $minfo = $self->find_mirrors({ hostname => $hostname }); if (my $mir = @{ $minfo || []}[0]) { if ($mir->{syncfrom}) { my $res = $self->_mirror_level($mir->{syncfrom}, $hostname, @prev); return defined($res) ? $res + 1 : undef; } else { return 0; } } else { return 0; } } sub mirror_validity { my ($self, $uri) = @_; my $listf = $self->db->prepare(q{ select * from global_files }); $listf->execute; while (my $res = $listf->fetchrow_hashref) { my $furi = URI->new($uri . $res->{relpath}); $self->_check_url($furi) or return; } 1; } sub check_distributions { my ($self) = @_; my $uneeded_check = $self->db->prepare(q{ select * from distributions_validity where lastcheck > now() - '6 hours'::interval }); $uneeded_check->execute(); my $uch = $uneeded_check->fetchall_hashref([ qw(urlskey distributionkey) ]); my $listd = $self->db->prepare(q{ select * from toplevel_urls, distributions where toplevel_urls.valid = true }); my $addstatus = $self->db->prepare(q{ insert into distributions_validity (urlskey, distributionkey, exists) values (?,?,?) }); my $updstatus = $self->db->prepare(q{ update distributions_validity set lastcheck = now(), exists = ? where urlskey = ? and distributionkey = ? }); my $upd_lastok = $self->db->prepare(q{ update distributions_validity set lastok = now() where urlskey = ? and distributionkey = ? }); my %urls_status = (); my $updurl = $self->db->prepare(q{ update toplevel_urls set lastcheck = now(), valid = ? where key = ? }); my $upd_url_lastok = $self->db->prepare(q{ update toplevel_urls set lastok = now() where key = ? }); $listd->execute(); while (my $res = $listd->fetchrow_hashref) { $uch->{$res->{key}}{$res->{dkey}} and next; my $url = $self->fmt_url($res); if (!exists($urls_status{$res->{key}})) { $self->update_host_ips($res->{hostname}); my $ok = $self->mirror_validity($url); $updurl->execute($ok ? 1 : 0, $res->{key}); $upd_url_lastok->execute($res->{key}) if ($ok); $urls_status{$res->{key}} = $ok; } $urls_status{$res->{key}} or next; my $furi = URI->new(join('/', $url, $res->{relpath}, $res->{relfile})); my $exists = $self->_check_url($furi); if ($updstatus->execute($exists ? 1 : 0, $res->{key}, $res->{dkey}) == 0) { $addstatus->execute($res->{key}, $res->{dkey}, $exists); } $upd_lastok->execute($res->{key}, $res->{dkey}) if ($exists); $self->db->commit; } } sub _check_url { my ($self, $furi) = @_; my ($fh, $filename) = tempfile(); close($fh); my $cmd = $furi->scheme =~ /^http|ftp$/ ? "wget --no-check-certificate -nv -t 1 -T 4 -O $filename " . $furi->as_string : $furi->scheme eq 'rsync' ? "rsync --timeout 4 -q " . $furi->as_string . " $filename" : ''; my $ok = (system($cmd) == 0); unlink($filename); return $ok } sub get_protocol_info { my ($self, $protocol) = @_; my $get = $self->db->prepare(q{ select * from protocol where name = ? }); $get->execute($protocol); my $res = $get->fetchrow_hashref; $get->finish; $res; } sub find_mirrors { my ($self, $filters, $key) = @_; my $query = q{ select *, coalesce(hosts.latitude, countries.latitude) as latitude, coalesce(hosts.longitude, countries.longitude) as longitude from hosts join countries on countries.code = hosts.country where hosts.hostname in (select hostname from toplevel_urls %s) %s }; my (@mvals, @uvals); my (@mw, @uw); if (keys %{ $filters || {}}) { foreach (keys %$filters) { $filters->{$_} or next; if (my $field = { hostname => 'hosts.hostname', country => 'countries.code', continent => 'countries.contienent_code', }->{$_}) { push(@mw, sprintf('%s = any(?)', $field)); push(@mvals, ref $filters->{$_} ? $filters->{$_} : [ $filters->{$_} ]); } if (my $field = { protocol => 'protocol', valid => 'url.valid', }->{$_}) { push(@uw, sprintf('%s = any(?)', $field)); push(@uvals, ref $filters->{$_} ? $filters->{$_} : [ $filters->{$_} ]); } } } my $list = $self->db->prepare(sprintf( $query, (@uw ? 'where ' . join(' and ', @uw) : ''), (@mw ? 'and ' . join(' and ', @mw) : ''), )); $list->execute(@uvals, @mvals); return $list->fetchall_arrayref({}); } sub _find_urls { my ($self, $filters, $key) = @_; my $query = q{ select toplevel_urls.* from toplevel_urls join hosts on hosts.hostname = toplevel_urls.hostname }; my @vals; if (keys %{ $filters || {} }) { my @w; foreach (keys %$filters) { $filters->{$_} or next; my $field = { hostname => 'hosts.hostname', protocol => 'toplevel_urls.protocol', }->{$_} or next; push(@w, sprintf('%s = any(?)', $field)); push(@vals, ref $filters->{$_} ? $filters->{$_} : [ $filters->{$_} ]); } $query .= ' where ' if (@w); $query .= join(' and ', @w); } my $list = $self->db->prepare($query); $list->execute(@vals); return $list->fetchall_arrayref({}); } sub find_distributions { my ($self, $filters, $key) = @_; return [ map { $_->{url} = $self->fmt_url($_); $_ } @{ $self->_find_distributions($filters) || []}] } sub _find_distributions { my ($self, $filters, $key) = @_; my $query = q{ select *, toplevel_urls.path || distributions.relpath as path, coalesce(hosts.latitude, countries.latitude) as latitude, coalesce(hosts.longitude, countries.longitude) as longitude from hosts join toplevel_urls on toplevel_urls.hostname = hosts.hostname join distributions_validity on toplevel_urls.key = distributions_validity.urlskey join distributions on distributions.dkey = distributions_validity.distributionkey join countries on countries.code = hosts.country }; my @vals; if (keys %{ $filters || {} }) { my @w; foreach (keys %$filters) { $filters->{$_} or next; my $field = { hostname => 'hosts.hostname', protocol => 'topelvel_urls.protocol', version => 'distributions.version', arch => 'distributions.arch', country => 'mirrors.country', }->{$_} or next; push(@w, sprintf('%s = any(?)', $field)); push(@vals, ref $filters->{$_} ? $filters->{$_} : [ $filters->{$_} ]); } $query .= ' where ' if (@w); $query .= join(' and ', @w); } my $list = $self->db->prepare($query); $list->execute(@vals); return $list->fetchall_arrayref({}); } sub find_host_ip_overlap { my ($self, $hostname) = @_; my @addresses = $self->host_ips($hostname); my $list = $self->db->prepare(q{ select * from ips where ip = any(?) and hostname != ? }); $list->execute(\@addresses, $hostname); my $res = $list->fetchall_hashref('hostname'); return keys %{ $res }; } sub add_or_update_host { my ($self, $hostname, %info) = @_; my (@fields, @vals); while (my ($field, $val) = each(%info)) { push(@fields, $field); push(@vals, $val || undef); } if (keys %info) { my $upd = $self->db->prepare(sprintf(q{ update hosts set %s where hostname = ? }, join(', ', map { "$_ = ?" } @fields))); if ($upd->execute(@vals, $hostname) == 0) { my $add = $self->db->prepare(sprintf(q{ insert into hosts (%s) values (%s) }, join(', ', (@fields, 'hostname')), join(',', ('?') x (scalar(@fields)+1)) )); $add->execute(@vals, $hostname) or do { $self->db->rollback; return; }; } } $self->update_host_ips($hostname); 1; } sub add_or_update_url { my ($self, $uri) = @_; if (!ref $uri) { $uri = URI->new($uri); } my $update = $self->db->prepare(q{ update toplevel_urls set path = ?, port = ?, valid = true where hostname = ? and protocol = ? }); if ($update->execute( $uri->path, $uri->port == $uri->default_port ? undef : $uri->port, $uri->host, $uri->scheme ) == 0) { my $add = $self->db->prepare(q{ insert into toplevel_urls (path, port, hostname, protocol) values (?,?,?,?) }); $add->execute($uri->path, $uri->port == $uri->default_port ? undef : $uri->port, $uri->host, $uri->scheme) or do { $self->db->rollback; return; } } 1; } sub update_host_ips { my ($self, $hostname) = @_; my @addresses = $self->host_ips($hostname); my $delete = $self->db->prepare( q{delete from ips where hostname = ? and ip != any(?) } ); $delete->execute($hostname, [ @addresses ]); my $getip = $self->db->prepare(q{ select 1 from ips where hostname = ? and ip = ? }); my $addip = $self->db->prepare(q{ insert into ips (hostname, ip) values (?,?) }); foreach (@addresses) { if ($getip->execute($hostname, $_) == 0) { $addip->execute($hostname, $_); } $getip->finish; } 1; } sub find_urls { my ($self, $filters, $key) = @_; return [ map { $_->{url} = $self->fmt_url($_); $_ } @{ $self->_find_urls($filters) || []}] } sub fmt_url { my ($self, $dburl) = @_; $dburl->{path} =~ s:/+:/:g; my $uri = URI->new( sprintf('%s://%s%s', $dburl->{protocol}, $dburl->{hostname}, $dburl->{path} || '/', ) ); $uri->port($dburl->{port}); return $uri->as_string; } 1;