swarming package

Simulation package for interacting particle systems

Calculates a system of ordinary differential equations in terms of positions X and velocities V, subject to force depending on all X and V, using an explicit Runge-Kutta scheme of second order.

Furthermore, there is functionality to record system state and system visualization. Initial conditions, force terms (right hand side, RHS), parameters for RHS, can be replaced for every time step, to allow for change of parameters during a running simulation, e.g. for an interactive app.

swarming.rand(d0, d1, ..., dn)

Random values in a given shape.

Note

This is a convenience function for users porting code from Matlab, and wraps random_sample. That function takes a tuple to specify the size of the output, which is consistent with other NumPy functions like numpy.zeros and numpy.ones.

Create an array of the given shape and populate it with random samples from a uniform distribution over [0, 1).

Parameters
d0, d1, …, dnint, optional

The dimensions of the returned array, must be non-negative. If no argument is given a single Python float is returned.

Returns
outndarray, shape (d0, d1, ..., dn)

Random values.

See also

random

Examples

>>> np.random.rand(3,2)
array([[ 0.14022471,  0.96360618],  #random
       [ 0.37601032,  0.25528411],  #random
       [ 0.49313049,  0.94909878]]) #random
swarming.randn(d0, d1, ..., dn)

Return a sample (or samples) from the “standard normal” distribution.

Note

This is a convenience function for users porting code from Matlab, and wraps standard_normal. That function takes a tuple to specify the size of the output, which is consistent with other NumPy functions like numpy.zeros and numpy.ones.

Note

New code should use the standard_normal method of a default_rng() instance instead; please see the random-quick-start.

If positive int_like arguments are provided, randn generates an array of shape (d0, d1, ..., dn), filled with random floats sampled from a univariate “normal” (Gaussian) distribution of mean 0 and variance 1. A single float randomly sampled from the distribution is returned if no argument is provided.

Parameters
d0, d1, …, dnint, optional

The dimensions of the returned array, must be non-negative. If no argument is given a single Python float is returned.

Returns
Zndarray or float

A (d0, d1, ..., dn)-shaped array of floating-point samples from the standard normal distribution, or a single such float if no parameters were supplied.

See also

standard_normal

Similar, but takes a tuple as its argument.

normal

Also accepts mu and sigma arguments.

Generator.standard_normal

which should be used for new code.

Notes

For random samples from \(N(\mu, \sigma^2)\), use:

sigma * np.random.randn(...) + mu

Examples

>>> np.random.randn()
2.1923875335537315  # random

Two-by-four array of samples from N(3, 6.25):

>>> 3 + 2.5 * np.random.randn(2, 4)
array([[-4.49401501,  4.00950034, -1.81814867,  7.29718677],   # random
       [ 0.39924804,  4.68456316,  4.99394529,  4.84057254]])  # random

Submodules

swarming.doe module

Some helpers to conduct parameter studies for a given RHS and to evaluate them.

class swarming.doe.Parameters(param_list=[('Ca', 20, 10, 30), ('Cr', 50, 40, 60), ('la', 100, 40, 300), ('lr', 2, 1, 10), ('noise', 0.0, 0.0, 2.0)])

Bases: object

Creates lating hypercube sampling of given parameters in predefined ranges, using pyDOE.

Parameters
param_list: list of tuples

List of 4-tuples, every tuple contains (parameter name, reference value, lower bound, upper bound), default value is the content of swarming.DEFAULT_PARAMS.

Attributes
params: pandas.DataFrame

Dataframe containing the same information as given in parameter param_list.

n_params: int

Number of parameters

names: list of str

List of parameter names, to be given to swarming.get_density_plots().

Methods

sampling([n_samples])

Latin hypercube sampling for given number of parameters and number of samples.

sampling_dicts([n_samples])

Converts sampling to defined parameter range for each parameter.

property n_params
property names
sampling(n_samples=100)

Latin hypercube sampling for given number of parameters and number of samples.

Parameters
n_samples: int

number of samples to generate, default 100

Returns
numpy.array

n_samples by n_params array with sampled values between 0, 1

sampling_dicts(n_samples=100)

Converts sampling to defined parameter range for each parameter.

Parameters
n_samples: int

number of samples to generate, default 100

Returns
list of dicts

every dictionary has as keys the parameter names, values are the sampled values in the defined parameter ranges.

