From 97da25ce38f3661fea26055c8a10ba9a7d4c0a54 Mon Sep 17 00:00:00 2001 From: Brian Date: Thu, 8 May 2014 22:55:12 -0600 Subject: [PATCH] Threaded console, threaded cleanup. Yes! Made the threaded console to batch messages and so I could manually flush or clear them. At some point I would consider a safety maximum buffer size that triggers it to auto flush. This worked out really well, though I have to see why in some cases lines appear to double up still, could be something with the process not completing when I expect it to. This is possible a naive thread implementation, since it pushes a directory for every thread which seems too drastic. I'd like to see how much better it works without all the context switches. It's also a matter of figuring out how much to handle yourself before letting another thread join in. Right now the threads don't branch out too much since I think they basically do a breadth-first search, though I have to double check on that. Still to come, trying to safely work with fstat across multiple directories. It's fast, but on the console the script would appear to stall as it parses everything, so I'd still want to break it down somewhat so you can see the script making visible progress. I would also prefer this because then console messages wouldn't be so short and blocky. Improvements to come! --- p4RemoveUnversioned.py | 180 ++++++++++++++++++++++++++--------------- 1 file changed, 117 insertions(+), 63 deletions(-) diff --git a/p4RemoveUnversioned.py b/p4RemoveUnversioned.py index 06c6bf5..d5cbb3a 100644 --- a/p4RemoveUnversioned.py +++ b/p4RemoveUnversioned.py @@ -29,6 +29,8 @@ MSG = enum('SHUTDOWN', 'PARSE_DIRECTORY') p4_ignore = ".p4ignore" +main_pid = os.getpid( ) + def PressEnter( ): print( "\nPress ENTER to continue..." ) @@ -54,13 +56,6 @@ def match_in_ignore_list( path, ignore_list ): return True return False -class Console: - def __init__( self ): - self.mutex = multiprocessing.Semaphore( ) - def Write( self, data ): - with self.mutex: - print( data ) - class PTable( list ): def __init__( self, *args ): list.__init__( self, args ) @@ -71,11 +66,66 @@ class PDict( dict ): dict.__init__( self, args ) self.mutex = multiprocessing.Semaphore( ) +class Console( threading.Thread ): + MSG = enum('WRITE', 'FLUSH', 'SHUTDOWN', 'CLEAR' ) + + def __init__( self ): + threading.Thread.__init__( self ) + self.buffers = {} + self.running = True + self.queue = multiprocessing.JoinableQueue( ) + + def write( self, data ): + self.queue.put( ( Console.MSG.WRITE, os.getpid(), data ) ) + + def flush( self ): + self.queue.put( ( Console.MSG.FLUSH, os.getpid() ) ) + + def clear( self ): + self.queue.put( ( Console.MSG.CLEAR, os.getpid() ) ) + + def __enter__( self ): + self.start( ) + return self + + def __exit__( self, type, value, tb ): + self.running = False + + def run( self ): + # TODO: switch to a queue so we're not spinning and wasting a thread + self.running = True + while True: + data = self.queue.get( ) + event = data[0] + + if event == Console.MSG.SHUTDOWN: + # flush remaining buffers before shutting down + for ( pid, buffer ) in self.buffers.iteritems( ): + for line in buffer: + print( line ) + break + + elif event == Console.MSG.WRITE: + pid, s = data[ 1 : ] + + if pid not in self.buffers: + self.buffers[ pid ] = [] + self.buffers[ pid ].append( s ) + elif event == Console.MSG.FLUSH: + pid = data[ 1 ] + if pid in self.buffers: + for line in self.buffers[ pid ]: + print( line ) + elif event == Console.MSG.CLEAR: + pid = data[ 1 ] + if pid in self.buffers: + del self.buffers[ pid ] class Worker( threading.Thread ): - def __init__( self, queue, files_to_ignore ): + def __init__( self, console, queue, files_to_ignore ): threading.Thread.__init__( self ) + self.console = console self.queue = queue self.files_to_ignore = files_to_ignore @@ -84,50 +134,52 @@ class Worker( threading.Thread ): ( cmd, data ) = self.queue.get( ) if cmd == MSG.SHUTDOWN: + self.console.write("SHUTDOWN") + self.queue.task_done( ) + self.console.flush( ) break if cmd != MSG.PARSE_DIRECTORY or data is None: + self.console.flush( ) self.queue.task_done( ) continue directory = data - current_directory = os.getcwd( ) + self.console.write( "Working on " + directory ) dir_contents = os.listdir( directory ) if p4_ignore in dir_contents: file_regexes = [] # Should automatically ignore .p4ignore even if it's not specified, otherwise it'll be deleted. - path = os.path.join( current_directory, p4_ignore ) + path = os.path.join( directory, p4_ignore ) with open( path ) as f: for line in f: new_line = remove_comment( line.strip( ) ) if len( new_line ) > 0: - file_regexes.append( re.compile( os.path.join( re.escape( current_directory + os.sep ), new_line ) ) ) + file_regexes.append( re.compile( os.path.join( re.escape( directory + os.sep ), new_line ) ) ) - print( "|Appending ignores from " + path ) + self.console.write( "|Appending ignores from " + path ) with self.files_to_ignore.mutex: - if current_directory not in self.files_to_ignore: - self.files_to_ignore[ current_directory ] = [] - self.files_to_ignore[ current_directory ].extend( file_regexes ) + if directory not in self.files_to_ignore: + self.files_to_ignore[ directory ] = [] + self.files_to_ignore[ directory ].extend( file_regexes ) - ignore_list = get_ignore_list( current_directory, self.files_to_ignore ) + ignore_list = get_ignore_list( directory, self.files_to_ignore ) files = [] command = "p4 fstat *" - os.chdir( directory ) - proc = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) + proc = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=directory ) (out, err) = proc.communicate() - os.chdir( current_directory ) for line in err.decode('utf-8').split( os.linesep ): if len( line ) == 0: continue - print(line) + # # dirty hack that grabs the filename from the ends of the printed out (not err) "depo_path - local_path" # # I could use regex to verify the expected string, but that will just slow us down. # basename = os.path.basename( line ) @@ -137,31 +189,32 @@ class Worker( threading.Thread ): if basename == "*": # Directory is empty, we could delete it now continue - path = os.path.join( current_directory, basename ) + path = os.path.join( directory, basename ) if not os.path.isdir( path ): files.append( basename ) for content in dir_contents: - if os.path.isdir( content ): - path = os.path.join( current_directory, content ) + path = os.path.join( directory, content ) + if os.path.isdir( path ): if match_in_ignore_list( path, ignore_list ): - print( "| Ignoring " + content ) + self.console.write( "| Ignoring " + content ) else: - self.queue.put( ( MSG.PARSE_DIRECTORY, content ) ) + self.queue.put( ( MSG.PARSE_DIRECTORY, path ) ) for file in files: - path = os.path.join( current_directory, file ) + path = os.path.join( directory, file ) if match_in_ignore_list( path, ignore_list ): - print( "| Ignoring " + path ) + self.console.write( "| Ignoring " + path ) continue - print( "| " + file + " is unversioned, removing it." ) + self.console.write( "| " + file + " is unversioned, removing it." ) os.chmod( path, stat.S_IWRITE ) os.remove( path ) - print( "|Done." ) + self.console.write( "|Done." ) + self.console.flush( ) self.queue.task_done( ) @@ -174,13 +227,13 @@ def main( args ): #http://docs.python.org/library/optparse.html parser = optparse.OptionParser( ) - parser.add_option( "-t", "--threads", dest="thread_count", help="Number of threads to crawl your drive and poll p4.", default=10 ) + parser.add_option( "-d", "--dir", dest="directory", help="Desired directory to crawl.", default=None ) + parser.add_option( "-t", "--threads", dest="thread_count", help="Number of threads to crawl your drive and poll p4.", default=2 ) parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", default=True ) ( options, args ) = parser.parse_args( ) - root_path = "." root_full_path = os.getcwd( ) # Files are added from .p4ignore @@ -189,7 +242,7 @@ def main( args ): # make sure script doesn't delete itself with files_to_ignore.mutex: - files_to_ignore[ root_path ] = [ re.compile( os.path.join( re.escape( root_path + os.sep ), os.path.basename( __file__ ) ) ) ] + files_to_ignore[ root_full_path ] = [ re.compile( os.path.join( re.escape( root_full_path + os.sep ), os.path.basename( __file__ ) ) ) ] # Setup threading threads = [] @@ -197,44 +250,45 @@ def main( args ): queue = multiprocessing.JoinableQueue( ) - for i in range( thread_count ): - t = Worker( queue, files_to_ignore ) - threads.append( t ) - t.start( ) + with Console() as c: + for i in range( thread_count ): + t = Worker( c, queue, files_to_ignore ) + threads.append( t ) + t.start( ) - if len( threads ) == 1: - print( "Spawned %s thread." % len( threads ) ) - else: - print( "Spawned %s threads." % len( threads ) ) + if len( threads ) == 1: + print( "Spawned %s thread." % len( threads ) ) + else: + print( "Spawned %s threads." % len( threads ) ) - queue.put( ( MSG.PARSE_DIRECTORY, "." ) ) - queue.join( ) + queue.put( ( MSG.PARSE_DIRECTORY, options.directory if options.directory is not None else os.getcwd( ) ) ) + queue.join( ) - for i in range( thread_count ): - queue.put( ( MSG.SHUTDOWN, None ) ) + for i in range( thread_count ): + queue.put( ( MSG.SHUTDOWN, None ) ) - print( os.linesep + "Removing empty directories...") - # remove empty directories in reverse order - for root, dirs, files in os.walk( root_path, topdown=False ): - ignore_list = get_ignore_list( root, files_to_ignore ) + print( os.linesep + "Removing empty directories...") + # remove empty directories in reverse order + for root, dirs, files in os.walk( root_full_path, topdown=False ): + ignore_list = get_ignore_list( root, files_to_ignore ) - for d in dirs: - path = os.path.join( root, d ) + for d in dirs: + path = os.path.join( root, d ) - if match_in_ignore_list( path, ignore_list ): - # add option of using send2trash - print( "| ignoring " + d ) - dirs.remove( d ) - try: - os.rmdir(path) - print( "| " + d + " was removed." ) - except OSError: - # Fails on non-empty directory - pass - print( "|Done." ) + if match_in_ignore_list( path, ignore_list ): + # add option of using send2trash + print( "| ignoring " + d ) + dirs.remove( d ) + try: + os.rmdir(path) + print( "| " + d + " was removed." ) + except OSError: + # Fails on non-empty directory + pass + print( "|Done." ) - for t in threads: - t.join( ) + for t in threads: + t.join( ) if __name__ == "__main__": try: