p4scripts/p4SyncMissingFiles.py
Brian Ernst 85de0ec1ca I added dry-run to p4SyncMissing some time ago; I don't remember if I finished it, but I don't have time to test, and don't want to lose it, so submitting it. Wow is this a rough testing branch or what.
Add TODOs.
Fix some directory bugs when running scripts with a cwd not in the p4 workspace. TODO: make sure all scripts and options work when run outside p4 workspace.
If user isn't logged in you can get weird errors later on in the pipeline, and without extra manually added prints, you wouldn't know you just need to log in. Added TODO: about detecting if we need to do a p4 login.
Some of my stuff seems to have stopped working with later version of Python/p4, had to update string to byte string; no doubt more of these issues hiding.
Haven't tested on python 2 in a while, do not consider these working there.
2026-03-05 15:27:28 -08:00

261 lines
No EOL
12 KiB
Python

#!/usr/bin/python
# -*- coding: utf8 -*-
# author : Brian Ernst
# python_version : 2.7.6 and 3.4.0
# =================================
# TODO: setup batches before pushing to threads and use p4 --parallel
# http://www.perforce.com/perforce/r14.2/manuals/cmdref/p4_sync.html
from p4Helper import *
import time, traceback
#==============================================================
class P4SyncMissing:
def run( self, args ):
start = time.time()
fail_if_no_p4()
#http://docs.python.org/library/optparse.html
parser = optparse.OptionParser( )
parser.add_option( "-d", "--dir", dest="directory", help="Desired directory to crawl.", default=None )
parser.add_option( "-t", "--threads", dest="thread_count", help="Number of threads to crawl your drive and poll p4.", default=12 )
parser.add_option( "-q", "--quiet", action="store_true", dest="quiet", help="This overrides verbose", default=False )
parser.add_option( "--dry-run", action="store_true", dest="dry_run", help="Whether the script should not make any changes but note what files it would try to sync.", default=False )
parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", default=False )
( options, args ) = parser.parse_args( args )
directory = normpath( options.directory if options.directory is not None else os.getcwd( ) )
with Console( auto_flush_time=1 ) as c:
with P4Workspace( directory ):
if not options.quiet:
c.writeflush( "Setting up to retreive missing files..." )
c.writeflush( " Setting up threads..." )
# Setup threading
WRK = enum( 'SHUTDOWN', 'SYNC' )
def shutdown( data ):
return False
def sync( files ):
files_len = len(files)
files_flat = ' '.join('"' + p4FriendlyPath( f ) + '"' for f in files)
if options.verbose:
files_len = len(files)
if files_len > 1:
c.write( " Syncing batch of " + str(len(files)) + " ...")
for f in files:
c.write( " " + os.path.relpath( f, directory ) )
else:
for f in files:
c.write( " Syncing " + os.path.relpath( f, directory ) + " ..." )
ret = -1
count = 0
while ret != 0 and count < 2:
ret = try_call_process( "p4 sync -f " + files_flat )
count += 1
if ret != 0 and not options.quiet:
c.write("Failed, trying again to sync " + files_flat)
if ret != 0:
if not options.quiet:
c.write("Failed to sync " + files_flat)
else:
if not options.quiet:
files_len = len(files)
if files_len > 1:
c.write( " Synced batch of " + str(len(files)) )
for f in files:
c.write( " Synced " + os.path.relpath( f, directory ) )
return True
commands = {
WRK.SHUTDOWN : shutdown,
WRK.SYNC : sync
}
threads = [ ]
thread_count = options.thread_count if options.thread_count > 0 else multiprocessing.cpu_count( ) + options.thread_count
if not options.quiet:
c.writeflush( f" Setting up {thread_count} threads..." )
count = 0
total = 0
self.queue = multiprocessing.JoinableQueue( )
for i in range( thread_count ):
t = Worker( c, self.queue, commands )
t.daemon = True
threads.append( t )
t.start( )
if not options.quiet:
c.writeflush( " Done." )
make_drive_upper = True if os.name == 'nt' or sys.platform == 'cygwin' else False
command = "p4 fstat ..."
if not options.quiet:
c.writeflush( f" Checking files in workspace (on stream `{get_client_stream()}`), this may take some time for large depots..." )
proc = subprocess.Popen( command.split( ), stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=directory )
clientFile_tag = "... clientFile "
headAction_tag = "... headAction "
headType_tag = "... headType "
# http://www.perforce.com/perforce/r12.1/manuals/cmdref/fstat.html
accepted_actions = [ 'add', 'edit', 'branch', 'move/add', 'move\\add', 'integrate', 'import', 'archive' ] #currently not checked
rejected_actions = [ 'delete', 'move/delete', 'move\\delete', 'purge' ]
file_type_binary = 'binary+l'
file_type_text = 'text'
client_file = None
file_action = None
file_type = None
file_type_last = None
# todo: use fewer threads, increase bucket size and use p4 threading
class Bucket:
def __init__(self, limit):
self.queue = []
self.queue_size = 0
self.queue_limit = limit
def append(self,obj):
self.queue.append(obj)
self.queue_size += 1
def is_full(self):
return self.queue_size >= self.queue_limit
self.buckets = {}
self.buckets[file_type_text] = Bucket(10)
self.buckets[file_type_binary] = Bucket(2)
def push_queued(bucket):
if bucket.queue_size == 0:
return
if options.verbose:
for f in bucket.queue:
c.write( " Queued " + os.path.relpath( f, directory ) )
self.queue.put( ( WRK.SYNC, bucket.queue ) )
bucket.queue = []
bucket.queue_size = 0
# Wrap with a try block for catching an early termination so we can clear the queue so threads don't keep spinning.
try:
# From here on out we have to scrape a bunch of lines just for
# one file. We have to build up a picture from the metadata about
# the state of the file.
#
# This means we're iterating a few times until we have client_file
# and file_action. Yes it's bad we wait to check client_file and
# file_action as long as there's pending lines, we should check
# that outside of the loop, not within.
for line in proc.stdout:
line = get_str_from_process_stdout( line )
# Push work when finding out type and that it needs to be synced.
if client_file and file_action is not None and line.startswith( headType_tag ):
if not options.dry_run:
# Add file to queue to be synced.
file_type = normpath( line[ len( headType_tag ) : ].strip( ) )
if file_type == file_type_text:
self.buckets[file_type_text].append(client_file)
else:
self.buckets[file_type_binary].append(client_file)
count += 1
# We wait to kick off a sync until we consider it to
# be a minimum sufficient size, then once it is, we
# kick it for syncing.
for b in self.buckets.values():
if b.is_full():
push_queued(b)
else:
c.write( f" {os.path.relpath( client_file, directory )}")
elif client_file and line.startswith( headAction_tag ):
# Even if we're ignoring the file, count it as being processed.
total += 1
file_action = normpath( line[ len( headAction_tag ) : ].strip( ) )
if any(file_action == a for a in rejected_actions):
# If the headAction indicates we shouldn't waste time
# syncing, don't indicate a valid action for our
# purposes. Yes we're hijacking the file_action, not
# the best way to do this.
file_action = None
else:
# If file exists, we don't need to sync it, don't
# indicate valid action for our purposes. Yes we're
# hijacking the file_action, not the best way to do this.
if os.path.exists( client_file ):
file_action = None
elif line.startswith( clientFile_tag ):
client_file = line[ len( clientFile_tag ) : ].strip( )
if make_drive_upper:
drive, path = splitdrive( client_file )
client_file = ''.join( [ drive.upper( ), path ] )
elif len(line.rstrip()) == 0:
client_file = None
for b in self.buckets.values():
push_queued(b)
proc.wait( )
for line in proc.stderr:
if "no such file" in line:
continue
#raise Exception(line)
c.write(line)#log as error
if not options.quiet:
c.writeflush( " Done. Checked " + str(total) + " file(s)." )
c.writeflush( " Queued " + str(count) + " file(s), now waiting for threads..." )
#for i in range( thread_count ):
#self.queue.put( ( WRK.SHUTDOWN, None ) )
#for t in threads:
#t.join( )
self.queue.join()
except KeyboardInterrupt:
# Don't leave threads to keep working.
self.queue.empty()
# Wait for threads to finish logging everything.
for i in range( thread_count ):
self.queue.put( ( WRK.SHUTDOWN, None ) )
for t in threads:
t.join( )
if not options.quiet:
print( " Done." )
if not options.quiet:
end = time.time()
delta = end - start
output = " Done. Finished in " + str(delta) + "s"
print( output )
if __name__ == "__main__":
try:
P4SyncMissing().run(sys.argv)
except:
print( "\nUnexpected error!" )
traceback.print_exc( file = sys.stdout )