summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/Scire/Communicator.pm89
-rw-r--r--client/Scire/Job.pm116
-rwxr-xr-xclient/scireclient.pl260
-rw-r--r--client/test.pl26
-rw-r--r--etc/scire.conf8
-rw-r--r--etc/scireserver.conf12
-rwxr-xr-xserver/scireserver.pl466
7 files changed, 977 insertions, 0 deletions
diff --git a/client/Scire/Communicator.pm b/client/Scire/Communicator.pm
new file mode 100644
index 0000000..1a6b982
--- /dev/null
+++ b/client/Scire/Communicator.pm
@@ -0,0 +1,89 @@
+package Scire::Communicator;
+
+use IPC::Open2 (open2);
+
+sub new {
+ my $proto = shift;
+ my $class = ref($proto) || $proto;
+ my $self = {
+ port => 22,
+ user => scire,
+ server_script => "/usr/bin/scireserver.pl",
+ SERVER_STDOUT => undef,
+ SERVER_STDIN => undef,
+ @_
+ };
+ bless ($self, $class);
+ $self->build_connection_command();
+ return $self;
+}
+
+sub send_command {
+ my $self = shift;
+ my $cmd = shift;
+ my @args = @_;
+ my $tosend = "${cmd}";
+
+ for my $arg (@args) {
+ if($arg =~ /^[0-9]+$/) {
+ $tosend .= " ${arg}";
+ } else {
+ $arg =~ s/"/\\"/g;
+ $tosend .= " \"${arg}\"";
+ }
+ }
+ $tosend .= "\n";
+
+ my ($tmpin, $tmpout) = ($self->{SERVER_STDIN}, $self->{SERVER_STDOUT});
+ print $tmpin $tosend;
+ #FIXME WE NEED A TIMEOUT HERE OF SOME SORT!!
+ #if the server doesn't give you a newline this just hangs!
+ my $response = <$tmpout>;
+ return $self->parse_response($response);
+}
+
+sub parse_response {
+ my $self = shift;
+ my $response = shift;
+ $response =~ /^(OK|ERROR)(?: (.+?))?\s*$/;
+ my ($status, $message) = ($1, $2);
+ return ($status, $message);
+}
+
+sub create_connection {
+ my $self = shift;
+ # XXX: How do we capture this error? $pid has a valid value even if the
+ # process fails to run, since it just returns the PID of the forked perl
+ # process. I tried adding 'or die' after it, but it didn't help since it
+ # doesn't fail in the main process. When it fails, it outputs an error
+ # to STDERR:
+ # open2: exec of ../server/scireserver.pl failed at ./scireclient.pl line 116
+ $self->{connection_pid} = open2($self->{SERVER_STDOUT}, $self->{SERVER_STDIN}, $self->{connection_command});
+}
+
+sub close_connection {
+ my $self = shift;
+ close $self->{SERVER_STDIN};
+ close $self->{SERVER_STDOUT};
+}
+
+sub build_connection_command {
+ my $self = shift;
+ # This will eventually be something like "ssh scire@${scireserver} /usr/bin/scireserver.pl"
+ my $connection_command = "ssh ";
+ $connection_command .= "-o BatchMode yes ";
+ $connection_command .= "-o SendEnv 'SCIRE_*' ";
+ $connection_command .= "-o ServerAliveInterval 15 -o ServerAliveCountMax 4 ";
+ if(defined($self->{port})) {
+ $connection_command .= "-o Port=$conf{port} ";
+ }
+ $connection_command .= "$self->{user}\@$self->{host} $self->{server_script}";
+
+ if (-d ".svn") {
+ # Overwrite $connection_command in the case of a dev environment for now
+ $connection_command = "../server/scireserver.pl";
+ }
+ $self->{connection_command} = $connection_command;
+}
+
+1;
diff --git a/client/Scire/Job.pm b/client/Scire/Job.pm
new file mode 100644
index 0000000..27a5aa7
--- /dev/null
+++ b/client/Scire/Job.pm
@@ -0,0 +1,116 @@
+package Scire::Job;
+
+use POSIX qw/WEXITSTATUS WIFEXITED waitpid setuid setgid/;
+
+sub new {
+ my $proto = shift;
+ my $class = ref($proto) || $proto;
+ my $filename = shift;
+ my $self = {};
+ bless ($self, $class);
+ if(defined $filename) {
+ $self->load_jobfile($filename);
+ }
+ return $self;
+}
+
+sub load_jobfile {
+ my $self = shift;
+ my $filename = shift;
+ $self->{filename} = $filename;
+ my $jobcontents;
+ my $jobdata;
+ open JOB, "< ${filename}" or die "Can't open file ${filename}";
+ $jobcontents = join("", <JOB>);
+ close JOB;
+ $jobdata = eval($jobcontents);
+ ($@) and print "ERROR: Could not parse job file ${filename}!\n";
+ if(defined $jobdata->{script}) {
+ for(keys %{$jobdata->{script}}) {
+ $self->{$_} = $jobdata->{script}->{$_};
+ }
+ }
+ for(keys %{$jobdata}) {
+ $self->{$_} = $jobdata->{$_} unless($_ eq "script");
+ }
+}
+
+sub set_script_file {
+ my ($self, $scriptfile) = @_;
+ if(defined $scriptfile and $scriptfile) {
+ $self->{script_filename} = $scriptfile;
+ }
+}
+
+sub set_stdout_file {
+ my ($self, $outfile) = @_;
+ if(defined $outfile && $outfile) {
+ $self->{stdout_filename} = $outfile;
+ }
+}
+
+sub set_stderr_file {
+ my ($self, $errfile) = @_;
+ if(defined $errfile && $errfile) {
+ $self->{stderr_filename} = $errfile;
+ }
+}
+
+sub run {
+ my $self = shift;
+
+ # XXX: we might want to check capabilities here instead of UID, but I
+ # have no idea how to do that
+ my ($run_as_uid, $run_as_gid) = (0, 0);
+ if($< == 0) {
+ # XXX: we'll use setuid to drop privileges here
+ my @user = getpwnam($self->{run_as});
+ if(defined @user) {
+ $run_as_uid = $user[2];
+ $run_as_gid = $user[3];
+ } else {
+ return -2;
+ }
+ }
+
+ open SCRIPT, ">", $self->{script_filename};
+ print SCRIPT $self->{script_data};
+ close SCRIPT;
+ if($run_as_uid) {
+ chown $run_as_uid, $run_as_gid, $self->{script_filename};
+ }
+ chmod 0500, $self->{script_filename};
+
+ my $pid = fork();
+ if($pid) {
+ # XXX: eventually, we'll move the waitpid() call to another function
+ # called something like is_running() and use WNOHANG instead of blocking
+ waitpid($pid, 0);
+ my $status = $?;
+# my $exitcode = -1;
+# if(WIFEXITED($status)) {
+ my $exitcode = WEXITSTATUS($status);
+# }
+ return $exitcode;
+ } else {
+ # We redirect STDOUT and STDERR first since the new user may not have
+ # write access to the file locations
+ if(defined $self->{stdout_filename}) {
+ open STDOUT, '>', $self->{stdout_filename};
+ }
+ if(defined $self->{stderr_filename}) {
+ open STDERR, '>', $self->{stderr_filename};
+ }
+ if($run_as_uid) {
+ setuid($run_as_uid);
+ setgid($run_as_gid);
+ }
+ # XXX: exec() to run our command. our STDOUT and STDERR have been
+ # redirected to the files specified, and the exit code is returned
+ # to the main process when we're done executing. This will be changed
+ # to the path of the script we've written to disk once that code is in
+ exec '/bin/sh', '-c', $self->{script_filename};
+ }
+}
+
+1;
diff --git a/client/scireclient.pl b/client/scireclient.pl
new file mode 100755
index 0000000..a9cfe20
--- /dev/null
+++ b/client/scireclient.pl
@@ -0,0 +1,260 @@
+#!/usr/bin/perl
+
+# $Id$
+
+use strict;
+use warnings;
+
+use Scire::Job;
+use Scire::Communicator;
+use Getopt::Long;
+use Data::Dumper;
+use File::Path;
+use Sys::Hostname;
+use POSIX qw/WEXITSTATUS setuid/;
+
+my $ETC_DIR = "/etc/scire";
+my $SCIRE_CONFIG_FILE = "${ETC_DIR}/scire.conf";
+my %conf;
+my $comm;
+
+run_main();
+
+sub run_main {
+ parse_command_line();
+ my $conf_file = (defined($conf{config})) ? $conf{config} : $SCIRE_CONFIG_FILE;
+ read_config_file($conf_file);
+
+ check_job_dir();
+
+ my $exitcode = talk_to_server();
+ if($exitcode != 0) {
+ if($conf{daemon}) {
+ # We'll schedule another pass here later
+ } else {
+ debug("We couldn't communicate with the server...exiting!");
+ exit(1);
+ }
+ }
+
+ my @new_jobs = glob("$conf{job_dir}/queue/*.job");
+ for (@new_jobs) {
+ my $job = Scire::Job->new();
+ $job->load_jobfile($_);
+ $job->set_stdout_file("$conf{job_dir}/queue/$job->{jobid}.out");
+ $job->set_stderr_file("$conf{job_dir}/queue/$job->{jobid}.err");
+ $job->set_script_file("$conf{job_dir}/queue/$job->{jobid}.script");
+ my $exitcode = $job->run();
+ if(!$exitcode) {
+ # Successful job completion
+ system("mv $conf{job_dir}/queue/$job->{jobid}.* $conf{job_dir}/done/");
+ } else {
+ # Job failed
+ system("mv $conf{job_dir}/queue/$job->{jobid}.* $conf{job_dir}/failed/");
+ }
+ }
+
+ talk_to_server();
+}
+
+sub talk_to_server {
+ # This functions forks a new process just for the purpose of dropping privileges.
+ my $pid = fork();
+ if($pid) {
+ debug("Waiting for PID ${pid} to finish");
+ waitpid($pid, 0);
+ my $exitcode = WEXITSTATUS($?);
+ debug("PID ${pid} has finished with status ${exitcode}");
+ return $exitcode;
+ } else {
+ # We'll need to add a call to setuid() here at some point
+
+ #ok folks so here's how this thang goes down.
+ #1. Connect.
+ $comm = Scire::Communicator->new( host => $conf{host}, user => $conf{user}, port => $conf{port} );
+ $comm->create_connection();
+
+ #2. Register with the DB. (only it knows if you're allowed to be active)
+ # If we do not have a defined key file, we assume this is the first run of this client
+ # so we register them instead of trying to identify.
+ if(defined($conf{key_file}) and (-f $conf{key_file})) {
+ if(!identify_client()) {
+ exit(1);
+ }
+ } else {
+ register_client();
+ exit(0);
+ }
+
+ #3. Scan the jobs directory. If there are done/failed jobs, report them. Note jobs in running or queue.
+ my @existing_jobs = scan_jobs_dir();
+ #4. Fetch the jobs list
+ get_jobs();
+ #5. ???
+ #6. Profit!
+
+ $comm->close_connection();
+ exit(0);
+ }
+}
+
+sub parse_command_line {
+ GetOptions(
+ 'debug|d' => \$conf{debug},
+ 'daemon|D' => \$conf{daemon},
+ 'dry-run' => \$conf{dry_run},
+ 'help|h' => \$conf{help},
+ 'config|c=s' => \$conf{config},
+ 'threads|t=i' => \$conf{max_threads},
+
+ #config overrides.
+ 'host=s' => \$conf{host},
+ 'port=i' => \$conf{port},
+ 'user|u=s' => \$conf{user},
+ 'server_script=s' => \$conf{server_script},
+ 'job_dir' => \$conf{job_dir},
+ );
+ if ($conf{help}) {
+ print "\nusage: scireclient.pl [--debug or -d]\n\t [--dry-run]"
+ ."\t [--config=CONF or -c] \n\t [--threads=# or -t] \t [--help or -h] \n"
+ ."\t [[--host=HOST] \t [--port=PORT] \t [--user=USER or -u] \n\t"
+ ." [--server_script=foo.pl] \t [--job_dir=/tmp/jobs] \n";
+ exit 0;
+ }
+
+}
+
+sub check_job_dir {
+ my @checkdirs = ($conf{job_dir}, "$conf{job_dir}/queue", "$conf{job_dir}/done", "$conf{job_dir}/failed", "$conf{job_dir}/run");
+ for my $dir (@checkdirs) {
+ if (! -d $dir) {
+ print "WARNING! ${dir} does not exist...creating\n";
+ mkpath( $dir, {verbose => 1, mode => 0660})
+ or die("Couldn't make ${dir} w/ perms 0660: $!");
+ }
+ }
+}
+
+sub read_config_file {
+ my $conf_file = shift;
+ my %config_defaults = (
+ "key_file" => "${ETC_DIR}/client_key",
+ "debug" => 0,
+ );
+ open(FH, "< ${conf_file}") or die("Couldn't open the config file ${conf_file}: $!");
+ while (<FH>) {
+ chomp;
+ next if /^\s*(?:#|$)/;
+ if(/^\s*(.+?)\s*=\s*(.+?)\s*(?:#.*)?$/) {
+ unless(defined($conf{lc($1)})) { #Don't overwrite anything specified in cmdline
+ $conf{lc($1)} = $2;
+ }
+ }
+ }
+ close(FH) or die("Couldn't close the config file ${conf_file}: $!");
+ for(keys %config_defaults) {
+ if(!defined $conf{$_}) {
+ $conf{$_} = $config_defaults{$_};
+ }
+ }
+}
+
+sub register_client {
+# my $mac = "00:11:22:33:44:55";
+# my $ip = "192.168.2.3";
+ my ($mac, $ip) = get_interface_info(defined $conf{interface} && $conf{interface} ? $conf{interface} : "eth0");
+ my $hostname = hostname();
+ my ($status, $message) = $comm->send_command("REGISTER", $mac, $ip, $hostname);
+ die "Could not register client $mac w/ ip $ip and hostname $hostname. Got: $message" if (! defined $status or $status ne "OK");
+ debug("Client registered. Status is pending. digest is $message");
+ open(FILE, ">$conf{key_file}") or die("Couldn't open key file $conf{key_file} for writing: $!");
+ print FILE "$message\n";
+ close(FILE);
+}
+
+sub identify_client {
+ open(FILE, $conf{key_file}) or die("Couldn't open client_key $conf{key_file}: $!");
+ my $digest = <FILE>;
+ chomp $digest;
+ close(FILE);
+ my ($status, $message) = $comm->send_command("IDENTIFY", $digest);
+ unless (defined $status && $status eq "OK") {
+ print "ERROR Could not identify to server: $message\n";
+ return 0;
+ }
+ debug("Client identified");
+ return 1;
+}
+
+sub get_jobs {
+ my ($status, $jobs) = $comm->send_command("GET_JOBS");
+ unless (defined $status && $status eq "OK") {
+ print "Could not get jobs list from server: $status\n";
+ return 0;
+ }
+ if (defined($jobs) && $jobs) {
+ $jobs =~ s/\s//g; #Remove all whitespace
+ my @jobs_list = split(/,/, $jobs);
+ foreach my $job (@jobs_list) {
+ my ($status, $filename) = $comm->send_command("GET_JOB", $job);
+ #SCP the file to $conf{job_dir}/queue/
+
+ system("cp $filename $conf{job_dir}/queue/") and die("Can't copy file: $!"); #Temporary hack. only works locally.
+ # XXX: Modify this to fetch a file instead
+ debug("Fetched job $job ");
+ my ($status2,$message) = $comm->send_command("JOB_FETCHED", $job);
+ unless (defined $status2 && $status2 eq "OK") {
+ die("ERROR Could not signal job was fetched: $message\n");
+ }
+
+ }
+ #This function doesn't actually need to do anything with the list of jobs, the executor handles that part.
+ }
+}
+
+sub scan_jobs_dir {
+ #Scan the dirs for job files.
+ my @existing_jobs = glob("$conf{job_dir}/queue/*.job");
+ my @failed_jobs = glob("$conf{job_dir}/failed/*.job");
+ my @done_jobs = glob("$conf{job_dir}/done/*.job");
+
+ # XXX: this function should just scan the various job dirs, create a Scire::Job object
+ # for each job found, and return a structure containing the info, so that another
+ # function can act on the completed jobs
+
+ #Report on those jobs needing reporting.
+ foreach my $job_file (@failed_jobs) {
+ $job_file =~ /(\d+)\.job/;
+ my $jobid = $1;
+ my ($status, $message) = $comm->send_command("SET_JOB_STATUS", $jobid, "Failed");
+ open(FILE, $job_file) or die "Couldn't open job file $job_file: $!";
+ my $job_data = join("", <FILE>);
+ close(FILE);
+
+ }
+ #may be able to use same code as above.
+ foreach my $job_file (@done_jobs) {
+ $job_file =~ /(\d+)\.job/;
+ my $jobid = $1;
+ my ($status, $message) = $comm->send_command("SET_JOB_STATUS", $jobid, "Done");
+ # XXX: Send job output
+ }
+
+ return @existing_jobs;
+}
+
+sub debug {
+ my $msg = shift;
+ if($conf{debug}) {
+ print STDERR $msg."\n";
+ }
+}
+
+sub get_interface_info {
+ my $interface = shift;
+
+ my $info = `/sbin/ifconfig ${interface}`;
+ $info =~ /^.+HWaddr ([a-zA-Z0-9:]+).+inet addr:([0-9.]+).+$/s;
+ my ($mac, $ip) = ($1, $2);
+ return ($mac, $ip);
+}
diff --git a/client/test.pl b/client/test.pl
new file mode 100644
index 0000000..e5fdcbb
--- /dev/null
+++ b/client/test.pl
@@ -0,0 +1,26 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+$| = 1;
+
+use Scire::Job;
+use Scire::Communicator;
+
+my $job = Scire::Job->new();
+$job->load_jobfile("/tmp/scirejobs/queue/39.job");
+#print $job->{script_data} . "\n";
+$job->set_stdout_file("/tmp/scirejobs/result/39_stdout.txt");
+$job->set_stderr_file("/tmp/scirejobs/result/39_stderr.txt");
+$job->set_script_file("/tmp/scirejobs/run/runjob.sh");
+my $exitcode = $job->run();
+print "Job complete with exit code ${exitcode}\n";
+
+exit;
+
+my $comm = Scire::Communicator->new( host => "localhost" );
+$comm->create_connection();
+$comm->close_connection();
+#my ($status, $message) = $comm->send_command("QUIT");
+#print "$status\n";
diff --git a/etc/scire.conf b/etc/scire.conf
new file mode 100644
index 0000000..243e5ca
--- /dev/null
+++ b/etc/scire.conf
@@ -0,0 +1,8 @@
+HOST=localhost
+PORT=22
+USER=scire
+SERVER_SCRIPT=../server/scireserver.pl
+JOB_DIR=/tmp/scirejobs #comments are fine.
+KEY_FILE=../etc/client.key
+interface=eth0 # This is the default
+debug=1
diff --git a/etc/scireserver.conf b/etc/scireserver.conf
new file mode 100644
index 0000000..33f256a
--- /dev/null
+++ b/etc/scireserver.conf
@@ -0,0 +1,12 @@
+# $Id$
+# Scire Server config file.
+debug=1
+job_dir=/tmp/scirejobs
+#logfile=/tmp/scireserver.log
+#Database stuff.
+db_type=mysql
+db_host=localhost
+db_name=scire
+db_user=foo
+db_passwd=bar
+
diff --git a/server/scireserver.pl b/server/scireserver.pl
new file mode 100755
index 0000000..1af39ee
--- /dev/null
+++ b/server/scireserver.pl
@@ -0,0 +1,466 @@
+#!/usr/bin/perl
+
+# $Id$
+
+use strict;
+use warnings;
+use DBI;
+use Data::Dumper;
+use Digest::MD5 qw(md5 md5_hex );
+use File::Path;
+use Schedule::Cron::Events;
+
+
+$| = 1;
+$Data::Dumper::Purity = 1;
+
+my $ETC_DIR = "/etc/scire";
+my $SCIRE_CONFIG_FILE = "${ETC_DIR}/scireserver.conf";
+my %conf;
+my $LOGFILE;
+
+my $conf_file = (defined($conf{config})) ? $conf{config} : $SCIRE_CONFIG_FILE;
+read_config_file($conf_file);
+Dumper(\%conf);
+
+my $identified = 0; #Global variable to determine if already identified or not.
+my $client_id = 0; #Clobal variable for the client id.
+# Somehow this feels insecure.
+
+sub logger {
+ my $line = shift;
+ if(!defined $LOGFILE) {
+ open(*LOGFILE, ">>$conf{logfile}") or die "Cannot open logfile $conf{logfile}";
+ }
+ print LOGFILE localtime() . " " . $line . "\n";
+}
+
+sub debug {
+ my $line = shift;
+ if ($conf{debug}) {
+ if (defined($conf{logfile})) {
+ logger("DEBUG: ${line}");
+ } else {
+ print STDERR "DEBUG: ${line}\n";
+ }
+ }
+}
+
+#Connect to the Database.
+my $connect_string = "DBI:$conf{db_type}:$conf{db_name};host=$conf{db_host}";
+debug("Connecting to $connect_string");
+my $dbh = DBI->connect($connect_string, $conf{db_user}, $conf{db_passwd}, { RaiseError => 1 } )
+ or die "Could not connect to database: $DBI::errstr";
+
+while(<>) {
+ my ($command, @args) = parse_command($_);
+# chomp( my $line = $_);
+ debug("DEBUG: line is: $_");
+# SEE http://agaffney.org/mediawiki/index.php/SSH-based_protocol for documentation on the protocol.
+
+ if($command eq "QUIT") {
+ print "OK\n";
+ exit;
+ }
+
+ if($command eq "REGISTER") {
+ my ($mac,$ip,$hostname) = @args;
+ register_client($mac, $ip, $hostname);
+ next; #End switch here. You can go no further.
+ }
+
+ if($command eq "IDENTIFY") {
+ my $fingerprint = $args[0];
+ identify_client($fingerprint);
+ next; #End switch here. You can go no further.
+ }
+ unless($identified == 1) {
+ print "ERROR This client has not yet been authorized. Please identify!\n";
+ next;
+ }
+
+ if ($command eq "GET_JOBS") {
+ my @jobs = get_jobs();
+ print "OK " . join(",", @jobs) . "\n";
+ } elsif ($command eq "GET_JOB") {
+ my $job = $args[0];
+ my $jobfile = get_job($job);
+ print "OK ${jobfile}\n";
+ } elsif ($command eq "JOB_FETCHED") {
+ my $job = $args[0];
+ job_fetched($job) and print "OK\n";
+ } elsif ($command eq "SET_JOB_STATUS") {
+ my ($jobid,$status) = @args;
+ set_job_status($jobid,$client_id,$status) and print "OK\n";
+ } elsif ($command eq "RETURN_JOBFILE") {
+ my $jobid = $args[0];
+ my @filenames = ("$conf{job_dir}/$client_id/result/$jobid.stdout", "$conf{job_dir}/$client_id/result/$jobid.stderr");
+ print "OK " . join(" ", @filenames) . "\n";
+ } elsif ($command eq "JOBFILE_SENT") {
+ my @jobfiles = @args;
+ process_jobfile($_) foreach(@jobfiles);
+ print "OK\n"
+ } else {
+ print "ERROR The command $command is unknown. Please try again.\n";
+ }
+}
+
+
+
+sub read_config_file {
+ my $conf_file = shift;
+ open(FH, "< ${conf_file}") or die("Couldn't open the config file ${conf_file}: $!");
+ while (<FH>) {
+ chomp;
+ next if /^\s*(?:#|$)/;
+ if(/^\s*(.+?)\s*=\s*(.+?)\s*(?:#.*)?$/) {
+ unless(defined($conf{lc($1)})) { #Don't overwrite anything specified in cmdline
+ $conf{lc($1)} = $2;
+ }
+ }
+ }
+ close(FH) or die("Couldn't close the config file ${conf_file}: $!");
+ debug("Conf file $conf_file read.");
+}
+
+#New clients must be registered so they can be given a key to use (perhaps for job file transfers?) for authentication. This must be allowed before identifying.
+sub register_client {
+ my ($mac,$ip, $hostname) = @_;
+ #Validate your inputs!
+ $mac =~ /^[a-zA-Z0-9\:]+$/ or print "ERROR invalid mac $mac!\n";
+ $ip =~ /^[a-zA-Z0-9\.\:]+$/ or print "ERROR invalid ip $ip!\n";
+
+ my ($query, $status_id, $id, $sth);
+
+ #Generate the digest
+ my $digest = md5_hex(time()."${mac}${ip}${hostname}");
+
+ eval {
+ $query = 'SELECT statusid FROM client_status WHERE statusname = "Pending"';
+ $sth = run_query($query);
+ $status_id = $sth->fetchrow_hashref->{'statusid'};
+ };
+ ($@) and print "ERROR Could not get status id: $DBI::errstr\n";
+
+ eval {
+ run_query('LOCK TABLES `gacl_axo_seq` WRITE');
+ $query = 'SELECT id FROM `gacl_axo_seq`';
+ $sth = run_query($query);
+ $id = $sth->fetchrow_hashref->{'id'};
+ $id += 1;
+ $query = 'UPDATE `gacl_axo_seq` SET id=?';
+ run_query($query,$id);
+ run_query('UNLOCK TABLES');
+ };
+ ($@) and print "ERROR during fetching of id sequence: $DBI::errstr\n";
+
+ eval {
+ $query = 'INSERT INTO `gacl_axo` (id,section_value,value,order_value,name,hidden) VALUES (?,"clients",?,"1",?,"0")';
+ run_query($query,$id,$hostname,$hostname);
+ #NOTE: not sure if this query is still valid. may be using id instead of hostname for one of those two now.
+
+ $query = 'INSERT INTO clients (clientid,digest,hostname,mac,ip,status) VALUES (?,?,?,?,?,?)';
+ run_query($query,$id,$digest,$hostname,$mac,$ip,$status_id);
+ };
+ ($@) and print "ERROR Could not insert client with $query: $DBI::errstr\n";
+ #FIXME look for "duplicate key" and if found fail and notify admin.
+
+ print "OK $digest\n";
+}
+
+#Identify the client by looking up the fingerprint in the database, and matching it up.
+sub identify_client {
+ my $digest = shift;
+ #Validate your inputs!
+ $digest =~ s/"//g; #Clear the quotes.
+ $digest =~ /^[A-Za-z0-9]+$/ or print "ERROR invalid digest!\n";
+
+ my $query = 'SELECT client_status.statusname, clients.clientid FROM clients JOIN client_status on (clients.status = client_status.statusid) WHERE clients.digest=?';
+ my $sth = run_query($query,$digest);
+ my $hashref = $sth->fetchrow_hashref();
+ debug(Dumper($hashref));
+ my $status_name = $hashref->{'statusname'};
+ $client_id = $hashref->{'clientid'};
+ if (defined($client_id) and ($client_id > 0) and ($status_name eq 'Active')) {
+ $identified = 1;
+ print "OK\n";
+ } else {
+ print "ERROR Client could not be identified. Status was $status_name\n";
+ }
+
+}
+
+sub get_jobs {
+
+ #FIXME expand jobs for $client_id
+ expand_jobs();
+
+ my $query = <<'EndOfQuery';
+ SELECT jobs.jobid
+ FROM jobs NATURAL JOIN jobs_clients NATURAL JOIN job_conditions
+ WHERE jobs_clients.clientid = ?
+ AND jobs.jobid = jobs_clients.jobid
+ AND (job_conditions.deploy_time < now())
+ AND ((job_conditions.expiration_time > now()) OR (job_conditions.expiration_time IS NULL))
+ ORDER BY jobs.priority,jobs.created
+EndOfQuery
+
+ #FIXME ADD JOB DEPENDENCIES TO THIS QUERY.
+ my $sth = run_query($query,$client_id);
+ my $jobs_ref = $sth->fetchall_arrayref();
+ # Don't ask me...ask the guys in #perl :P
+ my @jobs = map { @$_ } @$jobs_ref;
+ return @jobs;
+}
+
+sub get_job {
+ my $jobid = shift;
+ #Validate your inputs!
+ my $query = 'SELECT * FROM jobs LEFT JOIN job_conditions on (jobs.jobid) WHERE jobs.jobid = ?';
+ my $sth = run_query($query, $jobid);
+ my $job = $sth->fetchrow_hashref();
+ my $scriptid = $job->{'script'};
+
+ $query = 'SELECT * FROM scripts WHERE scriptid=?';
+ $sth = run_query($query,$scriptid);
+ $job->{'script'} = $sth->fetchrow_hashref();
+
+ debug(Dumper($job));
+ #Write the job w/ all data to a jobfile with the following path /JOBDIR/CLIENT_ID/queue/JOBID.job
+ my $path = "$conf{job_dir}/$client_id/queue";
+ my $filename = "$path/$jobid.job";
+ unless (-d $path) {
+ print "WARNING! $path does not exist...creating\n";
+ mkpath( $path, {verbose => 1, mode => 0660})
+ or die("Couldn't make $path w/ perms 0660: $!");
+ }
+ open(FH, ">$filename") or die("Couldn't open $filename: $!");
+ my $jobdata = Dumper($job);
+ print FH $jobdata . "\n";
+ close(FH) or die("Couldn't close $filename : $!");
+ debug("OK $filename");
+ return $filename;
+}
+
+sub job_fetched {
+ my $jobid = shift;
+ set_job_status($jobid,$client_id,'Downloaded', 'Job downloaded by client.') or print "ERROR could not set job status to downloaded.\n";
+
+ eval {
+ my $query = 'DELETE FROM jobs_clients WHERE jobid=? AND clientid=?';
+ run_query($query,$jobid,$client_id);
+ };
+ ($@) and print "ERROR Could not get status id: $DBI::errstr\n";
+
+ my $filename = "$conf{job_dir}/$client_id/queue/$jobid.job";
+ unlink ($filename) or die("ERROR Could not unlink the jobfile from the queue. filename: $filename : $!");
+ return 1;
+}
+
+sub set_job_status {
+ my ($jobid,$id_of_client,$status,$eventmsg) = @_;
+ #Validate your inputs!
+ $jobid =~ /^\d+$/ or die("Invalid jobid $jobid");
+ $id_of_client ||= $client_id;
+ $id_of_client =~ /^\d+$/ or die("Invalid id of client $id_of_client");
+ $eventmsg ||= "Server status update.";
+ #fixme validate status
+ my $status_id;
+ eval {
+ my $query = 'SELECT statusid FROM jobs_status WHERE statusname = ?';
+ my $sth = run_query($query,$status);
+ $status_id = $sth->fetchrow_hashref->{'statusid'};
+ };
+ ($@) and print "ERROR Could not get status id: $DBI::errstr\n";
+ $status_id or print "ERROR Invalid status id $status_id\n";
+
+ eval {
+ my $query = 'INSERT INTO job_history (jobid,clientid,statusid,eventmsg) VALUES (?,?,?,?)';
+ run_query($query,$jobid,$id_of_client,$status_id,$eventmsg);
+ };
+ ($@) and print "ERROR Could not insert into job_history: $DBI::errstr\n";
+
+
+ #If we're marking the completetion or failure of a job, we have more work to do here.
+ if ($status eq 'Failed') {
+ mark_job_as_failed($jobid,$id_of_client);
+ } elsif ($status eq 'Completed') {
+ mark_job_as_completed($jobid,$id_of_client);
+ }
+
+ return 1;
+}
+
+sub parse_command {
+ my $line = shift;
+ chomp $line;
+ my @parts = split / (?!(?:[^" ]|[^"] [^"])+")/, $line;
+ for(0..$#parts) {
+ $parts[$_] =~ s/(^"|"$)//g;
+ $parts[$_] =~ s/\\"/"/g;
+ }
+ return @parts;
+}
+
+sub run_query {
+ my ($query, @params) = @_;
+ debug("Query is $query");
+ my $sth = $dbh->prepare($query);
+ $sth->execute(@params);
+ return $sth;
+}
+
+
+sub expand_jobs {
+ #Searches for the group jobs that the client must be into and does the expansion.
+ my @groups = get_client_groups();
+ foreach my $groupid (@groups) {
+ debug("Groupid is $groupid");
+ my @members = get_group_clients($groupid);
+ eval {
+ my $query = <<'EndOfQuery2';
+SELECT DISTINCT(jobs_clients.jobid)
+FROM jobs_clients LEFT JOIN job_conditions on (jobs_clients.jobid=job_conditions.jobid)
+WHERE jobs_clients.groupid = ?
+AND (job_conditions.deploy_time < now())
+AND ((job_conditions.expiration_time > now()) OR (job_conditions.expiration_time IS NULL))
+AND ((job_conditions.last_run_date < job_conditions.deploy_time) OR (job_conditions.last_run_date IS NULL))
+EndOfQuery2
+ my $sth = run_query($query,$groupid);
+ run_query('LOCK TABLES `jobs_clients` WRITE, `job_conditions` WRITE, `job_history` WRITE, `jobs_status` WRITE, `jobs` WRITE');
+ #FIXME need to lock jobs_clients for READ as well!!!
+ while( my $jobref = $sth->fetchrow_hashref() ) {
+ my $jobid = $jobref->{'jobid'};
+ foreach my $member (@members) {
+ $query = 'INSERT INTO jobs_clients (jobid, clientid) VALUES (?,?)';
+ my $sth2 = run_query($query,$jobid,$member);
+
+ set_job_status($jobid,$member,'Pending', 'Job expanded.') or print "ERROR could not add expanded jobs to job_history.\n";
+ }
+ $query = 'UPDATE `job_conditions` SET last_run_date = now() WHERE jobid = ?';
+ run_query($query,$jobid);
+
+ $query = 'UPDATE `jobs` SET pending=pending+? WHERE jobid = ?';
+ run_query($query,$#members,$jobid); #This works because you want one less b/c of removing the group.
+
+ # One last query to remove the row from jobs_clients so someone else doesn't expand it.
+ $query = 'DELETE FROM `jobs_clients` WHERE groupid=? AND jobid=?';
+ run_query($query,$groupid,$jobid);
+
+ }
+
+ run_query('UNLOCK TABLES');
+ };
+ ($@) and print "ERROR Could not expand jobs: $@ $DBI::errstr\n";
+ return undef;
+ }
+}
+
+sub mark_job_as_failed {
+ my ($jobid,$id_of_client) = @_;
+
+ #First off, update the pending count and failed count with the result.
+ eval {
+ my $query = 'UPDATE jobs SET pending=pending-1, failed=failed+1 WHERE jobid=?';
+ run_query($query);
+ };
+ ($@) and print "ERROR Could not update pending count: $@ $DBI::errstr\n";
+}
+
+sub mark_job_as_completed {
+ my ($jobid,$id_of_client) = @_;
+ my ($query,$sth);
+ debug("Marking $jobid as completed for client $id_of_client");
+ #If we succeeded, we need to check this jobid to see if it is a recurring job, and then set the next_run if necessary.
+ #This requries looking at the pending count for the job as well as the run_schedule.
+
+ #First off, update the pending count now that we've finished.
+ eval {
+ $query = 'UPDATE jobs SET pending=pending-1 WHERE jobid=?';
+ run_query($query,$jobid);
+ };
+ ($@) and print "ERROR Could not update pending count: $@ $DBI::errstr\n";
+
+ #Now get the pending count and run_schedule.
+ eval {
+ $query = 'SELECT pending,run_schedule,expiration_time,deploy_time FROM jobs WHERE jobid=?';
+ my $sth = run_query($query,$jobid);
+ my $rowref = $sth->fetchrow_hashref();
+ my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst)=localtime(time);
+ my $datetime = sprintf "%4d-%02d-%02d %02d:%02d:%02d\n",$year+1900,$mon+1,$mday,$hour,$min,$sec;
+ if ($rowref->{'run_schedule'} and ($rowref->{'pending'} == 0) and ( $rowref->{'expiration_time'} > $datetime)) {
+ #Determine the next run time.
+ my $cron = new Schedule::Cron::Events( $rowref->{'run_schedule'}, Date => [ ( localtime(time()) )[0..5] ] );
+ my ($sec, $min, $hour, $day, $month, $year) = $cron->nextEvent;
+ printf("Event will start next at %2d:%02d:%02d on %d %s, %d\n", $hour, $min, $sec, $day, $month, ($year+1900));
+
+ #Get the groups and clients from the recurring_jobs_clients table.
+ $query = 'SELECT clientid,groupid FROM recurring_jobs_clients WHERE jobid=?';
+ my $sth2 = run_query($query,$jobid);
+ while( my $recrow_ref = $sth2->fetchrow_hashref() ) {
+ $query = 'INSERT INTO jobs_clients (jobid,clientid,groupid) VALUES (?,?,?)';
+ run_query($query,$jobid,$recrow_ref->{'clientid'},$recrow_ref->{'groupid'});
+ }
+ }
+
+ };
+ ($@) and print "ERROR Could not get run_schedule and pending count: $@ $DBI::errstr\n";
+
+}
+
+sub process_jobfile {
+ my $filename = shift;
+
+}
+
+#########################################################
+# PHPGACL FUNCTIONS
+#########################################################
+
+sub get_client_groups {
+ my $query;
+ my @groups;
+ my $option = 'NO RECURSE';
+ # If RECURSE it will get all ancestor groups. defaults to only get direct parents.
+
+ debug("get_object_groups(): Object ID: $client_id, option: $option");
+ my $object_type = 'axo';
+ my $group_table = 'gacl_axo_groups';
+ my $map_table = 'gacl_groups_axo_map';
+
+ if ($option eq 'RECURSE') {
+ $query = "SELECT DISTINCT g.id as group_id FROM $map_table gm ";
+ $query .= "LEFT JOIN $group_table g1 ON g1.id=gm.group_id ";
+ $query .= "LEFT JOIN $group_table g ON g.lft<=g1.lft AND g.rgt>=g1.rgt";
+ } else {
+ $query = "SELECT gm.group_id FROM $map_table gm ";
+ }
+ $query .= " WHERE gm.axo_id=?";
+ debug("Query is $query");
+ eval {
+ my $sth = $dbh->prepare($query);
+ $sth->execute($client_id);
+ my $groups_ref = $sth->fetchall_arrayref();
+ # Don't ask me...ask the guys in #perl :P
+ @groups = map { @$_ } @$groups_ref;
+ };
+ ($@) and print "ERROR Could not get client groups: $DBI::errstr\n";
+ return @groups;
+}
+
+sub get_group_clients {
+ #This function gets the members of groups. Returns an array containing those clients, empty otherwise.
+ my $groupid = shift;
+ my @members;
+ my $query = 'SELECT axo_id FROM gacl_groups_axo_map WHERE group_id = ?';
+ debug("Query is $query");
+ eval {
+ my $sth = $dbh->prepare($query);
+ $sth->execute($groupid);
+ my $members_ref = $sth->fetchall_arrayref();
+ # Don't ask me...ask the guys in #perl :P
+ @members = map { @$_ } @$members_ref;
+ };
+ ($@) and print "ERROR Could not get group members: $DBI::errstr\n";
+ return @members;
+}