summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorroot <root@gmona.(none)>2010-05-26 10:01:43 +0300
committerroot <root@gmona.(none)>2010-05-26 10:01:43 +0300
commit2c4497353a913c7669d0c51ce617967878b1ac65 (patch)
tree59fe61e351010aa2085c0237c6b348a71061acef /twrapper
parentSome dirs added (diff)
downloadidfetch-2c4497353a913c7669d0c51ce617967878b1ac65.tar.gz
idfetch-2c4497353a913c7669d0c51ce617967878b1ac65.tar.bz2
idfetch-2c4497353a913c7669d0c51ce617967878b1ac65.zip
twrapper kills child processes on exit. Log file for wget replaced with pipe
Diffstat (limited to 'twrapper')
-rwxr-xr-xtwrapper/twrapper.py197
1 files changed, 129 insertions, 68 deletions
diff --git a/twrapper/twrapper.py b/twrapper/twrapper.py
index 9e0951c..224f637 100755
--- a/twrapper/twrapper.py
+++ b/twrapper/twrapper.py
@@ -12,6 +12,7 @@ import os
import re
import time
import sys
+import signal
from portage.process import spawn
import idfetch_settings
import pickle
@@ -20,7 +21,7 @@ import time
from threading import Thread
CLEAN_LINE=" "
MAX_ACTIVE_DOWNLOADS=8
-TASK_SPACE=4
+TASK_SPACE=3
def time_msg():
stdscr.addstr(0,50,"["+time.ctime()+"] q - Quit")
@@ -31,16 +32,11 @@ def fit_string(text):
return text[0:max_x-1]
def msg(index,msg_text):
global exit_flag
-# print(msg_text)
if exit_flag:
pass
else:
stdscr.addstr(index,0,fit_string(msg_text))
stdscr.refresh()
-# stdscr.nodelay(1)
-# key = stdscr.getch()
-# if key== ord('q'):
-# sys.exit()
def download_msg(index,msg_text):
msg(TASK_SPACE*index,str(index)+")"+msg_text)
@@ -53,13 +49,8 @@ def total_msg(msg_text):
def progress_msg(index,msg_text1,msg_text2):
-# global exit_flag
-# if exit_flag:
-# pass
-# else:
- stdscr.addstr(TASK_SPACE*index+1,0,fit_string(" "+msg_text1))
- stdscr.addstr(TASK_SPACE*index+2,0,fit_string(" "+msg_text2))
- stdscr.refresh()
+ msg(TASK_SPACE*index+1,msg_text1)
+
def error_msg(msg_text):
msg(max_y-1,"WARNING: "+msg_text)
@@ -91,40 +82,102 @@ class fetchit(Thread):
self.trials_log_file_name = idfetch_settings.TASK_DIR+'/logs/log_trials/'+self.download_file+".log"
def start(self,place_in_the_list):
self.place_in_the_list=place_in_the_list
-# msg(14+place_in_the_list,"lkajkldfjjlsdkjflaskjdfkalskdjallllllllfalskdjflaskdjffff"+str(self.place_in_the_list))
Thread.start(self)
def run(self):
# msg(0,'DOWNLOADING: '+self.download_file)
+ global exit_flag
self.download_file_url_list_file=open(idfetch_settings.TASK_DIR+'/urls/'+self.download_file+".urllist")
self.download_file_url_list=pickle.load(self.download_file_url_list_file)
self.download_file_url_list_file.close()
-# print(self.download_file_url_list)
-# while downloadfile_url_list:
-# print(downloadfile_url_list.pop())
-# return
-# a=os.spawnl(os.P_NOWAIT,"/bin/touch","/usr/lib/portage/swrapper/1.txt")
self.trials_log_file= open (self.trials_log_file_name,"w")
- #wget_output = os.popen("/usr/bin/wget "+self.download_file_url_list.pop()+" -o "+log_file,"r")
- while (self.download_file_url_list) and (self.status !=0) :
+ while (self.download_file_url_list) and (self.status !=0) and (not(exit_flag)) :
self.current_url=self.download_file_url_list.pop()
self.trials_log_file.write("[DL "+str(self.index)+"]: "+self.current_url+"\n")
download_msg(self.place_in_the_list,"[DL "+str(self.index)+"]: "+self.current_url)
-# wget_run_status=os.spawnl(os.P_WAIT,"/usr/bin/wget","wget","--connect-timeout=1","--progress=bar:force",self.current_url, "-o",self.wget_log_file_name)
- wget_run_status=os.spawnl(os.P_WAIT,"/usr/bin/wget","wget","--connect-timeout=1",\
- self.current_url,"--directory-prefix="+idfetch_settings.DIST_DIR,\
- '--tries='+str(idfetch_settings.WGET_TRIES),\
- "--read-timeout="+str(idfetch_settings.WGET_READ_TIMEOUT),"-o",self.wget_log_file_name)
-# wget_run_status = os.popen("/usr/bin/wget "+self.download_file_url_list.pop(),"r")
- wget_output= open(self.wget_log_file_name,"r")
- while 1:
- line = wget_output.readline()
- if not line: break
- igot = re.findall('saved',line)
- if igot:
- self.status = 0
- break
-# int(igot[0])
-# msg(10+self.index,"exited")
+ # preparing to fork
+ #if fd_pipes is None:
+ self.fd_pipes = {
+ 0:sys.stdin.fileno(),
+ 1:sys.stdout.fileno(),
+ 2:sys.stderr.fileno(),
+ }
+
+
+ if 1:
+ # Using a log file requires that stdout and stderr
+ # are assigned to the process we're running.
+ if 1 not in self.fd_pipes or 2 not in self.fd_pipes:
+ raise ValueError(self.fd_pipes)
+
+ # Create a pipe
+ (self.pr, self.pw) = os.pipe()
+
+ # Create a tee process, giving it our stdout and stderr
+ # as well as the read end of the pipe.
+ # mypids.extend(spawn(('tee', '-i', '-a', logfile),
+ # returnpid=True, fd_pipes={0:pr,
+ # 1:fd_pipes[1], 2:fd_pipes[2]}))
+
+ # We don't need the read end of the pipe, so close it.
+ ### os.close(pr)
+
+ # Assign the write end of the pipe to our stdout and stderr.
+ self.fd_pipes[1] = self.pw
+ self.fd_pipes[2] = self.pw
+ self.pid = os.fork()
+ if self.pid:
+ # we are the parent
+# global
+ mypids.append(self.pid)
+ os.close(self.pw) # use os.close() to close a file descriptor
+ self.pr = os.fdopen(self.pr) # turn r into a file object
+# print "parent: reading"
+# print "--------------->"
+ # for i in range(1,50):
+ self.line=self.pr.readline()
+ while self.line:
+ progress_msg(self.place_in_the_list, self.line,"")
+# time.ctime(1)
+ self.line=self.pr.readline()
+ self.igot = re.findall('saved',self.line)
+ if self.igot:
+ self.status = 0
+ break
+### os.waitpid(self.pid, 0) # make sure the child process gets cleaned up
+ try:
+ os.waitpid(self.pid, os.WNOHANG)
+# os.waitpid(self.pid, 0)
+ except OSError:
+ # This pid has been cleaned up outside
+ # of spawn().
+ pass
+ else:
+ # we are the child
+ os.close(self.pr)
+ self.w = os.fdopen(self.pw, 'w')
+# print "child: writing"
+ self.my_fds = {}
+ # To protect from cases where direct assignment could
+ # clobber needed fds ({1:2, 2:1}) we first dupe the fds
+ # into unused fds.
+ for self.fd in self.fd_pipes:
+ self.my_fds[self.fd] = os.dup(self.fd_pipes[self.fd])
+ # Then assign them to what they should be.
+ for self.fd in self.my_fds:
+ os.dup2(self.my_fds[self.fd], self.fd)
+ # Then close _all_ fds that haven't been explictly
+ # requested to be kept open.
+ # for fd in get_open_fds():
+ if self.fd not in self.my_fds:
+ try:
+ os.close(self.fd)
+ except OSError:
+ pass
+ os.execv('/usr/bin/wget', ["wget","--connect-timeout=1", self.current_url,\
+ "--directory-prefix="+idfetch_settings.DIST_DIR,\
+ "--tries="+str(idfetch_settings.WGET_TRIES),\
+ "--read-timeout="+str(idfetch_settings.WGET_READ_TIMEOUT)])
+ sys.exit(0)
if self.status ==0:
self.trials_log_file.write("[FIN "+str(self.index)+"]: "+self.current_url+"\n")
download_msg(self.place_in_the_list,"[FIN "+str(self.index)+"]: "+self.current_url)
@@ -136,24 +189,10 @@ class fetchit(Thread):
download_msg(self.place_in_the_list,"[ERROR+LIST_IS_EMPTY "+str(self.index)+"]: "+self.current_url)
self.trials_log_file.close()
-#### a=os.spawnl(os.P_NOWAIT,"/usr/bin/wget","wget",downloadfile_url_list.pop(), "-o",log_file)
-# a=os.system("/bin/touch /usr/lib/portage/swrapper/touch.txt")
-# a=os.system("wget www.mail.ru")
-# print("boom:",a)
-# spawn(["/bin/bash", "-c", "exec \"$@\""],'','')
- def show_fetch_progress(self):
- try:
- fileHandle = open(self.wget_log_file_name,"r")
- lineList = fileHandle.readlines()
- fileHandle.close()
- if len(lineList)>1:
- progress_msg(self.place_in_the_list,lineList[-2],lineList[-1])
- except:
- #error_msg("ERROR while reading"+self.wget_log_file_name)
- pass
def get_place_in_the_list(self):
return self.place_in_the_list
-def do_tasks(task_list,exit_flag):
+def do_tasks(task_list):
+ global exit_flag
msg(1,"TASK DIR: "+idfetch_settings.TASK_DIR)
msg(0,"DOWNLOADING with twrapper...")
@@ -185,10 +224,8 @@ def do_tasks(task_list,exit_flag):
key='x'
stdscr.nodelay(1)
while (key != ord('q')) and (key != ord('Q')):
- key = stdscr.getch()
+
for current_fetch_distfile_thread in running_fetch_distfile_thread_list:
-# status_msg(current_fetch.index,"Status: "+report[current_fetch.status])
- current_fetch_distfile_thread.show_fetch_progress()
if current_fetch_distfile_thread.status==0:
if to_start_fetch_distfile_thread_list:
#start next one on the same place in the list
@@ -200,36 +237,60 @@ def do_tasks(task_list,exit_flag):
starting_fetch_distfile_thread.start(current_fetch_distfile_thread.get_place_in_the_list())
time_msg()
time.sleep(0.5)
+ key = stdscr.getch()
+ if (key == ord('q')) or (key == ord('Q')):
+ msg(28,">>>>>>>>>>>>>>>>> EXITING <<<<<<<<<<<<<<<<<<<<<<<<<<")
exit_flag=1
-def main(task_list,exit_flag):
+def main(task_list):
try:
+
task_list=open_task_list()
- do_tasks(task_list,exit_flag)
+ do_tasks(task_list)
+ msg(30,">>>>>>>>>>>> more exited <<<<<<<<<<<<<<")
+
finally:
+ for pid in mypids:
+ try:
+ if os.waitpid(pid, os.WNOHANG) == (0, 0):
+ os.kill(pid, signal.SIGTERM)
+ os.waitpid(pid, 0)
+ except OSError:
+ # This pid has been cleaned up outside
+ # of spawn().
+ pass
curses.nocbreak()
stdscr.keypad(0)
curses.echo()
curses.endwin()
print("twrapper exited")
- sys.exit()
-# if key== ord('q'):
-# for current_fetch in fetchlist:
-# current_fetch.join()
-## print("Status for",current_fetch.download_file,"is",report[current_fetch.status])
+def cleanup():
+ while spawned_pids:
+ pid = spawned_pids.pop()
+ try:
+ if os.waitpid(pid, os.WNOHANG) == (0, 0):
+ os.kill(pid, signal.SIGTERM)
+ os.waitpid(pid, 0)
+ except OSError:
+ # This pid has been cleaned up outside
+ # of spawn().
+ pass
+
if __name__ == '__main__':
exit_flag=0
task_list=[]
+ # mypids will hold the pids of all processes created.
+ mypids = []
stdscr = curses.initscr()
max_y,max_x=stdscr.getmaxyx()
curses.noecho()
curses.cbreak()
stdscr.keypad(1)
-# win = curses.newwin(10, 30, 20, 0)
-# win.addstr(1, 1, "Current")
-# win.refresh()
-# time.sleep(5)
- main(task_list,exit_flag)
+ curses.curs_set(0)
+ try:
+ main(task_list)
+ finally:
+ sys.exit()