Invoke-Parallel need help to clone the current Runspace

Hi PowerShell Community!

If I hand over the Invoke-Parallel Function to other peoples, they struggle about the extra level of abstraction.
They do not know that each Runspace (Session State) has its own environment.
So the Variables, Functions, Modules, Snapins und so on from the current Runspace are not accessible from inside an new Runspace.
To make the use of Invoke-Parallel easier I Try to clone the current Runspace into the InitialSessionState.

I present here my first working attempt.
Do you have Ideas or do you find Bugs. What shall implement additional. Do I have forgotten something to clone?
Some things you have to clone from $Host.Runspace some things you have to clone from $Host.RunspaceInitialSessionState some things you can get from current Thread (like the Apartment State)
Which clone source is right which source is wrong?


Function Invoke-Parallel {
<#
.SYNOPSIS
    A parallel ForEach that uses runspaces
 
.PARAMETER ScriptBlock
    ScriptBlock to execute for each InputObject
 
.PARAMETER ScriptFile
    Script file to execute for each InputObject
 
.PARAMETER InputObject
    Object(s) to run script against in parallel
 
.PARAMETER Throttle
    Maximum number of threads to run at one time.  Default: 5
 
.PARAMETER Timeout
    Stop each thread after this many seconds.  Default: 0
 
.PARAMETER SleepTimer
    When looping through open threads, wait this many milliseconds before looping again.  Default: 200
 
.EXAMPLE
    (0..50) | Invoke-Parallel -Throttle 4 { $_; sleep (Get-Random -Minimum 0 -Maximum 5) }
 
    Send the number 0 through 50 to scriptblock.  For each, display the number and then sleep for 0 to 5 seconds.  Only execute 4 threads at a time.
.EXAMPLE
    (0..50) | Invoke-Parallel -Throttle 4 -ScriptBlock {$_; Start-Sleep -Seconds (Get-Random -Minimum 0 -Maximum 10)} -Timeout 5
    
    Send the number 0 through 50 to scriptblock.  For each, display the number and then sleep for 0 to 10 seconds.  Only execute 4 threads at a time.
    If a thread runs (sleeps) more than 5 seconds the thread is terminated (stopped).
.EXAMPLE
    $servers | Invoke-Parallel -Throttle 20 -Timeout 60 -sleeptimer 200 -verbose -scriptFile C:\query.ps1
 
    Run query.ps1 against each computer in $servers.  Run 20 threads at a time, timeout a thread if it takes longer than 60 minutes to run, give verbose output.
 
.FUNCTIONALITY
    PowerShell Language
 
.NOTES
    Improved by Peter Kriegel
    19.February.2014
    added synchronized Hashtable to support real Timeout behavior
    22.May.2014
    added 
 
Credit to Tome Tanasovski (ForEach-Parallel)
http://powertoe.wordpress.com/2012/05/03/foreach-parallel
 
Credit to rambling cookie monster (ForEach-Parallel, Invoke-Parallel)
http://gallery.technet.microsoft.com/Foreach-Parallel-Parallel-a8f3d22b
http://ramblingcookiemonster.wordpress.com/2012/11/25/parallel-powershell-part-ii/

Credit to Alexey Shuvalov (for Invoke-RunspaceJob)
http://gallery.technet.microsoft.com/scriptcenter/Invoke-RunspaceJob-8d487eab

Credits to Boe Prox for his several good Blog post about runspaces!
http://learn-powershell.net
 
#>
  [cmdletbinding()]
  param(
    [Parameter(Mandatory=$False,position=0,ParameterSetName='ScriptBlock')]
    [System.Management.Automation.ScriptBlock]$ScriptBlock,
    
    [Parameter(Mandatory=$False,ParameterSetName='ScriptFile')]
    [ValidateScript({Test-Path $_ -pathtype leaf})]
    $ScriptFile,
    
    [Parameter(Mandatory=$true,ValueFromPipeline=$True)]
    [PSObject[]]$InputObject,
    
    [ValidateRange(2,2147483647)]
    [int]$Throttle=5,
    
    [double]$SleepTimer = 200,
    
    [double]$Timeout = 0
  )
  BEGIN {

     $MyVars = Get-Variable -Scope 1
         
    #Build the text to be replaced in the $ScriptToRun HereString depending on the parameterset used
    switch ($PSCmdlet.ParameterSetName){
      'ScriptBlock' {$ExternalScript = $Scriptblock.ToString()}
      'ScriptFile' {$ExternalScript = $(Get-Content $scriptFile | Out-String)}
      Default {Write-Error ('Must provide ScriptBlock or ScriptFile'); Return}
    }
    
    $ScriptToRun = @'
    param(
        [String]$RunspaceID,
        [PSObject]$InputObject
    )
 
    # Write thread Informations to the Synchronized Hashtable
    $SynchronizedHash.$RunspaceID = @{
            StartTime = (Get-Date);
            Status = 'Running'
        }
    # Run the Scriptblock to do the work    
    & {<ReplacePlace>} $InputObject
 
'@.Replace('<ReplacePlace>',$ExternalScript)
    
    
    $ScriptBlock = $ExecutionContext.InvokeCommand.NewScriptBlock($ScriptToRun)
    
    #Define the initial sessionstate, create the runspacepool
    Write-Verbose "Creating runspace pool with $Throttle threads"

###############################################################################
# Begin to clone current Runspace >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    
    $sessionState = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()

    $sessionState.ApartmentState = [System.Threading.Thread]::CurrentThread.GetApartmentState()
    # $sessionState.Assamblies =  [appdomain]::currentdomain.getassemblies() ?????
    $sessionState.LanguageMode = $Host.runspace.LanguageMode

    # Add Variables from Current current runspace to the InitialSessionState

    # InitialSessionState comes with predefined Variables
    # '$','^','ConfirmPreference','DebugPreference','ErrorActionPreference','ErrorView','FormatEnumerationLimit','NestedPromptLevel','OutputEncoding','ProgressPreference','PSEmailServer','PSSessionApplicationName','PSSessionConfigurationName','PSSessionOption','StackTrace','VerbosePreference','WarningPreference','WhatIfPreference'
    # We remove this Variables and re-add them with Variable from Current Runspace
    $sessionState.Variables.Clear()

    # this Variables are added by Runspace.Open() Method and produce Stop errors if you add them twice
    $VorbiddenVars = @('?','args','ConsoleFileName','Error','ExecutionContext','false','HOME','Host','input','InputObject','MaximumAliasCount','MaximumDriveCount','MaximumErrorCount','MaximumFunctionCount','MaximumHistoryCount','MaximumVariableCount','MyInvocation','null','PID','PSBoundParameters','PSCommandPath','PSCulture','PSDefaultParameterValues','PSHOME','PSScriptRoot','PSUICulture','PSVersionTable','PWD','ShellId','SynchronizedHash','true')

   # Add Variables from Parent Scope (current runspace) into the InitialSessionState 
    ForEach($Var in $MyVars) {
       If($VorbiddenVars -notcontains $Var.Name) {
        Write-Verbose "Adding Variable $($Var.Name) to InitialSessionState"
         $sessionstate.Variables.Add((New-Object -TypeName System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList $Var.name,$Var.Value,$Var.description,$Var.options,$Var.attributes))
        }
    }

   # Add Functions from current runspace to the InitialSessionState
   ForEach($Function in (Get-ChildItem Function:)) {
      Write-Verbose "Adding Function $($Function.Name) to InitialSessionState"
      $sessionState.Commands.Add((New-Object -TypeName System.Management.Automation.Runspaces.SessionStateFunctionEntry -ArgumentList $Function.Name, $Function.Definition))
    }

    # Add Modules from current runspace to the InitialSessionState
    ForEach($ModulePath in (Get-Module | Select-Object -ExpandProperty ModuleBase)) {
      Write-Verbose "Adding Module $($ModulePath) to InitialSessionState"
      $sessionState.ImportPSModulesFromPath($ModulePath)
    }

    # Add Snapins from current runspace to the InitialSessionState
    ForEach($SnapinName in (Get-PSSnapin | Select-Object -ExpandProperty Name)) {
      # Skip Snapin 'Microsoft.PowerShell.Core' (that produced an Error is loaded by default)
      If(-not ($SnapinName -eq 'Microsoft.PowerShell.Core')) {
        $PSSnapInException =  $Null
        Write-Verbose "Adding PSSnapin $($SnapinName) to InitialSessionState"
        $sessionState.ImportPSSnapIn($SnapinName, [Ref]$PSSnapInException)
        If($PSSnapInException) {
          Throw $PSSnapInException
        }
      }
    }

    # create a thread save synchronized Hashtable to Share Data beween the Runspaces and to get the StartTime and the Status out of a thread
    $SynchronizedHash = [HashTable]::Synchronized(@{})
    # add synchronized Hashtable to the initialsession state, so all threads can use the same synchronized Hashtable
    $sessionState.Variables.Add((New-Object -TypeName System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'SynchronizedHash', $SynchronizedHash,''))
    
    # create the runspace pool with minimum 1 Thread and maximum Throttle number Threads, sessionState and current Host
    $pool = [Runspacefactory]::CreateRunspacePool(1, $Throttle, $sessionState, $host)
    $pool.ApartmentState = [System.Threading.Thread]::CurrentThread.GetApartmentState()
    $pool.open()

# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< END clonig of current Runspace #
###############################################################################
    
    #array to hold details on each thread
    $threads = [System.Collections.ArrayList]@()
  
    # Set Flag to run stuff only one time in the Process block
    # if it is $False the stuff is executed
    # if it is $True the stuff is not executed another time
    $Once = $False

    # $Variable to hold the Runspace ID
    $RunspaceID = 0

  }
  
  PROCESS {
    
    If(-not $Once) {
      # Test if function is runnig inside a Pipeline or if it is runnung as standalone call
      If($PSCmdlet.MyInvocation.PipelineLength -eq 1) {
        $InPipeline = $False
      } Else {
        $InPipeline = $True
      }  
    }
      
    ForEach($object in $inputObject){
      
      $RunspaceID = $RunspaceID++
      $SynchronizedHash.$RunspaceID = @{
                    StartTime = (Get-Date);
                    Status = 'NotStarted'
                }
      
      #For each pipeline object, create a new powershell instance, add to runspacepool
      $powershell = [powershell]::Create().AddScript($scriptblock).AddArgument($RunspaceID).AddArgument($object)
      $powershell.runspacepool=$pool
      
      Write-Verbose "Added $object to the runspacepool"
      
      # For performance reasons and to prevent lock of death and race conditions I do not use the $SynchronizedHash to hold the thread details.
      # On every write action on the $SynchronizedHash the thread is blocked!
      # Write actions on $SynchronizedHash must be smal and few as possible!
      # Here I add references to inputobject, PowerShell (runspace) instance, and handle to threads array
      # the relation ship between the $threads array and $SynchronizedHash is the $RunspaceID
      [void]$threads.Add((New-Object psobject -Property @{
                RunspaceID = $RunspaceID;
                Object = $object;
                instance = $powershell;
                handle = $powershell.begininvoke();
                startTime = $startTime
            }))
    }
    
    # Set Flag to run stuff only one time in the Process block
    # if it is $True the stuff is not executed another time
    $Once = $True
  }
  END {
    $notdone = $true
    
    #Loop through threads.
    while ($notdone) {
           
      for ($i=0; $i -lt $threads.count; $i++) {
      
        $thread = $threads[$i]

        if ($thread) {
          $ThreadStartTime = $SynchronizedHash.($thread.RunspaceID).StartTime
          
          #If thread is complete, dispose of it.
          if ($thread.handle.iscompleted) {
            Write-verbose "Closing thread for $($thread.Object)"
            $thread.instance.endinvoke($thread.handle)
            $thread.instance.dispose()
            $threads[$i] = $null
            $threads.RemoveAt($i)
            $SynchronizedHash.($thread.RunspaceID).Status = 'Completed'
          }
          
          #Thread exceeded maxruntime timeout threshold
          elseif( $Timeout -ne 0 -and ( (get-date) - $ThreadStartTime ).TotalSeconds -gt $Timeout ){
            Write-Error "Closing thread for $($thread.Object): Thread exceeded $Timeout seconds limit" -TargetObject $thread.inputObject
            $thread.instance.dispose()
            $threads[$i] = $null
            $threads.RemoveAt($i)
            $SynchronizedHash.($thread.RunspaceID).Status = 'Failed'
          }
          
          #Thread is running, loop again!
          else {
            $notdone = $true
          }
        }
      }
      
      If($threads.count -eq 0 ) {
        # no threads left set quit flag
        $notdone = $False
      } Else {
        #Sleep for specified time before looping again
        Start-Sleep -Milliseconds $sleepTimer
      }
    }
 
    $pool.close()
  }
}

