diff options
-rw-r--r-- | client/Scire/Communicator.pm | 89 | ||||
-rw-r--r-- | client/Scire/Job.pm | 116 | ||||
-rwxr-xr-x | client/scireclient.pl | 260 | ||||
-rw-r--r-- | client/test.pl | 26 | ||||
-rw-r--r-- | etc/scire.conf | 8 | ||||
-rw-r--r-- | etc/scireserver.conf | 12 | ||||
-rwxr-xr-x | server/scireserver.pl | 466 |
7 files changed, 977 insertions, 0 deletions
diff --git a/client/Scire/Communicator.pm b/client/Scire/Communicator.pm new file mode 100644 index 0000000..1a6b982 --- /dev/null +++ b/client/Scire/Communicator.pm @@ -0,0 +1,89 @@ +package Scire::Communicator; + +use IPC::Open2 (open2); + +sub new { + my $proto = shift; + my $class = ref($proto) || $proto; + my $self = { + port => 22, + user => scire, + server_script => "/usr/bin/scireserver.pl", + SERVER_STDOUT => undef, + SERVER_STDIN => undef, + @_ + }; + bless ($self, $class); + $self->build_connection_command(); + return $self; +} + +sub send_command { + my $self = shift; + my $cmd = shift; + my @args = @_; + my $tosend = "${cmd}"; + + for my $arg (@args) { + if($arg =~ /^[0-9]+$/) { + $tosend .= " ${arg}"; + } else { + $arg =~ s/"/\\"/g; + $tosend .= " \"${arg}\""; + } + } + $tosend .= "\n"; + + my ($tmpin, $tmpout) = ($self->{SERVER_STDIN}, $self->{SERVER_STDOUT}); + print $tmpin $tosend; + #FIXME WE NEED A TIMEOUT HERE OF SOME SORT!! + #if the server doesn't give you a newline this just hangs! + my $response = <$tmpout>; + return $self->parse_response($response); +} + +sub parse_response { + my $self = shift; + my $response = shift; + $response =~ /^(OK|ERROR)(?: (.+?))?\s*$/; + my ($status, $message) = ($1, $2); + return ($status, $message); +} + +sub create_connection { + my $self = shift; + # XXX: How do we capture this error? $pid has a valid value even if the + # process fails to run, since it just returns the PID of the forked perl + # process. I tried adding 'or die' after it, but it didn't help since it + # doesn't fail in the main process. When it fails, it outputs an error + # to STDERR: + # open2: exec of ../server/scireserver.pl failed at ./scireclient.pl line 116 + $self->{connection_pid} = open2($self->{SERVER_STDOUT}, $self->{SERVER_STDIN}, $self->{connection_command}); +} + +sub close_connection { + my $self = shift; + close $self->{SERVER_STDIN}; + close $self->{SERVER_STDOUT}; +} + +sub build_connection_command { + my $self = shift; + # This will eventually be something like "ssh scire@${scireserver} /usr/bin/scireserver.pl" + my $connection_command = "ssh "; + $connection_command .= "-o BatchMode yes "; + $connection_command .= "-o SendEnv 'SCIRE_*' "; + $connection_command .= "-o ServerAliveInterval 15 -o ServerAliveCountMax 4 "; + if(defined($self->{port})) { + $connection_command .= "-o Port=$conf{port} "; + } + $connection_command .= "$self->{user}\@$self->{host} $self->{server_script}"; + + if (-d ".svn") { + # Overwrite $connection_command in the case of a dev environment for now + $connection_command = "../server/scireserver.pl"; + } + $self->{connection_command} = $connection_command; +} + +1; diff --git a/client/Scire/Job.pm b/client/Scire/Job.pm new file mode 100644 index 0000000..27a5aa7 --- /dev/null +++ b/client/Scire/Job.pm @@ -0,0 +1,116 @@ +package Scire::Job; + +use POSIX qw/WEXITSTATUS WIFEXITED waitpid setuid setgid/; + +sub new { + my $proto = shift; + my $class = ref($proto) || $proto; + my $filename = shift; + my $self = {}; + bless ($self, $class); + if(defined $filename) { + $self->load_jobfile($filename); + } + return $self; +} + +sub load_jobfile { + my $self = shift; + my $filename = shift; + $self->{filename} = $filename; + my $jobcontents; + my $jobdata; + open JOB, "< ${filename}" or die "Can't open file ${filename}"; + $jobcontents = join("", <JOB>); + close JOB; + $jobdata = eval($jobcontents); + ($@) and print "ERROR: Could not parse job file ${filename}!\n"; + if(defined $jobdata->{script}) { + for(keys %{$jobdata->{script}}) { + $self->{$_} = $jobdata->{script}->{$_}; + } + } + for(keys %{$jobdata}) { + $self->{$_} = $jobdata->{$_} unless($_ eq "script"); + } +} + +sub set_script_file { + my ($self, $scriptfile) = @_; + if(defined $scriptfile and $scriptfile) { + $self->{script_filename} = $scriptfile; + } +} + +sub set_stdout_file { + my ($self, $outfile) = @_; + if(defined $outfile && $outfile) { + $self->{stdout_filename} = $outfile; + } +} + +sub set_stderr_file { + my ($self, $errfile) = @_; + if(defined $errfile && $errfile) { + $self->{stderr_filename} = $errfile; + } +} + +sub run { + my $self = shift; + + # XXX: we might want to check capabilities here instead of UID, but I + # have no idea how to do that + my ($run_as_uid, $run_as_gid) = (0, 0); + if($< == 0) { + # XXX: we'll use setuid to drop privileges here + my @user = getpwnam($self->{run_as}); + if(defined @user) { + $run_as_uid = $user[2]; + $run_as_gid = $user[3]; + } else { + return -2; + } + } + + open SCRIPT, ">", $self->{script_filename}; + print SCRIPT $self->{script_data}; + close SCRIPT; + if($run_as_uid) { + chown $run_as_uid, $run_as_gid, $self->{script_filename}; + } + chmod 0500, $self->{script_filename}; + + my $pid = fork(); + if($pid) { + # XXX: eventually, we'll move the waitpid() call to another function + # called something like is_running() and use WNOHANG instead of blocking + waitpid($pid, 0); + my $status = $?; +# my $exitcode = -1; +# if(WIFEXITED($status)) { + my $exitcode = WEXITSTATUS($status); +# } + return $exitcode; + } else { + # We redirect STDOUT and STDERR first since the new user may not have + # write access to the file locations + if(defined $self->{stdout_filename}) { + open STDOUT, '>', $self->{stdout_filename}; + } + if(defined $self->{stderr_filename}) { + open STDERR, '>', $self->{stderr_filename}; + } + if($run_as_uid) { + setuid($run_as_uid); + setgid($run_as_gid); + } + # XXX: exec() to run our command. our STDOUT and STDERR have been + # redirected to the files specified, and the exit code is returned + # to the main process when we're done executing. This will be changed + # to the path of the script we've written to disk once that code is in + exec '/bin/sh', '-c', $self->{script_filename}; + } +} + +1; diff --git a/client/scireclient.pl b/client/scireclient.pl new file mode 100755 index 0000000..a9cfe20 --- /dev/null +++ b/client/scireclient.pl @@ -0,0 +1,260 @@ +#!/usr/bin/perl + +# $Id$ + +use strict; +use warnings; + +use Scire::Job; +use Scire::Communicator; +use Getopt::Long; +use Data::Dumper; +use File::Path; +use Sys::Hostname; +use POSIX qw/WEXITSTATUS setuid/; + +my $ETC_DIR = "/etc/scire"; +my $SCIRE_CONFIG_FILE = "${ETC_DIR}/scire.conf"; +my %conf; +my $comm; + +run_main(); + +sub run_main { + parse_command_line(); + my $conf_file = (defined($conf{config})) ? $conf{config} : $SCIRE_CONFIG_FILE; + read_config_file($conf_file); + + check_job_dir(); + + my $exitcode = talk_to_server(); + if($exitcode != 0) { + if($conf{daemon}) { + # We'll schedule another pass here later + } else { + debug("We couldn't communicate with the server...exiting!"); + exit(1); + } + } + + my @new_jobs = glob("$conf{job_dir}/queue/*.job"); + for (@new_jobs) { + my $job = Scire::Job->new(); + $job->load_jobfile($_); + $job->set_stdout_file("$conf{job_dir}/queue/$job->{jobid}.out"); + $job->set_stderr_file("$conf{job_dir}/queue/$job->{jobid}.err"); + $job->set_script_file("$conf{job_dir}/queue/$job->{jobid}.script"); + my $exitcode = $job->run(); + if(!$exitcode) { + # Successful job completion + system("mv $conf{job_dir}/queue/$job->{jobid}.* $conf{job_dir}/done/"); + } else { + # Job failed + system("mv $conf{job_dir}/queue/$job->{jobid}.* $conf{job_dir}/failed/"); + } + } + + talk_to_server(); +} + +sub talk_to_server { + # This functions forks a new process just for the purpose of dropping privileges. + my $pid = fork(); + if($pid) { + debug("Waiting for PID ${pid} to finish"); + waitpid($pid, 0); + my $exitcode = WEXITSTATUS($?); + debug("PID ${pid} has finished with status ${exitcode}"); + return $exitcode; + } else { + # We'll need to add a call to setuid() here at some point + + #ok folks so here's how this thang goes down. + #1. Connect. + $comm = Scire::Communicator->new( host => $conf{host}, user => $conf{user}, port => $conf{port} ); + $comm->create_connection(); + + #2. Register with the DB. (only it knows if you're allowed to be active) + # If we do not have a defined key file, we assume this is the first run of this client + # so we register them instead of trying to identify. + if(defined($conf{key_file}) and (-f $conf{key_file})) { + if(!identify_client()) { + exit(1); + } + } else { + register_client(); + exit(0); + } + + #3. Scan the jobs directory. If there are done/failed jobs, report them. Note jobs in running or queue. + my @existing_jobs = scan_jobs_dir(); + #4. Fetch the jobs list + get_jobs(); + #5. ??? + #6. Profit! + + $comm->close_connection(); + exit(0); + } +} + +sub parse_command_line { + GetOptions( + 'debug|d' => \$conf{debug}, + 'daemon|D' => \$conf{daemon}, + 'dry-run' => \$conf{dry_run}, + 'help|h' => \$conf{help}, + 'config|c=s' => \$conf{config}, + 'threads|t=i' => \$conf{max_threads}, + + #config overrides. + 'host=s' => \$conf{host}, + 'port=i' => \$conf{port}, + 'user|u=s' => \$conf{user}, + 'server_script=s' => \$conf{server_script}, + 'job_dir' => \$conf{job_dir}, + ); + if ($conf{help}) { + print "\nusage: scireclient.pl [--debug or -d]\n\t [--dry-run]" + ."\t [--config=CONF or -c] \n\t [--threads=# or -t] \t [--help or -h] \n" + ."\t [[--host=HOST] \t [--port=PORT] \t [--user=USER or -u] \n\t" + ." [--server_script=foo.pl] \t [--job_dir=/tmp/jobs] \n"; + exit 0; + } + +} + +sub check_job_dir { + my @checkdirs = ($conf{job_dir}, "$conf{job_dir}/queue", "$conf{job_dir}/done", "$conf{job_dir}/failed", "$conf{job_dir}/run"); + for my $dir (@checkdirs) { + if (! -d $dir) { + print "WARNING! ${dir} does not exist...creating\n"; + mkpath( $dir, {verbose => 1, mode => 0660}) + or die("Couldn't make ${dir} w/ perms 0660: $!"); + } + } +} + +sub read_config_file { + my $conf_file = shift; + my %config_defaults = ( + "key_file" => "${ETC_DIR}/client_key", + "debug" => 0, + ); + open(FH, "< ${conf_file}") or die("Couldn't open the config file ${conf_file}: $!"); + while (<FH>) { + chomp; + next if /^\s*(?:#|$)/; + if(/^\s*(.+?)\s*=\s*(.+?)\s*(?:#.*)?$/) { + unless(defined($conf{lc($1)})) { #Don't overwrite anything specified in cmdline + $conf{lc($1)} = $2; + } + } + } + close(FH) or die("Couldn't close the config file ${conf_file}: $!"); + for(keys %config_defaults) { + if(!defined $conf{$_}) { + $conf{$_} = $config_defaults{$_}; + } + } +} + +sub register_client { +# my $mac = "00:11:22:33:44:55"; +# my $ip = "192.168.2.3"; + my ($mac, $ip) = get_interface_info(defined $conf{interface} && $conf{interface} ? $conf{interface} : "eth0"); + my $hostname = hostname(); + my ($status, $message) = $comm->send_command("REGISTER", $mac, $ip, $hostname); + die "Could not register client $mac w/ ip $ip and hostname $hostname. Got: $message" if (! defined $status or $status ne "OK"); + debug("Client registered. Status is pending. digest is $message"); + open(FILE, ">$conf{key_file}") or die("Couldn't open key file $conf{key_file} for writing: $!"); + print FILE "$message\n"; + close(FILE); +} + +sub identify_client { + open(FILE, $conf{key_file}) or die("Couldn't open client_key $conf{key_file}: $!"); + my $digest = <FILE>; + chomp $digest; + close(FILE); + my ($status, $message) = $comm->send_command("IDENTIFY", $digest); + unless (defined $status && $status eq "OK") { + print "ERROR Could not identify to server: $message\n"; + return 0; + } + debug("Client identified"); + return 1; +} + +sub get_jobs { + my ($status, $jobs) = $comm->send_command("GET_JOBS"); + unless (defined $status && $status eq "OK") { + print "Could not get jobs list from server: $status\n"; + return 0; + } + if (defined($jobs) && $jobs) { + $jobs =~ s/\s//g; #Remove all whitespace + my @jobs_list = split(/,/, $jobs); + foreach my $job (@jobs_list) { + my ($status, $filename) = $comm->send_command("GET_JOB", $job); + #SCP the file to $conf{job_dir}/queue/ + + system("cp $filename $conf{job_dir}/queue/") and die("Can't copy file: $!"); #Temporary hack. only works locally. + # XXX: Modify this to fetch a file instead + debug("Fetched job $job "); + my ($status2,$message) = $comm->send_command("JOB_FETCHED", $job); + unless (defined $status2 && $status2 eq "OK") { + die("ERROR Could not signal job was fetched: $message\n"); + } + + } + #This function doesn't actually need to do anything with the list of jobs, the executor handles that part. + } +} + +sub scan_jobs_dir { + #Scan the dirs for job files. + my @existing_jobs = glob("$conf{job_dir}/queue/*.job"); + my @failed_jobs = glob("$conf{job_dir}/failed/*.job"); + my @done_jobs = glob("$conf{job_dir}/done/*.job"); + + # XXX: this function should just scan the various job dirs, create a Scire::Job object + # for each job found, and return a structure containing the info, so that another + # function can act on the completed jobs + + #Report on those jobs needing reporting. + foreach my $job_file (@failed_jobs) { + $job_file =~ /(\d+)\.job/; + my $jobid = $1; + my ($status, $message) = $comm->send_command("SET_JOB_STATUS", $jobid, "Failed"); + open(FILE, $job_file) or die "Couldn't open job file $job_file: $!"; + my $job_data = join("", <FILE>); + close(FILE); + + } + #may be able to use same code as above. + foreach my $job_file (@done_jobs) { + $job_file =~ /(\d+)\.job/; + my $jobid = $1; + my ($status, $message) = $comm->send_command("SET_JOB_STATUS", $jobid, "Done"); + # XXX: Send job output + } + + return @existing_jobs; +} + +sub debug { + my $msg = shift; + if($conf{debug}) { + print STDERR $msg."\n"; + } +} + +sub get_interface_info { + my $interface = shift; + + my $info = `/sbin/ifconfig ${interface}`; + $info =~ /^.+HWaddr ([a-zA-Z0-9:]+).+inet addr:([0-9.]+).+$/s; + my ($mac, $ip) = ($1, $2); + return ($mac, $ip); +} diff --git a/client/test.pl b/client/test.pl new file mode 100644 index 0000000..e5fdcbb --- /dev/null +++ b/client/test.pl @@ -0,0 +1,26 @@ +#!/usr/bin/perl + +use strict; +use warnings; + +$| = 1; + +use Scire::Job; +use Scire::Communicator; + +my $job = Scire::Job->new(); +$job->load_jobfile("/tmp/scirejobs/queue/39.job"); +#print $job->{script_data} . "\n"; +$job->set_stdout_file("/tmp/scirejobs/result/39_stdout.txt"); +$job->set_stderr_file("/tmp/scirejobs/result/39_stderr.txt"); +$job->set_script_file("/tmp/scirejobs/run/runjob.sh"); +my $exitcode = $job->run(); +print "Job complete with exit code ${exitcode}\n"; + +exit; + +my $comm = Scire::Communicator->new( host => "localhost" ); +$comm->create_connection(); +$comm->close_connection(); +#my ($status, $message) = $comm->send_command("QUIT"); +#print "$status\n"; diff --git a/etc/scire.conf b/etc/scire.conf new file mode 100644 index 0000000..243e5ca --- /dev/null +++ b/etc/scire.conf @@ -0,0 +1,8 @@ +HOST=localhost +PORT=22 +USER=scire +SERVER_SCRIPT=../server/scireserver.pl +JOB_DIR=/tmp/scirejobs #comments are fine. +KEY_FILE=../etc/client.key +interface=eth0 # This is the default +debug=1 diff --git a/etc/scireserver.conf b/etc/scireserver.conf new file mode 100644 index 0000000..33f256a --- /dev/null +++ b/etc/scireserver.conf @@ -0,0 +1,12 @@ +# $Id$ +# Scire Server config file. +debug=1 +job_dir=/tmp/scirejobs +#logfile=/tmp/scireserver.log +#Database stuff. +db_type=mysql +db_host=localhost +db_name=scire +db_user=foo +db_passwd=bar + diff --git a/server/scireserver.pl b/server/scireserver.pl new file mode 100755 index 0000000..1af39ee --- /dev/null +++ b/server/scireserver.pl @@ -0,0 +1,466 @@ +#!/usr/bin/perl + +# $Id$ + +use strict; +use warnings; +use DBI; +use Data::Dumper; +use Digest::MD5 qw(md5 md5_hex ); +use File::Path; +use Schedule::Cron::Events; + + +$| = 1; +$Data::Dumper::Purity = 1; + +my $ETC_DIR = "/etc/scire"; +my $SCIRE_CONFIG_FILE = "${ETC_DIR}/scireserver.conf"; +my %conf; +my $LOGFILE; + +my $conf_file = (defined($conf{config})) ? $conf{config} : $SCIRE_CONFIG_FILE; +read_config_file($conf_file); +Dumper(\%conf); + +my $identified = 0; #Global variable to determine if already identified or not. +my $client_id = 0; #Clobal variable for the client id. +# Somehow this feels insecure. + +sub logger { + my $line = shift; + if(!defined $LOGFILE) { + open(*LOGFILE, ">>$conf{logfile}") or die "Cannot open logfile $conf{logfile}"; + } + print LOGFILE localtime() . " " . $line . "\n"; +} + +sub debug { + my $line = shift; + if ($conf{debug}) { + if (defined($conf{logfile})) { + logger("DEBUG: ${line}"); + } else { + print STDERR "DEBUG: ${line}\n"; + } + } +} + +#Connect to the Database. +my $connect_string = "DBI:$conf{db_type}:$conf{db_name};host=$conf{db_host}"; +debug("Connecting to $connect_string"); +my $dbh = DBI->connect($connect_string, $conf{db_user}, $conf{db_passwd}, { RaiseError => 1 } ) + or die "Could not connect to database: $DBI::errstr"; + +while(<>) { + my ($command, @args) = parse_command($_); +# chomp( my $line = $_); + debug("DEBUG: line is: $_"); +# SEE http://agaffney.org/mediawiki/index.php/SSH-based_protocol for documentation on the protocol. + + if($command eq "QUIT") { + print "OK\n"; + exit; + } + + if($command eq "REGISTER") { + my ($mac,$ip,$hostname) = @args; + register_client($mac, $ip, $hostname); + next; #End switch here. You can go no further. + } + + if($command eq "IDENTIFY") { + my $fingerprint = $args[0]; + identify_client($fingerprint); + next; #End switch here. You can go no further. + } + unless($identified == 1) { + print "ERROR This client has not yet been authorized. Please identify!\n"; + next; + } + + if ($command eq "GET_JOBS") { + my @jobs = get_jobs(); + print "OK " . join(",", @jobs) . "\n"; + } elsif ($command eq "GET_JOB") { + my $job = $args[0]; + my $jobfile = get_job($job); + print "OK ${jobfile}\n"; + } elsif ($command eq "JOB_FETCHED") { + my $job = $args[0]; + job_fetched($job) and print "OK\n"; + } elsif ($command eq "SET_JOB_STATUS") { + my ($jobid,$status) = @args; + set_job_status($jobid,$client_id,$status) and print "OK\n"; + } elsif ($command eq "RETURN_JOBFILE") { + my $jobid = $args[0]; + my @filenames = ("$conf{job_dir}/$client_id/result/$jobid.stdout", "$conf{job_dir}/$client_id/result/$jobid.stderr"); + print "OK " . join(" ", @filenames) . "\n"; + } elsif ($command eq "JOBFILE_SENT") { + my @jobfiles = @args; + process_jobfile($_) foreach(@jobfiles); + print "OK\n" + } else { + print "ERROR The command $command is unknown. Please try again.\n"; + } +} + + + +sub read_config_file { + my $conf_file = shift; + open(FH, "< ${conf_file}") or die("Couldn't open the config file ${conf_file}: $!"); + while (<FH>) { + chomp; + next if /^\s*(?:#|$)/; + if(/^\s*(.+?)\s*=\s*(.+?)\s*(?:#.*)?$/) { + unless(defined($conf{lc($1)})) { #Don't overwrite anything specified in cmdline + $conf{lc($1)} = $2; + } + } + } + close(FH) or die("Couldn't close the config file ${conf_file}: $!"); + debug("Conf file $conf_file read."); +} + +#New clients must be registered so they can be given a key to use (perhaps for job file transfers?) for authentication. This must be allowed before identifying. +sub register_client { + my ($mac,$ip, $hostname) = @_; + #Validate your inputs! + $mac =~ /^[a-zA-Z0-9\:]+$/ or print "ERROR invalid mac $mac!\n"; + $ip =~ /^[a-zA-Z0-9\.\:]+$/ or print "ERROR invalid ip $ip!\n"; + + my ($query, $status_id, $id, $sth); + + #Generate the digest + my $digest = md5_hex(time()."${mac}${ip}${hostname}"); + + eval { + $query = 'SELECT statusid FROM client_status WHERE statusname = "Pending"'; + $sth = run_query($query); + $status_id = $sth->fetchrow_hashref->{'statusid'}; + }; + ($@) and print "ERROR Could not get status id: $DBI::errstr\n"; + + eval { + run_query('LOCK TABLES `gacl_axo_seq` WRITE'); + $query = 'SELECT id FROM `gacl_axo_seq`'; + $sth = run_query($query); + $id = $sth->fetchrow_hashref->{'id'}; + $id += 1; + $query = 'UPDATE `gacl_axo_seq` SET id=?'; + run_query($query,$id); + run_query('UNLOCK TABLES'); + }; + ($@) and print "ERROR during fetching of id sequence: $DBI::errstr\n"; + + eval { + $query = 'INSERT INTO `gacl_axo` (id,section_value,value,order_value,name,hidden) VALUES (?,"clients",?,"1",?,"0")'; + run_query($query,$id,$hostname,$hostname); + #NOTE: not sure if this query is still valid. may be using id instead of hostname for one of those two now. + + $query = 'INSERT INTO clients (clientid,digest,hostname,mac,ip,status) VALUES (?,?,?,?,?,?)'; + run_query($query,$id,$digest,$hostname,$mac,$ip,$status_id); + }; + ($@) and print "ERROR Could not insert client with $query: $DBI::errstr\n"; + #FIXME look for "duplicate key" and if found fail and notify admin. + + print "OK $digest\n"; +} + +#Identify the client by looking up the fingerprint in the database, and matching it up. +sub identify_client { + my $digest = shift; + #Validate your inputs! + $digest =~ s/"//g; #Clear the quotes. + $digest =~ /^[A-Za-z0-9]+$/ or print "ERROR invalid digest!\n"; + + my $query = 'SELECT client_status.statusname, clients.clientid FROM clients JOIN client_status on (clients.status = client_status.statusid) WHERE clients.digest=?'; + my $sth = run_query($query,$digest); + my $hashref = $sth->fetchrow_hashref(); + debug(Dumper($hashref)); + my $status_name = $hashref->{'statusname'}; + $client_id = $hashref->{'clientid'}; + if (defined($client_id) and ($client_id > 0) and ($status_name eq 'Active')) { + $identified = 1; + print "OK\n"; + } else { + print "ERROR Client could not be identified. Status was $status_name\n"; + } + +} + +sub get_jobs { + + #FIXME expand jobs for $client_id + expand_jobs(); + + my $query = <<'EndOfQuery'; + SELECT jobs.jobid + FROM jobs NATURAL JOIN jobs_clients NATURAL JOIN job_conditions + WHERE jobs_clients.clientid = ? + AND jobs.jobid = jobs_clients.jobid + AND (job_conditions.deploy_time < now()) + AND ((job_conditions.expiration_time > now()) OR (job_conditions.expiration_time IS NULL)) + ORDER BY jobs.priority,jobs.created +EndOfQuery + + #FIXME ADD JOB DEPENDENCIES TO THIS QUERY. + my $sth = run_query($query,$client_id); + my $jobs_ref = $sth->fetchall_arrayref(); + # Don't ask me...ask the guys in #perl :P + my @jobs = map { @$_ } @$jobs_ref; + return @jobs; +} + +sub get_job { + my $jobid = shift; + #Validate your inputs! + my $query = 'SELECT * FROM jobs LEFT JOIN job_conditions on (jobs.jobid) WHERE jobs.jobid = ?'; + my $sth = run_query($query, $jobid); + my $job = $sth->fetchrow_hashref(); + my $scriptid = $job->{'script'}; + + $query = 'SELECT * FROM scripts WHERE scriptid=?'; + $sth = run_query($query,$scriptid); + $job->{'script'} = $sth->fetchrow_hashref(); + + debug(Dumper($job)); + #Write the job w/ all data to a jobfile with the following path /JOBDIR/CLIENT_ID/queue/JOBID.job + my $path = "$conf{job_dir}/$client_id/queue"; + my $filename = "$path/$jobid.job"; + unless (-d $path) { + print "WARNING! $path does not exist...creating\n"; + mkpath( $path, {verbose => 1, mode => 0660}) + or die("Couldn't make $path w/ perms 0660: $!"); + } + open(FH, ">$filename") or die("Couldn't open $filename: $!"); + my $jobdata = Dumper($job); + print FH $jobdata . "\n"; + close(FH) or die("Couldn't close $filename : $!"); + debug("OK $filename"); + return $filename; +} + +sub job_fetched { + my $jobid = shift; + set_job_status($jobid,$client_id,'Downloaded', 'Job downloaded by client.') or print "ERROR could not set job status to downloaded.\n"; + + eval { + my $query = 'DELETE FROM jobs_clients WHERE jobid=? AND clientid=?'; + run_query($query,$jobid,$client_id); + }; + ($@) and print "ERROR Could not get status id: $DBI::errstr\n"; + + my $filename = "$conf{job_dir}/$client_id/queue/$jobid.job"; + unlink ($filename) or die("ERROR Could not unlink the jobfile from the queue. filename: $filename : $!"); + return 1; +} + +sub set_job_status { + my ($jobid,$id_of_client,$status,$eventmsg) = @_; + #Validate your inputs! + $jobid =~ /^\d+$/ or die("Invalid jobid $jobid"); + $id_of_client ||= $client_id; + $id_of_client =~ /^\d+$/ or die("Invalid id of client $id_of_client"); + $eventmsg ||= "Server status update."; + #fixme validate status + my $status_id; + eval { + my $query = 'SELECT statusid FROM jobs_status WHERE statusname = ?'; + my $sth = run_query($query,$status); + $status_id = $sth->fetchrow_hashref->{'statusid'}; + }; + ($@) and print "ERROR Could not get status id: $DBI::errstr\n"; + $status_id or print "ERROR Invalid status id $status_id\n"; + + eval { + my $query = 'INSERT INTO job_history (jobid,clientid,statusid,eventmsg) VALUES (?,?,?,?)'; + run_query($query,$jobid,$id_of_client,$status_id,$eventmsg); + }; + ($@) and print "ERROR Could not insert into job_history: $DBI::errstr\n"; + + + #If we're marking the completetion or failure of a job, we have more work to do here. + if ($status eq 'Failed') { + mark_job_as_failed($jobid,$id_of_client); + } elsif ($status eq 'Completed') { + mark_job_as_completed($jobid,$id_of_client); + } + + return 1; +} + +sub parse_command { + my $line = shift; + chomp $line; + my @parts = split / (?!(?:[^" ]|[^"] [^"])+")/, $line; + for(0..$#parts) { + $parts[$_] =~ s/(^"|"$)//g; + $parts[$_] =~ s/\\"/"/g; + } + return @parts; +} + +sub run_query { + my ($query, @params) = @_; + debug("Query is $query"); + my $sth = $dbh->prepare($query); + $sth->execute(@params); + return $sth; +} + + +sub expand_jobs { + #Searches for the group jobs that the client must be into and does the expansion. + my @groups = get_client_groups(); + foreach my $groupid (@groups) { + debug("Groupid is $groupid"); + my @members = get_group_clients($groupid); + eval { + my $query = <<'EndOfQuery2'; +SELECT DISTINCT(jobs_clients.jobid) +FROM jobs_clients LEFT JOIN job_conditions on (jobs_clients.jobid=job_conditions.jobid) +WHERE jobs_clients.groupid = ? +AND (job_conditions.deploy_time < now()) +AND ((job_conditions.expiration_time > now()) OR (job_conditions.expiration_time IS NULL)) +AND ((job_conditions.last_run_date < job_conditions.deploy_time) OR (job_conditions.last_run_date IS NULL)) +EndOfQuery2 + my $sth = run_query($query,$groupid); + run_query('LOCK TABLES `jobs_clients` WRITE, `job_conditions` WRITE, `job_history` WRITE, `jobs_status` WRITE, `jobs` WRITE'); + #FIXME need to lock jobs_clients for READ as well!!! + while( my $jobref = $sth->fetchrow_hashref() ) { + my $jobid = $jobref->{'jobid'}; + foreach my $member (@members) { + $query = 'INSERT INTO jobs_clients (jobid, clientid) VALUES (?,?)'; + my $sth2 = run_query($query,$jobid,$member); + + set_job_status($jobid,$member,'Pending', 'Job expanded.') or print "ERROR could not add expanded jobs to job_history.\n"; + } + $query = 'UPDATE `job_conditions` SET last_run_date = now() WHERE jobid = ?'; + run_query($query,$jobid); + + $query = 'UPDATE `jobs` SET pending=pending+? WHERE jobid = ?'; + run_query($query,$#members,$jobid); #This works because you want one less b/c of removing the group. + + # One last query to remove the row from jobs_clients so someone else doesn't expand it. + $query = 'DELETE FROM `jobs_clients` WHERE groupid=? AND jobid=?'; + run_query($query,$groupid,$jobid); + + } + + run_query('UNLOCK TABLES'); + }; + ($@) and print "ERROR Could not expand jobs: $@ $DBI::errstr\n"; + return undef; + } +} + +sub mark_job_as_failed { + my ($jobid,$id_of_client) = @_; + + #First off, update the pending count and failed count with the result. + eval { + my $query = 'UPDATE jobs SET pending=pending-1, failed=failed+1 WHERE jobid=?'; + run_query($query); + }; + ($@) and print "ERROR Could not update pending count: $@ $DBI::errstr\n"; +} + +sub mark_job_as_completed { + my ($jobid,$id_of_client) = @_; + my ($query,$sth); + debug("Marking $jobid as completed for client $id_of_client"); + #If we succeeded, we need to check this jobid to see if it is a recurring job, and then set the next_run if necessary. + #This requries looking at the pending count for the job as well as the run_schedule. + + #First off, update the pending count now that we've finished. + eval { + $query = 'UPDATE jobs SET pending=pending-1 WHERE jobid=?'; + run_query($query,$jobid); + }; + ($@) and print "ERROR Could not update pending count: $@ $DBI::errstr\n"; + + #Now get the pending count and run_schedule. + eval { + $query = 'SELECT pending,run_schedule,expiration_time,deploy_time FROM jobs WHERE jobid=?'; + my $sth = run_query($query,$jobid); + my $rowref = $sth->fetchrow_hashref(); + my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst)=localtime(time); + my $datetime = sprintf "%4d-%02d-%02d %02d:%02d:%02d\n",$year+1900,$mon+1,$mday,$hour,$min,$sec; + if ($rowref->{'run_schedule'} and ($rowref->{'pending'} == 0) and ( $rowref->{'expiration_time'} > $datetime)) { + #Determine the next run time. + my $cron = new Schedule::Cron::Events( $rowref->{'run_schedule'}, Date => [ ( localtime(time()) )[0..5] ] ); + my ($sec, $min, $hour, $day, $month, $year) = $cron->nextEvent; + printf("Event will start next at %2d:%02d:%02d on %d %s, %d\n", $hour, $min, $sec, $day, $month, ($year+1900)); + + #Get the groups and clients from the recurring_jobs_clients table. + $query = 'SELECT clientid,groupid FROM recurring_jobs_clients WHERE jobid=?'; + my $sth2 = run_query($query,$jobid); + while( my $recrow_ref = $sth2->fetchrow_hashref() ) { + $query = 'INSERT INTO jobs_clients (jobid,clientid,groupid) VALUES (?,?,?)'; + run_query($query,$jobid,$recrow_ref->{'clientid'},$recrow_ref->{'groupid'}); + } + } + + }; + ($@) and print "ERROR Could not get run_schedule and pending count: $@ $DBI::errstr\n"; + +} + +sub process_jobfile { + my $filename = shift; + +} + +######################################################### +# PHPGACL FUNCTIONS +######################################################### + +sub get_client_groups { + my $query; + my @groups; + my $option = 'NO RECURSE'; + # If RECURSE it will get all ancestor groups. defaults to only get direct parents. + + debug("get_object_groups(): Object ID: $client_id, option: $option"); + my $object_type = 'axo'; + my $group_table = 'gacl_axo_groups'; + my $map_table = 'gacl_groups_axo_map'; + + if ($option eq 'RECURSE') { + $query = "SELECT DISTINCT g.id as group_id FROM $map_table gm "; + $query .= "LEFT JOIN $group_table g1 ON g1.id=gm.group_id "; + $query .= "LEFT JOIN $group_table g ON g.lft<=g1.lft AND g.rgt>=g1.rgt"; + } else { + $query = "SELECT gm.group_id FROM $map_table gm "; + } + $query .= " WHERE gm.axo_id=?"; + debug("Query is $query"); + eval { + my $sth = $dbh->prepare($query); + $sth->execute($client_id); + my $groups_ref = $sth->fetchall_arrayref(); + # Don't ask me...ask the guys in #perl :P + @groups = map { @$_ } @$groups_ref; + }; + ($@) and print "ERROR Could not get client groups: $DBI::errstr\n"; + return @groups; +} + +sub get_group_clients { + #This function gets the members of groups. Returns an array containing those clients, empty otherwise. + my $groupid = shift; + my @members; + my $query = 'SELECT axo_id FROM gacl_groups_axo_map WHERE group_id = ?'; + debug("Query is $query"); + eval { + my $sth = $dbh->prepare($query); + $sth->execute($groupid); + my $members_ref = $sth->fetchall_arrayref(); + # Don't ask me...ask the guys in #perl :P + @members = map { @$_ } @$members_ref; + }; + ($@) and print "ERROR Could not get group members: $DBI::errstr\n"; + return @members; +} |