Added bucketing based on file type (text/binary) and batching to reduce
server calls.
This commit is contained in:
parent
49153babed
commit
c32c0bfbd1
25
p4Helper.py
25
p4Helper.py
|
@ -4,7 +4,7 @@
|
||||||
# python_version : 2.7.6 and 3.4.0
|
# python_version : 2.7.6 and 3.4.0
|
||||||
# =================================
|
# =================================
|
||||||
|
|
||||||
import datetime, inspect, marshal, multiprocessing, optparse, os, re, stat, subprocess, sys, threading
|
import datetime, inspect, itertools, marshal, multiprocessing, optparse, os, re, stat, subprocess, sys, threading
|
||||||
|
|
||||||
# trying ntpath, need to test on linux
|
# trying ntpath, need to test on linux
|
||||||
import ntpath
|
import ntpath
|
||||||
|
@ -297,19 +297,19 @@ class Console( threading.Thread ):
|
||||||
self.auto_flush_time = auto_flush_time * 1000 if auto_flush_time is not None else -1
|
self.auto_flush_time = auto_flush_time * 1000 if auto_flush_time is not None else -1
|
||||||
self.shutting_down = False
|
self.shutting_down = False
|
||||||
|
|
||||||
def write( self, data, pid = None ):
|
def write( self, data ):
|
||||||
self.queue.put( ( Console.MSG.WRITE, pid if pid is not None else os.getpid(), data ) )
|
self.queue.put( ( Console.MSG.WRITE, threading.current_thread().ident, data ) )
|
||||||
|
|
||||||
def writeflush( self, data, pid = None ):
|
def writeflush( self, data ):
|
||||||
pid = pid if pid is not None else os.getpid()
|
pid = threading.current_thread().ident
|
||||||
self.queue.put( ( Console.MSG.WRITE, pid, data ) )
|
self.queue.put( ( Console.MSG.WRITE, pid, data ) )
|
||||||
self.queue.put( ( Console.MSG.FLUSH, pid ) )
|
self.queue.put( ( Console.MSG.FLUSH, pid ) )
|
||||||
|
|
||||||
def flush( self, pid = None ):
|
def flush( self ):
|
||||||
self.queue.put( ( Console.MSG.FLUSH, pid if pid is not None else os.getpid() ) )
|
self.queue.put( ( Console.MSG.FLUSH, threading.current_thread().ident ) )
|
||||||
|
|
||||||
def clear( self, pid = None ):
|
def clear( self ):
|
||||||
self.queue.put( ( Console.MSG.CLEAR, pid if pid is not None else os.getpid() ) )
|
self.queue.put( ( Console.MSG.CLEAR, threading.current_thread().ident ) )
|
||||||
|
|
||||||
def __enter__( self ):
|
def __enter__( self ):
|
||||||
self.start( )
|
self.start( )
|
||||||
|
@ -325,16 +325,12 @@ class Console( threading.Thread ):
|
||||||
event = data[0]
|
event = data[0]
|
||||||
|
|
||||||
if event == Console.MSG.SHUTDOWN:
|
if event == Console.MSG.SHUTDOWN:
|
||||||
# flush remaining buffers before shutting down
|
|
||||||
for ( pid, buffer ) in self.buffers.items( ):
|
for ( pid, buffer ) in self.buffers.items( ):
|
||||||
for line in buffer:
|
for line in buffer:
|
||||||
print( line )
|
print( line )
|
||||||
self.buffers.clear( )
|
self.buffers.clear( )
|
||||||
self.buffer_write_times.clear( )
|
self.buffer_write_times.clear( )
|
||||||
self.queue.task_done( )
|
self.queue.task_done( )
|
||||||
|
|
||||||
#print(self.queue.qsize())
|
|
||||||
#print(self.queue.empty())
|
|
||||||
break
|
break
|
||||||
|
|
||||||
elif event == Console.MSG.WRITE:
|
elif event == Console.MSG.WRITE:
|
||||||
|
@ -354,7 +350,8 @@ class Console( threading.Thread ):
|
||||||
elif event == Console.MSG.FLUSH:
|
elif event == Console.MSG.FLUSH:
|
||||||
pid = data[ 1 ]
|
pid = data[ 1 ]
|
||||||
if pid in self.buffers:
|
if pid in self.buffers:
|
||||||
for line in self.buffers[ pid ]:
|
buffer = self.buffers[ pid ]
|
||||||
|
for line in buffer:
|
||||||
print( line )
|
print( line )
|
||||||
self.buffers.pop( pid, None )
|
self.buffers.pop( pid, None )
|
||||||
self.buffer_write_times[ pid ] = datetime.datetime.now( )
|
self.buffer_write_times[ pid ] = datetime.datetime.now( )
|
||||||
|
|
|
@ -13,125 +13,186 @@ import time, traceback
|
||||||
|
|
||||||
|
|
||||||
#==============================================================
|
#==============================================================
|
||||||
def main( args ):
|
class P4SyncMissing:
|
||||||
start = time.clock()
|
def run( self, args ):
|
||||||
|
start = time.clock()
|
||||||
|
|
||||||
fail_if_no_p4()
|
fail_if_no_p4()
|
||||||
|
|
||||||
#http://docs.python.org/library/optparse.html
|
#http://docs.python.org/library/optparse.html
|
||||||
parser = optparse.OptionParser( )
|
parser = optparse.OptionParser( )
|
||||||
|
|
||||||
parser.add_option( "-d", "--dir", dest="directory", help="Desired directory to crawl.", default=None )
|
parser.add_option( "-d", "--dir", dest="directory", help="Desired directory to crawl.", default=None )
|
||||||
parser.add_option( "-t", "--threads", dest="thread_count", help="Number of threads to crawl your drive and poll p4.", default=100 )
|
parser.add_option( "-t", "--threads", dest="thread_count", help="Number of threads to crawl your drive and poll p4.", default=12 )
|
||||||
parser.add_option( "-q", "--quiet", action="store_true", dest="quiet", help="This overrides verbose", default=False )
|
parser.add_option( "-q", "--quiet", action="store_true", dest="quiet", help="This overrides verbose", default=False )
|
||||||
parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", default=False )
|
parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", default=False )
|
||||||
parser.add_option( "-i", "--interactive", action="store_true", dest="interactive", default=False )
|
parser.add_option( "-i", "--interactive", action="store_true", dest="interactive", default=False )
|
||||||
|
|
||||||
( options, args ) = parser.parse_args( args )
|
( options, args ) = parser.parse_args( args )
|
||||||
|
|
||||||
directory = normpath( options.directory if options.directory is not None else os.getcwd( ) )
|
directory = normpath( options.directory if options.directory is not None else os.getcwd( ) )
|
||||||
|
|
||||||
with Console( auto_flush_num=20, auto_flush_time=1000 ) as c:
|
with Console( auto_flush_time=1000 ) as c:
|
||||||
with P4Workspace( directory ):
|
with P4Workspace( directory ):
|
||||||
if not options.quiet:
|
if not options.quiet:
|
||||||
c.writeflush( "Preparing to sync missing files..." )
|
c.writeflush( "Preparing to sync missing files..." )
|
||||||
c.write( " Setting up threads..." )
|
c.writeflush( " Setting up threads..." )
|
||||||
|
|
||||||
|
# Setup threading
|
||||||
|
WRK = enum( 'SHUTDOWN', 'SYNC' )
|
||||||
|
|
||||||
|
def shutdown( data ):
|
||||||
|
return False
|
||||||
|
def sync( files ):
|
||||||
|
files_flat = ' '.join(files)
|
||||||
|
#subprocess.check_output( "p4 sync -f " + files_flat + "", shell=False, cwd=None )
|
||||||
|
ret = -1
|
||||||
|
count = 0
|
||||||
|
while ret != 0 and count < 2:
|
||||||
|
ret = try_call_process( "p4 sync -f " + files_flat )
|
||||||
|
count += 1
|
||||||
|
if ret != 0 and not options.quiet:
|
||||||
|
c.write("Failed, trying again to sync " + files_flat)
|
||||||
|
|
||||||
# Setup threading
|
|
||||||
WRK = enum( 'SHUTDOWN', 'SYNC' )
|
|
||||||
|
|
||||||
def shutdown( data ):
|
|
||||||
return False
|
|
||||||
def sync( data ):
|
|
||||||
if data is not None and not os.path.exists( data ):
|
|
||||||
subprocess.check_output( "p4 sync -f \"" + data + "\"", shell=False, cwd=None )
|
|
||||||
if not options.quiet:
|
if not options.quiet:
|
||||||
c.write( " Synced " + os.path.relpath( data, directory ) )
|
files_len = len(files)
|
||||||
return True
|
if files_len > 1:
|
||||||
|
c.write( " Synced batch of " + str(len(files)) )
|
||||||
|
for f in files:
|
||||||
|
c.write( " Synced " + os.path.relpath( f, directory ) )
|
||||||
|
return True
|
||||||
|
|
||||||
commands = {
|
commands = {
|
||||||
WRK.SHUTDOWN : shutdown,
|
WRK.SHUTDOWN : shutdown,
|
||||||
WRK.SYNC : sync
|
WRK.SYNC : sync
|
||||||
}
|
}
|
||||||
|
|
||||||
threads = [ ]
|
threads = [ ]
|
||||||
thread_count = options.thread_count if options.thread_count > 0 else multiprocessing.cpu_count( ) + threads
|
thread_count = options.thread_count if options.thread_count > 0 else multiprocessing.cpu_count( ) + threads
|
||||||
|
|
||||||
queue = multiprocessing.JoinableQueue( )
|
count = 0
|
||||||
|
self.queue = multiprocessing.JoinableQueue( )
|
||||||
|
|
||||||
for i in range( thread_count ):
|
for i in range( thread_count ):
|
||||||
t = Worker( c, queue, commands )
|
t = Worker( c, self.queue, commands )
|
||||||
threads.append( t )
|
threads.append( t )
|
||||||
t.start( )
|
t.start( )
|
||||||
|
|
||||||
make_drive_upper = True if os.name == 'nt' or sys.platform == 'cygwin' else False
|
make_drive_upper = True if os.name == 'nt' or sys.platform == 'cygwin' else False
|
||||||
|
|
||||||
command = "p4 fstat ..."
|
command = "p4 fstat ..."
|
||||||
|
|
||||||
if not options.quiet:
|
if not options.quiet:
|
||||||
c.writeflush( " Checking files in depot, this may take some time for large depots..." )
|
c.writeflush( " Checking files in depot, this may take some time for large depots..." )
|
||||||
|
|
||||||
proc = subprocess.Popen( command.split( ), stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=directory )
|
proc = subprocess.Popen( command.split( ), stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=directory )
|
||||||
|
|
||||||
clientFile_tag = "... clientFile "
|
clientFile_tag = "... clientFile "
|
||||||
headAction_tag = "... headAction "
|
headAction_tag = "... headAction "
|
||||||
|
headType_tag = "... headType "
|
||||||
|
|
||||||
# http://www.perforce.com/perforce/r12.1/manuals/cmdref/fstat.html
|
# http://www.perforce.com/perforce/r12.1/manuals/cmdref/fstat.html
|
||||||
accepted_actions = [ 'add', 'edit', 'branch', 'move/add', 'move\\add', 'integrate', 'import', 'archive' ] #currently not checked
|
accepted_actions = [ 'add', 'edit', 'branch', 'move/add', 'move\\add', 'integrate', 'import', 'archive' ] #currently not checked
|
||||||
rejected_actions = [ 'delete', 'move/delete', 'move\\delete', 'purge' ]
|
rejected_actions = [ 'delete', 'move/delete', 'move\\delete', 'purge' ]
|
||||||
|
file_type_binary = 'binary+l'
|
||||||
|
file_type_text = 'text'
|
||||||
|
|
||||||
client_file = None
|
client_file = None
|
||||||
|
file_action = None
|
||||||
|
file_type = None
|
||||||
|
file_type_last = None
|
||||||
|
|
||||||
for line in proc.stdout:
|
class Bucket:
|
||||||
line = get_str_from_process_stdout( line )
|
def __init__(self, limit):
|
||||||
|
self.queue = []
|
||||||
|
self.queue_size = 0
|
||||||
|
self.queue_limit = limit
|
||||||
|
def append(self,obj):
|
||||||
|
self.queue.append(obj)
|
||||||
|
self.queue_size += 1
|
||||||
|
def is_full(self):
|
||||||
|
return self.queue_size >= self.queue_limit
|
||||||
|
|
||||||
if client_file and line.startswith( headAction_tag ):
|
self.buckets = {}
|
||||||
action = normpath( line[ len( headAction_tag ) : ].strip( ) )
|
self.buckets[file_type_text] = Bucket(10)
|
||||||
if not any(action == a for a in rejected_actions):
|
self.buckets[file_type_binary] = Bucket(2)
|
||||||
if options.verbose:
|
|
||||||
c.write( " Checking " + os.path.relpath( local_path, directory ) )
|
|
||||||
# TODO: directories should be batched and synced in parallel
|
|
||||||
queue.put( ( WRK.SYNC, local_path ) )
|
|
||||||
|
|
||||||
if line.startswith( clientFile_tag ):
|
def push_queued(bucket):
|
||||||
client_file = None
|
if bucket.queue_size == 0:
|
||||||
local_path = normpath( line[ len( clientFile_tag ) : ].strip( ) )
|
return
|
||||||
if make_drive_upper:
|
if options.verbose:
|
||||||
drive, path = splitdrive( local_path )
|
for f in bucket.queue:
|
||||||
client_file = ''.join( [ drive.upper( ), path ] )
|
c.write( " Checking " + os.path.relpath( f, directory ) )
|
||||||
|
self.queue.put( ( WRK.SYNC, bucket.queue ) )
|
||||||
|
bucket.queue = []
|
||||||
|
bucket.queue_size = 0
|
||||||
|
|
||||||
if len(line.rstrip()) == 0:
|
for line in proc.stdout:
|
||||||
client_file = None
|
line = get_str_from_process_stdout( line )
|
||||||
|
|
||||||
proc.wait( )
|
#push work when finding out type
|
||||||
|
if client_file and file_action is not None and line.startswith( headType_tag ):
|
||||||
|
|
||||||
for line in proc.stderr:
|
file_type = normpath( line[ len( headType_tag ) : ].strip( ) )
|
||||||
if "no such file" in line:
|
if file_type == file_type_text:
|
||||||
continue
|
self.buckets[file_type_text].append(client_file)
|
||||||
#raise Exception(line)
|
else:
|
||||||
c.write(line)#log as error
|
self.buckets[file_type_binary].append(client_file)
|
||||||
|
count += 1
|
||||||
|
|
||||||
if not options.quiet:
|
#check sizes and push
|
||||||
c.writeflush( " Pushed work, now waiting for threads..." )
|
for b in self.buckets.values():
|
||||||
|
if b.is_full():
|
||||||
|
push_queued(b)
|
||||||
|
|
||||||
for i in range( thread_count ):
|
elif client_file and line.startswith( headAction_tag ):
|
||||||
queue.put( ( WRK.SHUTDOWN, None ) )
|
file_action = normpath( line[ len( headAction_tag ) : ].strip( ) )
|
||||||
|
if any(file_action == a for a in rejected_actions):
|
||||||
|
file_action = None
|
||||||
|
else:
|
||||||
|
if os.path.exists( client_file ):
|
||||||
|
file_action = None
|
||||||
|
|
||||||
for t in threads:
|
elif line.startswith( clientFile_tag ):
|
||||||
t.join( )
|
client_file = normpath( line[ len( clientFile_tag ) : ].strip( ) )
|
||||||
|
if make_drive_upper:
|
||||||
|
drive, path = splitdrive( client_file )
|
||||||
|
client_file = ''.join( [ drive.upper( ), path ] )
|
||||||
|
|
||||||
if not options.quiet:
|
elif len(line.rstrip()) == 0:
|
||||||
c.write( "Done." )
|
client_file = None
|
||||||
|
|
||||||
end = time.clock()
|
for b in self.buckets.values():
|
||||||
delta = end - start
|
push_queued(b)
|
||||||
output = "\nFinished in " + str(delta) + "s"
|
proc.wait( )
|
||||||
|
|
||||||
c.writeflush( output )
|
for line in proc.stderr:
|
||||||
|
if "no such file" in line:
|
||||||
|
continue
|
||||||
|
#raise Exception(line)
|
||||||
|
c.write(line)#log as error
|
||||||
|
|
||||||
|
if not options.quiet:
|
||||||
|
c.writeflush( " Checking " + str(count) + " file(s), now waiting for threads..." )
|
||||||
|
|
||||||
|
for i in range( thread_count ):
|
||||||
|
self.queue.put( ( WRK.SHUTDOWN, None ) )
|
||||||
|
|
||||||
|
for t in threads:
|
||||||
|
t.join( )
|
||||||
|
|
||||||
|
if not options.quiet:
|
||||||
|
print( "Done." )
|
||||||
|
|
||||||
|
end = time.clock()
|
||||||
|
delta = end - start
|
||||||
|
output = "\nFinished in " + str(delta) + "s"
|
||||||
|
print( output )
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
try:
|
try:
|
||||||
main( sys.argv )
|
P4SyncMissing().run(sys.argv)
|
||||||
except:
|
except:
|
||||||
print( "\nUnexpected error!" )
|
print( "\nUnexpected error!" )
|
||||||
traceback.print_exc( file = sys.stdout )
|
traceback.print_exc( file = sys.stdout )
|
Loading…
Reference in New Issue