# Function that must show up in the different Runspaces by the current Runspace clone mechanism
Function Test-Function {
    
    param (
      [String]$GiveMeStuff
    )
    
    "Message from Test-Function, I got Stuff: $GiveMeStuff"
}

# Variable that must show up in the different Runspaces by the current Runspace clone mechanism
$TestVariable = 'This is the Value of the TestVariable'

# Test call to Test developed Stuff
1..5 | Invoke-Parallel -Verbose -scriptblock {
  
  param($InputObject)
  
  # Call Function which is Imported into this Runspace by the current Runspace clone mechanism
  Test-Function $InputObject

  # Call Variable which is Imported into this Runspace by the current Runspace clone mechanism
  $TestVariable

  # List Imported Snapins
  Get-PSSnapin
  
  # List Imported Modules
  Get-Module
}

awaiting you comments
Greets Peter Kriegel

Be very careful with that approach. Adding Modules, Functions and Snap-ins to the new runspace is fine, but when you add the same object references for variables to another runspace, you’ll be creating potential race conditions and other threading bugs. Sharing objects between threads in this manner should be probably be a conscious decision by the caller, not something that just happens automatically.

Also, using Get-Variable -Scope 1 to obtain the caller’s variables will work so long as the caller is in the same script or script module as Invoke-Parallel (or if it’s been dot-sourced from a ps1 file to the current session), but won’t do what you intend if Invoke-Parallel has been imported from a psm1 file. You can always get at caller variables via the $PSCmdlet variable in an advanced function (ex: $PSCmdlet.GetVariableValue(‘ErrorActionPreference’) ), but I’m not aware of a public API for enumerating all variables this way.