swarming.doe.get_density_plots(df, ncols=2, param_names=None, plot_width=300, plot_height=270, palette='Viridis256', size=1)

For a dataframe containing particle positions, calculates “density” by binning them in a 2-d historgram generated with bokeh.plotting.figure.hexbin(). Furthermore display the parameters belonging to positions on top of the density plot.

Parameters
df: pandas.DataFrame

columns should contain the parameter names given in parameter param_names, as well as X1 and X2, containing in each dataframe row x and y positions of all particles.

ncols: int

number of columns to order figures into

param_names: list of str

list of parameter names to extract from df columns to display.

plot_width: int

width of each figure in pixels.

plot_height: int

height of each figure in pixels.

palette: str or palette from ``bokeh.palettes``

color palette to indicate file counts in density plot

size: float

max distance of a point to center of hexagon, to be counted as contained in that hexagon (hexplot)

Returns
bokeh.layouts.grid

grid of figures with density plots and parameter values, to be given to bokeh.io.show

swarming.model module

Module containing most of the computation logic and the visualizations, using Bokeh and ColumnDataSource to allow for updating figures displaying the system.

class swarming.model.InitialCondition(condition='circular', n_particles=100, x_range=(- 40, 40), y_range=(- 40, 40), rhs=None)

Bases: swarming.model.Model

Subclass to prepare a system of interacting particles in a defined initial state or to reset it to one. Is based on swarming.model.Model.

Parameters
condition: str

name of the initial condition to create. Must be a property name of this class

n_particles: int

number of particles to generate

x_range: tuple

x range where to distribute particles

y_range: tuple

y range where to distribute particles

rhs: function

function reference for right hand side for system, see swarming.model.Model.

Attributes
n: int

given number of particles. Is effective when setting initial condition

xr: tuple

x range where to distribute particles

yr: tuple

y range where to distribute particles

distx: float

length of x range

disty: float

length of y range

circular: tuple of np.array

positions and velocities of a donut-shaped distribution of particles

square: tuple of np.array

positions and velocities of a square-shaped distribution of particles, all velocities aligned into one direction

randomspeed: tuple of np.array

like “square” with random velocity vectors

nospeed: tuple of np.array

like “square”, with zero velocity vectors

Methods

add_history()

Appends current simulation time, positions, velocities and mean positions and velocities into system history.

calc_chg_from_history([lookback])

For the recorded mean positions and velocities, calculate modulus of discrete change rates over time.

cds_dict()

Creates a data dictionary to insert into ColumnDataSource for current system state.

cds_static(X, V[, vscale])

For given positions and velocities, create a dictionary that can be inserted into a bokeh ColumnDataSource, providing data for plotting particle positions, as well as velocity vectors for a kind of quiver plot.

evolve(time_step, n_steps[, snapshot])

Get parameters for right hand side, get a right hand side for this parameter set and calculate a certain amount of time steps with it.

plot([plot_width, plot_height, plot_mean])

Plot all particles with velocity vectors from current system state.

plot_density([plot_width, plot_height, …])

Plot particle density from current system state.

plot_initial([ncols, attrs, plot_width, …])

Plot all registered initial conditions as an example.

plot_trajectory([plot_width, plot_height])

Plots the trajectory of the mean position and mean velocity of the whole system over time, recorded so far-

record_for_time(max_time, time_step, …)

Wrapper for swarming.model.Model.evolve() to run batches of timesteps, until a certain maximum simulation time is reached, recording intermediate states into system history.

record_until(max_steps, time_step, n_steps)

Wrapper for swarming.model.Model.evolve() to run batches of timesteps, until a maximum number of time steps has been reached, recording intermediate states into system history.

set_initial([condition])

Prepare system in defined condition.

step(time_step[, noise])

Computes one time step with explicit Runge-Kutta method of 2nd order.

update_cds()

Replaces the data dict of ColumnDataSource self.cds with current velocities and positions.

property circular
property distx
property disty
property nospeed
plot_initial(ncols=3, attrs=None, plot_width=300, plot_height=300)

Plot all registered initial conditions as an example. Does not change system state.

Parameters
ncols: int

number of columsn for grid plot of initial conditions

attrs: list of str

list of names of initial conditions to plot. All conditions are lotted by default.

