p4Scripts/p4Helper.py

433 lines
15 KiB
Python

#!/usr/bin/python
# -*- coding: utf8 -*-
# author : Brian Ernst
# python_version : 2.7.6 and 3.4.0
# =================================
import datetime, inspect, itertools, marshal, multiprocessing, optparse, os, re, stat, subprocess, sys, threading
# trying ntpath, need to test on linux
import ntpath
try: input = raw_input
except: pass
#==============================================================
re_remove_comment = re.compile( "#.*$" )
def remove_comment( s ):
return re.sub( re_remove_comment, "", s )
def singular_pulural( val, singular, plural ):
return singular if val == 1 else plural
#==============================================================
def enum(*sequential, **named):
enums = dict(zip(sequential, range(len(sequential))), **named)
return type('Enum', (), enums)
MSG = enum('SHUTDOWN', 'PARSE_DIRECTORY', 'RUN_FUNCTION')
p4_ignore = ".p4ignore"
main_pid = os.getpid( )
#==============================================================
#if os.name == 'nt' or sys.platform == 'cygwin'
def basename( path ):
# TODO: import based on platform
# https://docs.python.org/2/library/os.path.html
# posixpath for UNIX-style paths
# ntpath for Windows paths
# macpath for old-style MacOS paths
# os2emxpath for OS/2 EMX paths
#return os.path.basename( path )
return ntpath.basename( path )
def normpath( path ):
return ntpath.normpath( path )
def join( patha, pathb ):
return ntpath.join( patha, pathb )
def splitdrive( path ):
return ntpath.splitdrive( path )
def p4FriendlyPath(path):
"""
Returns path with sanitized unsupported characters due to filename limitations.
"""
# http://www.perforce.com/perforce/doc.current/manuals/cmdref/filespecs.html#1041962
replace_items = {
'@' : '%40',
'#' : '%23',
'*' : '%2A',
'%' : '%25'
}
def replace(c):
return replace_items[c] if c in replace_items else c
return ''.join(map(replace, path))
#==============================================================
def get_ignore_list( path, files_to_ignore ):
# have to split path and test top directory
dirs = path.split( os.sep )
ignore_list = [ ]
for i, val in enumerate( dirs ):
path_to_find = os.sep.join( dirs[ : i + 1] )
if path_to_find in files_to_ignore:
ignore_list.extend( files_to_ignore[ path_to_find ] )
return ignore_list
def match_in_ignore_list( path, ignore_list ):
for r in ignore_list:
if re.match( r, path ):
return True
return False
#==============================================================
def call_process( args ):
return subprocess.call( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
def try_call_process( args, path=None ):
try:
subprocess.check_output( args, shell=False, cwd=path )#, stderr=subprocess.STDOUT )
return 0
except subprocess.CalledProcessError:
return 1
use_bytearray_str_conversion = type( b"str" ) is not str
def get_str_from_process_stdout( line ):
if use_bytearray_str_conversion:
return ''.join( map( chr, line ) )
else:
return line
def parse_info_from_command( args, value, path = None ):
"""
:rtype : string
"""
proc = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=path )
for line in proc.stdout:
line = get_str_from_process_stdout( line )
if not line.startswith( value ):
continue
return line[ len( value ) : ].strip( )
return None
def get_p4_py_results( args, path = None ):
results = []
proc = subprocess.Popen( 'p4 -G ' + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=path )
try:
while True:
output = marshal.load( proc.stdout )
results.append( output )
except EOFError:
pass
finally:
proc.stdout.close()
return results
#==============================================================
def fail_if_no_p4():
if call_process( 'p4 -V' ) != 0:
print( 'Perforce Command-line Client(p4) is required for this script.' )
sys.exit( 1 )
# Keep these in mind if you have issues:
# https://stackoverflow.com/questions/16557908/getting-output-of-a-process-at-runtime
# https://stackoverflow.com/questions/4417546/constantly-print-subprocess-output-while-process-is-running
def get_client_set( path ):
files = set( [ ] )
make_drive_upper = True if os.name == 'nt' or sys.platform == 'cygwin' else False
command = "p4 fstat ..."
proc = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=path )
for line in proc.stdout:
line = get_str_from_process_stdout( line )
clientFile_tag = "... clientFile "
if not line.startswith( clientFile_tag ):
continue
local_path = normpath( line[ len( clientFile_tag ) : ].strip( ) )
if make_drive_upper:
drive, path = splitdrive( local_path )
local_path = ''.join( [ drive.upper( ), path ] )
files.add( local_path )
proc.wait( )
for line in proc.stderr:
if "no such file" in line:
continue
raise Exception(line)
return files
def get_client_root( ):
"""
:rtype : string
"""
command = "p4 info"
proc = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
for line in proc.stdout:
line = get_str_from_process_stdout( line )
clientFile_tag = "Client root: "
if not line.startswith( clientFile_tag ):
continue
local_path = normpath( line[ len( clientFile_tag ) : ].strip( ) )
return local_path
return None
class P4Workspace:
"""
Use this class when working in a workspace.
Makes sure the environmentals are setup correctly, and that you aren't working on a non-perforce directory accidentally;
otherwise you can delete files that shouldn't be deleted. Ex:
with P4Workspace( cwd ): #sets current workspace to cwd, or fails
# do stuff here
# on exit reverts to previous set workspace
"""
def __init__( self, directory):
self.directory = directory
def __enter__( self ):
# get user
#print("\nChecking p4 info...")
result = get_p4_py_results('info')
if len(result) == 0 or b'userName' not in result[0].keys():
print("Can't find perforce info, is it even setup? Possibly can't connect to server.")
sys.exit(1)
username = get_str_from_process_stdout(result[0][b'userName'])
client_host = get_str_from_process_stdout(result[0][b'clientHost'])
#print("|Done.")
# see if current directory is set to current workspace, if not, set it to current workspace.
client_root = get_client_root()
ldirectory = self.directory.lower()
oldworkspace_name = None
if client_root is None or not ldirectory.startswith(client_root.lower()):
#print("\nCurrent directory not in client view, checking other workspaces for user '" + username + "' ...")
oldworkspace_name = parse_info_from_command('p4 info', 'Client name: ')
# get user workspaces
result = get_p4_py_results('workspaces -u ' + username)
workspaces = []
for r in result:
whost = get_str_from_process_stdout(r[b'Host'])
if whost is not None and len(whost) != 0 and client_host != whost:
continue
workspace = {'root': get_str_from_process_stdout(r[b'Root']), 'name': get_str_from_process_stdout(r[b'client'])}
workspaces.append(workspace)
del result
# check current directory against current workspace, see if it matches existing workspaces.
for w in workspaces:
wname = w['name']
wlower = w['root'].lower()
if ldirectory.startswith(wlower):
# set current directory, don't forget to revert it back to the existing one
#print("|Setting client view to: " + wname)
if try_call_process( 'p4 set P4CLIENT=' + wname ):
#print("|There was a problem trying to set the p4 client view (workspace).")
sys.exit(1)
break
else:
# TODO: look up workspace/users for this computer
print( "Couldn't find a workspace root that matches the current directory for the current user." )
sys.exit(1)
#print("|Done.")
self.oldworkspace_name = oldworkspace_name
return self
def __exit__( self, type, value, tb ):
# If we changed the current workspace, switch it back.
if self.oldworkspace_name is not None:
c.write("\nReverting back to original client view...")
# set workspace back to the original one
if try_call_process( 'p4 set P4CLIENT=' + self.oldworkspace_name ):
# error_count += 1 # have console log errors
# if not options.quiet:
print("There was a problem trying to restore the set p4 client view (workspace).")
sys.exit(1)
#else:
# if not options.quiet:
# c.write("|Reverted client view back to '" + self.oldworkspace_name + "'.")
#if not options.quiet:
# c.write("|Done.")
#==============================================================
class PTable( list ):
def __init__( self, *args ):
list.__init__( self, args )
self.mutex = multiprocessing.Semaphore( )
class PDict( dict ):
def __init__( self, *args ):
dict.__init__( self, args )
self.mutex = multiprocessing.Semaphore( )
#==============================================================
# TODO: Create a child thread for triggering autoflush events
# TODO: Hook console into stdout so it catches print
class Console( threading.Thread ):
MSG = enum('WRITE', 'FLUSH', 'SHUTDOWN', 'CLEAR' )
@staticmethod
def wake(thread):
thread.flush()
if not thread.shutting_down:
thread.wake_thread = threading.Timer(thread.auto_flush_time / 1000.0, Console.wake, [thread])
thread.wake_thread.daemon = True
thread.wake_thread.start()
# auto_flush_time is time in milliseconds since last flush to trigger a flush when writing
def __init__( self, auto_flush_num = None, auto_flush_time = None ):
threading.Thread.__init__( self )
self.buffers = {}
self.buffer_write_times = {}
self.running = True
self.queue = multiprocessing.JoinableQueue( )
self.auto_flush_num = auto_flush_num if auto_flush_num is not None else -1
self.auto_flush_time = auto_flush_time * 1000 if auto_flush_time is not None else -1
self.shutting_down = False
self.wake_thread = None
if self.auto_flush_time > 0:
Console.wake(self)
def write( self, data, pid = None ):
pid = pid if pid is not None else threading.current_thread().ident
self.queue.put( ( Console.MSG.WRITE, pid, data ) )
def writeflush( self, data, pid = None ):
pid = pid if pid is not None else threading.current_thread().ident
self.queue.put( ( Console.MSG.WRITE, pid, data ) )
self.queue.put( ( Console.MSG.FLUSH, pid ) )
def flush( self, pid = None ):
pid = pid if pid is not None else threading.current_thread().ident
self.queue.put( ( Console.MSG.FLUSH, pid ) )
def clear( self, pid = None ):
pid = pid if pid is not None else threading.current_thread().ident
self.queue.put( ( Console.MSG.CLEAR, pid ) )
def __enter__( self ):
self.start( )
return self
def __exit__( self, type, value, tb ):
self.shutting_down = True
if self.wake_thread:
self.wake_thread.cancel()
self.wake_thread.join()
self.queue.put( ( Console.MSG.SHUTDOWN, ) )
self.queue.join( )
def run( self ):
while True:
data = self.queue.get( )
event = data[0]
if event == Console.MSG.SHUTDOWN:
for ( pid, buffer ) in self.buffers.items( ):
for line in buffer:
print( line )
self.buffers.clear( )
self.buffer_write_times.clear( )
self.queue.task_done( )
break
elif event == Console.MSG.WRITE:
pid, s = data[ 1 : ]
if pid not in self.buffers:
self.buffers[ pid ] = []
if pid not in self.buffer_write_times:
self.buffer_write_times[ pid ] = datetime.datetime.now( )
self.buffers[ pid ].append( s )
try:
if self.auto_flush_num >= 0 and len( self.buffers[ pid ] ) >= self.auto_flush_num:
self.flush( pid )
elif self.auto_flush_time >= 0 and ( datetime.datetime.now( ) - self.buffer_write_times[ pid ] ).microseconds >= self.auto_flush_time:
self.flush( pid )
except TypeError:
print('"' + pid + '"')
raise
# TODO: if buffer is not empty and we don't auto flush on write, sleep until a time then auto flush according to auto_flush_time
elif event == Console.MSG.FLUSH:
pid = data[ 1 ]
if pid in self.buffers:
buffer = self.buffers[ pid ]
for line in buffer:
print( line )
self.buffers.pop( pid, None )
self.buffer_write_times[ pid ] = datetime.datetime.now( )
elif event == Console.MSG.CLEAR:
pid = data[ 1 ]
if pid in self.buffers:
self.buffers.pop( pid, None )
self.queue.task_done( )
#==============================================================
# class Task( threading.Event ):
# def __init__( data, cmd = None ):
# threading.Event.__init__( self )
# self.cmd = cmd if cmd is None MSG.RUN_FUNCTION
# self.data = data
# def isDone( self ):
# return self.isSet()
# def join( self ):
# self.wait( )
#==============================================================
class Worker( threading.Thread ):
def __init__( self, console, queue, commands ):
threading.Thread.__init__( self )
self.queue = queue
self.commands = commands
def run( self ):
while True:
( cmd, data ) = self.queue.get( )
if not self.commands[cmd](data):
self.queue.task_done( )
break
self.queue.task_done( )