Hi Dave!

Thank you foryour comments!

I know the bussines to do Thread save things, because i am a .NET developer too.

If you fumble arround with Threads there is allways a risk!
That is the reason because the powerShell Team has decided to use PowerShell Jobs as Processes not as Threads.
I hope with Windows 8 we get an PowerShell-Async Module (for the WinRT API).
If you know the pitfalls, you can use it savely. :wink:
The first pitfall to know is, that a clone is a Snapshot, a copy, of a current state.
So the changes outside the child runspaces are not reflected inside them.

I dont think the PowerShell Team offers a InitialSessionState.Variables.Add() Method to us, if there is a danger with Thread save calls.
The Variables ar not handed over explicitly by Reference [Ref].
I think the Variables are not Shared they are re-created (copied) in every Runspace. It is Expensive but thread save…
I did not investigated in this. Do you realy know for shure, that the Variables are shared in a not thread save manner?
Even the scopes of the Variables can get lost or go wrong.

Thank you for theInformation that the Get-Variable -Scope 1 not work in every circumstances.
That is even a Pitfall to know… :wink: I will put this on my Test List.

(can somebody correct the typo in the Topic headline?)

Thanks Peter

I dont think the PowerShell Team offers a InitialSessionState.Variables.Add() Method to us, if there is a danger with Thread save calls. The Variables ar not handed over explicitly by Reference [Ref]. I think the Variables are not Shared they are re-created (copied) in every Runspace. It is Expensive but thread save…. I did not investigated in this. Do you realy know for shure, that the Variables are shared in a not thread save manner?

I’m positive. I even wrote a Lock function for PowerShell once I realized it was possible to share object references across different threads / runspaces: Browse code samples | Microsoft Learn

That gallery post has some test code which demonstrates both the sharing of these objects and how to use the function to synchronize access to them.

Edit: I should probably clarify something. You’re passing around .NET object references and creating PowerShell variables with those references as their value. You can modify the PowerShell variables within a runspace without causing problems, but if you modify the .NET object that the variable referred to, it’s not thread-safe.

$someVariable = ‘New Value’ # is okay.
$someVariable.SomeProperty = ‘New Value’ # is not.

Hey Dave BIG BIG thanks to enlighten me!
This is very good Information!
You Lock Function is marvelous.

Ok! I withdraw the clone process of all Variables and put in the ability for the User to chose some Shared Variables over a Parameter.
DO you have some other suggestions how we can help the User to make this Function more simple to use?

Hi Peter,

Haven’t touched the code in a while, but that is exactly the issue I run into at work. Co-workers without much scripting or dev experience have trouble with the idea of each runspace being an independent environment.

I provide basic example scripts they can borrow and tweak… eventually they make a change big enough to break something (i.e. reference a variable created outside the runspace) and I try to explain why this won’t work. They nod their heads, modify the code, and rinse/repeat a few months down the road : )

