# -*- coding: utf-8 -*-
"""
Core of BMS. All content of this file is imported by bms, and is therefore in bms
This file defines the base of BMS.
"""
import numpy as np
#import numpy.random
import matplotlib.pyplot as plt
#import math
import networkx as nx
import dill
from scipy.optimize import fsolve, root, minimize
#import cma
[docs]class Variable:
""" Defines a variable
:param names: Defines full name and short name.
If names is a string the two names will be identical
otherwise names should be a tuple of strings (full_name,short_name)
:param hidden: inner variable to hide in plots if true
"""
def __init__(self, names='variable', initial_values=[0], hidden=False):
if type(names) == str:
self.name = names
self.short_name = names
else:
try:
self.short_name = names[1]
self.name = names[0]
except:
raise TypeError
self.initial_values = initial_values
self._values = np.array([])
self.max_order = 0
self.hidden = hidden
def _InitValues(self, ns, ts, max_order):
self.max_order = max_order
self._values = self.initial_values[0]*np.ones(ns+max_order+1)
self._ForwardValues()
def _ForwardValues(self):
pass
def _get_values(self):
return self._values[self.max_order:]
values = property(_get_values)
[docs]class Signal(Variable):
""" Abstract class of signal """
def __init__(self, names):
if type(names) == str:
self.name = names
self.short_name = names
else:
try:
self.short_name = names[1]
self.name = names[0]
except:
raise TypeError
self._values = np.array([])
self.max_order = 0
self.hidden = False
def _InitValues(self, ns, ts, max_order):
self.max_order = max_order
self._values = np.zeros(ns+max_order+1)
for i in range(ns+1):
self._values[i+max_order] = self.function(i*ts)
self._ForwardValues()
self.initial_values = [self._values[0]]
def _ForwardValues(self):
"""
Implementation for problems with derivative conditions on variables
"""
pass
[docs]class Block:
""" Abstract class of block: this class should not be instanciate directly
"""
def __init__(self, inputs, outputs, max_input_order, max_output_order):
self.inputs = []
self.outputs = []
self.n_inputs = len(inputs)
self.n_outputs = len(outputs)
for variable in inputs:
self._AddInput(variable)
for variable in outputs:
self._AddOutput(variable)
# self.input_orders=
# self.output_orders=output_orders
self.max_input_order = max_input_order
self.max_output_order = max_output_order
self.max_order = max(self.max_input_order, self.max_output_order)
def _AddInput(self, variable):
"""
Add one more variable as an input of the block
:param variable: variable (or signal as it is also a variable)
"""
if isinstance(variable, Variable):
self.inputs.append(variable)
else:
print('Error: ', variable.name, variable,
' given is not a variable')
raise TypeError
def _AddOutput(self, variable):
"""
Add one more variable as an output of the block
:param variable: variable (or signal as it is also a variable)
"""
if isinstance(variable, Variable):
self.outputs.append(variable)
else:
print(variable)
raise TypeError
[docs] def OutputValues(self, it, nsteps=None):
# Provides values in inputs values for computing at iteration it
if nsteps == None:
nsteps = self.max_output_order
O = np.zeros((self.n_outputs, nsteps))
for iv, variable in enumerate(self.outputs):
O[iv, :] = variable._values[it-nsteps:it]
return O
[docs] def Solve(self, it, ts):
step_outputs = self.Evaluate(it, ts)
for i, step_output in enumerate(step_outputs):
self.outputs[i]._values[it] = step_output
[docs]class ModelError(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return 'Model Error: '+self.message
[docs]class DynamicSystem:
"""
Defines a dynamic system that can simulate itself
:param te: time of simulation's end
:param ns: number of steps
:param blocks: (optional) list of blocks defining the model
"""
def __init__(self, te, ns, blocks=[]):
self.te = te
self.ns = ns
self.ts = self.te/float(self.ns) # time step
self.t = np.linspace(0, self.te, num=ns+1) # Time vector
self.blocks = []
self.variables = []
self.signals = []
self.max_order = 0
for block in blocks:
self.AddBlock(block)
self._utd_graph = False # True if graph is up-to-date
[docs] def AddBlock(self, block):
"""
Add the given block to the model and also its input/output variables
"""
if isinstance(block, Block):
self.blocks.append(block)
self.max_order = max(self.max_order, block.max_input_order-1)
self.max_order = max(self.max_order, block.max_output_order)
for variable in block.inputs+block.outputs:
self._AddVariable(variable)
else:
print(block)
raise TypeError
self._utd_graph = False
def _AddVariable(self, variable):
"""
Add a variable to the model. Should not be used by end-user
"""
if isinstance(variable, Signal):
if not variable in self.signals:
self.signals.append(variable)
elif isinstance(variable, Variable):
if not variable in self.variables:
self.variables.append(variable)
else:
raise TypeError
self._utd_graph = False
def _get_Graph(self):
if not self._utd_graph:
# Generate graph
self._graph = nx.DiGraph()
for variable in self.variables:
self._graph.add_node(variable, bipartite=0)
for block in self.blocks:
self._graph.add_node(block, bipartite=1)
for variable in block.inputs:
self._graph.add_edge(variable, block)
for variable in block.outputs:
self._graph.add_edge(block, variable)
self._utd_graph = True
return self._graph
graph = property(_get_Graph)
def _ResolutionOrder(self, variables_to_solve):
"""
return a list of lists of tuples (block,output,ndof) to be solved
"""
# Gp=nx.DiGraph()
#
# for i in range(nvar):
# Gp.add_node('v'+str(i),bipartite=0)
#
# for i in range(neq):
# Gp.add_node('e'+str(i),bipartite=1)
# for j in range(nvar):
# if Mo[i,j]==1:
# Gp.add_edge('e'+str(i),'v'+str(j))
Gp = nx.DiGraph()
for variable in self.variables:
Gp.add_node(variable, bipartite=0)
for block in self.blocks:
for iov, output_variable in enumerate(block.outputs):
Gp.add_node((block, iov), bipartite=1)
Gp.add_edge((block, iov), output_variable)
Gp.add_edge(output_variable, (block, iov))
for input_variable in block.inputs:
if not isinstance(input_variable, Signal):
Gp.add_edge(input_variable, (block, iov))
# for n1,n2 in M.items():
# Gp.add_edge(n1,n2)
sinks = []
sources = []
for node in Gp.nodes():
if Gp.out_degree(node) == 0:
sinks.append(node)
elif Gp.in_degree(node) == 0:
sources.append(node)
G2 = sources[:]
for node in sources:
for node2 in nx.descendants(Gp, node):
if node2 not in G2:
G2.append(node2)
if G2 != []:
print(G2)
raise ModelError('Overconstrained variables')
G3 = sinks[:]
for node in sinks:
for node2 in nx.ancestors(Gp, node):
if node2 not in G3:
G3.append(node2)
if G3 != []:
raise ModelError('Underconstrained variables')
# vars_resolvables=[]
# for var in vars_resoudre:
# if not 'v'+str(var) in G2+G3:
# vars_resolvables.append(var)
# G1=Gp.copy()
# G1.remove_nodes_from(G2+G3)
#
# M1=nx.bipartite.maximum_matching(G1)
# G1p=nx.DiGraph()
#
# G1p.add_nodes_from(G1.nodes())
# for e in G1.edges():
# # equation vers variable
# if e[0][0]=='v':
# G1p.add_edge(e[0],e[1])
# else:
# G1p.add_edge(e[1],e[0])
# # print(len(M))
# for n1,n2 in M1.items():
# # print(n1,n2)
# if n1[0]=='e':
# G1p.add_edge(n1,n2)
# else:
# G1p.add_edge(n2,n1)
scc = list(nx.strongly_connected_components(Gp))
# pos=nx.spring_layout(G1p)
# plt.figure()
# nx.draw(G1p,pos)
# nx.draw_networkx_labels(G1p,pos)
# print(scc)
if scc != []:
C = nx.condensation(Gp, scc)
isc_vars = []
for isc, sc in enumerate(scc):
for var in variables_to_solve:
if var in sc:
isc_vars.append(isc)
break
ancestors_vars = isc_vars[:]
for isc_var in isc_vars:
for ancetre in nx.ancestors(C, isc_var):
if ancetre not in ancestors_vars:
ancestors_vars.append(ancetre)
order_sc = [sc for sc in nx.topological_sort(
C) if sc in ancestors_vars]
order_ev = []
for isc in order_sc:
# liste d'équations et de variables triées pour être séparées
evs = list(scc[isc])
# print(evs)
# levs=int(len(evs)/2)
eqs = []
var = []
for element in evs:
if type(element) == tuple:
eqs.append(element)
else:
var.append(element)
order_ev.append((len(eqs), eqs, var))
return order_ev
raise ModelError
[docs] def Simulate(self, variables_to_solve=None):
if variables_to_solve == None:
variables_to_solve = [
variable for variable in self.variables if not variable.hidden]
order = self._ResolutionOrder(variables_to_solve)
# Initialisation of variables values
for variable in self.variables+self.signals:
variable._InitValues(self.ns, self.ts, self.max_order)
# ==============================================================================
# Enhancement to do: defining functions out of loop (copy args)s
# ==============================================================================
# print(order)
residue = []
for it, t in enumerate(self.t[1:]):
for neqs, equations, variables in order:
if neqs == 1:
equations[0][0].Solve(it+self.max_order+1, self.ts)
else:
# x0=np.zeros(neqs)
x0 = [equations[i][0].outputs[equations[i][1]]._values[it +
self.max_order] for i in range(len(equations))]
# print('===========')
def r(x, equations=equations[:]):
# Writing variables values proposed by optimizer
for i, xi in enumerate(x):
equations[i][0].outputs[equations[i][1]
]._values[it+self.max_order+1] = xi
# Computing regrets
r = []
# s=0
for ieq, (block, neq) in enumerate(equations):
# print(block,it)
# print(block.Evaluate(it+self.max_order+1,self.ts).shape)
# print(block.Evaluate(it+self.max_order+1,self.ts),block)
r.append(x[ieq]-block.Evaluate(it +
self.max_order+1, self.ts)[neq])
# print(block)
# print('xproposed:',x[ieq])
# print('block eval',block.Evaluate(it+self.max_order+1,self.ts)[neq])
# print('value', x[ieq]-block.Evaluate(it+self.max_order+1,self.ts)[neq])
# s+=abs(x[ieq]-block.Evaluate(it+self.max_order+1,self.ts)[neq])
# print(x[ieq],block.Evaluate(it+self.max_order+1,self.ts)[neq])
return r
def f(x, equations=equations[:]):
# Writing variables values proposed by optimizer
for i, xi in enumerate(x):
equations[i][0].outputs[equations[i][1]
]._values[it+self.max_order+1] = xi
# Computing regrets
# r=[]
s = 0
for ieq, (block, neq) in enumerate(equations):
# print(block,it)
# print(block.Evaluate(it+self.max_order+1,self.ts).shape)
# print(block.Evaluate(it+self.max_order+1,self.ts),block)
# r.append(x[ieq]-block.Evaluate(it+self.max_order+1,self.ts)[neq])
# print(block)
s += abs(x[ieq]-block.Evaluate(it +
self.max_order+1, self.ts)[neq])
# print(x[ieq],block.Evaluate(it+self.max_order+1,self.ts)[neq])
# return r
# print(s)
return s
x, d, i, m = fsolve(r, x0, full_output=True)
# res=root(f,x0,method='anderson')
# x=res.x
# res=minimize(f,x0,method='powell')
# if res.fun>1e-3:
# x0=[equations[i][0].outputs[equations[i][1]]._values[it+self.max_order] for i in range(len(equations))]
# x0+=np.random.random(len(equations))
# print('restart')
# res=minimize(f,x0,method='powell')
#
# residue.append(f(res.x))
# print(r(x),i)
# print(f(res.x),res.fun)
# f(x)
# print(r)
# if i!=1:
# print(equations)
# print(i,r(x))
# options={'tolfun':1e-3,'verbose':-9,'ftarget':1e-3}
# res=cma.fmin(f,x0,1,options=options)
# print(f(res[0]),r(res[0]))
# print(equations)
#
# print(m)
# if res.fun>1e-3:
# print('fail',res.fun)
# options={'tolfun':1e-3,'verbose':-9}
# res=cma.fmin(f,x0,1,options=options)
# else:
# print('ok')
# return residue
[docs] def VariablesValues(self, variables, t):
"""
Returns the value of given variables at time t.
Linear interpolation is performed between two time steps.
:param variables: one variable or a list of variables
:param t: time of evaluation
"""
# TODO: put interpolation in variables
if (t < self.te) | (t > 0):
i = t//self.ts # time step
ti = self.ts*i
if type(variables) == list:
values = []
for variable in variables:
# interpolation
values.append(
variables.values[i]*((ti-t)/self.ts+1)+variables.values[i+1]*(t-ti)/self.ts)
return values
else:
# interpolation
i = int(i)# Fixing bug #31
return variables.values[i]*((ti-t)/self.ts+1)+variables.values[i+1]*(t-ti)/self.ts
else:
raise ValueError
[docs] def PlotVariables(self, subplots_variables=None):
if subplots_variables == None:
subplots_variables = [self.signals+self.variables]
subplots_variables = [
[variable for variable in self.signals+self.variables if not variable.hidden]]
# plt.figure()
fig, axs = plt.subplots(len(subplots_variables), sharex=True)
if len(subplots_variables) == 1:
axs = [axs]
for isub, subplot in enumerate(subplots_variables):
legend = []
for variable in subplot:
axs[isub].plot(self.t, variable.values)
legend.append(variable.name)
axs[isub].legend(legend, loc='best')
axs[isub].margins(0.08)
axs[isub].grid()
plt.xlabel('Time')
plt.show()
[docs] def DrawModel(self):
from .interface import ModelDrawer
ModelDrawer(self)
[docs] def Save(self, name_file):
"""
name_file: name of the file without extension.
The extension .bms is added by function
"""
with open(name_file+'.bms', 'wb') as file:
model = dill.dump(self, file)
def __getstate__(self):
dic = self.__dict__.copy()
return dic
def __setstate__(self, dic):
self.__dict__ = dic
[docs]def Load(file):
""" Loads a model from specified file """
with open(file, 'rb') as file:
model = dill.load(file)
return model
[docs]class PhysicalNode:
"""
Abstract class
"""
def __init__(self, cl_solves_potential, cl_solves_fluxes, node_name, potential_variable_name, flux_variable_name):
self.cl_solves_potential = cl_solves_potential
self.cl_solves_fluxes = cl_solves_fluxes
self.name = node_name
self.potential_variable_name = potential_variable_name
self.flux_variable_name = flux_variable_name
self.variable = Variable(potential_variable_name+' '+node_name)
[docs]class PhysicalBlock:
"""
Abstract class to inherit when coding a physical block
"""
def __init__(self, physical_nodes, nodes_with_fluxes, occurence_matrix, commands, name):
self.physical_nodes = physical_nodes
self.name = name
self.nodes_with_fluxes = nodes_with_fluxes
self.occurence_matrix = occurence_matrix
self.commands = commands
self.variables = [Variable(physical_nodes[inode].flux_variable_name+' from ' +
physical_nodes[inode].name+' to '+self.name) for inode in nodes_with_fluxes]
[docs]class PhysicalSystem:
"""
Defines a physical system
"""
def __init__(self, te, ns, physical_blocks, command_blocks):
self.te = te
self.ns = ns
self.physical_blocks = []
self.physical_nodes = []
self.variables = []
self.command_blocks = []
for block in physical_blocks:
self.AddPhysicalBlock(block)
for block in command_blocks:
self.AddCommandBlock(block)
self._utd_ds = False
[docs] def AddPhysicalBlock(self, block):
if isinstance(block, PhysicalBlock):
self.physical_blocks.append(block)
for node in block.physical_nodes:
self._AddPhysicalNode(node)
else:
raise TypeError
self._utd_ds = False
def _AddPhysicalNode(self, node):
if isinstance(node, PhysicalNode):
if not node in self.physical_nodes:
self.physical_nodes.append(node)
self._utd_ds = False
[docs] def AddCommandBlock(self, block):
if isinstance(block, Block):
self.command_blocks.append(block)
for variable in block.inputs+block.outputs:
self._AddVariable(variable)
else:
raise TypeError
self._utd_ds = False
def _AddVariable(self, variable):
if isinstance(variable, Variable):
if not variable in self.variables:
self.variables.append(variable)
self._utd_ds = False
[docs] def GenerateDynamicSystem(self):
# from bms.blocks.continuous import WeightedSum
G = nx.Graph()
# variables={}
# Adding node variables
for node in self.physical_nodes:
G.add_node(node.variable, bipartite=0)
for block in self.physical_blocks:
# print(block.variables)
for variable in block.variables:
# add variable realted to connection
G.add_node(node.variable, bipartite=0)
ne, nv = block.occurence_matrix.shape
# print(block,block.occurence_matrix)
# Add equations of blocs
for ie in range(ne):
G.add_node((block, ie), bipartite=1)
for iv in range(nv):
# print(iv)
if block.occurence_matrix[ie, iv] == 1:
if iv % 2 == 0:
G.add_edge(
(block, ie), block.physical_nodes[iv//2].variable)
else:
G.add_edge((block, ie), block.variables[iv//2])
# Adding equation of physical nodes: conservative law of node from its occurence matrix
# Restricted to nodes to which are brought fluxes
for node in self.physical_nodes:
# linking conservative equation of flux to potential if
if node.cl_solves_potential:
G.add_edge(node, node.variable)
if node.cl_solves_fluxes:
# Linking fluxes of node
G.add_node(node, bipartite=1)
for block in self.physical_blocks:
for inb in block.nodes_with_fluxes:
# print(block,block.physical_nodes[inb].name)
node_block = block.physical_nodes[inb]
if node == node_block:
G.add_edge(
node, block.variables[block.nodes_with_fluxes.index(inb)])
# print(node_block,block.variables[inb].name)
# # Draw graph for debug
# pos=nx.spring_layout(G)
# nx.draw(G,pos)
# names={}
# for node in G.nodes():
# if type(node)==tuple:
# names[node]=(node[0].name,node[1])
# else:
# names[node]=node.name
# nx.draw_networkx_labels(G,pos,names)
# plt.show()
G2 = nx.DiGraph()
G2.add_nodes_from(G)
eq_out_var = {}
for e in nx.bipartite.maximum_matching(G).items():
# print(e[0].__class__.__name__)
# eq -> variable
if e[0].__class__.__name__ == 'Variable':
G2.add_edge(e[1], e[0])
eq_out_var[e[1]] = e[0]
else:
G2.add_edge(e[0], e[1])
eq_out_var[e[0]] = e[1]
for e in G.edges():
if e[0].__class__.__name__ == 'Variable':
G2.add_edge(e[0], e[1])
else:
G2.add_edge(e[1], e[0])
# print('@@@@@@@@@@@@@@@@@@@@@@@@')
sinks = []
sources = []
for node in G2.nodes():
if G2.out_degree(node) == 0:
sinks.append(node)
elif G2.in_degree(node) == 0:
sources.append(node)
# print(sinks,sources)
if sinks != []:
print(sinks)
raise ModelError
if sources != []:
print(sources)
raise ModelError
# Model is solvable: it must say to equations of blocks which is their
# output variable
# print(eq_out_var)
model_blocks = []
for block_node, variable in eq_out_var.items():
# print(block_node,variable)
if type(block_node) == tuple:
# Blocks writes an equation
model_blocks.extend(
block_node[0].PartialDynamicSystem(block_node[1], variable))
else:
# Sum of incomming variables at nodes
# searching attached nodes
variables = []
for block in self.physical_blocks:
try:
ibn = block.physical_nodes.index(block_node)
if ibn in block.nodes_with_fluxes:
variable2 = block.variables[ibn]
if variable2 != variable:
variables.append(variable2)
except ValueError:
pass
# model_blocks.append(WeightedSum(variables,variable,[-1]*len(variables)))
model_blocks.extend(
block_node.ConservativeLaw(variables, variable))
# model_blocks.append(Gain(v1,variable,-1))
model_blocks.extend(self.command_blocks)
return DynamicSystem(self.te, self.ns, model_blocks)
def _get_ds(self):
if not self._utd_ds:
self._dynamic_system = self.GenerateDynamicSystem()
self._utd_ds = True
return self._dynamic_system
dynamic_system = property(_get_ds)
[docs] def Simulate(self):
self.dynamic_system.Simulate()