plot_width: int

plot width for each subfigure in pixels

plot_height: int

plot height for each subfigure in pixels

Returns
bokeh.layouts.grid

grid of figures containing selected initial conditions.

property randomspeed
set_initial(condition='circular')

Prepare system in defined condition. Resets particle positions and velocities imminently.

Returns
swarming.model.Model

reference to class instance to allow for command chaining.

property square
class swarming.model.Model(X, V, rhs=None)

Bases: object

Base class for simulating a system of potentially interacting particles, as a system of ordinary differential equations in terms of particle positions X and velocities V. The system behavior is mainly determined by the choice of the right hand side, which can be parameterized with coefficients, and which is applied to positions and velocities in every time step. There is further functionality to plot current model states with bokeh, and to record the complete trajectories of particles in the phase space.

Parameters
X: numpy.array

n by 2 array containing in each “row” the 2d positions of one of the n particles to simulate. Can be an initial distribution or an intermediate state. See also swarming.model.InitialCondition

V: numpy.array

n by 2 array containing in each “row” the 2d velocity vector of one of the n particles to simulate.

rhs: function

function returning a reference to a parameterized right hand side to be evaluated in every timestep for positions and velocities.

Attributes
X: numpy.array

n by 2 array containing in each “row” the current 2d positions of one of the n particles in the system

V: numpy.array

n by 2 array containing in each “row” the current 2d velocity vectors of one of the n particles in the system

time: float

current simulation time until which the model has been run since instantiation.

rhs: function

function returning a reference to a parameterized right hand side to be evaluated in every timestep for positions and velocities. Can be changed during system evolution to modify parameters

apply_rhs: function

the return of rhs called with a set of parameters, to be applied to particles and velocities in the simulation

VSCALE: float

factor by which velocity vecotrs are stretched for visualization only.

mean_x: numpy.array

2 array always containing the mean position of all particles at current timestamp.

mean_v: numpy.array

2 array always containing the mean velocity of all particles at current timestamp.

cds: bokeh.ColumnDataSource

reference to column data source that is used for plotting system state. Can be updated with current simulated data by calling swarming.model.Model.update_cds(). Helpful for updating bokeh plots that use this data source, e.g. in an interactive Bokeh app.

history: pandas.DataFrame

contains particle positions, velocities, mean position and velocity of all particles at certain time steps, that are determined in swarming.model.record_for_time() or swarming.model.record_until().

n_particles: int

amount of particles, i.e. number of rows of X: self.X.shape[0]

Methods

add_history()

Appends current simulation time, positions, velocities and mean positions and velocities into system history.

calc_chg_from_history([lookback])

For the recorded mean positions and velocities, calculate modulus of discrete change rates over time.

cds_dict()

Creates a data dictionary to insert into ColumnDataSource for current system state.

cds_static(X, V[, vscale])

For given positions and velocities, create a dictionary that can be inserted into a bokeh ColumnDataSource, providing data for plotting particle positions, as well as velocity vectors for a kind of quiver plot.

evolve(time_step, n_steps[, snapshot])

Get parameters for right hand side, get a right hand side for this parameter set and calculate a certain amount of time steps with it.

plot([plot_width, plot_height, plot_mean])

Plot all particles with velocity vectors from current system state.

plot_density([plot_width, plot_height, …])

Plot particle density from current system state.

plot_trajectory([plot_width, plot_height])

Plots the trajectory of the mean position and mean velocity of the whole system over time, recorded so far-

record_for_time(max_time, time_step, …)

Wrapper for swarming.model.Model.evolve() to run batches of timesteps, until a certain maximum simulation time is reached, recording intermediate states into system history.

record_until(max_steps, time_step, n_steps)

Wrapper for swarming.model.Model.evolve() to run batches of timesteps, until a maximum number of time steps has been reached, recording intermediate states into system history.

step(time_step[, noise])

Computes one time step with explicit Runge-Kutta method of 2nd order.

update_cds()

Replaces the data dict of ColumnDataSource self.cds with current velocities and positions.

add_history()

Appends current simulation time, positions, velocities and mean positions and velocities into system history.

calc_chg_from_history(lookback=None)

For the recorded mean positions and velocities, calculate modulus of discrete change rates over time. Corresponds to modulus of discrete first derivatives of mean_x, mean_v