If you’re comfortable sharing, would be interested to see the end result!

Cheers!

Kind of late to the party on runspace handlers, but I posted a possible solution for getting accurate start times to use for timing out stalled threads here: Accurate timeout for threads in runspace pool | The Powershell Workbench

Looking over the available scripts for handling runspaces, I note that none seem to have any provisions for returning the stream information from the runspaces.

A different approach to the problem might be to have a runspace handler that has provisions for collecting and returning that data to the user for diagnostics, so they can have a better idea what’s going on in those runspaces. With that they might be able to get their runspace environments set up with the minimum amount of stuff to get their script to run without throwing the kitchen sink (and the overhead that goes with it) at every runspace they open.

Brilliant, I had the same desire yesterday, but I didn’t find this thread before @psCookieMonster replied.
So I re-added it myself, here is code https://gist.github.com/vors/3c3cc7e08f37b1d824a1
Checkout diff https://gist.github.com/vors/3c3cc7e08f37b1d824a1/revisions

For me the whole idea of shared state is a way to enable simple late integration. Make it just work without rewriting a lot of code.

Scenario:
I wrote a script that provision remote VM and execute pester tests against it.
Now I want to run it against few different VMs (with various OS versions).
Provisioning take 15 minutes and most of this time I just wait.
I want to make provisioning parallel with minimal changes.

It’s essentially the same problem as here :
[blockquote]Co-workers without much scripting or dev experience have trouble with the idea of each runspace being an independent environment.[/blockquote]

One of the goals is to enable positive first-time experience for developers.
Literally, the experience that I want: replace “… | % {…}” by “… | Invoke-Parallel -ImportVariables -ImportModules -Script {…}”.

You may still run into some headaches if Invoke-Parallel is being imported as part of a script module (Get-Variable will catch the global scope, but not the caller’s scope stack below that. As long as the caller of Invoke-Parallel is the interactive console and not a script, that won’t matter.)

Painful visiting old and sloppy code! Oh well. Sergei, awesome ideas, incorporated the idea and a piece of your implementation, updated on github and technet. Thank you!