From c32c0bfbd15f80545a21a67b06bf3667f6def506 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 12 May 2015 14:47:18 -0600 Subject: [PATCH] Added bucketing based on file type (text/binary) and batching to reduce server calls. --- p4Helper.py | 25 ++--- p4SyncMissingFiles.py | 229 ++++++++++++++++++++++++++---------------- 2 files changed, 156 insertions(+), 98 deletions(-) diff --git a/p4Helper.py b/p4Helper.py index d50bd63..a22d862 100644 --- a/p4Helper.py +++ b/p4Helper.py @@ -4,7 +4,7 @@ # python_version : 2.7.6 and 3.4.0 # ================================= -import datetime, inspect, marshal, multiprocessing, optparse, os, re, stat, subprocess, sys, threading +import datetime, inspect, itertools, marshal, multiprocessing, optparse, os, re, stat, subprocess, sys, threading # trying ntpath, need to test on linux import ntpath @@ -297,19 +297,19 @@ class Console( threading.Thread ): self.auto_flush_time = auto_flush_time * 1000 if auto_flush_time is not None else -1 self.shutting_down = False - def write( self, data, pid = None ): - self.queue.put( ( Console.MSG.WRITE, pid if pid is not None else os.getpid(), data ) ) + def write( self, data ): + self.queue.put( ( Console.MSG.WRITE, threading.current_thread().ident, data ) ) - def writeflush( self, data, pid = None ): - pid = pid if pid is not None else os.getpid() + def writeflush( self, data ): + pid = threading.current_thread().ident self.queue.put( ( Console.MSG.WRITE, pid, data ) ) self.queue.put( ( Console.MSG.FLUSH, pid ) ) - def flush( self, pid = None ): - self.queue.put( ( Console.MSG.FLUSH, pid if pid is not None else os.getpid() ) ) + def flush( self ): + self.queue.put( ( Console.MSG.FLUSH, threading.current_thread().ident ) ) - def clear( self, pid = None ): - self.queue.put( ( Console.MSG.CLEAR, pid if pid is not None else os.getpid() ) ) + def clear( self ): + self.queue.put( ( Console.MSG.CLEAR, threading.current_thread().ident ) ) def __enter__( self ): self.start( ) @@ -325,16 +325,12 @@ class Console( threading.Thread ): event = data[0] if event == Console.MSG.SHUTDOWN: - # flush remaining buffers before shutting down for ( pid, buffer ) in self.buffers.items( ): for line in buffer: print( line ) self.buffers.clear( ) self.buffer_write_times.clear( ) self.queue.task_done( ) - - #print(self.queue.qsize()) - #print(self.queue.empty()) break elif event == Console.MSG.WRITE: @@ -354,7 +350,8 @@ class Console( threading.Thread ): elif event == Console.MSG.FLUSH: pid = data[ 1 ] if pid in self.buffers: - for line in self.buffers[ pid ]: + buffer = self.buffers[ pid ] + for line in buffer: print( line ) self.buffers.pop( pid, None ) self.buffer_write_times[ pid ] = datetime.datetime.now( ) diff --git a/p4SyncMissingFiles.py b/p4SyncMissingFiles.py index 3863dd2..f39a636 100644 --- a/p4SyncMissingFiles.py +++ b/p4SyncMissingFiles.py @@ -13,125 +13,186 @@ import time, traceback #============================================================== -def main( args ): - start = time.clock() +class P4SyncMissing: + def run( self, args ): + start = time.clock() - fail_if_no_p4() + fail_if_no_p4() - #http://docs.python.org/library/optparse.html - parser = optparse.OptionParser( ) + #http://docs.python.org/library/optparse.html + parser = optparse.OptionParser( ) - parser.add_option( "-d", "--dir", dest="directory", help="Desired directory to crawl.", default=None ) - parser.add_option( "-t", "--threads", dest="thread_count", help="Number of threads to crawl your drive and poll p4.", default=100 ) - parser.add_option( "-q", "--quiet", action="store_true", dest="quiet", help="This overrides verbose", default=False ) - parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", default=False ) - parser.add_option( "-i", "--interactive", action="store_true", dest="interactive", default=False ) + parser.add_option( "-d", "--dir", dest="directory", help="Desired directory to crawl.", default=None ) + parser.add_option( "-t", "--threads", dest="thread_count", help="Number of threads to crawl your drive and poll p4.", default=12 ) + parser.add_option( "-q", "--quiet", action="store_true", dest="quiet", help="This overrides verbose", default=False ) + parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", default=False ) + parser.add_option( "-i", "--interactive", action="store_true", dest="interactive", default=False ) - ( options, args ) = parser.parse_args( args ) + ( options, args ) = parser.parse_args( args ) - directory = normpath( options.directory if options.directory is not None else os.getcwd( ) ) + directory = normpath( options.directory if options.directory is not None else os.getcwd( ) ) - with Console( auto_flush_num=20, auto_flush_time=1000 ) as c: - with P4Workspace( directory ): - if not options.quiet: - c.writeflush( "Preparing to sync missing files..." ) - c.write( " Setting up threads..." ) + with Console( auto_flush_time=1000 ) as c: + with P4Workspace( directory ): + if not options.quiet: + c.writeflush( "Preparing to sync missing files..." ) + c.writeflush( " Setting up threads..." ) - # Setup threading - WRK = enum( 'SHUTDOWN', 'SYNC' ) + # Setup threading + WRK = enum( 'SHUTDOWN', 'SYNC' ) + + def shutdown( data ): + return False + def sync( files ): + files_flat = ' '.join(files) + #subprocess.check_output( "p4 sync -f " + files_flat + "", shell=False, cwd=None ) + ret = -1 + count = 0 + while ret != 0 and count < 2: + ret = try_call_process( "p4 sync -f " + files_flat ) + count += 1 + if ret != 0 and not options.quiet: + c.write("Failed, trying again to sync " + files_flat) + - def shutdown( data ): - return False - def sync( data ): - if data is not None and not os.path.exists( data ): - subprocess.check_output( "p4 sync -f \"" + data + "\"", shell=False, cwd=None ) if not options.quiet: - c.write( " Synced " + os.path.relpath( data, directory ) ) - return True + files_len = len(files) + if files_len > 1: + c.write( " Synced batch of " + str(len(files)) ) + for f in files: + c.write( " Synced " + os.path.relpath( f, directory ) ) + return True - commands = { - WRK.SHUTDOWN : shutdown, - WRK.SYNC : sync - } + commands = { + WRK.SHUTDOWN : shutdown, + WRK.SYNC : sync + } - threads = [ ] - thread_count = options.thread_count if options.thread_count > 0 else multiprocessing.cpu_count( ) + threads + threads = [ ] + thread_count = options.thread_count if options.thread_count > 0 else multiprocessing.cpu_count( ) + threads - queue = multiprocessing.JoinableQueue( ) + count = 0 + self.queue = multiprocessing.JoinableQueue( ) - for i in range( thread_count ): - t = Worker( c, queue, commands ) - threads.append( t ) - t.start( ) + for i in range( thread_count ): + t = Worker( c, self.queue, commands ) + threads.append( t ) + t.start( ) - make_drive_upper = True if os.name == 'nt' or sys.platform == 'cygwin' else False + make_drive_upper = True if os.name == 'nt' or sys.platform == 'cygwin' else False - command = "p4 fstat ..." + command = "p4 fstat ..." - if not options.quiet: - c.writeflush( " Checking files in depot, this may take some time for large depots..." ) + if not options.quiet: + c.writeflush( " Checking files in depot, this may take some time for large depots..." ) - proc = subprocess.Popen( command.split( ), stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=directory ) + proc = subprocess.Popen( command.split( ), stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=directory ) - clientFile_tag = "... clientFile " - headAction_tag = "... headAction " + clientFile_tag = "... clientFile " + headAction_tag = "... headAction " + headType_tag = "... headType " - # http://www.perforce.com/perforce/r12.1/manuals/cmdref/fstat.html - accepted_actions = [ 'add', 'edit', 'branch', 'move/add', 'move\\add', 'integrate', 'import', 'archive' ] #currently not checked - rejected_actions = [ 'delete', 'move/delete', 'move\\delete', 'purge' ] + # http://www.perforce.com/perforce/r12.1/manuals/cmdref/fstat.html + accepted_actions = [ 'add', 'edit', 'branch', 'move/add', 'move\\add', 'integrate', 'import', 'archive' ] #currently not checked + rejected_actions = [ 'delete', 'move/delete', 'move\\delete', 'purge' ] + file_type_binary = 'binary+l' + file_type_text = 'text' - client_file = None + client_file = None + file_action = None + file_type = None + file_type_last = None - for line in proc.stdout: - line = get_str_from_process_stdout( line ) + class Bucket: + def __init__(self, limit): + self.queue = [] + self.queue_size = 0 + self.queue_limit = limit + def append(self,obj): + self.queue.append(obj) + self.queue_size += 1 + def is_full(self): + return self.queue_size >= self.queue_limit - if client_file and line.startswith( headAction_tag ): - action = normpath( line[ len( headAction_tag ) : ].strip( ) ) - if not any(action == a for a in rejected_actions): - if options.verbose: - c.write( " Checking " + os.path.relpath( local_path, directory ) ) - # TODO: directories should be batched and synced in parallel - queue.put( ( WRK.SYNC, local_path ) ) + self.buckets = {} + self.buckets[file_type_text] = Bucket(10) + self.buckets[file_type_binary] = Bucket(2) - if line.startswith( clientFile_tag ): - client_file = None - local_path = normpath( line[ len( clientFile_tag ) : ].strip( ) ) - if make_drive_upper: - drive, path = splitdrive( local_path ) - client_file = ''.join( [ drive.upper( ), path ] ) + def push_queued(bucket): + if bucket.queue_size == 0: + return + if options.verbose: + for f in bucket.queue: + c.write( " Checking " + os.path.relpath( f, directory ) ) + self.queue.put( ( WRK.SYNC, bucket.queue ) ) + bucket.queue = [] + bucket.queue_size = 0 - if len(line.rstrip()) == 0: - client_file = None + for line in proc.stdout: + line = get_str_from_process_stdout( line ) - proc.wait( ) + #push work when finding out type + if client_file and file_action is not None and line.startswith( headType_tag ): - for line in proc.stderr: - if "no such file" in line: - continue - #raise Exception(line) - c.write(line)#log as error + file_type = normpath( line[ len( headType_tag ) : ].strip( ) ) + if file_type == file_type_text: + self.buckets[file_type_text].append(client_file) + else: + self.buckets[file_type_binary].append(client_file) + count += 1 - if not options.quiet: - c.writeflush( " Pushed work, now waiting for threads..." ) + #check sizes and push + for b in self.buckets.values(): + if b.is_full(): + push_queued(b) + + elif client_file and line.startswith( headAction_tag ): + file_action = normpath( line[ len( headAction_tag ) : ].strip( ) ) + if any(file_action == a for a in rejected_actions): + file_action = None + else: + if os.path.exists( client_file ): + file_action = None - for i in range( thread_count ): - queue.put( ( WRK.SHUTDOWN, None ) ) + elif line.startswith( clientFile_tag ): + client_file = normpath( line[ len( clientFile_tag ) : ].strip( ) ) + if make_drive_upper: + drive, path = splitdrive( client_file ) + client_file = ''.join( [ drive.upper( ), path ] ) - for t in threads: - t.join( ) + elif len(line.rstrip()) == 0: + client_file = None - if not options.quiet: - c.write( "Done." ) + for b in self.buckets.values(): + push_queued(b) + proc.wait( ) - end = time.clock() - delta = end - start - output = "\nFinished in " + str(delta) + "s" + for line in proc.stderr: + if "no such file" in line: + continue + #raise Exception(line) + c.write(line)#log as error - c.writeflush( output ) + if not options.quiet: + c.writeflush( " Checking " + str(count) + " file(s), now waiting for threads..." ) + + for i in range( thread_count ): + self.queue.put( ( WRK.SHUTDOWN, None ) ) + + for t in threads: + t.join( ) + + if not options.quiet: + print( "Done." ) + + end = time.clock() + delta = end - start + output = "\nFinished in " + str(delta) + "s" + print( output ) if __name__ == "__main__": try: - main( sys.argv ) + P4SyncMissing().run(sys.argv) except: print( "\nUnexpected error!" ) traceback.print_exc( file = sys.stdout ) \ No newline at end of file