wrapper source code
log in

Advanced search

Message boards : Number crunching : wrapper source code

Author Message
Profile rebirther
Volunteer moderator
Project administrator
Project developer
Project tester
Project scientist
Avatar
Send message
Joined: 2 Jan 13
Posts: 7479
Credit: 43,657,967
RAC: 40,740
Message 1572 - Posted: 18 Jun 2015, 19:29:48 UTC
Last modified: 4 May 2023, 14:46:53 UTC

wrapper.cpp

// This file is part of BOINC. // http://boinc.berkeley.edu // Copyright (C) 2014 University of California // // BOINC is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License // as published by the Free Software Foundation, // either version 3 of the License, or (at your option) any later version. // // BOINC is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see <http://www.gnu.org/licenses/>. // BOINC wrapper - lets you use non-BOINC apps with BOINC // See http://boinc.berkeley.edu/trac/wiki/WrapperApp // // cmdline options: // --device N macro-substitute N for $GPU_DEVICE_NUM // in worker cmdlines and env values // --nthreads X macro-substitute X for $NTHREADS // in worker cmdlines and env values // --trickle X send a trickle-up message reporting runtime every X sec // of runtime (use this for credit granting // if your app does its own job management) // // Handles: // - suspend/resume/quit/abort // - reporting CPU time // - loss of heartbeat from client // - checkpointing // (at the level of task; or potentially within task) // // Contributor: Andrew J. Younge (ajy4490@umiacs.umd.edu) #ifndef _WIN32 #include "config.h" #endif #include <stdio.h> #include <vector> #include <string> #ifdef _WIN32 #include "boinc_win.h" #include "win_util.h" #else #ifdef HAVE_SYS_WAIT_H #include <sys/wait.h> #endif #include <sys/types.h> #include <sys/stat.h> #ifdef HAVE_SYS_TIME_H #include <sys/time.h> #endif #ifdef HAVE_SYS_RESOURCE_H #include <sys/resource.h> #endif #include <unistd.h> #endif #include "version.h" #include "boinc_api.h" #include "boinc_zip.h" #include "diagnostics.h" #include "error_numbers.h" #include "filesys.h" #include "parse.h" #include "proc_control.h" #include "procinfo.h" #include "str_util.h" #include "str_replace.h" #include "util.h" #include "regexp.h" using std::vector; using std::string; //#define DEBUG #if 1 #define debug_msg(x) #else inline void debug_msg(const char* x) { fprintf(stderr, "%s\n", x); } #endif #define JOB_FILENAME "job.xml" #define CHECKPOINT_FILENAME "wrapper_checkpoint.txt" #define POLL_PERIOD 1.0 int nthreads = 1; int gpu_device_num = -1; double runtime = 0; // run time this session double trickle_period = 0; vector<string> unzip_filenames; string zip_filename; vector<regexp*> zip_patterns; APP_INIT_DATA aid; struct TASK { string application; string exec_dir; // optional execution directory; // macro-substituted vector<string> vsetenv; // vector of strings for environment variables // macro-substituted string stdin_filename; string stdout_filename; string stderr_filename; string checkpoint_filename; // name of task's checkpoint file, if any string fraction_done_filename; // name of file where app will write its fraction done string command_line; // macro-substituted double weight; // contribution of this task to overall fraction done bool is_daemon; bool append_cmdline_args; bool multi_process; double time_limit; int priority; // dynamic stuff follows double current_cpu_time; // most recently measured CPU time of this task double final_cpu_time; // final CPU time of this task double starting_cpu; // how much CPU time was used by tasks before this one bool suspended; double elapsed_time; #ifdef _WIN32 HANDLE pid_handle; DWORD pid; struct _stat last_stat; // mod time of checkpoint file #else int pid; struct stat last_stat; double start_rusage; // getrusage() CPU time at start of task #endif bool stat_first; int parse(XML_PARSER&); bool poll(int& status); int run(int argc, char** argv); void kill(); void stop(); void resume(); double cpu_time(); inline bool has_checkpointed() { bool changed = false; if (checkpoint_filename.size() == 0) return false; struct stat new_stat; int retval = stat(checkpoint_filename.c_str(), &new_stat); if (retval) return false; if (!stat_first && new_stat.st_mtime != last_stat.st_mtime) { changed = true; } stat_first = false; last_stat.st_mtime = new_stat.st_mtime; return changed; } inline double fraction_done() { if (fraction_done_filename.size() == 0) return 0; FILE* f = fopen(fraction_done_filename.c_str(), "r"); if (!f) return 0; // read the last line of the file // fseek(f, -32, SEEK_END); double temp, frac = 0; while (!feof(f)) { char buf[256]; char* p = fgets(buf, 256, f); if (p == NULL) break; int n = sscanf(buf, "%lf", &temp); if (n == 1) frac = temp; } fclose(f); if (frac < 0) return 0; if (frac > 1) return 1; return frac; } #ifdef _WIN32 // Windows uses a "null-terminated sequence of null-terminated strings" // to represent env vars. // I guess arg/argv didn't cut it for them. // void set_up_env_vars(char** env_vars, const int nvars) { int bufsize = 0; int len = 0; for (int j = 0; j < nvars; j++) { bufsize += (1 + (int)vsetenv[j].length()); } bufsize++; // add a final byte for array null ptr *env_vars = new char[bufsize]; memset(*env_vars, 0, sizeof(char) * bufsize); char* p = *env_vars; // copy each env string to a buffer for the process for (vector<string>::iterator it = vsetenv.begin(); it != vsetenv.end() && len < bufsize-1; it++ ) { strncpy(p, it->c_str(), it->length()); len = (int)strlen(p); p += len + 1; // move pointer ahead } } #else void set_up_env_vars(char*** env_vars, const int nvars) { *env_vars = new char*[nvars+1]; // need one more than the # of vars, for a NULL ptr at the end memset(*env_vars, 0x00, sizeof(char*) * (nvars+1)); // get all environment vars for this task for (int i = 0; i < nvars; i++) { (*env_vars)[i] = const_cast<char*>(vsetenv[i].c_str()); } } #endif }; vector<TASK> tasks; vector<TASK> daemons; // replace s1 with s2 // void str_replace_all(char* buf, const char* s1, const char* s2) { char buf2[64000]; while (1) { char* p = strstr(buf, s1); if (!p) break; strcpy(buf2, p+strlen(s1)); strcpy(p, s2); strcat(p, buf2); } } // macro-substitute strings from job.xml // $PROJECT_DIR -> project directory // $NTHREADS --> --nthreads arg if present, else 1 // $GPU_DEVICE_NUM --> gpu_device_num from init_data.xml, or --device arg // void macro_substitute(char* buf) { const char* pd = strlen(aid.project_dir)?aid.project_dir:"."; str_replace_all(buf, "$PROJECT_DIR", pd); char nt[256]; sprintf(nt, "%d", nthreads); str_replace_all(buf, "$NTHREADS", nt); if (aid.gpu_device_num >= 0) { gpu_device_num = aid.gpu_device_num; } if (gpu_device_num >= 0) { sprintf(nt, "%d", gpu_device_num); str_replace_all(buf, "$GPU_DEVICE_NUM", nt); } } // make a list of files in the slot directory, // and write to "initial_file_list" // void get_initial_file_list() { char fname[256]; vector<string> initial_files; DIRREF d = dir_open("."); while (!dir_scan(fname, d, sizeof(fname))) { initial_files.push_back(fname); } dir_close(d); FILE* f = fopen("initial_file_list_temp", "w"); for (unsigned int i=0; i<initial_files.size(); i++) { fprintf(f, "%s\n", initial_files[i].c_str()); } fclose(f); int retval = boinc_rename("initial_file_list_temp", "initial_file_list"); if (retval) { fprintf(stderr, "boinc_rename() error: %d\n", retval); exit(1); } } void read_initial_file_list(vector<string>& files) { char buf[256]; FILE* f = fopen("initial_file_list", "r"); if (!f) return; while (fgets(buf, sizeof(buf), f)) { strip_whitespace(buf); files.push_back(string(buf)); } fclose(f); } // if any zipped input files are present, unzip and remove them // void do_unzip_inputs() { for (unsigned int i=0; i<unzip_filenames.size(); i++) { string zipfilename = unzip_filenames[i]; if (boinc_file_exists(zipfilename.c_str())) { string path; boinc_resolve_filename_s(zipfilename.c_str(), path); int retval = boinc_zip(UNZIP_IT, path, NULL); if (retval) { fprintf(stderr, "boinc_unzip() error: %d\n", retval); exit(1); } retval = boinc_delete_file(zipfilename.c_str()); if (retval) { fprintf(stderr, "boinc_delete_file() error: %d\n", retval); } } } } bool in_vector(string s, vector<string>& v) { for (unsigned int i=0; i<v.size(); i++) { if (s == v[i]) return true; } return false; } // get the list of output files to zip // void get_zip_inputs(ZipFileList &files) { vector<string> initial_files; char fname[256]; read_initial_file_list(initial_files); DIRREF d = dir_open("."); while (!dir_scan(fname, d, sizeof(fname))) { string filename = string(fname); if (in_vector(filename, initial_files)) continue; for (unsigned int i=0; i<zip_patterns.size(); i++) { regmatch match; if (re_exec_w(zip_patterns[i], fname, 1, &match) == 1) { files.push_back(filename); break; } } } } // if the zipped output file is not present, // create the zip in a temp file, then rename it // void do_zip_outputs() { if (zip_filename.empty()) return; if (boinc_file_exists(zip_filename.c_str())) return; ZipFileList infiles; get_zip_inputs(infiles); int retval = boinc_zip(ZIP_IT, string("temp.zip"), &infiles); if (retval) { fprintf(stderr, "boinc_zip() failed: %d\n", retval); exit(1); } string path; boinc_resolve_filename_s(zip_filename.c_str(), path); retval = boinc_rename("temp.zip", path.c_str()); if (retval) { fprintf(stderr, "failed to rename temp.zip: %d\n", retval); exit(1); } } int TASK::parse(XML_PARSER& xp) { char buf[8192]; weight = 1; current_cpu_time = 0; final_cpu_time = 0; stat_first = true; pid = 0; is_daemon = false; multi_process = false; append_cmdline_args = false; time_limit = 0; priority = PROCESS_PRIORITY_LOWEST; while (!xp.get_tag()) { if (!xp.is_tag) { fprintf(stderr, "%s TASK::parse(): unexpected text %s\n", boinc_msg_prefix(buf, sizeof(buf)), xp.parsed_tag ); continue; } if (xp.match_tag("/task")) { return 0; } else if (xp.parse_string("application", application)) continue; else if (xp.parse_str("exec_dir", buf, sizeof(buf))) { macro_substitute(buf); exec_dir = buf; continue; } else if (xp.parse_str("setenv", buf, sizeof(buf))) { macro_substitute(buf); vsetenv.push_back(buf); continue; } else if (xp.parse_string("stdin_filename", stdin_filename)) continue; else if (xp.parse_string("stdout_filename", stdout_filename)) continue; else if (xp.parse_string("stderr_filename", stderr_filename)) continue; else if (xp.parse_str("command_line", buf, sizeof(buf))) { macro_substitute(buf); command_line = buf; continue; } else if (xp.parse_string("checkpoint_filename", checkpoint_filename)) continue; else if (xp.parse_string("fraction_done_filename", fraction_done_filename)) continue; else if (xp.parse_double("weight", weight)) continue; else if (xp.parse_bool("daemon", is_daemon)) continue; else if (xp.parse_bool("multi_process", multi_process)) continue; else if (xp.parse_bool("append_cmdline_args", append_cmdline_args)) continue; else if (xp.parse_double("time_limit", time_limit)) continue; else if (xp.parse_int("priority", priority)) continue; } return ERR_XML_PARSE; } int parse_unzip_input(XML_PARSER& xp) { char buf2[256]; string s; while (!xp.get_tag()) { if (xp.match_tag("/unzip_input")) { return 0; } if (xp.parse_string("zipfilename", s)) { unzip_filenames.push_back(s); continue; } fprintf(stderr, "%s unexpected tag in job.xml: %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), xp.parsed_tag ); } return ERR_XML_PARSE; } int parse_zip_output(XML_PARSER& xp) { char buf[256]; while (!xp.get_tag()) { if (xp.match_tag("/zip_output")) { return 0; } if (xp.parse_string("zipfilename", zip_filename)) { continue; } if (xp.parse_str("filename", buf, sizeof(buf))) { regexp* rp; int retval = re_comp_w(&rp, buf); if (retval) { fprintf(stderr, "re_comp_w() failed: %d\n", retval); exit(1); } zip_patterns.push_back(rp); continue; } fprintf(stderr, "%s unexpected tag in job.xml: %s\n", boinc_msg_prefix(buf, sizeof(buf)), xp.parsed_tag ); } return ERR_XML_PARSE; } int parse_job_file() { MIOFILE mf; char buf[256], buf2[256]; boinc_resolve_filename(JOB_FILENAME, buf, 1024); FILE* f = boinc_fopen(buf, "r"); if (!f) { fprintf(stderr, "%s can't open job file %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), buf ); return ERR_FOPEN; } mf.init_file(f); XML_PARSER xp(&mf); if (!xp.parse_start("job_desc")) return ERR_XML_PARSE; while (!xp.get_tag()) { if (!xp.is_tag) { fprintf(stderr, "%s unexpected text in job.xml: %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), xp.parsed_tag ); continue; } if (xp.match_tag("/job_desc")) { fclose(f); return 0; } if (xp.match_tag("task")) { TASK task; int retval = task.parse(xp); if (!retval) { if (task.is_daemon) { daemons.push_back(task); } else { tasks.push_back(task); } } continue; } if (xp.match_tag("unzip_input")) { parse_unzip_input(xp); continue; } if (xp.match_tag("zip_output")) { parse_zip_output(xp); continue; } fprintf(stderr, "%s unexpected tag in job.xml: %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), xp.parsed_tag ); } fclose(f); return ERR_XML_PARSE; } int start_daemons(int argc, char** argv) { for (unsigned int i=0; i<daemons.size(); i++) { TASK& task = daemons[i]; int retval = task.run(argc, argv); if (retval) return retval; } return 0; } void kill_daemons() { vector<int> daemon_pids; for (unsigned int i=0; i<daemons.size(); i++) { TASK& task = daemons[i]; if (task.pid) { daemon_pids.push_back(task.pid); } } kill_all(daemon_pids); } #ifdef _WIN32 // CreateProcess() takes HANDLEs for the stdin/stdout. // We need to use CreateFile() to get them. Ugh. // HANDLE win_fopen(const char* path, const char* mode) { SECURITY_ATTRIBUTES sa; memset(&sa, 0, sizeof(sa)); sa.nLength = sizeof(sa); sa.bInheritHandle = TRUE; if (!strcmp(mode, "r")) { return CreateFile( path, GENERIC_READ, FILE_SHARE_READ, &sa, OPEN_EXISTING, 0, 0 ); } else if (!strcmp(mode, "w")) { return CreateFile( path, GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, &sa, OPEN_ALWAYS, 0, 0 ); } else if (!strcmp(mode, "a")) { HANDLE hAppend = CreateFile( path, GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, &sa, OPEN_ALWAYS, 0, 0 ); SetFilePointer(hAppend, 0, NULL, FILE_END); return hAppend; } else { return 0; } } #endif void slash_to_backslash(char* p) { while (1) { char* q = strchr(p, '/'); if (!q) break; *q = '\\'; } } int TASK::run(int argct, char** argvt) { string stdout_path, stdin_path, stderr_path; char app_path[1024], buf[256]; if (fraction_done_filename.size()) { boinc_delete_file(fraction_done_filename.c_str()); } strcpy(buf, application.c_str()); char* p = strstr(buf, "$PROJECT_DIR"); if (p) { p += strlen("$PROJECT_DIR"); sprintf(app_path, "%s%s", aid.project_dir, p); } else { boinc_resolve_filename(buf, app_path, sizeof(app_path)); } if (!boinc_file_exists(app_path)) { fprintf(stderr, "application %s missing\n", app_path); exit(1); } // Optionally append wrapper's command-line arguments // to those in the job file. // if (append_cmdline_args) { for (int i=1; i<argct; i++){ command_line += string(" "); command_line += argvt[i]; } } fprintf(stderr, "%s wrapper: running %s (%s)\n", boinc_msg_prefix(buf, sizeof(buf)), app_path, command_line.c_str() ); #ifdef _WIN32 PROCESS_INFORMATION process_info; STARTUPINFO startup_info; string command; slash_to_backslash(app_path); memset(&process_info, 0, sizeof(process_info)); memset(&startup_info, 0, sizeof(startup_info)); if (ends_with((string)app_path, ".bat") || ends_with((string)app_path, ".cmd")) { command = string("cmd.exe /c \"") + app_path + string("\" ") + command_line; } else { command = string("\"") + app_path + string("\" ") + command_line; } // pass std handles to app // startup_info.dwFlags = STARTF_USESTDHANDLES; if (stdout_filename != "") { boinc_resolve_filename_s(stdout_filename.c_str(), stdout_path); startup_info.hStdOutput = win_fopen(stdout_path.c_str(), "a"); } else { startup_info.hStdOutput = (HANDLE)_get_osfhandle(_fileno(stderr)); } if (stdin_filename != "") { boinc_resolve_filename_s(stdin_filename.c_str(), stdin_path); startup_info.hStdInput = win_fopen(stdin_path.c_str(), "r"); } if (stderr_filename != "") { boinc_resolve_filename_s(stderr_filename.c_str(), stderr_path); startup_info.hStdError = win_fopen(stderr_path.c_str(), "a"); } else { startup_info.hStdError = (HANDLE)_get_osfhandle(_fileno(stderr)); } if (startup_info.hStdOutput == INVALID_HANDLE_VALUE) { fprintf(stderr, "Error: startup_info.hStdOutput is invalid\n"); } if ((stdin_filename != "") && (startup_info.hStdInput == INVALID_HANDLE_VALUE)) { fprintf(stderr, "Error: startup_info.hStdInput is invalid\n"); } if (startup_info.hStdError == INVALID_HANDLE_VALUE) { fprintf(stderr, "Error: startup_info.hStdError is invalid\n"); } // setup environment vars if needed // int nvars = (int)vsetenv.size(); char* env_vars = NULL; if (nvars > 0) { set_up_env_vars(&env_vars, nvars); } BOOL success; success = CreateProcess( NULL, (LPSTR)command.c_str(), NULL, NULL, TRUE, // bInheritHandles CREATE_NO_WINDOW|process_priority_value(priority), (LPVOID) env_vars, exec_dir.empty()?NULL:exec_dir.c_str(), &startup_info, &process_info ); if (!success) { char error_msg[1024]; windows_format_error_string(GetLastError(), error_msg, sizeof(error_msg)); fprintf(stderr, "can't run app: %s\n", error_msg); fprintf(stderr, "Error: command is '%s'\n", command.c_str()); fprintf(stderr, "Error: exec_dir is '%s'\n", exec_dir.c_str()); if (env_vars) delete [] env_vars; return ERR_EXEC; } if (env_vars) delete [] env_vars; pid_handle = process_info.hProcess; pid = process_info.dwProcessId; #else int retval; char* argv[256]; char arglist[4096]; FILE* stdout_file; FILE* stdin_file; FILE* stderr_file; struct rusage ru; getrusage(RUSAGE_CHILDREN, &ru); start_rusage = (float)ru.ru_utime.tv_sec + ((float)ru.ru_utime.tv_usec)/1e+6; pid = fork(); if (pid == -1) { perror("fork(): "); return ERR_FORK; } if (pid == 0) { // we're in the child process here // // open stdout, stdin if file names are given // NOTE: if the application is restartable, // we should deal with atomicity somehow // if (stdout_filename != "") { boinc_resolve_filename_s(stdout_filename.c_str(), stdout_path); stdout_file = freopen(stdout_path.c_str(), "a", stdout); if (!stdout_file) { fprintf(stderr, "Can't open %s for stdout; exiting\n", stdout_path.c_str()); return ERR_FOPEN; } } if (stdin_filename != "") { boinc_resolve_filename_s(stdin_filename.c_str(), stdin_path); stdin_file = freopen(stdin_path.c_str(), "r", stdin); if (!stdin_file) { fprintf(stderr, "Can't open %s for stdin; exiting\n", stdin_path.c_str()); return ERR_FOPEN; } } if (stderr_filename != "") { boinc_resolve_filename_s(stderr_filename.c_str(), stderr_path); stderr_file = freopen(stderr_path.c_str(), "a", stderr); if (!stderr_file) { fprintf(stderr, "Can't open %s for stderr; exiting\n", stderr_path.c_str()); return ERR_FOPEN; } } // construct argv // TODO: use malloc instead of stack var // argv[0] = app_path; strlcpy(arglist, command_line.c_str(), sizeof(arglist)); parse_command_line(arglist, argv+1); setpriority(PRIO_PROCESS, 0, process_priority_value(priority)); if (!exec_dir.empty()) { retval = chdir(exec_dir.c_str()); if (!retval) { fprintf(stderr, "%s chdir() to %s failed\n", boinc_msg_prefix(buf, sizeof(buf)), exec_dir.c_str() ); exit(1); } } // setup environment variables (if any) // const int nvars = vsetenv.size(); char** env_vars = NULL; if (nvars > 0) { set_up_env_vars(&env_vars, nvars); retval = execve(app_path, argv, env_vars); } else { retval = execv(app_path, argv); } perror("execv() failed: "); exit(ERR_EXEC); } // pid = 0 i.e. child proc of the fork #endif suspended = false; elapsed_time = 0; return 0; } // return true if task exited // bool TASK::poll(int& status) { char buf[256]; if (time_limit && elapsed_time > time_limit) { fprintf(stderr, "%s task %s reached time limit %.0f\n", boinc_msg_prefix(buf, sizeof(buf)), application.c_str(), time_limit ); kill(); status = 0; return true; } #ifdef _WIN32 unsigned long exit_code; if (GetExitCodeProcess(pid_handle, &exit_code)) { if (exit_code != STILL_ACTIVE) { status = exit_code; final_cpu_time = current_cpu_time; fprintf(stderr, "%s %s exited; CPU time %f\n", boinc_msg_prefix(buf, sizeof(buf)), application.c_str(), final_cpu_time ); return true; } } #else int wpid; struct rusage ru; wpid = waitpid(pid, &status, WNOHANG); if (wpid) { getrusage(RUSAGE_CHILDREN, &ru); final_cpu_time = (float)ru.ru_utime.tv_sec + ((float)ru.ru_utime.tv_usec)/1e+6; final_cpu_time -= start_rusage; fprintf(stderr, "%s %s exited; CPU time %f\n", boinc_msg_prefix(buf, sizeof(buf)), application.c_str(), final_cpu_time ); if (final_cpu_time < current_cpu_time) { final_cpu_time = current_cpu_time; } return true; } #endif return false; } // kill this task (gracefully if possible) and any other subprocesses // void TASK::kill() { #ifdef _WIN32 kill_descendants(); #else kill_descendants(pid); #endif } void TASK::stop() { if (multi_process) { suspend_or_resume_descendants(false); } else { suspend_or_resume_process(pid, false); } suspended = true; } void TASK::resume() { if (multi_process) { suspend_or_resume_descendants(true); } else { suspend_or_resume_process(pid, true); } suspended = false; } // Get the CPU time of the app while it's running. // This totals the CPU time of all the descendant processes, // so it shouldn't be called too frequently. // double TASK::cpu_time() { #ifndef ANDROID // the Android GUI doesn't show CPU time, // and process_tree_cpu_time() crashes sometimes // double x = process_tree_cpu_time(pid); // if the process has exited, the above could return zero. // So update carefully. // if (x > current_cpu_time) { current_cpu_time = x; } #endif return current_cpu_time; } void poll_boinc_messages(TASK& task) { BOINC_STATUS status; boinc_get_status(&status); //fprintf(stderr, "wrapper: polling\n"); if (status.no_heartbeat) { debug_msg("wrapper: kill"); task.kill(); kill_daemons(); exit(0); } if (status.quit_request) { debug_msg("wrapper: quit"); task.kill(); kill_daemons(); exit(0); } if (status.abort_request) { debug_msg("wrapper: abort"); task.kill(); kill_daemons(); exit(0); } if (status.suspended) { if (!task.suspended) { debug_msg("wrapper: suspend"); task.stop(); } } else { if (task.suspended) { debug_msg("wrapper: resume"); task.resume(); } } } // see if it's time to send trickle-up reporting elapsed time // void check_trickle_period() { char buf[256]; static double last_trickle_report_time = 0; if ((runtime - last_trickle_report_time) < trickle_period) { return; } last_trickle_report_time = runtime; sprintf(buf, "<cpu_time>%f</cpu_time>", last_trickle_report_time ); boinc_send_trickle_up( const_cast<char*>("cpu_time"), buf ); } // Support for multiple tasks. // We keep a checkpoint file that says how many tasks we've completed // and how much CPU time and runtime has been used so far // void write_checkpoint(int ntasks_completed, double cpu, double rt) { boinc_begin_critical_section(); FILE* f = fopen(CHECKPOINT_FILENAME, "w"); if (!f) return; fprintf(f, "%d %f %f\n", ntasks_completed, cpu, rt); fclose(f); boinc_checkpoint_completed(); } int read_checkpoint(int& ntasks_completed, double& cpu, double& rt) { int nt; double c, r; ntasks_completed = 0; cpu = 0; FILE* f = fopen(CHECKPOINT_FILENAME, "r"); if (!f) return ERR_FOPEN; int n = fscanf(f, "%d %lf %lf", &nt, &c, &r); fclose(f); if (n != 2) return 0; ntasks_completed = nt; cpu = c; rt = r; return 0; } int main(int argc, char** argv) { BOINC_OPTIONS options; int retval, ntasks_completed; unsigned int i; double total_weight=0, weight_completed=0; double checkpoint_cpu_time; // total CPU time at last checkpoint char buf[256]; #ifdef _WIN32 SetPriorityClass(GetCurrentProcess(), NORMAL_PRIORITY_CLASS); #endif for (int j=1; j<argc; j++) { if (!strcmp(argv[j], "--nthreads")) { nthreads = atoi(argv[++j]); } else if (!strcmp(argv[j], "--device")) { gpu_device_num = atoi(argv[++j]); } else if (!strcmp(argv[j], "--trickle")) { trickle_period = atof(argv[++j]); } } retval = parse_job_file(); if (retval) { fprintf(stderr, "%s can't parse job file: %d\n", boinc_msg_prefix(buf, sizeof(buf)), retval ); boinc_finish(retval); } do_unzip_inputs(); retval = read_checkpoint(ntasks_completed, checkpoint_cpu_time, runtime); if (retval && !zip_filename.empty()) { // this is the first time we've run. // If we're going to zip output files, // make a list of files present at this point // so we can exclude them. // write_checkpoint(0, 0, 0); get_initial_file_list(); } // do initialization after getting initial file list, // in case we're supposed to zip stderr.txt // memset(&options, 0, sizeof(options)); options.main_program = true; options.check_heartbeat = true; options.handle_process_control = true; boinc_init_options(&options); fprintf(stderr, "%s wrapper (%d.%d.%d): starting\n", boinc_msg_prefix(buf, sizeof(buf)), BOINC_MAJOR_VERSION, BOINC_MINOR_VERSION, WRAPPER_RELEASE ); boinc_get_init_data(aid); if (ntasks_completed > (int)tasks.size()) { fprintf(stderr, "%s Checkpoint file: ntasks_completed too large: %d > %d\n", boinc_msg_prefix(buf, sizeof(buf)), ntasks_completed, (int)tasks.size() ); boinc_finish(1); } for (i=0; i<tasks.size(); i++) { total_weight += tasks[i].weight; } retval = start_daemons(argc, argv); if (retval) { fprintf(stderr, "%s start_daemons(): %d\n", boinc_msg_prefix(buf, sizeof(buf)), retval ); kill_daemons(); boinc_finish(retval); } // loop over tasks // for (i=0; i<tasks.size(); i++) { TASK& task = tasks[i]; if ((int)i<ntasks_completed) { weight_completed += task.weight; continue; } double frac_done = weight_completed/total_weight; double cpu_time = 0; task.starting_cpu = checkpoint_cpu_time; retval = task.run(argc, argv); if (retval) { boinc_finish(retval); } int counter = 0; while (1) { int status; if (task.poll(status)) { if (status) { fprintf(stderr, "%s app exit status: 0x%x\n", boinc_msg_prefix(buf, sizeof(buf)), status ); // On Unix, if the app is non-executable, // the child status will be 0x6c00. // If we return this the client will treat it // as recoverable, and restart us. // We don't want this, so return an 8-bit error code. // kill_daemons(); boinc_finish(EXIT_CHILD_FAILED); } break; } poll_boinc_messages(task); double task_fraction_done = task.fraction_done(); double delta = task_fraction_done*task.weight/total_weight; // getting CPU time of task tree is inefficient, // so do it only every 10 sec // if (counter%10 == 0) { cpu_time = task.cpu_time(); } #ifdef DEBUG fprintf(stderr, "%s cpu time %f, checkpoint CPU time %f frac done %f\n", boinc_msg_prefix(buf, sizeof(buf)), task.starting_cpu + cpu_time, checkpoint_cpu_time, frac_done + delta ); #endif boinc_report_app_status( task.starting_cpu + cpu_time, checkpoint_cpu_time, frac_done + delta ); if (task.has_checkpointed()) { cpu_time = task.cpu_time(); checkpoint_cpu_time = task.starting_cpu + cpu_time; write_checkpoint(i, checkpoint_cpu_time, runtime); } if (trickle_period) { check_trickle_period(); } boinc_sleep(POLL_PERIOD); if (!task.suspended) { task.elapsed_time += POLL_PERIOD; runtime += POLL_PERIOD; } counter++; } checkpoint_cpu_time = task.starting_cpu + task.final_cpu_time; #ifdef DEBUG fprintf(stderr, "%s cpu time %f, checkpoint CPU time %f frac done %f\n", boinc_msg_prefix(buf, sizeof(buf)), task.starting_cpu + task.final_cpu_time, checkpoint_cpu_time, frac_done + task.weight/total_weight ); #endif boinc_report_app_status( task.starting_cpu + task.final_cpu_time, checkpoint_cpu_time, frac_done + task.weight/total_weight ); write_checkpoint(i+1, checkpoint_cpu_time, runtime); weight_completed += task.weight; } kill_daemons(); do_zip_outputs(); boinc_finish(0); } #ifdef _WIN32 int WINAPI WinMain(HINSTANCE hInst, HINSTANCE hPrevInst, LPSTR Args, int WinMode) { LPSTR command_line; char* argv[100]; int argc; command_line = GetCommandLine(); argc = parse_command_line(command_line, argv); return main(argc, argv); } #endif

Crunch3r
Send message
Joined: 28 Jun 15
Posts: 1
Credit: 7,350
RAC: 0
Message 1601 - Posted: 28 Jun 2015, 16:25:21 UTC

Built on 10.8 mountain lion

http://www.boincunited.org/opt_apps/osx/wrapper.zip

Ken_g6
Send message
Joined: 1 Jan 15
Posts: 3
Credit: 5,820,643
RAC: 1
Message 5263 - Posted: 7 Jun 2019, 13:41:15 UTC

Any chance the wrapper could be compiled from a newer version? Like this one? https://github.com/ibethune/llr_wrapper/blob/master/wrapper.C

Most likely the wrapper would have to be modified, but still that code has a lot of benefits the wrapper here doesn't.

Profile rebirther
Volunteer moderator
Project administrator
Project developer
Project tester
Project scientist
Avatar
Send message
Joined: 2 Jan 13
Posts: 7479
Credit: 43,657,967
RAC: 40,740
Message 5266 - Posted: 7 Jun 2019, 15:17:53 UTC - in response to Message 5263.

Any chance the wrapper could be compiled from a newer version? Like this one? https://github.com/ibethune/llr_wrapper/blob/master/wrapper.C

Most likely the wrapper would have to be modified, but still that code has a lot of benefits the wrapper here doesn't.


