summaryrefslogtreecommitdiff
path: root/services/scheduler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'services/scheduler.cpp')
-rw-r--r--services/scheduler.cpp2149
1 files changed, 2149 insertions, 0 deletions
diff --git a/services/scheduler.cpp b/services/scheduler.cpp
new file mode 100644
index 0000000..f4bf792
--- /dev/null
+++ b/services/scheduler.cpp
@@ -0,0 +1,2149 @@
+/* -*- mode: C++; c-file-style: "gnu"; fill-column: 78 -*- */
+/* vim:cinoptions={.5s,g0,p5,t0,(0,^-0.5s,n-0.5s:tw=78:cindent:sw=4: */
+/*
+ This file is part of Icecream.
+
+ Copyright (c) 2004 Michael Matz <matz@suse.de>
+ 2004 Stephan Kulow <coolo@suse.de>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#ifndef _GNU_SOURCE
+// getopt_long
+#define _GNU_SOURCE 1
+#endif
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/select.h>
+#include <sys/signal.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <time.h>
+#include <getopt.h>
+#include <string>
+#include <list>
+#include <map>
+#include <queue>
+#include <algorithm>
+#include <cassert>
+#include <fstream>
+#include <string>
+#include <stdio.h>
+#include "comm.h"
+#include "logging.h"
+#include "job.h"
+#include "config.h"
+#include "bench.h"
+
+#define DEBUG_SCHEDULER 0
+
+/* TODO:
+ * leak check
+ * are all filedescs closed when done?
+ * simplify livetime of the various structures (Jobs/Channels/CSs know
+ of each other and sometimes take over ownership)
+ */
+
+/* TODO:
+ - iron out differences in code size between architectures
+ + ia64/i686: 1.63
+ + x86_64/i686: 1.48
+ + ppc/i686: 1.22
+ + ppc64/i686: 1.59
+ (missing data for others atm)
+*/
+
+/* The typical flow of messages for a remote job should be like this:
+ prereq: daemon is connected to scheduler
+ * client does GET_CS
+ * request gets queued
+ * request gets handled
+ * scheduler sends USE_CS
+ * client asks remote daemon
+ * daemon sends JOB_BEGIN
+ * client sends END + closes connection
+ * daemon sends JOB_DONE (this can be swapped with the above one)
+ This means, that iff the client somehow closes the connection we can and
+ must remove all traces of jobs resulting from that client in all lists.
+ */
+
+using namespace std;
+
+static string pidFilePath;
+
+struct JobStat {
+ unsigned long osize; // output size (uncompressed)
+ unsigned long compile_time_real; // in milliseconds
+ unsigned long compile_time_user;
+ unsigned long compile_time_sys;
+ unsigned int job_id;
+ JobStat() : osize(0), compile_time_real(0), compile_time_user(0),
+ compile_time_sys(0), job_id(0) {}
+ JobStat& operator +=(const JobStat &st)
+ {
+ osize += st.osize;
+ compile_time_real += st.compile_time_real;
+ compile_time_user += st.compile_time_user;
+ compile_time_sys += st.compile_time_sys;
+ job_id = 0;
+ return *this;
+ }
+ JobStat& operator -=(const JobStat &st)
+ {
+ osize -= st.osize;
+ compile_time_real -= st.compile_time_real;
+ compile_time_user -= st.compile_time_user;
+ compile_time_sys -= st.compile_time_sys;
+ job_id = 0;
+ return *this;
+ }
+private:
+ JobStat& operator /=(int d)
+ {
+ osize /= d;
+ compile_time_real /= d;
+ compile_time_user /= d;
+ compile_time_sys /= d;
+ job_id = 0;
+ return *this;
+ }
+public:
+ JobStat operator /(int d) const
+ {
+ JobStat r = *this;
+ r /= d;
+ return r;
+ }
+
+};
+
+class Job;
+
+/* One compile server (receiver, compile daemon) */
+class CS : public MsgChannel
+{
+public:
+ /* The listener port, on which it takes compile requests. */
+ unsigned int remote_port;
+ unsigned int hostid;
+ string nodename;
+ time_t busy_installing;
+ string host_platform;
+
+ // unsigned int jobs_done;
+ // unsigned long long rcvd_kb, sent_kb;
+ // unsigned int ms_per_job;
+ // unsigned int bytes_per_ms;
+ // LOAD is load * 1000
+ unsigned int load;
+ int max_jobs;
+ bool noremote;
+ list<Job*> joblist;
+ Environments compiler_versions; // Available compilers
+ CS (int fd, struct sockaddr *_addr, socklen_t _len, bool text_based)
+ : MsgChannel(fd, _addr, _len, text_based),
+ load(1000), max_jobs(0), noremote(false),
+ state(CONNECTED), type(UNKNOWN), chroot_possible(false)
+ {
+ hostid = 0;
+ busy_installing = 0;
+ }
+ void pick_new_id()
+ {
+ assert( !hostid );
+ hostid = ++hostid_counter;
+ }
+ list<JobStat> last_compiled_jobs;
+ list<JobStat> last_requested_jobs;
+ JobStat cum_compiled; // cumulated
+ JobStat cum_requested;
+ enum {CONNECTED, LOGGEDIN} state;
+ enum {UNKNOWN, DAEMON, MONITOR, LINE} type;
+ bool chroot_possible;
+ static unsigned int hostid_counter;
+ map<int, int> client_map; // map client ID for daemon to our IDs
+ bool is_eligible( const Job *job );
+ bool check_remote( const Job *job ) const;
+};
+
+unsigned int CS::hostid_counter = 0;
+
+static map<int, CS *> fd2cs;
+static bool exit_main_loop = false;
+
+time_t starttime;
+
+class Job
+{
+public:
+ unsigned int id;
+ unsigned int local_client_id;
+ enum {PENDING, WAITINGFORCS, COMPILING, WAITINGFORDONE} state;
+ CS *server; // on which server we build
+ CS *submitter; // who submitted us
+ Environments environments;
+ time_t starttime; // _local_ to the compiler server
+ time_t start_on_scheduler; // starttime local to scheduler
+ /**
+ * the end signal from client and daemon is a bit of a race and
+ * in 99.9% of all cases it's catched correctly. But for the remaining
+ * 0.1% we need a solution too - otherwise these jobs are eating up slots.
+ * So the solution is to track done jobs (client exited, daemon didn't signal)
+ * and after 10s no signal, kill the daemon (and let it rehup) **/
+ time_t done_time;
+
+ string target_platform;
+ string filename;
+ list<Job*> master_job_for;
+ unsigned int arg_flags;
+ string language; // for debugging
+ string preferred_host; // for debugging daemons
+ Job (unsigned int _id, CS *subm)
+ : id(_id), local_client_id( 0 ), state(PENDING), server(0),
+ submitter(subm),
+ starttime(0), start_on_scheduler(0), done_time( 0 ), arg_flags( 0 ) {}
+ ~Job()
+ {
+ // XXX is this really deleted on all other paths?
+/* fd2chan.erase (channel->fd);
+ delete channel;*/
+ }
+};
+
+// A subset of connected_hosts representing the compiler servers
+static list<CS*> css, monitors, controls;
+static list<string> block_css;
+static unsigned int new_job_id;
+static map<unsigned int, Job*> jobs;
+
+/* XXX Uah. Don't use a queue for the job requests. It's a hell
+ to delete anything out of them (for clean up). */
+struct UnansweredList
+{
+ list<Job*> l;
+ CS *server;
+ bool remove_job (Job *);
+};
+static list<UnansweredList*> toanswer;
+
+static list<JobStat> all_job_stats;
+static JobStat cum_job_stats;
+
+static float server_speed (CS *cs, Job *job = 0);
+
+/* Searches the queue for JOB and removes it.
+ Returns true if something was deleted. */
+bool
+UnansweredList::remove_job (Job *job)
+{
+ list<Job*>::iterator it;
+ for (it = l.begin(); it != l.end(); ++it)
+ if (*it == job)
+ {
+ l.erase (it);
+ return true;
+ }
+ return false;
+}
+
+static void
+add_job_stats (Job *job, JobDoneMsg *msg)
+{
+ JobStat st;
+ /* We don't want to base our timings on failed or too small jobs. */
+ if (msg->out_uncompressed < 4096
+ || msg->exitcode != 0)
+ return;
+
+ st.osize = msg->out_uncompressed;
+ st.compile_time_real = msg->real_msec;
+ st.compile_time_user = msg->user_msec;
+ st.compile_time_sys = msg->sys_msec;
+ st.job_id = job->id;
+
+ if ( job->arg_flags & CompileJob::Flag_g )
+ st.osize = st.osize * 10 / 36; // average over 1900 jobs: faktor 3.6 in osize
+ else if ( job->arg_flags & CompileJob::Flag_g3 )
+ st.osize = st.osize * 10 / 45; // average over way less jobs: factor 1.25 over -g
+
+ // the difference between the -O flags isn't as big as the one between -O0 and -O>=1
+ // the numbers are actually for gcc 3.3 - but they are _very_ rough heurstics anyway)
+ if ( job->arg_flags & CompileJob::Flag_O ||
+ job->arg_flags & CompileJob::Flag_O2 ||
+ job->arg_flags & CompileJob::Flag_Ol2)
+ st.osize = st.osize * 58 / 35;
+
+ if ( job->server->last_compiled_jobs.size() >= 7)
+ {
+ /* Smooth out spikes by not allowing one job to add more than
+ 20% of the current speed. */
+ float this_speed = (float) st.osize / (float) st.compile_time_user;
+ /* The current speed of the server, but without adjusting to the current
+ job, hence no second argument. */
+ float cur_speed = server_speed (job->server);
+
+ if ((this_speed / 1.2) > cur_speed)
+ st.osize = (long unsigned) (cur_speed * 1.2 * st.compile_time_user);
+ else if ((this_speed * 1.2) < cur_speed)
+ st.osize = (long unsigned) (cur_speed / 1.2 * st.compile_time_user);
+ }
+
+ job->server->last_compiled_jobs.push_back (st);
+ job->server->cum_compiled += st;
+ if (job->server->last_compiled_jobs.size() > 200)
+ {
+ job->server->cum_compiled -= *job->server->last_compiled_jobs.begin ();
+ job->server->last_compiled_jobs.pop_front ();
+ }
+ job->submitter->last_requested_jobs.push_back (st);
+ job->submitter->cum_requested += st;
+ if (job->submitter->last_requested_jobs.size() > 200)
+ {
+ job->submitter->cum_requested
+ -= *job->submitter->last_requested_jobs.begin ();
+ job->submitter->last_requested_jobs.pop_front ();
+ }
+ all_job_stats.push_back (st);
+ cum_job_stats += st;
+ if (all_job_stats.size () > 2000)
+ {
+ cum_job_stats -= *all_job_stats.begin ();
+ all_job_stats.pop_front ();
+ }
+
+#if DEBUG_SCHEDULER > 1
+ if ( job->arg_flags < 7000 )
+ trace() << "add_job_stats " << job->language << " "
+ << ( time( 0 ) - starttime ) << " "
+ << st.compile_time_user << " "
+ << ( job->arg_flags & CompileJob::Flag_g ? '1' : '0')
+ << ( job->arg_flags & CompileJob::Flag_g3 ? '1' : '0')
+ << ( job->arg_flags & CompileJob::Flag_O ? '1' : '0')
+ << ( job->arg_flags & CompileJob::Flag_O2 ? '1' : '0')
+ << ( job->arg_flags & CompileJob::Flag_Ol2 ? '1' : '0')
+ << " " << st.osize << " " << msg->out_uncompressed << " "
+ << job->server->nodename << " "
+ << float(msg->out_uncompressed) / st.compile_time_user << " "
+ << server_speed( job->server ) << endl ;
+#endif
+
+}
+
+static bool handle_end (CS *c, Msg *);
+
+static void
+notify_monitors (Msg* m)
+{
+ list<CS*>::iterator it, it_old;
+ for (it = monitors.begin(); it != monitors.end();)
+ {
+ it_old = it++;
+ /* If we can't send it, don't be clever, simply close this monitor. */
+ if (!(*it_old)->send_msg (*m, MsgChannel::SendNonBlocking /*| MsgChannel::SendBulkOnly*/)) {
+ trace() << "monitor is blocking... removing" << endl;
+ handle_end (*it_old, 0);
+ }
+ }
+ delete m;
+}
+
+static float
+server_speed (CS *cs, Job *job)
+{
+ if (cs->last_compiled_jobs.size() == 0
+ || cs->cum_compiled.compile_time_user == 0)
+ return 0;
+ else
+ {
+ float f = (float)cs->cum_compiled.osize
+ / (float) cs->cum_compiled.compile_time_user;
+
+ // we only care for the load if we're about to add a job to it
+ if (job) {
+ /* The submitter of a job gets more speed. So if he is equally
+ fast to the rest of the farm it will be prefered to chose him
+ to compile the job. Then this can be done locally without
+ needing the preprocessor. */
+ if (job->submitter == cs)
+ f *= 1.1;
+ else // ignoring load for submitter - assuming the load is our own
+ f *= float(1000 - cs->load) / 1000;
+ }
+
+ // below we add a pessimism factor - assuming the first job a computer got is not representative
+ if ( cs->last_compiled_jobs.size() < 7 )
+ f *= ( -0.5 * cs->last_compiled_jobs.size() + 4.5 );
+
+ return f;
+ }
+}
+
+static void
+handle_monitor_stats( CS *cs, StatsMsg *m = 0)
+{
+ if ( monitors.empty() )
+ return;
+
+ string msg;
+ char buffer[1000];
+ sprintf( buffer, "Name:%s\n", cs->nodename.c_str() );
+ msg += buffer;
+ sprintf( buffer, "IP:%s\n", cs->name.c_str() );
+ msg += buffer;
+ sprintf( buffer, "MaxJobs:%d\n", cs->max_jobs );
+ msg += buffer;
+ sprintf( buffer, "NoRemote:%s\n", cs->noremote ? "true" : "false" );
+ msg += buffer;
+ sprintf( buffer, "Platform:%s\n", cs->host_platform.c_str() );
+ msg += buffer;
+ sprintf( buffer, "Speed:%f\n", server_speed( cs ) );
+ msg += buffer;
+ if ( m )
+ {
+ sprintf( buffer, "Load:%d\n", m->load );
+ msg += buffer;
+ sprintf( buffer, "LoadAvg1:%d\n", m->loadAvg1 );
+ msg += buffer;
+ sprintf( buffer, "LoadAvg5:%d\n", m->loadAvg5 );
+ msg += buffer;
+ sprintf( buffer, "LoadAvg10:%d\n", m->loadAvg10 );
+ msg += buffer;
+ sprintf( buffer, "FreeMem:%d\n", m->freeMem );
+ msg += buffer;
+ }
+ else
+ {
+ sprintf( buffer, "Load:%d\n", cs->load );
+ msg += buffer;
+ }
+ notify_monitors( new MonStatsMsg( cs->hostid, msg ) );
+}
+
+static Job *
+create_new_job (CS *submitter)
+{
+ ++new_job_id;
+ assert (jobs.find(new_job_id) == jobs.end());
+
+ Job *job = new Job (new_job_id, submitter);
+ jobs[new_job_id] = job;
+ return job;
+}
+
+static void
+enqueue_job_request (Job *job)
+{
+ if (!toanswer.empty() && toanswer.back()->server == job->submitter)
+ {
+ toanswer.back()->l.push_back (job);
+ }
+ else
+ {
+ UnansweredList *newone = new UnansweredList();
+ newone->server = job->submitter;
+ newone->l.push_back (job);
+ toanswer.push_back (newone);
+ }
+}
+
+static Job *
+get_job_request (void)
+{
+ if (toanswer.empty())
+ return 0;
+
+ UnansweredList *first = toanswer.front();
+ assert (!first->l.empty());
+ return first->l.front();
+}
+
+/* Removes the first job request (the one returned by get_job_request()) */
+static void
+remove_job_request (void)
+{
+ if (toanswer.empty())
+ return;
+ UnansweredList *first = toanswer.front();
+ toanswer.pop_front();
+ first->l.pop_front();
+ if (first->l.empty())
+ {
+ delete first;
+ }
+ else
+ {
+ toanswer.push_back( first );
+ }
+}
+
+static string dump_job (Job *job);
+
+static bool
+handle_cs_request (MsgChannel *c, Msg *_m)
+{
+ GetCSMsg *m = dynamic_cast<GetCSMsg *>(_m);
+ if (!m)
+ return false;
+
+ CS *submitter = static_cast<CS*>( c );
+
+ Job *master_job = 0;
+
+ for ( unsigned int i = 0; i < m->count; ++i )
+ {
+ Job *job = create_new_job (submitter);
+ job->environments = m->versions;
+ job->target_platform = m->target;
+ job->arg_flags = m->arg_flags;
+ job->language = ( m->lang == CompileJob::Lang_C ? "C" : "C++" );
+ job->filename = m->filename;
+ job->local_client_id = m->client_id;
+ job->preferred_host = m->preferred_host;
+ enqueue_job_request (job);
+ std::ostream& dbg = log_info();
+ dbg << "NEW " << job->id << " client="
+ << submitter->nodename << " versions=[";
+ for ( Environments::const_iterator it = job->environments.begin();
+ it != job->environments.end();)
+ {
+ dbg << it->second << "(" << it->first << ")";
+ if (++it != job->environments.end())
+ dbg << ", ";
+ }
+ dbg << "] " << m->filename << " " << job->language << endl;
+ notify_monitors (new MonGetCSMsg (job->id, submitter->hostid, m));
+ if ( !master_job )
+ {
+ master_job = job;
+ }
+ else
+ {
+ master_job->master_job_for.push_back( job );
+ }
+ }
+ return true;
+
+}
+
+static bool
+handle_local_job (CS *c, Msg *_m)
+{
+ JobLocalBeginMsg *m = dynamic_cast<JobLocalBeginMsg *>(_m);
+ if (!m)
+ return false;
+
+ ++new_job_id;
+ trace() << "handle_local_job " << m->outfile << " " << m->id << endl;
+ c->client_map[m->id] = new_job_id;
+ notify_monitors (new MonLocalJobBeginMsg( new_job_id, m->outfile, m->stime, c->hostid ) );
+ return true;
+}
+
+static bool
+handle_local_job_done (CS *c, Msg *_m)
+{
+ JobLocalDoneMsg *m = dynamic_cast<JobLocalDoneMsg *>(_m);
+ if (!m)
+ return false;
+
+ trace() << "handle_local_job_done " << m->job_id << endl;
+ notify_monitors (new JobLocalDoneMsg( c->client_map[m->job_id] ) );
+ c->client_map.erase( m->job_id );
+ return true;
+}
+
+static bool
+platforms_compatible( const string &target, const string &platform )
+{
+ if ( target == platform )
+ return true;
+ // the below doesn't work as the unmapped platform is transfered back to the
+ // client and that asks the daemon for a platform he can't install (see TODO)
+
+ static multimap<string, string> platform_map;
+
+ if (platform_map.empty())
+ {
+ platform_map.insert( make_pair( string( "i386" ), string( "i486" ) ) );
+ platform_map.insert( make_pair( string( "i386" ), string( "i586" ) ) );
+ platform_map.insert( make_pair( string( "i386" ), string( "i686" ) ) );
+ platform_map.insert( make_pair( string( "i386" ), string( "x86_64" ) ) );
+
+ platform_map.insert( make_pair( string( "i486" ), string( "i586" ) ) );
+ platform_map.insert( make_pair( string( "i486" ), string( "i686" ) ) );
+ platform_map.insert( make_pair( string( "i486" ), string( "x86_64" ) ) );
+
+ platform_map.insert( make_pair( string( "i586" ), string( "i686" ) ) );
+ platform_map.insert( make_pair( string( "i586" ), string( "x86_64" ) ) );
+
+ platform_map.insert( make_pair( string( "i686" ), string( "x86_64" ) ) );
+
+ platform_map.insert( make_pair( string( "ppc" ), string( "ppc64" ) ) );
+ platform_map.insert( make_pair( string( "s390" ), string( "s390x" ) ) );
+ }
+
+ multimap<string, string>::const_iterator end = platform_map.upper_bound( target );
+ for ( multimap<string, string>::const_iterator it = platform_map.lower_bound( target );
+ it != end;
+ ++it )
+ {
+ if ( it->second == platform )
+ return true;
+ }
+
+ return false;
+}
+
+/* Given a candidate CS and a JOB, check all installed environments
+ on the CS for a match. Return an empty string if none of the required
+ environments for this job is installed. Otherwise return the
+ host platform of the first found installed environment which is among
+ the requested. That can be send to the client, which then completely
+ specifies which environment to use (name, host platform and target
+ platform). */
+static string
+envs_match( CS* cs, const Job *job )
+{
+ if ( job->submitter == cs)
+ return cs->host_platform; // it will compile itself
+
+ /* Check all installed envs on the candidate CS ... */
+ for ( Environments::const_iterator it = cs->compiler_versions.begin(); it != cs->compiler_versions.end(); ++it )
+ {
+ if ( it->first == job->target_platform )
+ {
+ /* ... IT now is an installed environment which produces code for
+ the requested target platform. Now look at each env which
+ could be installed from the client (i.e. those coming with the
+ job) if it matches in name and additionally could be run
+ by the candidate CS. */
+ for ( Environments::const_iterator it2 = job->environments.begin(); it2 != job->environments.end(); ++it2 )
+ {
+ if ( it->second == it2->second && platforms_compatible( it2->first, cs->host_platform ) )
+ return it2->first;
+ }
+ }
+ }
+ return string();
+}
+
+/* Given a candidate CS and a JOB, check if any of the requested
+ environments could be installed on the CS. This is the case if that
+ env can be run there, i.e. if the host platforms of the CS and of the
+ environment are compatible. Return an empty string if none can be
+ installed, otherwise return the platform of the first found
+ environments which can be installed. */
+static string
+can_install( CS* cs, const Job *job )
+{
+ // trace() << "can_install host: '" << cs->host_platform << "' target: '" << job->target_platform << "'" << endl;
+ if ( cs->busy_installing )
+ {
+#if DEBUG_SCHEDULER > 0
+ trace() << cs->nodename << " is busy installing since " << time(0) - cs->busy_installing << " seconds." << endl;
+#endif
+ return string();
+ }
+
+ for ( Environments::const_iterator it = job->environments.begin(); it != job->environments.end(); ++it )
+ {
+ if ( platforms_compatible( it->first, cs->host_platform ) )
+ return it->first;
+ }
+ return string();
+}
+
+bool CS::check_remote( const Job *job ) const
+{
+ bool local = (job->submitter == this);
+ return local || !noremote;
+}
+
+bool CS::is_eligible( const Job *job )
+{
+ bool jobs_okay = int( joblist.size() ) < max_jobs;
+ bool load_okay = load < 1000;
+ return jobs_okay
+ && chroot_possible
+ && load_okay
+ && can_install( this, job ).size()
+ && this->check_remote( job );
+}
+
+static CS *
+pick_server(Job *job)
+{
+ list<CS*>::iterator it;
+
+#if DEBUG_SCHEDULER > 1
+ trace() << "pick_server " << job->id << " " << job->target_platform << endl;
+#endif
+
+#if DEBUG_SCHEDULER > 0
+ /* consistency checking for now */
+ for (list<CS*>::iterator it = css.begin(); it != css.end(); ++it)
+ {
+ CS* cs= *it;
+ for ( list<Job*>::const_iterator it2 = cs->joblist.begin(); it2 != cs->joblist.end(); ++it2 )
+ {
+ Job *job = *it2;
+ assert( jobs.find( job->id ) != jobs.end() );
+ }
+ }
+ for (map<unsigned int, Job*>::const_iterator it = jobs.begin();
+ it != jobs.end(); ++it)
+ {
+ Job *j = it->second;
+
+ CS *cs = j->server;
+ assert( j->state != j->COMPILING ||
+ find( cs->joblist.begin(),
+ cs->joblist.end(), j ) != cs->joblist.end() );
+ }
+#endif
+
+ /* if the user wants to test/prefer one specific daemon, we look for that one first */
+ if (!job->preferred_host.empty())
+ {
+ for (it = css.begin(); it != css.end(); ++it)
+ {
+ if ((*it)->nodename == job->preferred_host
+ && (*it)->is_eligible( job ))
+ return *it;
+ }
+ return 0;
+ }
+
+ /* If we have no statistics simply use the first server which is usable. */
+ if (!all_job_stats.size ())
+ {
+ for (it = css.begin(); it != css.end(); ++it)
+ {
+ trace() << "no job stats - looking at " << ( *it )->nodename << " load: " << (*it )->load << " can install: " << can_install( *it, job ) << endl;
+ if ((*it)->is_eligible( job ))
+ {
+ trace() << "returning first " << ( *it )->nodename << endl;
+ return *it;
+ }
+ }
+ return 0;
+ }
+
+ /* Now guess about the job. First see, if this submitter already
+ had other jobs. Use them as base. */
+ JobStat guess;
+ if (job->submitter->last_requested_jobs.size() > 0)
+ {
+ guess = job->submitter->cum_requested
+ / job->submitter->last_requested_jobs.size();
+ }
+ else
+ {
+ /* Otherwise simply average over all jobs. */
+ guess = cum_job_stats / all_job_stats.size();
+ }
+ CS *best = 0;
+ // best uninstalled
+ CS *bestui = 0;
+ // best preloadable host
+ CS *bestpre = 0;
+
+ uint matches = 0;
+
+ for (it = css.begin(); it != css.end(); ++it)
+ {
+ CS *cs = *it;
+ /* For now ignore overloaded servers. */
+ /* Pre-loadable (cs->joblist.size()) == (cs->max_jobs) is checked later. */
+ if (int( cs->joblist.size() ) > cs->max_jobs || cs->load >= 1000)
+ {
+#if DEBUG_SCHEDULER > 1
+ trace() << "overloaded " << cs->nodename << " " << cs->joblist.size() << "/" << cs->max_jobs << " jobs, load:" << cs->load << endl;
+#endif
+ continue;
+ }
+
+ // incompatible architecture or busy installing
+ if ( !can_install( cs, job ).size() )
+ {
+#if DEBUG_SCHEDULER > 2
+ trace() << cs->nodename << " can't install " << job->id << endl;
+#endif
+ continue;
+ }
+
+ /* Don't use non-chroot-able daemons for remote jobs. XXX */
+ if (!cs->chroot_possible)
+ {
+ trace() << cs->nodename << " can't use chroot\n";
+ continue;
+ }
+
+ // Check if remote & if remote allowed
+ if (!cs->check_remote( job ))
+ {
+ trace() << cs->nodename << " fails remote job check\n";
+ continue;
+ }
+
+
+#if DEBUG_SCHEDULER > 1
+ trace() << cs->nodename << " compiled " << cs->last_compiled_jobs.size() << " got now: " <<
+ cs->joblist.size() << " speed: " << server_speed (cs) << " compile time " <<
+ cs->cum_compiled.compile_time_user << " produced code " << cs->cum_compiled.osize << endl;
+#endif
+
+ if ( cs->last_compiled_jobs.size() == 0 && cs->joblist.size() == 0 && cs->max_jobs)
+ {
+ /* Make all servers compile a job at least once, so we'll get an
+ idea about their speed. */
+ if (!envs_match (cs, job).empty())
+ {
+ best = cs;
+ matches++;
+ }
+ else
+ {
+ // if there is one server that already got the environment and one that
+ // hasn't compiled at all, pick the one with environment first
+ bestui = cs;
+ }
+ break;
+ }
+
+ if (!envs_match (cs, job).empty())
+ {
+ if ( !best )
+ best = cs;
+ /* Search the server with the earliest projected time to compile
+ the job. (XXX currently this is equivalent to the fastest one) */
+ else
+ if (best->last_compiled_jobs.size() != 0
+ && server_speed (best, job) < server_speed (cs, job))
+ {
+ if (int( cs->joblist.size() ) < cs->max_jobs)
+ best = cs;
+ else
+ bestpre = cs;
+ }
+ matches++;
+ }
+ else
+ {
+ if ( !bestui )
+ bestui = cs;
+ /* Search the server with the earliest projected time to compile
+ the job. (XXX currently this is equivalent to the fastest one) */
+ else
+ if (bestui->last_compiled_jobs.size() != 0
+ && server_speed (bestui, job) < server_speed (cs, job))
+ {
+ if (int( cs->joblist.size() ) < cs->max_jobs)
+ bestui = cs;
+ else
+ bestpre = cs;
+ }
+ }
+ }
+
+ // to make sure we find the fast computers at least after some time, we overwrite
+ // the install rule for every 19th job - if the farm is only filled a bit
+ if ( bestui && ( matches < 11 && matches < css.size() / 3 ) && job->id % 19 != 0 )
+ best = 0;
+
+ if ( best )
+ {
+#if DEBUG_SCHEDULER > 1
+ trace() << "taking best installed " << best->nodename << " " << server_speed (best) << endl;
+#endif
+ return best;
+ }
+
+ if ( bestui )
+ {
+#if DEBUG_SCHEDULER > 1
+ trace() << "taking best uninstalled " << bestui->nodename << " " << server_speed (bestui) << endl;
+#endif
+ return bestui;
+ }
+
+ if ( bestpre )
+ {
+#if DEBUG_SCHEDULER > 1
+ trace() << "taking best preload " << bestui->nodename << " " << server_speed (bestui) << endl;
+#endif
+ }
+
+ return bestpre;
+}
+
+/* Prunes the list of connected servers by those which haven't
+ answered for a long time. Return the number of seconds when
+ we have to cleanup next time. */
+static time_t
+prune_servers ()
+{
+ list<CS*>::iterator it;
+
+ time_t now = time( 0 );
+ time_t min_time = MAX_SCHEDULER_PING;
+
+ for (it = controls.begin(); it != controls.end();)
+ {
+ if (now - ( *it )->last_talk >= MAX_SCHEDULER_PING)
+ {
+ CS *old = *it;
+ ++it;
+ handle_end (old, 0);
+ continue;
+ }
+ min_time = min (min_time, MAX_SCHEDULER_PING - now + ( *it )->last_talk);
+ ++it;
+ }
+
+ for (it = css.begin(); it != css.end(); )
+ {
+ if ((*it)->busy_installing && (now - (*it)->busy_installing >=
+ MAX_BUSY_INSTALLING))
+ {
+ trace() << "busy installing for a long time - removing " << ( *it )->nodename << endl;
+ CS *old = *it;
+ ++it;
+ handle_end (old, 0);
+ continue;
+ }
+
+ /* protocol version 27 and newer use TCP keepalive */
+ if (IS_PROTOCOL_27(*it)) {
+ ++it;
+ continue;
+ }
+
+ if (now - ( *it )->last_talk >= MAX_SCHEDULER_PING)
+ {
+ if (( *it )->max_jobs >= 0)
+ {
+ trace() << "send ping " << ( *it )->nodename << endl;
+ ( *it )->max_jobs *= -1; // better not give it away
+ if(( *it )->send_msg( PingMsg() ))
+ {
+ // give it MAX_SCHEDULER_PONG to answer a ping
+ ( *it )->last_talk = time( 0 ) - MAX_SCHEDULER_PING
+ + 2 * MAX_SCHEDULER_PONG;
+ min_time = min (min_time, (time_t) 2 * MAX_SCHEDULER_PONG);
+ ++it;
+ continue;
+ }
+ }
+ // R.I.P.
+ trace() << "removing " << ( *it )->nodename << endl;
+ CS *old = *it;
+ ++it;
+ handle_end (old, 0);
+ continue;
+ }
+ else
+ min_time = min (min_time, MAX_SCHEDULER_PING - now + ( *it )->last_talk);
+#if DEBUG_SCHEDULER > 1
+ if ((random() % 400) < 0)
+ { // R.I.P.
+ trace() << "FORCED removing " << ( *it )->nodename << endl;
+ CS *old = *it;
+ ++it;
+ handle_end (old, 0);
+ continue;
+ }
+#endif
+
+ ++it;
+ }
+
+ return min_time;
+}
+
+static Job*
+delay_current_job()
+{
+ assert (!toanswer.empty());
+ if ( toanswer.size() == 1 )
+ return 0;
+
+ UnansweredList *first = toanswer.front();
+ toanswer.pop_front();
+ toanswer.push_back( first );
+ return get_job_request();
+}
+
+static bool
+empty_queue()
+{
+ Job *job = get_job_request ();
+ if (!job)
+ return false;
+
+ assert(!css.empty());
+
+ Job *first_job = job;
+ CS *cs = 0;
+
+ while ( true )
+ {
+ cs = pick_server (job);
+
+ if (cs)
+ break;
+
+ /* Ignore the load on the submitter itself if no other host could
+ be found. We only obey to its max job number. */
+ cs = job->submitter;
+ if (! (int( cs->joblist.size() ) < cs->max_jobs
+ && job->preferred_host.empty()
+ /* This should be trivially true. */
+ && can_install (cs, job).size()))
+ {
+ job = delay_current_job();
+ if ( job == first_job || !job ) // no job found in the whole toanswer list
+ return false;
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ remove_job_request ();
+
+ job->state = Job::WAITINGFORCS;
+ job->server = cs;
+
+ string host_platform = envs_match (cs, job);
+ bool gotit = true;
+ if (host_platform.empty ())
+ {
+ gotit = false;
+ host_platform = can_install (cs, job);
+ }
+
+ // mix and match between job ids
+ unsigned matched_job_id = 0;
+ unsigned count = 0;
+ for (list<JobStat>::const_iterator l = job->submitter->last_requested_jobs.begin();
+ l != job->submitter->last_requested_jobs.end(); ++l)
+ {
+ unsigned rcount = 0;
+ for (list<JobStat>::const_iterator r = cs->last_compiled_jobs.begin();
+ r != cs->last_compiled_jobs.end(); ++r)
+ {
+ if (l->job_id == r->job_id)
+ matched_job_id = l->job_id;
+ if (++rcount > 16)
+ break;
+ }
+
+ if (matched_job_id || ++count > 16)
+ break;
+ }
+
+ UseCSMsg m2(host_platform, cs->name, cs->remote_port, job->id,
+ gotit, job->local_client_id, matched_job_id );
+
+ if (!job->submitter->send_msg (m2))
+ {
+ trace() << "failed to deliver job " << job->id << endl;
+ handle_end( job->submitter, 0 ); // will care for the rest
+ return true;
+ }
+ else
+ {
+#if DEBUG_SCHEDULER >= 0
+ if (!gotit)
+ trace() << "put " << job->id << " in joblist of " << cs->nodename << " (will install now)" << endl;
+ else
+ trace() << "put " << job->id << " in joblist of " << cs->nodename << endl;
+#endif
+ cs->joblist.push_back( job );
+ /* if it doesn't have the environment, it will get it. */
+ if ( !gotit )
+ cs->busy_installing = time(0);
+
+ string env;
+ if ( !job->master_job_for.empty() )
+ {
+ for ( Environments::const_iterator it = job->environments.begin(); it != job->environments.end(); ++it )
+ {
+ if ( it->first == cs->host_platform )
+ {
+ env = it->second;
+ break;
+ }
+ }
+ }
+ if ( !env.empty() )
+ {
+ for ( list<Job*>::iterator it = job->master_job_for.begin(); it != job->master_job_for.end(); ++it )
+ { // remove all other environments
+ ( *it )->environments.clear();
+ ( *it )->environments.push_back( make_pair( cs->host_platform, env ) );
+ }
+ }
+ }
+ return true;
+}
+
+static bool
+handle_login (CS *cs, Msg *_m)
+{
+ LoginMsg *m = dynamic_cast<LoginMsg *>(_m);
+ if (!m)
+ return false;
+
+ std::ostream& dbg = trace();
+
+ cs->remote_port = m->port;
+ cs->compiler_versions = m->envs;
+ cs->max_jobs = m->chroot_possible ? m->max_kids : 0;
+ cs->noremote = m->noremote;
+ if ( m->nodename.length() )
+ cs->nodename = m->nodename;
+ else
+ cs->nodename = cs->name;
+ cs->host_platform = m->host_platform;
+ cs->chroot_possible = m->chroot_possible;
+ cs->pick_new_id();
+
+ for (list<string>::const_iterator it = block_css.begin(); it != block_css.end(); ++it)
+ if (cs->name == *it)
+ return false;
+
+ dbg << "login " << m->nodename << " protocol version: " << cs->protocol;
+ handle_monitor_stats( cs );
+
+ /* remove any other clients with the same IP, they must be stale */
+ for (list<CS*>::iterator it = css.begin(); it != css.end(); )
+ {
+ if (cs->eq_ip(*(*it)))
+ {
+ CS* old = *it;
+ ++it;
+ handle_end(old, 0);
+ continue;
+ }
+ ++it;
+ }
+
+ css.push_back (cs);
+
+#if 1
+ dbg << " [";
+ for (Environments::const_iterator it = m->envs.begin();
+ it != m->envs.end(); ++it)
+ dbg << it->second << "(" << it->first << "), ";
+ dbg << "]" << endl;
+#endif
+
+ /* Configure the daemon */
+ if (IS_PROTOCOL_24( cs ))
+ cs->send_msg (ConfCSMsg(icecream_bench_code));
+
+ return true;
+}
+
+static bool
+handle_relogin (MsgChannel *c, Msg *_m)
+{
+ LoginMsg *m = dynamic_cast<LoginMsg *>(_m);
+ if (!m)
+ return false;
+
+ CS *cs = static_cast<CS *>(c);
+ cs->compiler_versions = m->envs;
+ cs->busy_installing = 0;
+
+ std::ostream &dbg = trace();
+ dbg << "RELOGIN " << cs->nodename << "(" << cs->host_platform << "): [";
+ for (Environments::const_iterator it = m->envs.begin();
+ it != m->envs.end(); ++it)
+ dbg << it->second << "(" << it->first << "), ";
+ dbg << "]" << endl;
+
+ /* Configure the daemon */
+ if (IS_PROTOCOL_24( c ))
+ c->send_msg (ConfCSMsg(icecream_bench_code));
+
+ return false;
+}
+
+static bool
+handle_mon_login (CS *c, Msg *_m)
+{
+ MonLoginMsg *m = dynamic_cast<MonLoginMsg *>(_m);
+ if (!m)
+ return false;
+ monitors.push_back (c);
+ // monitors really want to be fed lazily
+ c->setBulkTransfer();
+
+ for (list<CS*>::const_iterator it = css.begin(); it != css.end(); ++it)
+ handle_monitor_stats( *it );
+
+ fd2cs.erase( c->fd ); // no expected data from them
+ return true;
+}
+
+static bool
+handle_job_begin (CS *c, Msg *_m)
+{
+ JobBeginMsg *m = dynamic_cast<JobBeginMsg *>(_m);
+ if ( !m )
+ return false;
+
+ if (jobs.find(m->job_id) == jobs.end()) {
+ trace() << "handle_job_begin: no valid job id " << m->job_id << endl;
+ return false;
+ }
+ Job *job = jobs[m->job_id];
+ if (job->server != c)
+ {
+ trace() << "that job isn't handled by " << c->name << endl;
+ return false;
+ }
+ job->state = Job::COMPILING;
+ job->starttime = m->stime;
+ job->start_on_scheduler = time(0);
+ notify_monitors (new MonJobBeginMsg (m->job_id, m->stime, c->hostid));
+#if DEBUG_SCHEDULER >= 0
+ trace() << "BEGIN: " << m->job_id << " client=" << job->submitter->nodename
+ << "(" << job->target_platform << ")" << " server="
+ << job->server->nodename << "(" << job->server->host_platform
+ << ")" << endl;
+#endif
+
+ return true;
+}
+
+
+static bool
+handle_job_done (CS *c, Msg *_m)
+{
+ JobDoneMsg *m = dynamic_cast<JobDoneMsg *>(_m);
+ if ( !m )
+ return false;
+
+ Job *j = 0;
+
+ if (m->exitcode == CLIENT_WAS_WAITING_FOR_CS)
+ {
+ // the daemon saw a cancel of what he believes is waiting in the scheduler
+ map<unsigned int, Job*>::iterator mit;
+ for (mit = jobs.begin(); mit != jobs.end(); ++mit)
+ {
+ Job *job = mit->second;
+ trace() << "looking for waitcs " << job->server << " " << job->submitter << " " << c << " "
+ << job->state << " " << job->local_client_id << " " << m->job_id << endl;
+ if (job->server == 0 && job->submitter == c && job->state == Job::PENDING
+ && job->local_client_id == m->job_id )
+ {
+ trace() << "STOP (WAITFORCS) FOR " << mit->first << endl;
+ j = job;
+ m->job_id = j->id; // that's faked
+
+ /* Unfortunately the toanswer queues are also tagged based on the daemon,
+ so we need to clean them up also. */
+ list<UnansweredList*>::iterator it;
+ for (it = toanswer.begin(); it != toanswer.end(); ++it)
+ if ((*it)->server == c)
+ {
+ UnansweredList *l = *it;
+ list<Job*>::iterator jit;
+ for (jit = l->l.begin(); jit != l->l.end(); ++jit)
+ {
+ if (*jit == j)
+ {
+ l->l.erase(jit);
+ break;
+ }
+ }
+ if (l->l.empty())
+ {
+ it = toanswer.erase (it);
+ break;
+ }
+ }
+ }
+ }
+ }
+ else
+ if (jobs.find(m->job_id) != jobs.end())
+ j = jobs[m->job_id];
+
+ if (!j)
+ {
+ trace() << "job ID not present " << m->job_id << endl;
+ return false;
+ }
+
+ if (j->state == Job::PENDING)
+ {
+ trace() << "job ID still pending ?! scheduler recently restarted? " << m->job_id << endl;
+ return false;
+ }
+
+ if (m->is_from_server() && j->server != c)
+ {
+ log_info() << "the server isn't the same for job " << m->job_id << endl;
+ log_info() << "server: " << j->server->nodename << endl;
+ log_info() << "msg came from: " << c->nodename << endl;
+ // the daemon is not following matz's rules: kick him
+ handle_end(c, 0);
+ return false;
+ }
+ if (!m->is_from_server() && j->submitter != c)
+ {
+ log_info() << "the submitter isn't the same for job " << m->job_id << endl;
+ log_info() << "submitter: " << j->submitter->nodename << endl;
+ log_info() << "msg came from: " << c->nodename << endl;
+ // the daemon is not following matz's rules: kick him
+ handle_end(c, 0);
+ return false;
+ }
+
+
+
+ if ( m->exitcode == 0 )
+ {
+ std::ostream &dbg = trace();
+ dbg << "END " << m->job_id
+ << " status=" << m->exitcode;
+
+ if ( m->in_uncompressed )
+ dbg << " in=" << m->in_uncompressed
+ << "(" << int( m->in_compressed * 100 / m->in_uncompressed ) << "%)";
+ else
+ dbg << " in=0(0%)";
+
+ if ( m->out_uncompressed )
+ dbg << " out=" << m->out_uncompressed
+ << "(" << int( m->out_compressed * 100 / m->out_uncompressed ) << "%)";
+ else
+ dbg << " out=0(0%)";
+
+ dbg << " real=" << m->real_msec
+ << " user=" << m->user_msec
+ << " sys=" << m->sys_msec
+ << " pfaults=" << m->pfaults
+ << " server=" << j->server->nodename
+ << endl;
+ }
+ else
+ trace() << "END " << m->job_id
+ << " status=" << m->exitcode << endl;
+
+ if (j->server)
+ j->server->joblist.remove (j);
+
+ add_job_stats (j, m);
+ notify_monitors (new MonJobDoneMsg (*m));
+ jobs.erase (m->job_id);
+ delete j;
+
+ return true;
+}
+
+static bool
+handle_ping (CS* c, Msg * /*_m*/)
+{
+ c->last_talk = time( 0 );
+ if ( c->max_jobs < 0 )
+ c->max_jobs *= -1;
+ return true;
+}
+
+static bool
+handle_stats (CS * c, Msg * _m)
+{
+ StatsMsg *m = dynamic_cast<StatsMsg *>(_m);
+ if (!m)
+ return false;
+
+ /* Before protocol 25, ping and stat handling was
+ clutched together. */
+ if (!IS_PROTOCOL_25(c))
+ {
+ c->last_talk = time( 0 );
+ if ( c && c->max_jobs < 0 )
+ c->max_jobs *= -1;
+ }
+
+ for (list<CS*>::iterator it = css.begin(); it != css.end(); ++it)
+ if ( *it == c )
+ {
+ ( *it )->load = m->load;
+ handle_monitor_stats( *it, m );
+ return true;
+ }
+
+ return false;
+}
+
+static string
+dump_job (Job *job)
+{
+ char buffer[1000];
+ string line;
+ snprintf (buffer, sizeof (buffer), "%d %s sub:%s on:%s ",
+ job->id,
+ job->state == Job::PENDING ? "PEND"
+ : job->state == Job::WAITINGFORCS ? "WAIT"
+ : job->state == Job::COMPILING ? "COMP"
+ : job->state == Job::WAITINGFORDONE ? "DONE"
+ : "Huh?",
+ job->submitter ? job->submitter->nodename.c_str() : "<>",
+ job->server ? job->server->nodename.c_str() : "<unknown>");
+ buffer[sizeof (buffer) - 1] = 0;
+ line = buffer;
+ line = line + job->filename;
+ return line;
+}
+
+/* Splits the string S between characters in SET and add them to list L. */
+static void
+split_string (const string &s, const char *set, list<string> &l)
+{
+ string::size_type end = 0;
+ while (end != string::npos)
+ {
+ string::size_type start = s.find_first_not_of (set, end);
+ if (start == string::npos)
+ break;
+ end = s.find_first_of (set, start);
+ /* Do we really need to check end here or is the subtraction
+ defined on every platform correctly (with GCC it's ensured,
+ that (npos - start) is the rest of the string). */
+ if (end != string::npos)
+ l.push_back (s.substr (start, end - start));
+ else
+ l.push_back (s.substr (start));
+ }
+}
+
+static bool
+handle_control_login(CS* c)
+{
+ c->type = CS::LINE;
+ c->last_talk = time (0);
+ c->setBulkTransfer();
+ c->state = CS::LOGGEDIN;
+ assert(find(controls.begin(), controls.end(), c) == controls.end());
+ controls.push_back(c);
+
+ std::ostringstream o;
+ o << "200-ICECC " VERSION ": "
+ << time(0) - starttime << "s uptime, "
+ << css.size() << " hosts, "
+ << jobs.size() << " jobs in queue "
+ << "(" << new_job_id << " total)." << endl;
+ o << "200 Use 'help' for help and 'quit' to quit." << endl;
+ return c->send_msg(TextMsg(o.str()));
+}
+
+static bool
+handle_line (CS *c, Msg *_m)
+{
+ TextMsg *m = dynamic_cast<TextMsg *>(_m);
+ if (!m)
+ return false;
+
+ char buffer[1000];
+ string line;
+ list<string> l;
+ split_string (m->text, " \t\n", l);
+ string cmd;
+
+ c->last_talk = time(0);
+
+ if (l.empty())
+ cmd = "";
+ else
+ {
+ cmd = l.front();
+ l.pop_front();
+ transform(cmd.begin(), cmd.end(), cmd.begin(), ::tolower);
+ }
+ if (cmd == "listcs")
+ {
+ for (list<CS*>::iterator it = css.begin(); it != css.end(); ++it)
+ {
+ CS* cs= *it;
+ sprintf (buffer, " (%s:%d) ", cs->name.c_str(), cs->remote_port);
+ line = " " + cs->nodename + buffer;
+ line += "[" + cs->host_platform + "] speed=";
+ sprintf (buffer, "%.2f jobs=%d/%d load=%d", server_speed (cs),
+ (int)cs->joblist.size(), cs->max_jobs, cs->load);
+ line += buffer;
+ if (cs->busy_installing)
+ {
+ sprintf( buffer, " busy installing since %ld s", time(0) - cs->busy_installing );
+ line += buffer;
+ }
+ if(!c->send_msg (TextMsg (line)))
+ return false;
+ for ( list<Job*>::const_iterator it2 = cs->joblist.begin(); it2 != cs->joblist.end(); ++it2 )
+ if(!c->send_msg (TextMsg (" " + dump_job (*it2) ) ))
+ return false;
+ }
+ }
+ else if (cmd == "listblocks")
+ {
+ for (list<string>::const_iterator it = block_css.begin(); it != block_css.end(); ++it)
+ if(!c->send_msg (TextMsg (" " + (*it) ) ))
+ return false;
+ }
+ else if (cmd == "listjobs")
+ {
+ for (map<unsigned int, Job*>::const_iterator it = jobs.begin();
+ it != jobs.end(); ++it)
+ if(!c->send_msg( TextMsg( " " + dump_job (it->second) ) ))
+ return false;
+ }
+ else if (cmd == "quit" || cmd == "exit" )
+ {
+ handle_end(c, 0);
+ return false;
+ }
+ else if (cmd == "removecs" || cmd == "blockcs")
+ {
+ if (l.empty()) {
+ if(!c->send_msg (TextMsg (string ("401 Sure. But which hosts?"))))
+ return false;
+ }
+ else
+ for (list<string>::const_iterator si = l.begin(); si != l.end(); ++si)
+ for (list<CS*>::iterator it = css.begin(); it != css.end(); ++it)
+ if ((*it)->nodename == *si || (*it)->name == *si)
+ {
+ if (cmd == "blockcs")
+ block_css.push_back((*it)->name);
+ if (c->send_msg (TextMsg (string ("removing host ") + *si)))
+ handle_end ( *it, 0);
+ break;
+ }
+ }
+ else if (cmd == "crashme")
+ {
+ *(int *)0 = 42; // ;-)
+ }
+ else if (cmd == "internals" )
+ {
+ for (list<CS*>::iterator it = css.begin(); it != css.end(); ++it)
+ {
+ Msg *msg = NULL;
+
+ if (!l.empty())
+ {
+ list<string>::const_iterator si;
+ for (si = l.begin(); si != l.end(); ++si) {
+ if ((*it)->nodename == *si || (*it)->name == *si)
+ break;
+ }
+ if(si == l.end())
+ continue;
+ }
+
+ if(( *it )->send_msg( GetInternalStatus() ))
+ msg = ( *it )->get_msg();
+ if ( msg && msg->type == M_STATUS_TEXT ) {
+ if (!c->send_msg( TextMsg( dynamic_cast<StatusTextMsg*>( msg )->text ) ))
+ return false;
+ }
+ else {
+ if (!c->send_msg( TextMsg( ( *it )->nodename + " not reporting\n" ) ))
+ return false;
+ }
+ delete msg;
+ }
+ }
+ else if (cmd == "help")
+ {
+ if (!c->send_msg (TextMsg (
+ "listcs\nlistblocks\nlistjobs\nremovecs\nblockcs\ninternals\nhelp\nquit")))
+ return false;
+ }
+ else
+ {
+ string txt = "Invalid command '";
+ txt += m->text;
+ txt += "'";
+ if(!c->send_msg (TextMsg (txt)))
+ return false;
+ }
+ return c->send_msg (TextMsg (string ("200 done")));
+}
+
+// return false if some error occured, leaves C open. */
+static bool
+try_login (CS *c, Msg *m)
+{
+ bool ret = true;
+ switch (m->type)
+ {
+ case M_LOGIN:
+ c->type = CS::DAEMON;
+ ret = handle_login (c, m);
+ break;
+ case M_MON_LOGIN:
+ c->type = CS::MONITOR;
+ ret = handle_mon_login (c, m);
+ break;
+ default:
+ log_info() << "Invalid first message " << (char)m->type << endl;
+ ret = false;
+ break;
+ }
+ if (ret)
+ c->state = CS::LOGGEDIN;
+ else
+ handle_end (c, m);
+
+ delete m;
+ return ret;
+}
+
+static bool
+handle_end (CS *toremove, Msg *m)
+{
+#if DEBUG_SCHEDULER > 1
+ trace() << "Handle_end " << toremove << " " << m << endl;
+#else
+ ( void )m;
+#endif
+
+ switch (toremove->type) {
+ case CS::MONITOR:
+ {
+ assert (find (monitors.begin(), monitors.end(), toremove) != monitors.end());
+ monitors.remove (toremove);
+#if DEBUG_SCHEDULER > 1
+ trace() << "handle_end(moni) " << monitors.size() << endl;
+#endif
+ }
+ break;
+ case CS::DAEMON:
+ {
+ log_info() << "remove daemon " << toremove->nodename << endl;
+
+ notify_monitors(new MonStatsMsg( toremove->hostid, "State:Offline\n" ) );
+
+ /* A daemon disconnected. We must remove it from the css list,
+ and we have to delete all jobs scheduled on that daemon.
+ There might be still clients connected running on the machine on which
+ the daemon died. We expect that the daemon dying makes the client
+ disconnect soon too. */
+ css.remove (toremove);
+
+ /* Unfortunately the toanswer queues are also tagged based on the daemon,
+ so we need to clean them up also. */
+ list<UnansweredList*>::iterator it;
+ for (it = toanswer.begin(); it != toanswer.end();)
+ if ((*it)->server == toremove)
+ {
+ UnansweredList *l = *it;
+ list<Job*>::iterator jit;
+ for (jit = l->l.begin(); jit != l->l.end(); ++jit)
+ {
+ trace() << "STOP (DAEMON) FOR " << (*jit)->id << endl;
+ notify_monitors (new MonJobDoneMsg ( JobDoneMsg (( *jit )->id, 255 )));
+ if ((*jit)->server)
+ (*jit)->server->busy_installing = 0;
+ jobs.erase( (*jit)->id );
+ delete (*jit);
+ }
+ delete l;
+ it = toanswer.erase (it);
+ }
+ else
+ ++it;
+
+ map<unsigned int, Job*>::iterator mit;
+ for (mit = jobs.begin(); mit != jobs.end(); )
+ {
+ Job *job = mit->second;
+ if (job->server == toremove || job->submitter == toremove)
+ {
+ trace() << "STOP (DAEMON2) FOR " << mit->first << endl;
+ notify_monitors (new MonJobDoneMsg ( JobDoneMsg( job->id, 255 )));
+ /* If this job is removed because the submitter is removed
+ also remove the job from the servers joblist. */
+ if (job->server && job->server != toremove)
+ job->server->joblist.remove (job);
+ if (job->server)
+ job->server->busy_installing = 0;
+ jobs.erase( mit++ );
+ delete job;
+ }
+ else
+ {
+ ++mit;
+ }
+ }
+ }
+ break;
+ case CS::LINE:
+ {
+ if (!toremove->send_msg (TextMsg ("200 Good Bye!"))) {
+ }
+ controls.remove (toremove);
+ }
+ break;
+ default:
+ {
+ trace() << "remote end had UNKNOWN type?" << endl;
+ }
+ }
+
+ fd2cs.erase (toremove->fd);
+ delete toremove;
+ return true;
+}
+
+/* Returns TRUE if C was not closed. */
+static bool
+handle_activity (CS *c)
+{
+ Msg *m;
+ bool ret = true;
+ m = c->get_msg (0);
+ if (!m)
+ {
+ handle_end (c, m);
+ return false;
+ }
+ /* First we need to login. */
+ if (c->state == CS::CONNECTED)
+ return try_login (c, m);
+
+ switch (m->type)
+ {
+ case M_JOB_BEGIN: ret = handle_job_begin (c, m); break;
+ case M_JOB_DONE: ret = handle_job_done (c, m); break;
+ case M_PING: ret = handle_ping (c, m); break;
+ case M_STATS: ret = handle_stats (c, m); break;
+ case M_END: handle_end (c, m); ret = false; break;
+ case M_JOB_LOCAL_BEGIN: ret = handle_local_job (c, m); break;
+ case M_JOB_LOCAL_DONE: ret = handle_local_job_done( c, m ); break;
+ case M_LOGIN: ret = handle_relogin (c, m); break;
+ case M_TEXT: ret = handle_line (c, m); break;
+ case M_GET_CS: ret = handle_cs_request (c, m); break;
+ default:
+ log_info() << "Invalid message type arrived " << ( char )m->type << endl;
+ handle_end (c, m);
+ ret = false;
+ break;
+ }
+ delete m;
+ return ret;
+}
+
+static int
+open_broad_listener ()
+{
+ int listen_fd;
+ struct sockaddr_in myaddr;
+ if ((listen_fd = socket (PF_INET, SOCK_DGRAM, 0)) < 0)
+ {
+ log_perror ("socket()");
+ return -1;
+ }
+ int optval = 1;
+ if (setsockopt (listen_fd, SOL_SOCKET, SO_BROADCAST, &optval, sizeof(optval)) < 0)
+ {
+ log_perror ("setsockopt()");
+ return -1;
+ }
+ myaddr.sin_family = AF_INET;
+ myaddr.sin_port = htons (8765);
+ myaddr.sin_addr.s_addr = INADDR_ANY;
+ if (bind (listen_fd, (struct sockaddr *) &myaddr, sizeof (myaddr)) < 0)
+ {
+ log_perror ("bind()");
+ return -1;
+ }
+ return listen_fd;
+}
+
+static int
+open_tcp_listener (short port)
+{
+ int fd;
+ struct sockaddr_in myaddr;
+ if ((fd = socket (PF_INET, SOCK_STREAM, 0)) < 0)
+ {
+ log_perror ("socket()");
+ return -1;
+ }
+ int optval = 1;
+ if (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0)
+ {
+ log_perror ("setsockopt()");
+ return -1;
+ }
+ /* Although we select() on fd we need O_NONBLOCK, due to
+ possible network errors making accept() block although select() said
+ there was some activity. */
+ if (fcntl (fd, F_SETFL, O_NONBLOCK) < 0)
+ {
+ log_perror ("fcntl()");
+ return -1;
+ }
+ myaddr.sin_family = AF_INET;
+ myaddr.sin_port = htons (port);
+ myaddr.sin_addr.s_addr = INADDR_ANY;
+ if (bind (fd, (struct sockaddr *) &myaddr, sizeof (myaddr)) < 0)
+ {
+ log_perror ("bind()");
+ return -1;
+ }
+ if (listen (fd, 10) < 0)
+ {
+ log_perror ("listen()");
+ return -1;
+ }
+ return fd;
+}
+
+static void
+usage(const char* reason = 0)
+{
+ if (reason)
+ cerr << reason << endl;
+
+ cerr << "ICECREAM scheduler " VERSION "\n";
+ cerr << "usage: scheduler [options] \n"
+ << "Options:\n"
+ << " -n, --netname <name>\n"
+ << " -p, --port <port>\n"
+ << " -h, --help\n"
+ << " -l, --log-file <file>\n"
+ << " -d, --daemonize\n"
+ << " -v[v[v]]]\n"
+ << endl;
+
+ exit(1);
+}
+
+static void
+trigger_exit( int signum )
+{
+ if (!exit_main_loop)
+ exit_main_loop = true;
+ else
+ {
+ // hmm, we got killed already. try better
+ cerr << "forced exit." << endl;
+ _exit(1);
+ }
+ // make BSD happy
+ signal(signum, trigger_exit);
+}
+
+int
+main (int argc, char * argv[])
+{
+ int listen_fd, remote_fd, broad_fd, text_fd;
+ struct sockaddr_in remote_addr;
+ unsigned int port = 8765;
+ socklen_t remote_len;
+ char *netname = (char*)"ICECREAM";
+ bool detach = false;
+ int debug_level = Error;
+ string logfile;
+
+ while ( true ) {
+ int option_index = 0;
+ static const struct option long_options[] = {
+ { "netname", 1, NULL, 'n' },
+ { "help", 0, NULL, 'h' },
+ { "port", 0, NULL, 'p' },
+ { "daemonize", 0, NULL, 'd'},
+ { "log-file", 1, NULL, 'l'},
+ { 0, 0, 0, 0 }
+ };
+
+ const int c = getopt_long( argc, argv, "n:p:hl:vdr", long_options, &option_index );
+ if ( c == -1 ) break; // eoo
+
+ switch ( c ) {
+ case 0:
+ {
+ ( void ) long_options[option_index].name;
+ }
+ break;
+ case 'd':
+ detach = true;
+ break;
+ case 'l':
+ if ( optarg && *optarg )
+ logfile = optarg;
+ else
+ usage( "Error: -l requires argument" );
+ break;
+ case 'v':
+ if ( debug_level & Warning )
+ if ( debug_level & Info ) // for second call
+ debug_level |= Debug;
+ else
+ debug_level |= Info;
+ else
+ debug_level |= Warning;
+ break;
+ case 'n':
+ if ( optarg && *optarg )
+ netname = optarg;
+ else
+ usage("Error: -n requires argument");
+ break;
+ case 'p':
+ if ( optarg && *optarg )
+ {
+ port = 0; port = atoi( optarg );
+ if ( 0 == port )
+ usage("Error: Invalid port specified");
+ }
+ else
+ usage("Error: -p requires argument");
+ break;
+
+ default:
+ usage();
+ }
+ }
+
+ if ( !logfile.size() && detach )
+ logfile = "/var/log/icecc_scheduler";
+
+ setup_debug( debug_level, logfile );
+ if ( detach )
+ daemon( 0, 0 );
+
+ listen_fd = open_tcp_listener (port);
+ if (listen_fd < 0)
+ return 1;
+ text_fd = open_tcp_listener (port + 1);
+ if (text_fd < 0)
+ return 1;
+ broad_fd = open_broad_listener ();
+ if (broad_fd < 0)
+ return 1;
+
+ if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+ {
+ log_warning() << "signal(SIGPIPE, ignore) failed: " << strerror(errno) << endl;
+ return 1;
+ }
+
+ starttime = time( 0 );
+
+ ofstream pidFile;
+ string progName = argv[0];
+ progName = progName.substr(progName.rfind('/')+1);
+ pidFilePath = string(RUNDIR)+string("/")+progName+string(".pid");
+ pidFile.open(pidFilePath.c_str());
+ pidFile << getpid() << endl;
+ pidFile.close();
+
+ signal(SIGTERM, trigger_exit);
+ signal(SIGINT, trigger_exit);
+ signal(SIGALRM, trigger_exit);
+
+ time_t next_listen = 0;
+
+ while (!exit_main_loop)
+ {
+ struct timeval tv;
+ tv.tv_usec = 0;
+ tv.tv_sec = prune_servers ();
+
+ while (empty_queue())
+ continue;
+
+ fd_set read_set;
+ int max_fd = 0;
+ FD_ZERO (&read_set);
+ if (time(0) >= next_listen)
+ {
+ max_fd = listen_fd;
+ FD_SET (listen_fd, &read_set);
+ if (text_fd > max_fd)
+ max_fd = text_fd;
+ FD_SET (text_fd, &read_set);
+ }
+ if (broad_fd > max_fd)
+ max_fd = broad_fd;
+ FD_SET (broad_fd, &read_set);
+ for (map<int, CS*>::const_iterator it = fd2cs.begin(); it != fd2cs.end();)
+ {
+ int i = it->first;
+ CS *c = it->second;
+ bool ok = true;
+ ++it;
+ /* handle_activity() can delete c and make the iterator
+ invalid. */
+ while (ok && c->has_msg ())
+ if (!handle_activity (c))
+ ok = false;
+ if (ok)
+ {
+ if (i > max_fd)
+ max_fd = i;
+ FD_SET (i, &read_set);
+ }
+ }
+ max_fd = select (max_fd + 1, &read_set, NULL, NULL, &tv);
+ if (max_fd < 0 && errno == EINTR)
+ continue;
+ if (max_fd < 0)
+ {
+ log_perror ("select()");
+ return 1;
+ }
+ if (FD_ISSET (listen_fd, &read_set))
+ {
+ max_fd--;
+ bool pending_connections = true;
+ while (pending_connections)
+ {
+ remote_len = sizeof (remote_addr);
+ remote_fd = accept (listen_fd,
+ (struct sockaddr *) &remote_addr,
+ &remote_len );
+ if (remote_fd < 0)
+ pending_connections = false;
+
+ if (remote_fd < 0 && errno != EAGAIN && errno != EINTR && errno
+ != EWOULDBLOCK)
+ {
+ log_perror ("accept()");
+ /* don't quit because of ECONNABORTED, this can happen during
+ * floods */
+ }
+ if (remote_fd >= 0)
+ {
+ CS *cs = new CS (remote_fd, (struct sockaddr*) &remote_addr, remote_len, false);
+ trace() << "accepted " << cs->name << endl;
+ cs->last_talk = time( 0 );
+
+ if ( !cs->protocol ) // protocol mismatch
+ {
+ delete cs;
+ continue;
+ }
+ fd2cs[cs->fd] = cs;
+ while (!cs->read_a_bit () || cs->has_msg ())
+ if(! handle_activity (cs))
+ break;
+ }
+ }
+ next_listen = time(0) + 1;
+ }
+ if (max_fd && FD_ISSET (text_fd, &read_set))
+ {
+ max_fd--;
+ remote_len = sizeof (remote_addr);
+ remote_fd = accept (text_fd,
+ (struct sockaddr *) &remote_addr,
+ &remote_len );
+ if (remote_fd < 0 && errno != EAGAIN && errno != EINTR)
+ {
+ log_perror ("accept()");
+ /* Don't quit the scheduler just because a debugger couldn't
+ connect. */
+ }
+ if (remote_fd >= 0)
+ {
+ CS *cs = new CS (remote_fd, (struct sockaddr*) &remote_addr, remote_len, true);
+ fd2cs[cs->fd] = cs;
+ if (!handle_control_login(cs))
+ {
+ handle_end(cs, 0);
+ continue;
+ }
+ while (!cs->read_a_bit () || cs->has_msg ())
+ if (!handle_activity (cs))
+ break;
+ }
+ }
+ if (max_fd && FD_ISSET (broad_fd, &read_set))
+ {
+ max_fd--;
+ char buf[16];
+ struct sockaddr_in broad_addr;
+ socklen_t broad_len = sizeof (broad_addr);
+ if (recvfrom (broad_fd, buf, 1, 0, (struct sockaddr*) &broad_addr,
+ &broad_len) != 1)
+ {
+ int err = errno;
+ log_perror ("recvfrom()");
+ /* Some linux 2.6 kernels can return from select with
+ data available, and then return from read() with EAGAIN
+ even on a blocking socket (breaking POSIX). Happens
+ when the arriving packet has a wrong checksum. So
+ we ignore EAGAIN here, but still abort for all other errors. */
+ if (err != EAGAIN)
+ return -1;
+ }
+ /* Only answer if daemon would be able to talk to us. */
+ else if (buf[0] >= MIN_PROTOCOL_VERSION)
+ {
+ log_info() << "broadcast from " << inet_ntoa (broad_addr.sin_addr)
+ << ":" << ntohs (broad_addr.sin_port) << "\n";
+ buf[0]++;
+ memset (buf + 1, 0, sizeof (buf) - 1);
+ snprintf (buf + 1, sizeof (buf) - 1, "%s", netname);
+ buf[sizeof (buf) - 1] = 0;
+ if (sendto (broad_fd, buf, sizeof (buf), 0,
+ (struct sockaddr*)&broad_addr, broad_len) != sizeof (buf))
+ {
+ log_perror ("sendto()");
+ }
+ }
+ }
+ for (map<int, CS*>::const_iterator it = fd2cs.begin();
+ max_fd && it != fd2cs.end();)
+ {
+ int i = it->first;
+ CS *c = it->second;
+ /* handle_activity can delete the channel from the fd2cs list,
+ hence advance the iterator right now, so it doesn't become
+ invalid. */
+ ++it;
+ if (FD_ISSET (i, &read_set))
+ {
+ while (!c->read_a_bit () || c->has_msg ())
+ if(!handle_activity (c))
+ break;
+ max_fd--;
+ }
+ }
+ }
+ shutdown (broad_fd, SHUT_RDWR);
+ close (broad_fd);
+ unlink(pidFilePath.c_str());
+ return 0;
+}
+