Parameters
lookback: int

Maximum amount of time points from history to take into account. If not given, use all.

Returns
chgx: numpy.array

lookback sized array of mean position change rates

chgv: numpy.array

lookback sized array of mean velocity change rates

time: numpy.array

the timepoints for which change rates where calculated

property cds
cds_dict()

Creates a data dictionary to insert into ColumnDataSource for current system state. See also swarming.model.Model.cds_static().

Returns
dict

containing data for ColumnDataSource

static cds_static(X, V, vscale=1.0)

For given positions and velocities, create a dictionary that can be inserted into a bokeh ColumnDataSource, providing data for plotting particle positions, as well as velocity vectors for a kind of quiver plot.

Parameters
X: numpy.array

n by 2 array containing in each “row” the current 2d positions of one of the n particles in the system

V: numpy.array

n by 2 array containing in each “row” the current 2d velocity vectors of one of the n particles in the system

vscale: float

factor by which velocity vectors will be stretched in visualization, for better visibility.

Returns
dict

containing data for ColumnDataSource, with fields x1, x2 (positions), x1s, x2s (start and end coordinates of velocity vectors).

evolve(time_step, n_steps, snapshot=False, **kwargs)

Get parameters for right hand side, get a right hand side for this parameter set and calculate a certain amount of time steps with it.

Parameters
time_step: float

time step size to compute

n_steps: int

number of time steps to calculate

snapshot: bool

wether or not to record the particle positions and velocities into the system history after calculating for n_steps time steps.

kwargs: dict

parameters for right hand side function

Returns
swarming.model.Model

reference to class instance to allow for command chaining.

property history
property mean_v
property mean_x
property n_particles
plot(plot_width=500, plot_height=500, plot_mean=False)

Plot all particles with velocity vectors from current system state. Uses self.cds.

Parameters
plot_width: int

plot width in pixels

plot_height: int

plot height in pixels

plot_mean: bool

mark the systems mean position and velocity vector or not.

Returns
bokeh.plotting.figure

to be displayed with show or inserted into a layout.

plot_density(plot_width=500, plot_height=400, palette='Viridis256', size=1)

Plot particle density from current system state.

Parameters
plot_width: int

plot width in pixels

plot_height: int

plot height in pixels

palette: str or bokeh palette

color palette or name of color palette to use

size: float

determines the size of hexagonal histogram cells in plot coordinates.

Returns
bokeh.plotting.figure

to be displayed with show or inserted into a layout.

plot_trajectory(plot_width=300, plot_height=300)

Plots the trajectory of the mean position and mean velocity of the whole system over time, recorded so far-

Parameters
plot_width: int

plot width in pixels

plot_height: int

plot height in pixels

Returns
bokeh.plotting.figure

to be displayed with show or inserted into a layout.

record_for_time(max_time, time_step, n_steps, **kwargs)

Wrapper for swarming.model.Model.evolve() to run batches of timesteps, until a certain maximum simulation time is reached, recording intermediate states into system history.

Parameters
max_time: float

maximum simulation time for which to compute. Adds to self.time, so if self.time = 10 and max_time = 20 is given, self.time will be 30 in the end.

time_step: float

time step size to compute

n_steps: int

number of time steps to calculate for each recorded system state

kwargs: dict

parameters for right hand side function

Returns
swarming.model.Model

reference to class instance to allow for command chaining.

record_until(max_steps, time_step, n_steps, lookback=None, tolx=0.01, tolv=0.01, **kwargs)

Wrapper for swarming.model.Model.evolve() to run batches of timesteps, until a maximum number of time steps has been reached, recording intermediate states into system history.

Parameters
max_steps: int

maximum simulation steps to perform for one call of this function

time_step: float

time step size to compute

n_steps: int

number of time steps to calculate for each recorded system state

lookback: int

amount of previous time steps to take into account for not yet implemented stopping criterion

tolx: float

tolerance in position to take into account for not yet implemented stopping criterion

tolv: float

tolerance in velocity to take into account for not yet implemented stopping criterion

kwargs: dict

parameters for right hand side function

Returns
swarming.model.Model

reference to class instance to allow for command chaining.

step(time_step, noise=0.0)