No, there are many different things included like input / output files, readout files etc.

Profile rebirther
Volunteer moderator
Project administrator
Project developer
Project tester
Project scientist
Avatar
Send message
Joined: 2 Jan 13
Posts: 7479
Credit: 43,657,967
RAC: 40,740
Message 6323 - Posted: 20 Apr 2020, 19:17:11 UTC

Wrapper TF code:

// This file is part of BOINC. // http://boinc.berkeley.edu // Copyright (C) 2014 University of California // // BOINC is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License // as published by the Free Software Foundation, // either version 3 of the License, or (at your option) any later version. // // BOINC is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see <http://www.gnu.org/licenses/>. // BOINC wrapper - lets you use non-BOINC apps with BOINC // See https://boinc.berkeley.edu/trac/wiki/WrapperApp // // cmdline options: // --device N macro-substitute N for $GPU_DEVICE_NUM // in worker cmdlines and env values // --nthreads X macro-substitute X for $NTHREADS // in worker cmdlines and env values // --trickle X send a trickle-up message reporting runtime every X sec // of runtime (use this for credit granting // if your app does its own job management) // // Handles: // - suspend/resume/quit/abort // - reporting CPU time // - loss of heartbeat from client // - checkpointing // (at the level of task; or potentially within task) // // Contributor: Andrew J. Younge (ajy4490@umiacs.umd.edu) // comment out the following to disable checking that // executables are signed. // Doing so introduces a security vulnerability. // #define CHECK_EXECUTABLES #ifndef _WIN32 #include "config.h" #endif #include <stdio.h> #include <vector> #include <string> #include <algorithm> #ifdef _WIN32 #include "boinc_win.h" #include "win_util.h" #else #ifdef HAVE_SYS_WAIT_H #include <sys/wait.h> #endif #include <sys/types.h> #include <sys/stat.h> #ifdef HAVE_SYS_TIME_H #include <sys/time.h> #endif #ifdef HAVE_SYS_RESOURCE_H #include <sys/resource.h> #endif #include <unistd.h> #endif #include "version.h" #ifndef _WIN32 #include "svn_version.h" #endif #include "boinc_api.h" #include "app_ipc.h" #include "graphics2.h" #include "boinc_zip.h" #include "diagnostics.h" #include "error_numbers.h" #include "filesys.h" #include "parse.h" #include "proc_control.h" #include "procinfo.h" #include "str_util.h" #include "str_replace.h" #include "util.h" #include "regexp.h" using std::vector; using std::string; #ifdef DEBUG inline void debug_msg(const char* x) { fprintf(stderr, "[DEBUG] %s\n", x); } #else #define debug_msg(x) #endif #define JOB_FILENAME "job.xml" #define CHECKPOINT_FILENAME "wrapper_checkpoint.txt" #define POLL_PERIOD 1.0 int nthreads = 1; int gpu_device_num = -1; double runtime = 0; // run time this session double trickle_period = 0; bool enable_graphics_support = false; vector<string> unzip_filenames; string zip_filename; vector<regexp*> zip_patterns; APP_INIT_DATA aid; struct TASK { string application; string exec_dir; // optional execution directory; // macro-substituted vector<string> vsetenv; // vector of strings for environment variables // macro-substituted string stdin_filename; string stdout_filename; string stderr_filename; string checkpoint_filename; // name of task's checkpoint file, if any string fraction_done_filename; // name of file where app will write its fraction done string command_line; // macro-substituted double weight; // contribution of this task to overall fraction done bool is_daemon; bool append_cmdline_args; bool multi_process; bool forward_slashes; double time_limit; int priority; // dynamic stuff follows double current_cpu_time; // most recently measured CPU time of this task double final_cpu_time; // final CPU time of this task double starting_cpu; // how much CPU time was used by tasks before this one bool suspended; double elapsed_time; #ifdef _WIN32 HANDLE pid_handle; DWORD pid; struct _stat last_stat; // mod time of checkpoint file #else int pid; struct stat last_stat; double start_rusage; // getrusage() CPU time at start of task #endif bool stat_first; TASK() : checkpoint_filename("wrapper_checkpoint.txt") {} int parse(XML_PARSER&); void substitute_macros(); bool poll(int& status); int run(int argc, char** argv); void kill(); void stop(); void resume(); double cpu_time(); inline double getMfraction() { //printf("Rein in getMfraction\n"); if (FILE* f = fopen("worktodo.txt", "r")) { char* p; char buf[256]; p = fgets(buf, 256, f); fclose(f); if (p == NULL) return 0.0001; char * qch = strtok(p, "="); qch = strtok(NULL, ","); char * cm = "M"; char * ending = ".chp"; char * mfile = (char *)malloc(1 + strlen(cm) + strlen(qch)); strcpy(mfile, cm); strcat(mfile, qch); char * mfilename = (char *)malloc(1 + strlen(mfile) + strlen(ending)); strcpy(mfilename, mfile); strcat(mfilename, ending); //printf("Filename: %s\n", mfilename); if (FILE* ff = fopen(mfilename, "r")) { if (!ff) return 0.0001; char* pp; pp = fgets(buf, 256, ff); fclose(ff); if (pp == NULL) return 0.0001; char * pch; int count = 0; double all = 0, done = 0, frac = 0; pch = strtok(pp, " "); while (pch != NULL) { count++; if (count == 4) all = atof(pch); if (count == 7) done = atof(pch); pch = strtok(NULL, " "); } if (all > 0) frac = done / all; if (frac < 0) return 0.0001; if (frac > 1) return 0.0001; return frac; } else { return 0.0001; } } else { return 0.0001; } } inline bool has_checkpointed() { bool changed = false; if (checkpoint_filename.size() == 0) return false; struct stat new_stat; int retval = stat(checkpoint_filename.c_str(), &new_stat); if (retval) return false; if (!stat_first && new_stat.st_mtime != last_stat.st_mtime) { changed = true; } stat_first = false; last_stat.st_mtime = new_stat.st_mtime; return changed; } inline double fraction_done() { double frac = getMfraction(); return frac; } #ifdef _WIN32 // Windows uses a "null-terminated sequence of null-terminated strings" // to represent env vars. // I guess arg/argv didn't cut it for them. // void set_up_env_vars(char** env_vars, const int nvars) { int bufsize = 0; int len = 0; for (int j = 0; j < nvars; j++) { bufsize += (1 + (int)vsetenv[j].length()); } bufsize++; // add a final byte for array null ptr *env_vars = new char[bufsize]; memset(*env_vars, 0, sizeof(char) * bufsize); char* p = *env_vars; // copy each env string to a buffer for the process for (vector<string>::iterator it = vsetenv.begin(); it != vsetenv.end() && len < bufsize-1; ++it ) { strncpy(p, it->c_str(), it->length()); len = (int)strlen(p); p += len + 1; // move pointer ahead } } #else void set_up_env_vars(char*** env_vars, const int nvars) { *env_vars = new char*[nvars+1]; // need one more than the # of vars, for a NULL ptr at the end memset(*env_vars, 0x00, sizeof(char*) * (nvars+1)); // get all environment vars for this task for (int i = 0; i < nvars; i++) { (*env_vars)[i] = const_cast<char*>(vsetenv[i].c_str()); } } #endif }; vector<TASK> tasks; vector<TASK> daemons; // replace s1 with s2 // void str_replace_all(char* buf, const char* s1, const char* s2) { char buf2[64000]; const size_t s1_len = strlen(s1); while (1) { char* p = strstr(buf, s1); if (!p) break; strcpy(buf2, p+s1_len); strcpy(p, s2); strcat(p, buf2); } } // replace s1 with s2 // http://stackoverflow.com/questions/2896600/how-to-replace-all-occurrences-of-a-character-in-string // void str_replace_all(string &str, const string& s1, const string& s2) { size_t start_pos = 0; while((start_pos = str.find(s1, start_pos)) != string::npos) { str.replace(start_pos, s1.length(), s2); start_pos += s2.length(); // Handles case where 's1' is a substring of 's2' } } // macro-substitute strings from job.xml // $PROJECT_DIR -> project directory // $NTHREADS --> --nthreads arg if present, else 1 // $GPU_DEVICE_NUM --> gpu_device_num from init_data.xml, or --device arg // $PWD --> current directory // void macro_substitute(string &str) { const char* pd = strlen(aid.project_dir)?aid.project_dir:"."; str_replace_all(str, "$PROJECT_DIR", pd); #ifdef DEBUG fprintf(stderr, "[DEBUG] replacing '%s' with '%s'\n", "$PROJECT_DIR", pd); #endif char nt[256]; sprintf(nt, "%d", nthreads); str_replace_all(str, "$NTHREADS", nt); #ifdef DEBUG fprintf(stderr, "[DEBUG] replacing '%s' with '%s'\n", "$NTHREADS", nt); #endif if (aid.gpu_device_num >= 0) { gpu_device_num = aid.gpu_device_num; } if (gpu_device_num >= 0) { sprintf(nt, "%d", gpu_device_num); str_replace_all(str, "$GPU_DEVICE_NUM", nt); #ifdef DEBUG fprintf(stderr, "[DEBUG] replacing '%s' with '%s'\n", "$GPU_DEVICE_NUM", nt); #endif } #ifdef _WIN32 GetCurrentDirectory(sizeof(nt),nt); str_replace_all(str, "$PWD", nt); #ifdef DEBUG fprintf(stderr, "[DEBUG] replacing '%s' with '%s'\n", "$PWD", nt); #endif #else char cwd[1024]; str_replace_all(str, "$PWD", getcwd(cwd, sizeof(cwd))); #ifdef DEBUG fprintf(stderr, "[DEBUG] replacing '%s' with '%s'\n", "$PWD", getcwd(cwd, sizeof(cwd))); #endif #endif } // make a list of files in the slot directory, // and write to "initial_file_list" // void get_initial_file_list() { char fname[256]; vector<string> initial_files; DIRREF d = dir_open("."); while (!dir_scan(fname, d, sizeof(fname))) { initial_files.push_back(fname); } dir_close(d); FILE* f = fopen("initial_file_list_temp", "w"); for (unsigned int i=0; i<initial_files.size(); i++) { fprintf(f, "%s\n", initial_files[i].c_str()); } fclose(f); int retval = boinc_rename("initial_file_list_temp", "initial_file_list"); if (retval) { fprintf(stderr, "boinc_rename() error: %d\n", retval); exit(1); } } void read_initial_file_list(vector<string>& files) { char buf[256]; FILE* f = fopen("initial_file_list", "r"); if (!f) return; while (fgets(buf, sizeof(buf), f)) { strip_whitespace(buf); files.push_back(string(buf)); } fclose(f); } // if any zipped input files are present, unzip and remove them // void do_unzip_inputs() { for (unsigned int i=0; i<unzip_filenames.size(); i++) { string zipfilename = unzip_filenames[i]; if (boinc_file_exists(zipfilename.c_str())) { string path; boinc_resolve_filename_s(zipfilename.c_str(), path); int retval = boinc_zip(UNZIP_IT, path, NULL); if (retval) { fprintf(stderr, "boinc_unzip() error: %d\n", retval); exit(1); } retval = boinc_delete_file(zipfilename.c_str()); if (retval) { fprintf(stderr, "boinc_delete_file() error: %d\n", retval); } } } } bool in_vector(string s, vector<string>& v) { for (unsigned int i=0; i<v.size(); i++) { if (s == v[i]) return true; } return false; } // get the list of output files to zip // void get_zip_inputs(ZipFileList &files) { vector<string> initial_files; char fname[256]; read_initial_file_list(initial_files); DIRREF d = dir_open("."); while (!dir_scan(fname, d, sizeof(fname))) { string filename = string(fname); if (in_vector(filename, initial_files)) continue; for (unsigned int i=0; i<zip_patterns.size(); i++) { regmatch match; if (re_exec_w(zip_patterns[i], fname, 1, &match) == 1) { files.push_back(filename); break; } } } dir_close(d); } // if the zipped output file is not present, // create the zip in a temp file, then rename it // void do_zip_outputs() { if (zip_filename.empty()) return; string path; boinc_resolve_filename_s(zip_filename.c_str(), path); if (boinc_file_exists(path.c_str())) return; ZipFileList infiles; get_zip_inputs(infiles); int retval = boinc_zip(ZIP_IT, string("temp.zip"), &infiles); if (retval) { fprintf(stderr, "boinc_zip() failed: %d\n", retval); exit(1); } retval = boinc_rename("temp.zip", path.c_str()); if (retval) { fprintf(stderr, "failed to rename temp.zip: %d\n", retval); exit(1); } } int TASK::parse(XML_PARSER& xp) { char buf[8192]; weight = 1; current_cpu_time = 0; final_cpu_time = 0; stat_first = true; pid = 0; is_daemon = false; multi_process = false; append_cmdline_args = false; forward_slashes = false; time_limit = 0; priority = PROCESS_PRIORITY_LOWEST; while (!xp.get_tag()) { if (!xp.is_tag) { fprintf(stderr, "%s TASK::parse(): unexpected text %s\n", boinc_msg_prefix(buf, sizeof(buf)), xp.parsed_tag ); continue; } if (xp.match_tag("/task")) { return 0; } else if (xp.parse_string("application", application)) continue; else if (xp.parse_str("exec_dir", buf, sizeof(buf))) { exec_dir = buf; continue; } else if (xp.parse_str("setenv", buf, sizeof(buf))) { vsetenv.push_back(buf); continue; } else if (xp.parse_string("stdin_filename", stdin_filename)) continue; else if (xp.parse_string("stdout_filename", stdout_filename)) continue; else if (xp.parse_string("stderr_filename", stderr_filename)) continue; else if (xp.parse_str("command_line", buf, sizeof(buf))) { command_line = buf; continue; } else if (xp.parse_string("checkpoint_filename", checkpoint_filename)) continue; else if (xp.parse_string("fraction_done_filename", fraction_done_filename)) continue; else if (xp.parse_double("weight", weight)) continue; else if (xp.parse_bool("daemon", is_daemon)) continue; else if (xp.parse_bool("forward_slashes", forward_slashes)) continue; else if (xp.parse_bool("multi_process", multi_process)) continue; else if (xp.parse_bool("append_cmdline_args", append_cmdline_args)) continue; else if (xp.parse_double("time_limit", time_limit)) continue; else if (xp.parse_int("priority", priority)) continue; } return ERR_XML_PARSE; } void TASK::substitute_macros() { if (!exec_dir.empty()) { macro_substitute(exec_dir); } for (unsigned int i = 0; i < vsetenv.size(); i++) { macro_substitute(vsetenv[i]); } if (!command_line.empty()) { macro_substitute(command_line); } } int parse_unzip_input(XML_PARSER& xp) { char buf2[256]; string s; while (!xp.get_tag()) { if (xp.match_tag("/unzip_input")) { return 0; } if (xp.parse_string("zipfilename", s)) { unzip_filenames.push_back(s); continue; } fprintf(stderr, "%s unexpected tag in job.xml: %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), xp.parsed_tag ); } return ERR_XML_PARSE; } int parse_zip_output(XML_PARSER& xp) { char buf[256]; while (!xp.get_tag()) { if (xp.match_tag("/zip_output")) { return 0; } if (xp.parse_string("zipfilename", zip_filename)) { continue; } if (xp.parse_str("filename", buf, sizeof(buf))) { regexp* rp; int retval = re_comp_w(&rp, buf); if (retval) { fprintf(stderr, "re_comp_w() failed: %d\n", retval); exit(1); } zip_patterns.push_back(rp); continue; } fprintf(stderr, "%s unexpected tag in job.xml: %s\n", boinc_msg_prefix(buf, sizeof(buf)), xp.parsed_tag ); } return ERR_XML_PARSE; } int parse_job_file() { MIOFILE mf; char buf[256], buf2[256]; boinc_resolve_filename(JOB_FILENAME, buf, 1024); FILE* f = boinc_fopen(buf, "r"); if (!f) { fprintf(stderr, "%s can't open job file %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), buf ); return ERR_FOPEN; } mf.init_file(f); XML_PARSER xp(&mf); if (!xp.parse_start("job_desc")) return ERR_XML_PARSE; while (!xp.get_tag()) { if (!xp.is_tag) { fprintf(stderr, "%s unexpected text in job.xml: %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), xp.parsed_tag ); continue; } if (xp.match_tag("/job_desc")) { fclose(f); return 0; } if (xp.match_tag("task")) { TASK task; int retval = task.parse(xp); if (!retval) { if (task.is_daemon) { daemons.push_back(task); } else { tasks.push_back(task); } } continue; } if (xp.match_tag("unzip_input")) { parse_unzip_input(xp); continue; } if (xp.match_tag("zip_output")) { parse_zip_output(xp); continue; } if (xp.parse_bool("enable_graphics_support", enable_graphics_support)) continue; fprintf(stderr, "%s unexpected tag in job.xml: %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), xp.parsed_tag ); } fclose(f); return ERR_XML_PARSE; } int start_daemons(int argc, char** argv) { for (unsigned int i=0; i<daemons.size(); i++) { TASK& task = daemons[i]; int retval = task.run(argc, argv); if (retval) return retval; } return 0; } void kill_daemons() { vector<int> daemon_pids; for (unsigned int i=0; i<daemons.size(); i++) { TASK& task = daemons[i]; if (task.pid) { daemon_pids.push_back(task.pid); } } kill_all(daemon_pids); } #ifdef _WIN32 // CreateProcess() takes HANDLEs for the stdin/stdout. // We need to use CreateFile() to get them. Ugh. // HANDLE win_fopen(const char* path, const char* mode) { SECURITY_ATTRIBUTES sa; memset(&sa, 0, sizeof(sa)); sa.nLength = sizeof(sa); sa.bInheritHandle = TRUE; if (!strcmp(mode, "r")) { return CreateFile( path, GENERIC_READ, FILE_SHARE_READ, &sa, OPEN_EXISTING, 0, 0 ); } else if (!strcmp(mode, "w")) { return CreateFile( path, GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, &sa, OPEN_ALWAYS, 0, 0 ); } else if (!strcmp(mode, "a")) { HANDLE hAppend = CreateFile( path, GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, &sa, OPEN_ALWAYS, 0, 0 ); SetFilePointer(hAppend, 0, NULL, FILE_END); return hAppend; } else { return 0; } } #endif void slash_to_backslash(char* p) { while (1) { char* q = strchr(p, '/'); if (!q) break; *q = '\\'; } } void backslash_to_slash(char* p) { while (1) { char* q = strchr(p, '\\'); if (!q) break; *q = '/'; } } int TASK::run(int argct, char** argvt) { string stdout_path, stdin_path, stderr_path; char app_path[1024], buf[256]; if (fraction_done_filename.size()) { boinc_delete_file(fraction_done_filename.c_str()); } strcpy(buf, application.c_str()); char* p = strstr(buf, "$PROJECT_DIR"); if (p) { p += strlen("$PROJECT_DIR"); sprintf(app_path, "%s%s", aid.project_dir, p); } else { boinc_resolve_filename(buf, app_path, sizeof(app_path)); } if (!boinc_file_exists(app_path)) { fprintf(stderr, "application %s missing\n", app_path); exit(1); } // Optionally append wrapper's command-line arguments // to those in the job file. // if (append_cmdline_args) { for (int i=1; i<argct; i++){ command_line += string(" "); command_line += argvt[i]; } } // resolve "boinc_resolve(...)" phrases in command-line const size_t boinc_resolve_prefix_len = strlen("boinc_resolve("); while (1) { char lbuf[16384]; char fname[1024]; char *from, *to; strncpy (lbuf, command_line.c_str(), sizeof(lbuf)); lbuf[sizeof(lbuf)-1] = '\0'; from = strstr(lbuf, "boinc_resolve("); if (!from) { break; } to = strchr(from, ')'); if (!to) { fprintf(stderr, "missing ')' after 'boinc_resolve('\n"); exit(1); } *to = 0; boinc_resolve_filename(from + boinc_resolve_prefix_len, fname, sizeof(fname)); #ifdef _WIN32 if(forward_slashes) { backslash_to_slash(fname); } else { slash_to_backslash(fname); } #endif *from = 0; command_line = string(lbuf) + string(fname) + string(to+1); } fprintf(stderr, "%s wrapper: running %s (%s)\n", boinc_msg_prefix(buf, sizeof(buf)), app_path, command_line.c_str() ); #ifdef _WIN32 PROCESS_INFORMATION process_info; STARTUPINFO startup_info; string command; slash_to_backslash(app_path); memset(&process_info, 0, sizeof(process_info)); memset(&startup_info, 0, sizeof(startup_info)); if (ends_with((string)app_path, ".bat") || ends_with((string)app_path, ".cmd")) { command = string("cmd.exe /c \"") + app_path + string("\" ") + command_line; } else { command = string("\"") + app_path + string("\" ") + command_line; } // pass std handles to app // startup_info.dwFlags = STARTF_USESTDHANDLES; if (stdout_filename != "") { boinc_resolve_filename_s(stdout_filename.c_str(), stdout_path); startup_info.hStdOutput = win_fopen(stdout_path.c_str(), "a"); } else { startup_info.hStdOutput = (HANDLE)_get_osfhandle(_fileno(stderr)); } if (stdin_filename != "") { boinc_resolve_filename_s(stdin_filename.c_str(), stdin_path); startup_info.hStdInput = win_fopen(stdin_path.c_str(), "r"); } if (stderr_filename != "") { boinc_resolve_filename_s(stderr_filename.c_str(), stderr_path); startup_info.hStdError = win_fopen(stderr_path.c_str(), "a"); } else { startup_info.hStdError = (HANDLE)_get_osfhandle(_fileno(stderr)); } if (startup_info.hStdOutput == INVALID_HANDLE_VALUE) { fprintf(stderr, "Error: startup_info.hStdOutput is invalid\n"); } if ((stdin_filename != "") && (startup_info.hStdInput == INVALID_HANDLE_VALUE)) { fprintf(stderr, "Error: startup_info.hStdInput is invalid\n"); } if (startup_info.hStdError == INVALID_HANDLE_VALUE) { fprintf(stderr, "Error: startup_info.hStdError is invalid\n"); } // setup environment vars if needed // int nvars = (int)vsetenv.size(); char* env_vars = NULL; if (nvars > 0) { set_up_env_vars(&env_vars, nvars); } BOOL success; success = CreateProcess( NULL, (LPSTR)command.c_str(), NULL, NULL, TRUE, // bInheritHandles CREATE_NO_WINDOW|process_priority_value(priority), (LPVOID) env_vars, exec_dir.empty()?NULL:exec_dir.c_str(), &startup_info, &process_info ); if (!success) { char error_msg[1024]; windows_format_error_string(GetLastError(), error_msg, sizeof(error_msg)); fprintf(stderr, "can't run app: %s\n", error_msg); fprintf(stderr, "Error: command is '%s'\n", command.c_str()); fprintf(stderr, "Error: exec_dir is '%s'\n", exec_dir.c_str()); if (env_vars) delete [] env_vars; return ERR_EXEC; } if (env_vars) delete [] env_vars; pid_handle = process_info.hProcess; pid = process_info.dwProcessId; #else int retval; char* argv[256]; char arglist[4096]; FILE* stdout_file; FILE* stdin_file; FILE* stderr_file; struct rusage ru; getrusage(RUSAGE_CHILDREN, &ru); start_rusage = (float)ru.ru_utime.tv_sec + ((float)ru.ru_utime.tv_usec)/1e+6; pid = fork(); if (pid == -1) { perror("fork(): "); return ERR_FORK; } if (pid == 0) { // we're in the child process here // // open stdout, stdin if file names are given // NOTE: if the application is restartable, // we should deal with atomicity somehow // if (stdout_filename != "") { boinc_resolve_filename_s(stdout_filename.c_str(), stdout_path); stdout_file = freopen(stdout_path.c_str(), "a", stdout); if (!stdout_file) { fprintf(stderr, "Can't open %s for stdout; exiting\n", stdout_path.c_str()); return ERR_FOPEN; } } if (stdin_filename != "") { boinc_resolve_filename_s(stdin_filename.c_str(), stdin_path); stdin_file = freopen(stdin_path.c_str(), "r", stdin); if (!stdin_file) { fprintf(stderr, "Can't open %s for stdin; exiting\n", stdin_path.c_str()); return ERR_FOPEN; } } if (stderr_filename != "") { boinc_resolve_filename_s(stderr_filename.c_str(), stderr_path); stderr_file = freopen(stderr_path.c_str(), "a", stderr); if (!stderr_file) { fprintf(stderr, "Can't open %s for stderr; exiting\n", stderr_path.c_str()); return ERR_FOPEN; } } // construct argv // TODO: use malloc instead of stack var // argv[0] = app_path; strlcpy(arglist, command_line.c_str(), sizeof(arglist)); parse_command_line(arglist, argv+1); setpriority(PRIO_PROCESS, 0, process_priority_value(priority)); if (!exec_dir.empty()) { retval = chdir(exec_dir.c_str()); if (retval) { fprintf(stderr, "%s chdir() to %s failed with %d\n", boinc_msg_prefix(buf, sizeof(buf)), exec_dir.c_str(), retval ); exit(1); } } // setup environment variables (if any) // const int nvars = vsetenv.size(); char** env_vars = NULL; if (nvars > 0) { set_up_env_vars(&env_vars, nvars); retval = execve(app_path, argv, env_vars); } else { retval = execv(app_path, argv); } perror("execv() failed: "); exit(ERR_EXEC); } // pid = 0 i.e. child proc of the fork #endif suspended = false; elapsed_time = 0; return 0; } // return true if task exited; in that case also return its exit status // (zero means it completed successfully) // bool TASK::poll(int& status) { char buf[256]; if (time_limit && elapsed_time > time_limit) { fprintf(stderr, "%s task %s reached time limit %.0f\n", boinc_msg_prefix(buf, sizeof(buf)), application.c_str(), time_limit ); kill(); status = 0; return true; } #ifdef _WIN32 unsigned long exit_code; if (GetExitCodeProcess(pid_handle, &exit_code)) { if (exit_code != STILL_ACTIVE) { status = exit_code; final_cpu_time = current_cpu_time; fprintf(stderr, "%s %s exited; CPU time %f\n", boinc_msg_prefix(buf, sizeof(buf)), application.c_str(), final_cpu_time ); return true; } } #else int wpid; struct rusage ru; wpid = waitpid(pid, &status, WNOHANG); if (wpid) { getrusage(RUSAGE_CHILDREN, &ru); final_cpu_time = (float)ru.ru_utime.tv_sec + ((float)ru.ru_utime.tv_usec)/1e+6; final_cpu_time -= start_rusage; fprintf(stderr, "%s %s exited; CPU time %f\n", boinc_msg_prefix(buf, sizeof(buf)), application.c_str(), final_cpu_time ); if (WIFEXITED(status)) { status = WEXITSTATUS(status); } if (final_cpu_time < current_cpu_time) { final_cpu_time = current_cpu_time; } return true; } #endif return false; } // kill this task (gracefully if possible) and any other subprocesses // void TASK::kill() { #ifdef _WIN32 kill_descendants(); #else kill_descendants(pid); #endif } void TASK::stop() { if (multi_process) { suspend_or_resume_descendants(false); } else { suspend_or_resume_process(pid, false); } suspended = true; } void TASK::resume() { if (multi_process) { suspend_or_resume_descendants(true); } else { suspend_or_resume_process(pid, true); } suspended = false; } // Get the CPU time of the app while it's running. // This totals the CPU time of all the descendant processes, // so it shouldn't be called too frequently. // double TASK::cpu_time() { #ifndef ANDROID // the Android GUI doesn't show CPU time, // and process_tree_cpu_time() crashes sometimes // double x = process_tree_cpu_time(pid); // if the process has exited, the above could return zero. // So update carefully. // if (x > current_cpu_time) { current_cpu_time = x; } #endif return current_cpu_time; } void poll_boinc_messages(TASK& task) { BOINC_STATUS status; boinc_get_status(&status); //fprintf(stderr, "wrapper: polling\n"); if (status.no_heartbeat) { debug_msg("wrapper: kill"); task.kill(); kill_daemons(); exit(0); } if (status.quit_request) { debug_msg("wrapper: quit"); task.kill(); kill_daemons(); exit(0); } if (status.abort_request) { debug_msg("wrapper: abort"); task.kill(); kill_daemons(); exit(0); } if (status.suspended) { if (!task.suspended) { debug_msg("wrapper: suspend"); task.stop(); } } else { if (task.suspended) { debug_msg("wrapper: resume"); task.resume(); } } } // see if it's time to send trickle-up reporting elapsed time // void check_trickle_period() { char buf[256]; static double last_trickle_report_time = 0; if ((runtime - last_trickle_report_time) < trickle_period) { return; } last_trickle_report_time = runtime; sprintf(buf, "<cpu_time>%f</cpu_time>", last_trickle_report_time ); boinc_send_trickle_up( const_cast<char*>("cpu_time"), buf ); } // Support for multiple tasks. // We keep a checkpoint file that says how many tasks we've completed // and how much CPU time and runtime has been used so far // void write_checkpoint(int ntasks_completed, double cpu, double rt) { boinc_begin_critical_section(); FILE* f = fopen(CHECKPOINT_FILENAME, "w"); if (!f) { boinc_end_critical_section(); return; } fprintf(f, "%d %f %f\n", ntasks_completed, cpu, rt); fclose(f); boinc_checkpoint_completed(); } // read the checkpoint file; // return nonzero if it's missing or bad format // int read_checkpoint(int& ntasks_completed, double& cpu, double& rt) { int nt; double c, r; ntasks_completed = 0; cpu = 0; rt = 0; FILE* f = fopen(CHECKPOINT_FILENAME, "r"); if (!f) return ERR_FOPEN; int n = fscanf(f, "%d %lf %lf", &nt, &c, &r); fclose(f); if (n != 3) return -1; ntasks_completed = nt; cpu = c; rt = r; return 0; } // Check whether executable files (tasks and daemons) are code-signed. // The client supplies a list of app version files, which are code-signed. // For each executable file: // - check that it's a soft link // - check that it's of the form ../../project_url/x // - check that "x" is in the list of app version files // void check_execs(vector<TASK> &t) { for (unsigned int i=0; i<t.size(); i++) { TASK &task = t[i]; string phys_name = resolve_soft_link( aid.project_dir, task.application.c_str() ); if (phys_name.empty()) { fprintf(stderr, "task executable %s is not a link\n", phys_name.c_str() ); boinc_finish(1); } if (std::find(aid.app_files.begin(), aid.app_files.end(), phys_name) == aid.app_files.end()) { fprintf(stderr, "task executable %s is not in app version\n", task.application.c_str() ); boinc_finish(1); } } } void check_executables() { if (aid.app_files.size() == 0) return; check_execs(tasks); check_execs(daemons); } int main(int argc, char** argv) { BOINC_OPTIONS options; int retval, ntasks_completed; unsigned int i; double total_weight=0, weight_completed=0; double checkpoint_cpu_time; // total CPU time at last checkpoint char buf[256]; // Log banner // fprintf(stderr, "%s wrapper (%d.%d.%d): starting\n", boinc_msg_prefix(buf, sizeof(buf)), BOINC_MAJOR_VERSION, BOINC_MINOR_VERSION, WRAPPER_RELEASE ); #ifdef _WIN32 SetPriorityClass(GetCurrentProcess(), NORMAL_PRIORITY_CLASS); #endif for (int j=1; j<argc; j++) { if (!strcmp(argv[j], "--nthreads")) { nthreads = atoi(argv[++j]); } else if (!strcmp(argv[j], "--device")) { gpu_device_num = atoi(argv[++j]); } else if (!strcmp(argv[j], "--trickle")) { trickle_period = atof(argv[++j]); #ifndef _WIN32 } else if (!strcmp(argv[j], "--version") || !strcmp(argv[j], "-v")) { fprintf(stderr, "%s\n", SVN_VERSION); boinc_finish(0); #endif } } retval = parse_job_file(); if (retval) { fprintf(stderr, "%s can't parse job file: %d\n", boinc_msg_prefix(buf, sizeof(buf)), retval ); boinc_finish(retval); } do_unzip_inputs(); retval = read_checkpoint(ntasks_completed, checkpoint_cpu_time, runtime); if (retval && !zip_filename.empty()) { // this is the first time we've run. // If we're going to zip output files, // make a list of files present at this point so we can exclude them. // write_checkpoint(0, 0, 0); get_initial_file_list(); } // do initialization after getting initial file list, // in case we're supposed to zip stderr.txt // memset(&options, 0, sizeof(options)); options.main_program = true; options.check_heartbeat = true; options.handle_process_control = true; boinc_init_options(&options); fprintf(stderr, "%s wrapper (%d.%d.%d): starting\n", boinc_msg_prefix(buf, sizeof(buf)), BOINC_MAJOR_VERSION, BOINC_MINOR_VERSION, WRAPPER_RELEASE ); boinc_get_init_data(aid); #ifdef CHECK_EXECUTABLES check_executables(); #endif if (ntasks_completed > (int)tasks.size()) { fprintf(stderr, "%s Checkpoint file: ntasks_completed too large: %d > %d\n", boinc_msg_prefix(buf, sizeof(buf)), ntasks_completed, (int)tasks.size() ); boinc_finish(1); } for (i=0; i<tasks.size(); i++) { total_weight += tasks[i].weight; // need to substitute macros after boinc_init_options() and boinc_get_init_data() tasks[i].substitute_macros(); } retval = start_daemons(argc, argv); if (retval) { fprintf(stderr, "%s start_daemons(): %d\n", boinc_msg_prefix(buf, sizeof(buf)), retval ); kill_daemons(); boinc_finish(retval); } // loop over tasks // for (i=0; i<tasks.size(); i++) { TASK& task = tasks[i]; if ((int)i<ntasks_completed) { weight_completed += task.weight; continue; } double frac_done = weight_completed/total_weight; double cpu_time = 0; task.starting_cpu = checkpoint_cpu_time; retval = task.run(argc, argv); if (retval) { boinc_finish(retval); } int counter = 0; while (1) { int status; if (task.poll(status)) { if (status) { fprintf(stderr, "%s app exit status: 0x%x\n", boinc_msg_prefix(buf, sizeof(buf)), status ); // On Unix, if the app is non-executable, // the child status will be 0x6c00. // If we return this the client will treat it // as recoverable, and restart us. // We don't want this, so return an 8-bit error code. // kill_daemons(); boinc_finish(EXIT_CHILD_FAILED); } break; } poll_boinc_messages(task); double task_fraction_done = task.fraction_done(); double delta = task_fraction_done*task.weight/total_weight; // getting CPU time of task tree is inefficient, // so do it only every 10 sec // if (counter%10 == 0) { cpu_time = task.cpu_time(); } #ifdef DEBUG fprintf(stderr, "%s cpu time %f, checkpoint CPU time %f frac done %f\n", boinc_msg_prefix(buf, sizeof(buf)), task.starting_cpu + cpu_time, checkpoint_cpu_time, frac_done + delta ); #endif boinc_report_app_status( task.starting_cpu + cpu_time, checkpoint_cpu_time, frac_done + delta ); if (task.has_checkpointed()) { cpu_time = task.cpu_time(); checkpoint_cpu_time = task.starting_cpu + cpu_time; write_checkpoint(i, checkpoint_cpu_time, runtime); } if (trickle_period) { check_trickle_period(); } if (enable_graphics_support) { boinc_write_graphics_status( task.starting_cpu + cpu_time, checkpoint_cpu_time + task.elapsed_time, frac_done + task.weight/total_weight ); } boinc_sleep(POLL_PERIOD); if (!task.suspended) { task.elapsed_time += POLL_PERIOD; runtime += POLL_PERIOD; } counter++; } checkpoint_cpu_time = task.starting_cpu + task.final_cpu_time; #ifdef DEBUG fprintf(stderr, "%s cpu time %f, checkpoint CPU time %f frac done %f\n", boinc_msg_prefix(buf, sizeof(buf)), task.starting_cpu + task.final_cpu_time, checkpoint_cpu_time, frac_done + task.weight/total_weight ); #endif boinc_report_app_status( task.starting_cpu + task.final_cpu_time, checkpoint_cpu_time, frac_done + task.weight/total_weight ); write_checkpoint(i+1, checkpoint_cpu_time, runtime); weight_completed += task.weight; } kill_daemons(); do_zip_outputs(); boinc_finish(0); }

