#!/usr/bin/python # -*- coding: utf8 -*- # author : Brian Ernst # python_version : 2.7.6 and 3.4.0 # ================================= # TODO: setup batches before pushing to threads and use p4 --parallel # http://www.perforce.com/perforce/r14.2/manuals/cmdref/p4_sync.html from p4Helper import * import time, traceback #============================================================== class P4SyncMissing: def run( self, args ): start = time.time() fail_if_no_p4() #http://docs.python.org/library/optparse.html parser = optparse.OptionParser( ) parser.add_option( "-d", "--dir", dest="directory", help="Desired directory to crawl.", default=None ) parser.add_option( "-t", "--threads", dest="thread_count", help="Number of threads to crawl your drive and poll p4.", default=12 ) parser.add_option( "-q", "--quiet", action="store_true", dest="quiet", help="This overrides verbose", default=False ) parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", default=False ) ( options, args ) = parser.parse_args( args ) directory = normpath( options.directory if options.directory is not None else os.getcwd( ) ) with Console( auto_flush_time=1 ) as c: with P4Workspace( directory ): if not options.quiet: c.writeflush( "Retreiving missing files..." ) c.writeflush( " Setting up threads..." ) # Setup threading WRK = enum( 'SHUTDOWN', 'SYNC' ) def shutdown( data ): return False def sync( files ): files_len = len(files) files_flat = ' '.join('"' + p4FriendlyPath( f ) + '"' for f in files) if options.verbose: files_len = len(files) if files_len > 1: c.write( " Syncing batch of " + str(len(files)) + " ...") for f in files: c.write( " " + os.path.relpath( f, directory ) ) else: for f in files: c.write( " Syncing " + os.path.relpath( f, directory ) + " ..." ) ret = -1 count = 0 while ret != 0 and count < 2: ret = try_call_process( "p4 sync -f " + files_flat ) count += 1 if ret != 0 and not options.quiet: c.write("Failed, trying again to sync " + files_flat) if ret != 0: if not options.quiet: c.write("Failed to sync " + files_flat) else: if not options.quiet: files_len = len(files) if files_len > 1: c.write( " Synced batch of " + str(len(files)) ) for f in files: c.write( " Synced " + os.path.relpath( f, directory ) ) return True commands = { WRK.SHUTDOWN : shutdown, WRK.SYNC : sync } threads = [ ] thread_count = options.thread_count if options.thread_count > 0 else multiprocessing.cpu_count( ) + options.thread_count count = 0 total = 0 self.queue = multiprocessing.JoinableQueue( ) for i in range( thread_count ): t = Worker( c, self.queue, commands ) t.daemon = True threads.append( t ) t.start( ) if not options.quiet: c.writeflush( " Done." ) make_drive_upper = True if os.name == 'nt' or sys.platform == 'cygwin' else False command = "p4 fstat ..." if not options.quiet: c.writeflush( " Checking files in depot, this may take some time for large depots..." ) proc = subprocess.Popen( command.split( ), stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=directory ) clientFile_tag = "... clientFile " headAction_tag = "... headAction " headType_tag = "... headType " # http://www.perforce.com/perforce/r12.1/manuals/cmdref/fstat.html accepted_actions = [ 'add', 'edit', 'branch', 'move/add', 'move\\add', 'integrate', 'import', 'archive' ] #currently not checked rejected_actions = [ 'delete', 'move/delete', 'move\\delete', 'purge' ] file_type_binary = 'binary+l' file_type_text = 'text' client_file = None file_action = None file_type = None file_type_last = None # todo: use fewer threads, increase bucket size and use p4 threading class Bucket: def __init__(self, limit): self.queue = [] self.queue_size = 0 self.queue_limit = limit def append(self,obj): self.queue.append(obj) self.queue_size += 1 def is_full(self): return self.queue_size >= self.queue_limit self.buckets = {} self.buckets[file_type_text] = Bucket(10) self.buckets[file_type_binary] = Bucket(2) def push_queued(bucket): if bucket.queue_size == 0: return if options.verbose: for f in bucket.queue: c.write( " Checking " + os.path.relpath( f, directory ) ) self.queue.put( ( WRK.SYNC, bucket.queue ) ) bucket.queue = [] bucket.queue_size = 0 for line in proc.stdout: line = get_str_from_process_stdout( line ) #push work when finding out type if client_file and file_action is not None and line.startswith( headType_tag ): file_type = normpath( line[ len( headType_tag ) : ].strip( ) ) if file_type == file_type_text: self.buckets[file_type_text].append(client_file) else: self.buckets[file_type_binary].append(client_file) count += 1 #check sizes and push for b in self.buckets.values(): if b.is_full(): push_queued(b) elif client_file and line.startswith( headAction_tag ): file_action = normpath( line[ len( headAction_tag ) : ].strip( ) ) if any(file_action == a for a in rejected_actions): file_action = None else: total += 1 if os.path.exists( client_file ): file_action = None elif line.startswith( clientFile_tag ): client_file = line[ len( clientFile_tag ) : ].strip( ) if make_drive_upper: drive, path = splitdrive( client_file ) client_file = ''.join( [ drive.upper( ), path ] ) elif len(line.rstrip()) == 0: client_file = None for b in self.buckets.values(): push_queued(b) proc.wait( ) for line in proc.stderr: if "no such file" in line: continue #raise Exception(line) c.write(line)#log as error if not options.quiet: c.writeflush( " Done. Checked " + str(total) + " file(s)." ) c.writeflush( " Queued " + str(count) + " file(s), now waiting for threads..." ) for i in range( thread_count ): self.queue.put( ( WRK.SHUTDOWN, None ) ) for t in threads: t.join( ) if not options.quiet: print( " Done." ) if not options.quiet: end = time.time() delta = end - start output = " Done. Finished in " + str(delta) + "s" print( output ) if __name__ == "__main__": try: P4SyncMissing().run(sys.argv) except: print( "\nUnexpected error!" ) traceback.print_exc( file = sys.stdout )