Hi PowerShell Community!
If I hand over the Invoke-Parallel Function to other peoples, they struggle about the extra level of abstraction.
They do not know that each Runspace (Session State) has its own environment.
So the Variables, Functions, Modules, Snapins und so on from the current Runspace are not accessible from inside an new Runspace.
To make the use of Invoke-Parallel easier I Try to clone the current Runspace into the InitialSessionState.
I present here my first working attempt.
Do you have Ideas or do you find Bugs. What shall implement additional. Do I have forgotten something to clone?
Some things you have to clone from $Host.Runspace some things you have to clone from $Host.RunspaceInitialSessionState some things you can get from current Thread (like the Apartment State)
Which clone source is right which source is wrong?
Function Invoke-Parallel {
<#
.SYNOPSIS
A parallel ForEach that uses runspaces
.PARAMETER ScriptBlock
ScriptBlock to execute for each InputObject
.PARAMETER ScriptFile
Script file to execute for each InputObject
.PARAMETER InputObject
Object(s) to run script against in parallel
.PARAMETER Throttle
Maximum number of threads to run at one time. Default: 5
.PARAMETER Timeout
Stop each thread after this many seconds. Default: 0
.PARAMETER SleepTimer
When looping through open threads, wait this many milliseconds before looping again. Default: 200
.EXAMPLE
(0..50) | Invoke-Parallel -Throttle 4 { $_; sleep (Get-Random -Minimum 0 -Maximum 5) }
Send the number 0 through 50 to scriptblock. For each, display the number and then sleep for 0 to 5 seconds. Only execute 4 threads at a time.
.EXAMPLE
(0..50) | Invoke-Parallel -Throttle 4 -ScriptBlock {$_; Start-Sleep -Seconds (Get-Random -Minimum 0 -Maximum 10)} -Timeout 5
Send the number 0 through 50 to scriptblock. For each, display the number and then sleep for 0 to 10 seconds. Only execute 4 threads at a time.
If a thread runs (sleeps) more than 5 seconds the thread is terminated (stopped).
.EXAMPLE
$servers | Invoke-Parallel -Throttle 20 -Timeout 60 -sleeptimer 200 -verbose -scriptFile C:\query.ps1
Run query.ps1 against each computer in $servers. Run 20 threads at a time, timeout a thread if it takes longer than 60 minutes to run, give verbose output.
.FUNCTIONALITY
PowerShell Language
.NOTES
Improved by Peter Kriegel
19.February.2014
added synchronized Hashtable to support real Timeout behavior
22.May.2014
added
Credit to Tome Tanasovski (ForEach-Parallel)
http://powertoe.wordpress.com/2012/05/03/foreach-parallel
Credit to rambling cookie monster (ForEach-Parallel, Invoke-Parallel)
http://gallery.technet.microsoft.com/Foreach-Parallel-Parallel-a8f3d22b
http://ramblingcookiemonster.wordpress.com/2012/11/25/parallel-powershell-part-ii/
Credit to Alexey Shuvalov (for Invoke-RunspaceJob)
http://gallery.technet.microsoft.com/scriptcenter/Invoke-RunspaceJob-8d487eab
Credits to Boe Prox for his several good Blog post about runspaces!
http://learn-powershell.net
#>
[cmdletbinding()]
param(
[Parameter(Mandatory=$False,position=0,ParameterSetName='ScriptBlock')]
[System.Management.Automation.ScriptBlock]$ScriptBlock,
[Parameter(Mandatory=$False,ParameterSetName='ScriptFile')]
[ValidateScript({Test-Path $_ -pathtype leaf})]
$ScriptFile,
[Parameter(Mandatory=$true,ValueFromPipeline=$True)]
[PSObject[]]$InputObject,
[ValidateRange(2,2147483647)]
[int]$Throttle=5,
[double]$SleepTimer = 200,
[double]$Timeout = 0
)
BEGIN {
$MyVars = Get-Variable -Scope 1
#Build the text to be replaced in the $ScriptToRun HereString depending on the parameterset used
switch ($PSCmdlet.ParameterSetName){
'ScriptBlock' {$ExternalScript = $Scriptblock.ToString()}
'ScriptFile' {$ExternalScript = $(Get-Content $scriptFile | Out-String)}
Default {Write-Error ('Must provide ScriptBlock or ScriptFile'); Return}
}
$ScriptToRun = @'
param(
[String]$RunspaceID,
[PSObject]$InputObject
)
# Write thread Informations to the Synchronized Hashtable
$SynchronizedHash.$RunspaceID = @{
StartTime = (Get-Date);
Status = 'Running'
}
# Run the Scriptblock to do the work
& {<ReplacePlace>} $InputObject
'@.Replace('<ReplacePlace>',$ExternalScript)
$ScriptBlock = $ExecutionContext.InvokeCommand.NewScriptBlock($ScriptToRun)
#Define the initial sessionstate, create the runspacepool
Write-Verbose "Creating runspace pool with $Throttle threads"
###############################################################################
# Begin to clone current Runspace >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
$sessionState = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()
$sessionState.ApartmentState = [System.Threading.Thread]::CurrentThread.GetApartmentState()
# $sessionState.Assamblies = [appdomain]::currentdomain.getassemblies() ?????
$sessionState.LanguageMode = $Host.runspace.LanguageMode
# Add Variables from Current current runspace to the InitialSessionState
# InitialSessionState comes with predefined Variables
# '$','^','ConfirmPreference','DebugPreference','ErrorActionPreference','ErrorView','FormatEnumerationLimit','NestedPromptLevel','OutputEncoding','ProgressPreference','PSEmailServer','PSSessionApplicationName','PSSessionConfigurationName','PSSessionOption','StackTrace','VerbosePreference','WarningPreference','WhatIfPreference'
# We remove this Variables and re-add them with Variable from Current Runspace
$sessionState.Variables.Clear()
# this Variables are added by Runspace.Open() Method and produce Stop errors if you add them twice
$VorbiddenVars = @('?','args','ConsoleFileName','Error','ExecutionContext','false','HOME','Host','input','InputObject','MaximumAliasCount','MaximumDriveCount','MaximumErrorCount','MaximumFunctionCount','MaximumHistoryCount','MaximumVariableCount','MyInvocation','null','PID','PSBoundParameters','PSCommandPath','PSCulture','PSDefaultParameterValues','PSHOME','PSScriptRoot','PSUICulture','PSVersionTable','PWD','ShellId','SynchronizedHash','true')
# Add Variables from Parent Scope (current runspace) into the InitialSessionState
ForEach($Var in $MyVars) {
If($VorbiddenVars -notcontains $Var.Name) {
Write-Verbose "Adding Variable $($Var.Name) to InitialSessionState"
$sessionstate.Variables.Add((New-Object -TypeName System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList $Var.name,$Var.Value,$Var.description,$Var.options,$Var.attributes))
}
}
# Add Functions from current runspace to the InitialSessionState
ForEach($Function in (Get-ChildItem Function:)) {
Write-Verbose "Adding Function $($Function.Name) to InitialSessionState"
$sessionState.Commands.Add((New-Object -TypeName System.Management.Automation.Runspaces.SessionStateFunctionEntry -ArgumentList $Function.Name, $Function.Definition))
}
# Add Modules from current runspace to the InitialSessionState
ForEach($ModulePath in (Get-Module | Select-Object -ExpandProperty ModuleBase)) {
Write-Verbose "Adding Module $($ModulePath) to InitialSessionState"
$sessionState.ImportPSModulesFromPath($ModulePath)
}
# Add Snapins from current runspace to the InitialSessionState
ForEach($SnapinName in (Get-PSSnapin | Select-Object -ExpandProperty Name)) {
# Skip Snapin 'Microsoft.PowerShell.Core' (that produced an Error is loaded by default)
If(-not ($SnapinName -eq 'Microsoft.PowerShell.Core')) {
$PSSnapInException = $Null
Write-Verbose "Adding PSSnapin $($SnapinName) to InitialSessionState"
$sessionState.ImportPSSnapIn($SnapinName, [Ref]$PSSnapInException)
If($PSSnapInException) {
Throw $PSSnapInException
}
}
}
# create a thread save synchronized Hashtable to Share Data beween the Runspaces and to get the StartTime and the Status out of a thread
$SynchronizedHash = [HashTable]::Synchronized(@{})
# add synchronized Hashtable to the initialsession state, so all threads can use the same synchronized Hashtable
$sessionState.Variables.Add((New-Object -TypeName System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'SynchronizedHash', $SynchronizedHash,''))
# create the runspace pool with minimum 1 Thread and maximum Throttle number Threads, sessionState and current Host
$pool = [Runspacefactory]::CreateRunspacePool(1, $Throttle, $sessionState, $host)
$pool.ApartmentState = [System.Threading.Thread]::CurrentThread.GetApartmentState()
$pool.open()
# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< END clonig of current Runspace #
###############################################################################
#array to hold details on each thread
$threads = [System.Collections.ArrayList]@()
# Set Flag to run stuff only one time in the Process block
# if it is $False the stuff is executed
# if it is $True the stuff is not executed another time
$Once = $False
# $Variable to hold the Runspace ID
$RunspaceID = 0
}
PROCESS {
If(-not $Once) {
# Test if function is runnig inside a Pipeline or if it is runnung as standalone call
If($PSCmdlet.MyInvocation.PipelineLength -eq 1) {
$InPipeline = $False
} Else {
$InPipeline = $True
}
}
ForEach($object in $inputObject){
$RunspaceID = $RunspaceID++
$SynchronizedHash.$RunspaceID = @{
StartTime = (Get-Date);
Status = 'NotStarted'
}
#For each pipeline object, create a new powershell instance, add to runspacepool
$powershell = [powershell]::Create().AddScript($scriptblock).AddArgument($RunspaceID).AddArgument($object)
$powershell.runspacepool=$pool
Write-Verbose "Added $object to the runspacepool"
# For performance reasons and to prevent lock of death and race conditions I do not use the $SynchronizedHash to hold the thread details.
# On every write action on the $SynchronizedHash the thread is blocked!
# Write actions on $SynchronizedHash must be smal and few as possible!
# Here I add references to inputobject, PowerShell (runspace) instance, and handle to threads array
# the relation ship between the $threads array and $SynchronizedHash is the $RunspaceID
[void]$threads.Add((New-Object psobject -Property @{
RunspaceID = $RunspaceID;
Object = $object;
instance = $powershell;
handle = $powershell.begininvoke();
startTime = $startTime
}))
}
# Set Flag to run stuff only one time in the Process block
# if it is $True the stuff is not executed another time
$Once = $True
}
END {
$notdone = $true
#Loop through threads.
while ($notdone) {
for ($i=0; $i -lt $threads.count; $i++) {
$thread = $threads[$i]
if ($thread) {
$ThreadStartTime = $SynchronizedHash.($thread.RunspaceID).StartTime
#If thread is complete, dispose of it.
if ($thread.handle.iscompleted) {
Write-verbose "Closing thread for $($thread.Object)"
$thread.instance.endinvoke($thread.handle)
$thread.instance.dispose()
$threads[$i] = $null
$threads.RemoveAt($i)
$SynchronizedHash.($thread.RunspaceID).Status = 'Completed'
}
#Thread exceeded maxruntime timeout threshold
elseif( $Timeout -ne 0 -and ( (get-date) - $ThreadStartTime ).TotalSeconds -gt $Timeout ){
Write-Error "Closing thread for $($thread.Object): Thread exceeded $Timeout seconds limit" -TargetObject $thread.inputObject
$thread.instance.dispose()
$threads[$i] = $null
$threads.RemoveAt($i)
$SynchronizedHash.($thread.RunspaceID).Status = 'Failed'
}
#Thread is running, loop again!
else {
$notdone = $true
}
}
}
If($threads.count -eq 0 ) {
# no threads left set quit flag
$notdone = $False
} Else {
#Sleep for specified time before looping again
Start-Sleep -Milliseconds $sleepTimer
}
}
$pool.close()
}
}
# Function that must show up in the different Runspaces by the current Runspace clone mechanism
Function Test-Function {
param (
[String]$GiveMeStuff
)
"Message from Test-Function, I got Stuff: $GiveMeStuff"
}
# Variable that must show up in the different Runspaces by the current Runspace clone mechanism
$TestVariable = 'This is the Value of the TestVariable'
# Test call to Test developed Stuff
1..5 | Invoke-Parallel -Verbose -scriptblock {
param($InputObject)
# Call Function which is Imported into this Runspace by the current Runspace clone mechanism
Test-Function $InputObject
# Call Variable which is Imported into this Runspace by the current Runspace clone mechanism
$TestVariable
# List Imported Snapins
Get-PSSnapin
# List Imported Modules
Get-Module
}
awaiting you comments
Greets Peter Kriegel