Computes one time step with explicit Runge-Kutta method of 2nd order. By giving a noise parameter larger zero, this becomes an Ito-scheme for stochastic differential equations. As a user, normally you call either the swarming.model.Model.evolve(), swarming.model.Model.record_until() or swarming.model.Model.record_for_time() to advance the system by a defined number of time steps.

Parameters
time_step: float

time step size to compute

noise: float

coefficient to apply an Ito diffusion to the system and turn it into a system of stochastic differential equations

Returns
swarming.model.Model

reference to class instance to allow for command chaining.

update_cds()

Replaces the data dict of ColumnDataSource self.cds with current velocities and positions. If self.cds is attached as source to any bokeh figure, it will update, too.

Returns
swarming.model.Model

reference to class instance to allow for command chaining.

swarming.model.force_of_distance(r, Ca=20.0, Cr=50.0, la=100, lr=2)

Force function for swarm behavior for illustration purposes

Parameters
r: float, array

vector of distances for which to compute forces

Ca: float

attraction force

Cr: float

repulsion force

la: float

attraction range

lr: float

repulsion range

Returns
numpy.array

array of forces the same size as r

swarming.model.harmonic_oscillator_rhs(const=0.01, damping=0.1, **kwargs)

Right hand side for particle system, modelling harmonic oscillator behavior.

Parameters
const: float

coefficient

damping: float

damping coefficient

Returns
function

the parameterized function to act as a right hand side for the system

swarming.model.rand(d0, d1, ..., dn)

Random values in a given shape.

Note

This is a convenience function for users porting code from Matlab, and wraps random_sample. That function takes a tuple to specify the size of the output, which is consistent with other NumPy functions like numpy.zeros and numpy.ones.

Create an array of the given shape and populate it with random samples from a uniform distribution over [0, 1).

Parameters
d0, d1, …, dnint, optional

The dimensions of the returned array, must be non-negative. If no argument is given a single Python float is returned.

Returns
outndarray, shape (d0, d1, ..., dn)

Random values.

See also

random

Examples

>>> np.random.rand(3,2)
array([[ 0.14022471,  0.96360618],  #random
       [ 0.37601032,  0.25528411],  #random
       [ 0.49313049,  0.94909878]]) #random
swarming.model.randn(d0, d1, ..., dn)

Return a sample (or samples) from the “standard normal” distribution.

Note

This is a convenience function for users porting code from Matlab, and wraps standard_normal. That function takes a tuple to specify the size of the output, which is consistent with other NumPy functions like numpy.zeros and numpy.ones.

Note

New code should use the standard_normal method of a default_rng() instance instead; please see the random-quick-start.

If positive int_like arguments are provided, randn generates an array of shape (d0, d1, ..., dn), filled with random floats sampled from a univariate “normal” (Gaussian) distribution of mean 0 and variance 1. A single float randomly sampled from the distribution is returned if no argument is provided.

Parameters
d0, d1, …, dnint, optional

The dimensions of the returned array, must be non-negative. If no argument is given a single Python float is returned.

Returns
Zndarray or float

A (d0, d1, ..., dn)-shaped array of floating-point samples from the standard normal distribution, or a single such float if no parameters were supplied.

See also

standard_normal

Similar, but takes a tuple as its argument.

normal

Also accepts mu and sigma arguments.

Generator.standard_normal

which should be used for new code.

Notes

For random samples from \(N(\mu, \sigma^2)\), use:

sigma * np.random.randn(...) + mu

Examples

>>> np.random.randn()
2.1923875335537315  # random

Two-by-four array of samples from N(3, 6.25):

>>> 3 + 2.5 * np.random.randn(2, 4)
array([[-4.49401501,  4.00950034, -1.81814867,  7.29718677],   # random
       [ 0.39924804,  4.68456316,  4.99394529,  4.84057254]])  # random
swarming.model.rep_attr_rhs(alpha=0.07, beta=0.05, Ca=20.0, Cr=50.0, la=100, lr=2, noise=0.0, **kwargs)

Right hand side for particle system, modelling swarming behavior by balancing a repulsive and attractive force.

Parameters
alpha: float

propulsion coefficient

beta: float

breaking/damping coefficient

Ca: float

attraction force

Cr: float

repulsion force

la: float

attraction range

lr: float

repulsion range

noise: float

coefficient for random noise added to the motion of the system

Returns
function

the parameterized function to act as a right hand side for the system