Add TODOs. Fix some directory bugs when running scripts with a cwd not in the p4 workspace. TODO: make sure all scripts and options work when run outside p4 workspace. If user isn't logged in you can get weird errors later on in the pipeline, and without extra manually added prints, you wouldn't know you just need to log in. Added TODO: about detecting if we need to do a p4 login. Some of my stuff seems to have stopped working with later version of Python/p4, had to update string to byte string; no doubt more of these issues hiding. Haven't tested on python 2 in a while, do not consider these working there.
261 lines
No EOL
12 KiB
Python
261 lines
No EOL
12 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf8 -*-
|
|
# author : Brian Ernst
|
|
# python_version : 2.7.6 and 3.4.0
|
|
# =================================
|
|
|
|
# TODO: setup batches before pushing to threads and use p4 --parallel
|
|
# http://www.perforce.com/perforce/r14.2/manuals/cmdref/p4_sync.html
|
|
|
|
from p4Helper import *
|
|
|
|
import time, traceback
|
|
|
|
|
|
#==============================================================
|
|
class P4SyncMissing:
|
|
def run( self, args ):
|
|
start = time.time()
|
|
|
|
fail_if_no_p4()
|
|
|
|
#http://docs.python.org/library/optparse.html
|
|
parser = optparse.OptionParser( )
|
|
|
|
parser.add_option( "-d", "--dir", dest="directory", help="Desired directory to crawl.", default=None )
|
|
parser.add_option( "-t", "--threads", dest="thread_count", help="Number of threads to crawl your drive and poll p4.", default=12 )
|
|
parser.add_option( "-q", "--quiet", action="store_true", dest="quiet", help="This overrides verbose", default=False )
|
|
parser.add_option( "--dry-run", action="store_true", dest="dry_run", help="Whether the script should not make any changes but note what files it would try to sync.", default=False )
|
|
parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", default=False )
|
|
|
|
( options, args ) = parser.parse_args( args )
|
|
|
|
directory = normpath( options.directory if options.directory is not None else os.getcwd( ) )
|
|
|
|
with Console( auto_flush_time=1 ) as c:
|
|
with P4Workspace( directory ):
|
|
if not options.quiet:
|
|
c.writeflush( "Setting up to retreive missing files..." )
|
|
c.writeflush( " Setting up threads..." )
|
|
|
|
# Setup threading
|
|
WRK = enum( 'SHUTDOWN', 'SYNC' )
|
|
|
|
def shutdown( data ):
|
|
return False
|
|
def sync( files ):
|
|
files_len = len(files)
|
|
files_flat = ' '.join('"' + p4FriendlyPath( f ) + '"' for f in files)
|
|
|
|
if options.verbose:
|
|
files_len = len(files)
|
|
if files_len > 1:
|
|
c.write( " Syncing batch of " + str(len(files)) + " ...")
|
|
for f in files:
|
|
c.write( " " + os.path.relpath( f, directory ) )
|
|
else:
|
|
for f in files:
|
|
c.write( " Syncing " + os.path.relpath( f, directory ) + " ..." )
|
|
|
|
ret = -1
|
|
count = 0
|
|
while ret != 0 and count < 2:
|
|
ret = try_call_process( "p4 sync -f " + files_flat )
|
|
count += 1
|
|
if ret != 0 and not options.quiet:
|
|
c.write("Failed, trying again to sync " + files_flat)
|
|
if ret != 0:
|
|
if not options.quiet:
|
|
c.write("Failed to sync " + files_flat)
|
|
else:
|
|
if not options.quiet:
|
|
files_len = len(files)
|
|
if files_len > 1:
|
|
c.write( " Synced batch of " + str(len(files)) )
|
|
for f in files:
|
|
c.write( " Synced " + os.path.relpath( f, directory ) )
|
|
return True
|
|
|
|
commands = {
|
|
WRK.SHUTDOWN : shutdown,
|
|
WRK.SYNC : sync
|
|
}
|
|
|
|
threads = [ ]
|
|
thread_count = options.thread_count if options.thread_count > 0 else multiprocessing.cpu_count( ) + options.thread_count
|
|
|
|
if not options.quiet:
|
|
c.writeflush( f" Setting up {thread_count} threads..." )
|
|
|
|
count = 0
|
|
total = 0
|
|
self.queue = multiprocessing.JoinableQueue( )
|
|
|
|
for i in range( thread_count ):
|
|
t = Worker( c, self.queue, commands )
|
|
t.daemon = True
|
|
threads.append( t )
|
|
t.start( )
|
|
|
|
if not options.quiet:
|
|
c.writeflush( " Done." )
|
|
|
|
make_drive_upper = True if os.name == 'nt' or sys.platform == 'cygwin' else False
|
|
|
|
command = "p4 fstat ..."
|
|
|
|
if not options.quiet:
|
|
c.writeflush( f" Checking files in workspace (on stream `{get_client_stream()}`), this may take some time for large depots..." )
|
|
|
|
proc = subprocess.Popen( command.split( ), stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=directory )
|
|
|
|
clientFile_tag = "... clientFile "
|
|
headAction_tag = "... headAction "
|
|
headType_tag = "... headType "
|
|
|
|
# http://www.perforce.com/perforce/r12.1/manuals/cmdref/fstat.html
|
|
accepted_actions = [ 'add', 'edit', 'branch', 'move/add', 'move\\add', 'integrate', 'import', 'archive' ] #currently not checked
|
|
rejected_actions = [ 'delete', 'move/delete', 'move\\delete', 'purge' ]
|
|
file_type_binary = 'binary+l'
|
|
file_type_text = 'text'
|
|
|
|
client_file = None
|
|
file_action = None
|
|
file_type = None
|
|
file_type_last = None
|
|
|
|
# todo: use fewer threads, increase bucket size and use p4 threading
|
|
class Bucket:
|
|
def __init__(self, limit):
|
|
self.queue = []
|
|
self.queue_size = 0
|
|
self.queue_limit = limit
|
|
def append(self,obj):
|
|
self.queue.append(obj)
|
|
self.queue_size += 1
|
|
def is_full(self):
|
|
return self.queue_size >= self.queue_limit
|
|
|
|
self.buckets = {}
|
|
self.buckets[file_type_text] = Bucket(10)
|
|
self.buckets[file_type_binary] = Bucket(2)
|
|
|
|
def push_queued(bucket):
|
|
if bucket.queue_size == 0:
|
|
return
|
|
if options.verbose:
|
|
for f in bucket.queue:
|
|
c.write( " Queued " + os.path.relpath( f, directory ) )
|
|
self.queue.put( ( WRK.SYNC, bucket.queue ) )
|
|
bucket.queue = []
|
|
bucket.queue_size = 0
|
|
|
|
# Wrap with a try block for catching an early termination so we can clear the queue so threads don't keep spinning.
|
|
try:
|
|
# From here on out we have to scrape a bunch of lines just for
|
|
# one file. We have to build up a picture from the metadata about
|
|
# the state of the file.
|
|
#
|
|
# This means we're iterating a few times until we have client_file
|
|
# and file_action. Yes it's bad we wait to check client_file and
|
|
# file_action as long as there's pending lines, we should check
|
|
# that outside of the loop, not within.
|
|
for line in proc.stdout:
|
|
line = get_str_from_process_stdout( line )
|
|
|
|
# Push work when finding out type and that it needs to be synced.
|
|
if client_file and file_action is not None and line.startswith( headType_tag ):
|
|
if not options.dry_run:
|
|
# Add file to queue to be synced.
|
|
|
|
file_type = normpath( line[ len( headType_tag ) : ].strip( ) )
|
|
if file_type == file_type_text:
|
|
self.buckets[file_type_text].append(client_file)
|
|
else:
|
|
self.buckets[file_type_binary].append(client_file)
|
|
count += 1
|
|
|
|
# We wait to kick off a sync until we consider it to
|
|
# be a minimum sufficient size, then once it is, we
|
|
# kick it for syncing.
|
|
for b in self.buckets.values():
|
|
if b.is_full():
|
|
push_queued(b)
|
|
else:
|
|
c.write( f" {os.path.relpath( client_file, directory )}")
|
|
|
|
elif client_file and line.startswith( headAction_tag ):
|
|
# Even if we're ignoring the file, count it as being processed.
|
|
total += 1
|
|
|
|
file_action = normpath( line[ len( headAction_tag ) : ].strip( ) )
|
|
|
|
if any(file_action == a for a in rejected_actions):
|
|
# If the headAction indicates we shouldn't waste time
|
|
# syncing, don't indicate a valid action for our
|
|
# purposes. Yes we're hijacking the file_action, not
|
|
# the best way to do this.
|
|
file_action = None
|
|
else:
|
|
# If file exists, we don't need to sync it, don't
|
|
# indicate valid action for our purposes. Yes we're
|
|
# hijacking the file_action, not the best way to do this.
|
|
if os.path.exists( client_file ):
|
|
file_action = None
|
|
|
|
elif line.startswith( clientFile_tag ):
|
|
client_file = line[ len( clientFile_tag ) : ].strip( )
|
|
if make_drive_upper:
|
|
drive, path = splitdrive( client_file )
|
|
client_file = ''.join( [ drive.upper( ), path ] )
|
|
|
|
elif len(line.rstrip()) == 0:
|
|
client_file = None
|
|
|
|
for b in self.buckets.values():
|
|
push_queued(b)
|
|
proc.wait( )
|
|
|
|
for line in proc.stderr:
|
|
if "no such file" in line:
|
|
continue
|
|
#raise Exception(line)
|
|
c.write(line)#log as error
|
|
|
|
if not options.quiet:
|
|
c.writeflush( " Done. Checked " + str(total) + " file(s)." )
|
|
c.writeflush( " Queued " + str(count) + " file(s), now waiting for threads..." )
|
|
|
|
#for i in range( thread_count ):
|
|
#self.queue.put( ( WRK.SHUTDOWN, None ) )
|
|
|
|
#for t in threads:
|
|
#t.join( )
|
|
|
|
self.queue.join()
|
|
|
|
except KeyboardInterrupt:
|
|
# Don't leave threads to keep working.
|
|
self.queue.empty()
|
|
|
|
# Wait for threads to finish logging everything.
|
|
for i in range( thread_count ):
|
|
self.queue.put( ( WRK.SHUTDOWN, None ) )
|
|
for t in threads:
|
|
t.join( )
|
|
|
|
if not options.quiet:
|
|
print( " Done." )
|
|
|
|
if not options.quiet:
|
|
end = time.time()
|
|
delta = end - start
|
|
output = " Done. Finished in " + str(delta) + "s"
|
|
print( output )
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
P4SyncMissing().run(sys.argv)
|
|
except:
|
|
print( "\nUnexpected error!" )
|
|
traceback.print_exc( file = sys.stdout ) |