Profile rebirther
Volunteer moderator
Project administrator
Project developer
Project tester
Project scientist
Avatar
Send message
Joined: 2 Jan 13
Posts: 7479
Credit: 43,657,967
RAC: 40,740
Message 8934 - Posted: 12 Jun 2023, 10:22:27 UTC

Linux wrapper code

// This file is part of BOINC. // http://boinc.berkeley.edu // Copyright (C) 2014 University of California // // BOINC is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License // as published by the Free Software Foundation, // either version 3 of the License, or (at your option) any later version. // // BOINC is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see <http://www.gnu.org/licenses/>. // BOINC wrapper - lets you use non-BOINC apps with BOINC // See http://boinc.berkeley.edu/trac/wiki/WrapperApp // // cmdline options: // --device N macro-substitute N for $GPU_DEVICE_NUM // in worker cmdlines and env values // --nthreads X macro-substitute X for $NTHREADS // in worker cmdlines and env values // --trickle X send a trickle-up message reporting runtime every X sec // of runtime (use this for credit granting // if your app does its own job management) // // Handles: // - suspend/resume/quit/abort // - reporting CPU time // - loss of heartbeat from client // - checkpointing // (at the level of task; or potentially within task) // // Contributor: Andrew J. Younge (ajy4490@umiacs.umd.edu) #ifndef _WIN32 #include "config.h" #endif #include <stdio.h> #include <vector> #include <string> #ifdef _WIN32 #include "boinc_win.h" #include "win_util.h" #else #ifdef HAVE_SYS_WAIT_H #include <sys/wait.h> #endif #include <sys/types.h> #include <sys/stat.h> #ifdef HAVE_SYS_TIME_H #include <sys/time.h> #endif #ifdef HAVE_SYS_RESOURCE_H #include <sys/resource.h> #endif #include <unistd.h> #endif #include "version.h" #include "boinc_api.h" #include "boinc_zip.h" #include "diagnostics.h" #include "error_numbers.h" #include "filesys.h" #include "parse.h" #include "proc_control.h" #include "procinfo.h" #include "str_util.h" #include "str_replace.h" #include "util.h" #include "regexp.h" using std::vector; using std::string; //#define DEBUG #if 1 #define debug_msg(x) #else inline void debug_msg(const char* x) { fprintf(stderr, "%s\n", x); } #endif #define JOB_FILENAME "job.xml" #define CHECKPOINT_FILENAME "wrapper_checkpoint.txt" #define POLL_PERIOD 1.0 int nthreads = 1; int gpu_device_num = -1; double runtime = 0; // run time this session double trickle_period = 0; vector<string> unzip_filenames; string zip_filename; vector<regexp*> zip_patterns; APP_INIT_DATA aid; #ifndef _WIN32 // Define pipes for IPC between parent and child process int pipefd[2]; // pipefd[0] = file descriptor for reading end of pipe // pipefd[1] = file descriptor for writing end of pipe #include <limits.h> char buffer[PIPE_BUF]; // maximum message size // If we want to support non-blocking behaviour of read/write operations #include <fcntl.h> #endif struct TASK { string application; string exec_dir; // optional execution directory; // macro-substituted vector<string> vsetenv; // vector of strings for environment variables // macro-substituted string stdin_filename; string stdout_filename; string stderr_filename; string checkpoint_filename; // name of task's checkpoint file, if any string fraction_done_filename; // name of file where app will write its fraction done string command_line; // macro-substituted double weight; // contribution of this task to overall fraction done bool is_daemon; bool append_cmdline_args; bool multi_process; double time_limit; int priority; // dynamic stuff follows double current_cpu_time; // most recently measured CPU time of this task double final_cpu_time; // final CPU time of this task double starting_cpu; // how much CPU time was used by tasks before this one bool suspended; double elapsed_time; #ifdef _WIN32 HANDLE pid_handle; DWORD pid; struct _stat last_stat; // mod time of checkpoint file #else int pid; struct stat last_stat; double start_rusage; // getrusage() CPU time at start of task #endif bool stat_first; int parse(XML_PARSER&); bool poll(int& status); int run(int argc, char** argv); void kill(); void stop(); void resume(); double cpu_time(); inline bool has_checkpointed() { bool changed = false; if (checkpoint_filename.size() == 0) return false; struct stat new_stat; int retval = stat(checkpoint_filename.c_str(), &new_stat); if (retval) return false; if (!stat_first && new_stat.st_mtime != last_stat.st_mtime) { changed = true; } stat_first = false; last_stat.st_mtime = new_stat.st_mtime; return changed; } inline double fraction_done() { if (fraction_done_filename.size() == 0) return 0; FILE* f = fopen(fraction_done_filename.c_str(), "r"); if (!f) return 0; // read the last line of the file // fseek(f, -32, SEEK_END); double temp, frac = 0; while (!feof(f)) { char buf[256]; char* p = fgets(buf, 256, f); if (p == NULL) break; int n = sscanf(buf, "%lf", &temp); if (n == 1) frac = temp; } fclose(f); if (frac < 0) return 0; if (frac > 1) return 1; return frac; } #ifdef _WIN32 // Windows uses a "null-terminated sequence of null-terminated strings" // to represent env vars. // I guess arg/argv didn't cut it for them. // void set_up_env_vars(char** env_vars, const int nvars) { int bufsize = 0; int len = 0; for (int j = 0; j < nvars; j++) { bufsize += (1 + (int)vsetenv[j].length()); } bufsize++; // add a final byte for array null ptr *env_vars = new char[bufsize]; memset(*env_vars, 0, sizeof(char) * bufsize); char* p = *env_vars; // copy each env string to a buffer for the process for (vector<string>::iterator it = vsetenv.begin(); it != vsetenv.end() && len < bufsize-1; it++ ) { strncpy(p, it->c_str(), it->length()); len = (int)strlen(p); p += len + 1; // move pointer ahead } } #else void set_up_env_vars(char*** env_vars, const int nvars) { *env_vars = new char*[nvars+1]; // need one more than the # of vars, for a NULL ptr at the end memset(*env_vars, 0x00, sizeof(char*) * (nvars+1)); // get all environment vars for this task for (int i = 0; i < nvars; i++) { (*env_vars)[i] = const_cast<char*>(vsetenv[i].c_str()); } } #endif }; vector<TASK> tasks; vector<TASK> daemons; // replace s1 with s2 // void str_replace_all(char* buf, const char* s1, const char* s2) { char buf2[64000]; while (1) { char* p = strstr(buf, s1); if (!p) break; strcpy(buf2, p+strlen(s1)); strcpy(p, s2); strcat(p, buf2); } } // macro-substitute strings from job.xml // $PROJECT_DIR -> project directory // $NTHREADS --> --nthreads arg if present, else 1 // $GPU_DEVICE_NUM --> gpu_device_num from init_data.xml, or --device arg // void macro_substitute(char* buf) { const char* pd = strlen(aid.project_dir)?aid.project_dir:"."; str_replace_all(buf, "$PROJECT_DIR", pd); char nt[256]; sprintf(nt, "%d", nthreads); str_replace_all(buf, "$NTHREADS", nt); if (aid.gpu_device_num >= 0) { gpu_device_num = aid.gpu_device_num; } if (gpu_device_num >= 0) { sprintf(nt, "%d", gpu_device_num); str_replace_all(buf, "$GPU_DEVICE_NUM", nt); } } // make a list of files in the slot directory, // and write to "initial_file_list" // void get_initial_file_list() { char fname[256]; vector<string> initial_files; DIRREF d = dir_open("."); while (!dir_scan(fname, d, sizeof(fname))) { initial_files.push_back(fname); } dir_close(d); FILE* f = fopen("initial_file_list_temp", "w"); for (unsigned int i=0; i<initial_files.size(); i++) { fprintf(f, "%s\n", initial_files[i].c_str()); } fclose(f); int retval = boinc_rename("initial_file_list_temp", "initial_file_list"); if (retval) { fprintf(stderr, "boinc_rename() error: %d\n", retval); exit(1); } } void read_initial_file_list(vector<string>& files) { char buf[256]; FILE* f = fopen("initial_file_list", "r"); if (!f) return; while (fgets(buf, sizeof(buf), f)) { strip_whitespace(buf); files.push_back(string(buf)); } fclose(f); } // if any zipped input files are present, unzip and remove them // void do_unzip_inputs() { for (unsigned int i=0; i<unzip_filenames.size(); i++) { string zipfilename = unzip_filenames[i]; if (boinc_file_exists(zipfilename.c_str())) { string path; boinc_resolve_filename_s(zipfilename.c_str(), path); int retval = boinc_zip(UNZIP_IT, path, NULL); if (retval) { fprintf(stderr, "boinc_unzip() error: %d\n", retval); exit(1); } retval = boinc_delete_file(zipfilename.c_str()); if (retval) { fprintf(stderr, "boinc_delete_file() error: %d\n", retval); } } } } bool in_vector(string s, vector<string>& v) { for (unsigned int i=0; i<v.size(); i++) { if (s == v[i]) return true; } return false; } // get the list of output files to zip // void get_zip_inputs(ZipFileList &files) { vector<string> initial_files; char fname[256]; read_initial_file_list(initial_files); DIRREF d = dir_open("."); while (!dir_scan(fname, d, sizeof(fname))) { string filename = string(fname); if (in_vector(filename, initial_files)) continue; for (unsigned int i=0; i<zip_patterns.size(); i++) { regmatch match; if (re_exec_w(zip_patterns[i], fname, 1, &match) == 1) { files.push_back(filename); break; } } } } // if the zipped output file is not present, // create the zip in a temp file, then rename it // void do_zip_outputs() { if (zip_filename.empty()) return; if (boinc_file_exists(zip_filename.c_str())) return; ZipFileList infiles; get_zip_inputs(infiles); int retval = boinc_zip(ZIP_IT, string("temp.zip"), &infiles); if (retval) { fprintf(stderr, "boinc_zip() failed: %d\n", retval); exit(1); } string path; boinc_resolve_filename_s(zip_filename.c_str(), path); retval = boinc_rename("temp.zip", path.c_str()); if (retval) { fprintf(stderr, "failed to rename temp.zip: %d\n", retval); exit(1); } } int TASK::parse(XML_PARSER& xp) { char buf[8192]; weight = 1; current_cpu_time = 0; final_cpu_time = 0; stat_first = true; pid = 0; is_daemon = false; multi_process = false; append_cmdline_args = false; time_limit = 0; priority = PROCESS_PRIORITY_LOWEST; while (!xp.get_tag()) { if (!xp.is_tag) { fprintf(stderr, "%s TASK::parse(): unexpected text %s\n", boinc_msg_prefix(buf, sizeof(buf)), xp.parsed_tag ); continue; } if (xp.match_tag("/task")) { return 0; } else if (xp.parse_string("application", application)) continue; else if (xp.parse_str("exec_dir", buf, sizeof(buf))) { macro_substitute(buf); exec_dir = buf; continue; } else if (xp.parse_str("setenv", buf, sizeof(buf))) { macro_substitute(buf); vsetenv.push_back(buf); continue; } else if (xp.parse_string("stdin_filename", stdin_filename)) continue; else if (xp.parse_string("stdout_filename", stdout_filename)) continue; else if (xp.parse_string("stderr_filename", stderr_filename)) continue; else if (xp.parse_str("command_line", buf, sizeof(buf))) { macro_substitute(buf); command_line = buf; continue; } else if (xp.parse_string("checkpoint_filename", checkpoint_filename)) continue; else if (xp.parse_string("fraction_done_filename", fraction_done_filename)) continue; else if (xp.parse_double("weight", weight)) continue; else if (xp.parse_bool("daemon", is_daemon)) continue; else if (xp.parse_bool("multi_process", multi_process)) continue; else if (xp.parse_bool("append_cmdline_args", append_cmdline_args)) continue; else if (xp.parse_double("time_limit", time_limit)) continue; else if (xp.parse_int("priority", priority)) continue; } return ERR_XML_PARSE; } int parse_unzip_input(XML_PARSER& xp) { char buf2[256]; string s; while (!xp.get_tag()) { if (xp.match_tag("/unzip_input")) { return 0; } if (xp.parse_string("zipfilename", s)) { unzip_filenames.push_back(s); continue; } fprintf(stderr, "%s unexpected tag in job.xml: %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), xp.parsed_tag ); } return ERR_XML_PARSE; } int parse_zip_output(XML_PARSER& xp) { char buf[256]; while (!xp.get_tag()) { if (xp.match_tag("/zip_output")) { return 0; } if (xp.parse_string("zipfilename", zip_filename)) { continue; } if (xp.parse_str("filename", buf, sizeof(buf))) { regexp* rp; int retval = re_comp_w(&rp, buf); if (retval) { fprintf(stderr, "re_comp_w() failed: %d\n", retval); exit(1); } zip_patterns.push_back(rp); continue; } fprintf(stderr, "%s unexpected tag in job.xml: %s\n", boinc_msg_prefix(buf, sizeof(buf)), xp.parsed_tag ); } return ERR_XML_PARSE; } int parse_job_file() { MIOFILE mf; char buf[256], buf2[256]; boinc_resolve_filename(JOB_FILENAME, buf, 1024); FILE* f = boinc_fopen(buf, "r"); if (!f) { fprintf(stderr, "%s can't open job file %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), buf ); return ERR_FOPEN; } mf.init_file(f); XML_PARSER xp(&mf); if (!xp.parse_start("job_desc")) return ERR_XML_PARSE; while (!xp.get_tag()) { if (!xp.is_tag) { fprintf(stderr, "%s unexpected text in job.xml: %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), xp.parsed_tag ); continue; } if (xp.match_tag("/job_desc")) { fclose(f); return 0; } if (xp.match_tag("task")) { TASK task; int retval = task.parse(xp); if (!retval) { if (task.is_daemon) { daemons.push_back(task); } else { tasks.push_back(task); } } continue; } if (xp.match_tag("unzip_input")) { parse_unzip_input(xp); continue; } if (xp.match_tag("zip_output")) { parse_zip_output(xp); continue; } fprintf(stderr, "%s unexpected tag in job.xml: %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), xp.parsed_tag ); } fclose(f); return ERR_XML_PARSE; } int start_daemons(int argc, char** argv) { for (unsigned int i=0; i<daemons.size(); i++) { TASK& task = daemons[i]; int retval = task.run(argc, argv); if (retval) return retval; } return 0; } void kill_daemons() { vector<int> daemon_pids; for (unsigned int i=0; i<daemons.size(); i++) { TASK& task = daemons[i]; if (task.pid) { daemon_pids.push_back(task.pid); } } kill_all(daemon_pids); } #ifdef _WIN32 // CreateProcess() takes HANDLEs for the stdin/stdout. // We need to use CreateFile() to get them. Ugh. // HANDLE win_fopen(const char* path, const char* mode) { SECURITY_ATTRIBUTES sa; memset(&sa, 0, sizeof(sa)); sa.nLength = sizeof(sa); sa.bInheritHandle = TRUE; if (!strcmp(mode, "r")) { return CreateFile( path, GENERIC_READ, FILE_SHARE_READ, &sa, OPEN_EXISTING, 0, 0 ); } else if (!strcmp(mode, "w")) { return CreateFile( path, GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, &sa, OPEN_ALWAYS, 0, 0 ); } else if (!strcmp(mode, "a")) { HANDLE hAppend = CreateFile( path, GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, &sa, OPEN_ALWAYS, 0, 0 ); SetFilePointer(hAppend, 0, NULL, FILE_END); return hAppend; } else { return 0; } } #endif void slash_to_backslash(char* p) { while (1) { char* q = strchr(p, '/'); if (!q) break; *q = '\\'; } } int TASK::run(int argct, char** argvt) { string stdout_path, stdin_path, stderr_path; char app_path[1024], buf[256]; if (fraction_done_filename.size()) { boinc_delete_file(fraction_done_filename.c_str()); } strcpy(buf, application.c_str()); char* p = strstr(buf, "$PROJECT_DIR"); if (p) { p += strlen("$PROJECT_DIR"); sprintf(app_path, "%s%s", aid.project_dir, p); } else { boinc_resolve_filename(buf, app_path, sizeof(app_path)); } if (!boinc_file_exists(app_path)) { fprintf(stderr, "application %s missing\n", app_path); exit(1); } // Optionally append wrapper's command-line arguments // to those in the job file. // if (append_cmdline_args) { for (int i=1; i<argct; i++){ command_line += string(" "); command_line += argvt[i]; } } fprintf(stderr, "%s wrapper: running %s (%s)\n", boinc_msg_prefix(buf, sizeof(buf)), app_path, command_line.c_str() ); #ifdef _WIN32 PROCESS_INFORMATION process_info; STARTUPINFO startup_info; string command; slash_to_backslash(app_path); memset(&process_info, 0, sizeof(process_info)); memset(&startup_info, 0, sizeof(startup_info)); if (ends_with((string)app_path, ".bat") || ends_with((string)app_path, ".cmd")) { command = string("cmd.exe /c \"") + app_path + string("\" ") + command_line; } else { command = string("\"") + app_path + string("\" ") + command_line; } // pass std handles to app // startup_info.dwFlags = STARTF_USESTDHANDLES; if (stdout_filename != "") { boinc_resolve_filename_s(stdout_filename.c_str(), stdout_path); startup_info.hStdOutput = win_fopen(stdout_path.c_str(), "a"); } else { // Redirecting child stdout to wrapper stderr here is not a typo startup_info.hStdOutput = (HANDLE)_get_osfhandle(_fileno(stderr)); } if (stdin_filename != "") { boinc_resolve_filename_s(stdin_filename.c_str(), stdin_path); startup_info.hStdInput = win_fopen(stdin_path.c_str(), "r"); } if (stderr_filename != "") { boinc_resolve_filename_s(stderr_filename.c_str(), stderr_path); startup_info.hStdError = win_fopen(stderr_path.c_str(), "a"); } else { startup_info.hStdError = (HANDLE)_get_osfhandle(_fileno(stderr)); } if (startup_info.hStdOutput == INVALID_HANDLE_VALUE) { fprintf(stderr, "Error: startup_info.hStdOutput is invalid\n"); } if ((stdin_filename != "") && (startup_info.hStdInput == INVALID_HANDLE_VALUE)) { fprintf(stderr, "Error: startup_info.hStdInput is invalid\n"); } if (startup_info.hStdError == INVALID_HANDLE_VALUE) { fprintf(stderr, "Error: startup_info.hStdError is invalid\n"); } // setup environment vars if needed // int nvars = (int)vsetenv.size(); char* env_vars = NULL; if (nvars > 0) { set_up_env_vars(&env_vars, nvars); } BOOL success; success = CreateProcess( NULL, (LPSTR)command.c_str(), NULL, NULL, TRUE, // bInheritHandles CREATE_NO_WINDOW|process_priority_value(priority), (LPVOID) env_vars, exec_dir.empty()?NULL:exec_dir.c_str(), &startup_info, &process_info ); if (!success) { char error_msg[1024]; windows_format_error_string(GetLastError(), error_msg, sizeof(error_msg)); fprintf(stderr, "can't run app: %s\n", error_msg); fprintf(stderr, "Error: command is '%s'\n", command.c_str()); fprintf(stderr, "Error: exec_dir is '%s'\n", exec_dir.c_str()); if (env_vars) delete [] env_vars; return ERR_EXEC; } if (env_vars) delete [] env_vars; pid_handle = process_info.hProcess; pid = process_info.dwProcessId; #else // Hier muss ein Fehler sein int retval; char* argv[256]; char arglist[4096]; FILE* stdout_file; FILE* stdin_file; FILE* stderr_file; struct rusage ru; getrusage(RUSAGE_CHILDREN, &ru); start_rusage = (float)ru.ru_utime.tv_sec + ((float)ru.ru_utime.tv_usec)/1e+6; pid = fork(); if (pid == -1) { perror("fork(): "); return ERR_FORK; } if (pid == 0) { // we're in the child process here // // open stdout, stdin if file names are given // NOTE: if the application is restartable, // we should deal with atomicity somehow // close(pipefd[0]); // child doesn't need to read from pipe if (stdout_filename != "") { boinc_resolve_filename_s(stdout_filename.c_str(), stdout_path); stdout_file = freopen(stdout_path.c_str(), "a", stdout); if (!stdout_file) { fprintf(stderr, "Can't open %s for stdout; exiting\n", stdout_path.c_str()); return ERR_FOPEN; } } else { // Redirecting child stdout to wrapper stderr here is not a typo if (dup2(pipefd[1], STDOUT_FILENO) == -1) { // child sends stdout to pipe fprintf(stderr, "Can't redirect stdout to pipe; exiting\n"); return ERR_FOPEN; } } if (stdin_filename != "") { boinc_resolve_filename_s(stdin_filename.c_str(), stdin_path); stdin_file = freopen(stdin_path.c_str(), "r", stdin); if (!stdin_file) { fprintf(stderr, "Can't open %s for stdin; exiting\n", stdin_path.c_str()); return ERR_FOPEN; } } if (stderr_filename != "") { boinc_resolve_filename_s(stderr_filename.c_str(), stderr_path); stderr_file = freopen(stderr_path.c_str(), "a", stderr); if (!stderr_file) { fprintf(stderr, "Can't open %s for stderr; exiting\n", stderr_path.c_str()); return ERR_FOPEN; } } else { if (dup2(pipefd[1], STDERR_FILENO) == -1) { // child sends stderr to pipe fprintf(stderr, "Can't redirect stderr to pipe; exiting\n"); return ERR_FOPEN; } } close(pipefd[1]); // child doesn't need the file descriptor anymore because redirectsions have been setup above // construct argv // TODO: use malloc instead of stack var // argv[0] = app_path; strlcpy(arglist, command_line.c_str(), sizeof(arglist)); parse_command_line(arglist, argv+1); setpriority(PRIO_PROCESS, 0, process_priority_value(priority)); if (!exec_dir.empty()) { retval = chdir(exec_dir.c_str()); if (!retval) { fprintf(stderr, "%s chdir() to %s failed\n", boinc_msg_prefix(buf, sizeof(buf)), exec_dir.c_str() ); exit(1); } } // setup environment variables (if any) // const int nvars = vsetenv.size(); char** env_vars = NULL; if (nvars > 0) { set_up_env_vars(&env_vars, nvars); retval = execve(app_path, argv, env_vars); } else { retval = execv(app_path, argv); } perror("execv() failed: "); exit(ERR_EXEC); } else { close(pipefd[1]); // parent doesn't need to write to pipe } #endif suspended = false; elapsed_time = 0; return 0; } // return true if task exited // bool TASK::poll(int& status) { char buf[256]; if (time_limit && elapsed_time > time_limit) { fprintf(stderr, "%s task %s reached time limit %.0f\n", boinc_msg_prefix(buf, sizeof(buf)), application.c_str(), time_limit ); kill(); status = 0; return true; } #ifdef _WIN32 unsigned long exit_code; if (GetExitCodeProcess(pid_handle, &exit_code)) { if (exit_code != STILL_ACTIVE) { status = exit_code; final_cpu_time = current_cpu_time; fprintf(stderr, "%s %s exited; CPU time %f\n", boinc_msg_prefix(buf, sizeof(buf)), application.c_str(), final_cpu_time ); return true; } } #else int wpid; struct rusage ru; wpid = waitpid(pid, &status, WNOHANG); if (wpid) { getrusage(RUSAGE_CHILDREN, &ru); final_cpu_time = (float)ru.ru_utime.tv_sec + ((float)ru.ru_utime.tv_usec)/1e+6; final_cpu_time -= start_rusage; fprintf(stderr, "%s %s exited; CPU time %f\n", boinc_msg_prefix(buf, sizeof(buf)), application.c_str(), final_cpu_time ); if (final_cpu_time < current_cpu_time) { final_cpu_time = current_cpu_time; } return true; } #endif return false; } // kill this task (gracefully if possible) and any other subprocesses // void TASK::kill() { #ifdef _WIN32 kill_descendants(); #else kill_descendants(pid); #endif } void TASK::stop() { if (multi_process) { suspend_or_resume_descendants(false); } else { suspend_or_resume_process(pid, false); } suspended = true; } void TASK::resume() { if (multi_process) { suspend_or_resume_descendants(true); } else { suspend_or_resume_process(pid, true); } suspended = false; } // Get the CPU time of the app while it's running. // This totals the CPU time of all the descendant processes, // so it shouldn't be called too frequently. // double TASK::cpu_time() { #ifndef ANDROID // the Android GUI doesn't show CPU time, // and process_tree_cpu_time() crashes sometimes // double x = process_tree_cpu_time(pid); // if the process has exited, the above could return zero. // So update carefully. // if (x > current_cpu_time) { current_cpu_time = x; } #endif return current_cpu_time; } void poll_boinc_messages(TASK& task) { BOINC_STATUS status; boinc_get_status(&status); //fprintf(stderr, "wrapper: polling\n"); if (status.no_heartbeat) { debug_msg("wrapper: kill"); task.kill(); kill_daemons(); exit(0); } if (status.quit_request) { debug_msg("wrapper: quit"); task.kill(); kill_daemons(); exit(0); } if (status.abort_request) { debug_msg("wrapper: abort"); task.kill(); kill_daemons(); exit(0); } if (status.suspended) { if (!task.suspended) { debug_msg("wrapper: suspend"); task.stop(); } } else { if (task.suspended) { debug_msg("wrapper: resume"); task.resume(); } } } // see if it's time to send trickle-up reporting elapsed time // void check_trickle_period() { char buf[256]; static double last_trickle_report_time = 0; if ((runtime - last_trickle_report_time) < trickle_period) { return; } last_trickle_report_time = runtime; sprintf(buf, "<cpu_time>%f</cpu_time>", last_trickle_report_time ); boinc_send_trickle_up( const_cast<char*>("cpu_time"), buf ); } // Support for multiple tasks. // We keep a checkpoint file that says how many tasks we've completed // and how much CPU time and runtime has been used so far // void write_checkpoint(int ntasks_completed, double cpu, double rt) { boinc_begin_critical_section(); FILE* f = fopen(CHECKPOINT_FILENAME, "w"); if (!f) return; fprintf(f, "%d %f %f\n", ntasks_completed, cpu, rt); fclose(f); boinc_checkpoint_completed(); } int read_checkpoint(int& ntasks_completed, double& cpu, double& rt) { int nt; double c, r; ntasks_completed = 0; cpu = 0; FILE* f = fopen(CHECKPOINT_FILENAME, "r"); if (!f) return ERR_FOPEN; int n = fscanf(f, "%d %lf %lf", &nt, &c, &r); fclose(f); if (n != 2) return 0; ntasks_completed = nt; cpu = c; rt = r; return 0; } int main(int argc, char** argv) { BOINC_OPTIONS options; int retval, ntasks_completed; unsigned int i; double total_weight=0, weight_completed=0; double checkpoint_cpu_time; // total CPU time at last checkpoint char buf[256]; #ifndef _WIN32 // Setup pipe for IPC //if (pipe2(pipefd, O_NONBLOCK) == -1) { if (pipe2(pipefd, 0) == -1) { perror("pipe2(): "); return ERR_PIPE; } #endif #ifdef _WIN32 SetPriorityClass(GetCurrentProcess(), NORMAL_PRIORITY_CLASS); #endif for (int j=1; j<argc; j++) { if (!strcmp(argv[j], "--nthreads")) { nthreads = atoi(argv[++j]); } else if (!strcmp(argv[j], "--device")) { gpu_device_num = atoi(argv[++j]); } else if (!strcmp(argv[j], "--trickle")) { trickle_period = atof(argv[++j]); } } retval = parse_job_file(); if (retval) { fprintf(stderr, "%s can't parse job file: %d\n", boinc_msg_prefix(buf, sizeof(buf)), retval ); boinc_finish(retval); } do_unzip_inputs(); retval = read_checkpoint(ntasks_completed, checkpoint_cpu_time, runtime); if (retval && !zip_filename.empty()) { // this is the first time we've run. // If we're going to zip output files, // make a list of files present at this point // so we can exclude them. // write_checkpoint(0, 0, 0); get_initial_file_list(); } // do initialization after getting initial file list, // in case we're supposed to zip stderr.txt // memset(&options, 0, sizeof(options)); options.main_program = true; options.check_heartbeat = true; options.handle_process_control = true; boinc_init_options(&options); fprintf(stderr, "%s wrapper (%d.%d.%d): starting\n", boinc_msg_prefix(buf, sizeof(buf)), BOINC_MAJOR_VERSION, BOINC_MINOR_VERSION, WRAPPER_RELEASE ); boinc_get_init_data(aid); if (ntasks_completed > (int)tasks.size()) { fprintf(stderr, "%s Checkpoint file: ntasks_completed too large: %d > %d\n", boinc_msg_prefix(buf, sizeof(buf)), ntasks_completed, (int)tasks.size() ); boinc_finish(1); } for (i=0; i<tasks.size(); i++) { total_weight += tasks[i].weight; } retval = start_daemons(argc, argv); if (retval) { fprintf(stderr, "%s start_daemons(): %d\n", boinc_msg_prefix(buf, sizeof(buf)), retval ); kill_daemons(); boinc_finish(retval); } // loop over tasks // for (i=0; i<tasks.size(); i++) { TASK& task = tasks[i]; if ((int)i<ntasks_completed) { weight_completed += task.weight; continue; } double frac_done = weight_completed/total_weight; double cpu_time = 0; task.starting_cpu = checkpoint_cpu_time; retval = task.run(argc, argv); if (retval) { boinc_finish(retval); } int counter = 0; while (1) { #ifndef _WIN32 char _char; // Read data from pipe, i.e. from child process while (read(pipefd[0], &_char, sizeof(_char)) > 0) { // We need to replace the following character because LLR2 continuously alters // the process progress line in the output in manual mode (using -d) if (_char == '\r') { _char = '\n'; } fprintf(stderr, "%c", _char); } #endif int status; if (task.poll(status)) { if (status) { fprintf(stderr, "%s app exit status: 0x%x\n", boinc_msg_prefix(buf, sizeof(buf)), status ); // On Unix, if the app is non-executable, // the child status will be 0x6c00. // If we return this the client will treat it // as recoverable, and restart us. // We don't want this, so return an 8-bit error code. // kill_daemons(); boinc_finish(EXIT_CHILD_FAILED); } break; } poll_boinc_messages(task); double task_fraction_done = task.fraction_done(); double delta = task_fraction_done*task.weight/total_weight; // getting CPU time of task tree is inefficient, // so do it only every 10 sec // if (counter%10 == 0) { cpu_time = task.cpu_time(); } #ifdef DEBUG fprintf(stderr, "%s cpu time %f, checkpoint CPU time %f frac done %f\n", boinc_msg_prefix(buf, sizeof(buf)), task.starting_cpu + cpu_time, checkpoint_cpu_time, frac_done + delta ); #endif boinc_report_app_status( task.starting_cpu + cpu_time, checkpoint_cpu_time, frac_done + delta ); if (task.has_checkpointed()) { cpu_time = task.cpu_time(); checkpoint_cpu_time = task.starting_cpu + cpu_time; write_checkpoint(i, checkpoint_cpu_time, runtime); } if (trickle_period) { check_trickle_period(); } boinc_sleep(POLL_PERIOD); if (!task.suspended) { task.elapsed_time += POLL_PERIOD; runtime += POLL_PERIOD; } counter++; } #ifndef _WIN32 close(pipefd[0]); // parent doesn't need to read from pipe anymore #endif checkpoint_cpu_time = task.starting_cpu + task.final_cpu_time; #ifdef DEBUG fprintf(stderr, "%s cpu time %f, checkpoint CPU time %f frac done %f\n", boinc_msg_prefix(buf, sizeof(buf)), task.starting_cpu + task.final_cpu_time, checkpoint_cpu_time, frac_done + task.weight/total_weight ); #endif boinc_report_app_status( task.starting_cpu + task.final_cpu_time, checkpoint_cpu_time, frac_done + task.weight/total_weight ); write_checkpoint(i+1, checkpoint_cpu_time, runtime); weight_completed += task.weight; } kill_daemons(); do_zip_outputs(); boinc_finish(0); } #ifdef _WIN32 int WINAPI WinMain(HINSTANCE hInst, HINSTANCE hPrevInst, LPSTR Args, int WinMode) { LPSTR command_line; char* argv[100]; int argc; command_line = GetCommandLine(); argc = parse_command_line(command_line, argv); return main(argc, argv); } #endif

Message boards : Number crunching : wrapper source code


Main page · Your account · Message boards


Copyright © 2014-2024 BOINC Confederation / rebirther