# -*- coding: utf-8 -*-
Core of BMS. All content of this file is imported by bms, and is therefore in bms

This file defines the base of BMS. 


import numpy as np
import matplotlib.pyplot as plt
import networkx as nx
import dill
from scipy.optimize import fsolve, root, minimize
[docs]class Variable: """ Defines a variable :param names: Defines full name and short name. If names is a string the two names will be identical otherwise names should be a tuple of strings (full_name,short_name) :param hidden: inner variable to hide in plots if true """ def __init__(self, names='variable', initial_values=[0], hidden=False): if type(names) == str: = names self.short_name = names else: try: self.short_name = names[1] = names[0] except: raise TypeError self.initial_values = initial_values self._values = np.array([]) self.max_order = 0 self.hidden = hidden def _InitValues(self, ns, ts, max_order): self.max_order = max_order self._values = self.initial_values[0]*np.ones(ns+max_order+1) self._ForwardValues() def _ForwardValues(self): pass def _get_values(self): return self._values[self.max_order:] values = property(_get_values)
[docs]class Signal(Variable): """ Abstract class of signal """ def __init__(self, names): if type(names) == str: = names self.short_name = names else: try: self.short_name = names[1] = names[0] except: raise TypeError self._values = np.array([]) self.max_order = 0 self.hidden = False def _InitValues(self, ns, ts, max_order): self.max_order = max_order self._values = np.zeros(ns+max_order+1) for i in range(ns+1): self._values[i+max_order] = self.function(i*ts) self._ForwardValues() self.initial_values = [self._values[0]] def _ForwardValues(self): """ Implementation for problems with derivative conditions on variables """ pass
[docs]class Block: """ Abstract class of block: this class should not be instanciate directly """ def __init__(self, inputs, outputs, max_input_order, max_output_order): self.inputs = [] self.outputs = [] self.n_inputs = len(inputs) self.n_outputs = len(outputs) for variable in inputs: self._AddInput(variable) for variable in outputs: self._AddOutput(variable) # self.input_orders= # self.output_orders=output_orders self.max_input_order = max_input_order self.max_output_order = max_output_order self.max_order = max(self.max_input_order, self.max_output_order) def _AddInput(self, variable): """ Add one more variable as an input of the block :param variable: variable (or signal as it is also a variable) """ if isinstance(variable, Variable): self.inputs.append(variable) else: print('Error: ',, variable, ' given is not a variable') raise TypeError def _AddOutput(self, variable): """ Add one more variable as an output of the block :param variable: variable (or signal as it is also a variable) """ if isinstance(variable, Variable): self.outputs.append(variable) else: print(variable) raise TypeError
[docs] def InputValues(self, it, nsteps=None): """ Returns the input values at a given iteration for solving the block outputs """ if nsteps == None: nsteps = self.max_input_order # print(self,it) # Provides values in inputs values for computing at iteration it I = np.zeros((self.n_inputs, nsteps)) for iv, variable in enumerate(self.inputs): # print(it-self.max_input_order+1,it+1) # print(variable._values[it-self.max_input_order+1:it+1]) I[iv, :] = variable._values[it-nsteps+1:it+1] return I
[docs] def OutputValues(self, it, nsteps=None): # Provides values in inputs values for computing at iteration it if nsteps == None: nsteps = self.max_output_order O = np.zeros((self.n_outputs, nsteps)) for iv, variable in enumerate(self.outputs): O[iv, :] = variable._values[it-nsteps:it] return O
[docs] def Solve(self, it, ts): step_outputs = self.Evaluate(it, ts) for i, step_output in enumerate(step_outputs): self.outputs[i]._values[it] = step_output
[docs]class ModelError(Exception): def __init__(self, message): self.message = message def __str__(self): return 'Model Error: '+self.message
[docs]class DynamicSystem: """ Defines a dynamic system that can simulate itself :param te: time of simulation's end :param ns: number of steps :param blocks: (optional) list of blocks defining the model """ def __init__(self, te, ns, blocks=[]): self.te = te self.ns = ns self.ts = self.te/float(self.ns) # time step self.t = np.linspace(0, self.te, num=ns+1) # Time vector self.blocks = [] self.variables = [] self.signals = [] self.max_order = 0 for block in blocks: self.AddBlock(block) self._utd_graph = False # True if graph is up-to-date
[docs] def AddBlock(self, block): """ Add the given block to the model and also its input/output variables """ if isinstance(block, Block): self.blocks.append(block) self.max_order = max(self.max_order, block.max_input_order-1) self.max_order = max(self.max_order, block.max_output_order) for variable in block.inputs+block.outputs: self._AddVariable(variable) else: print(block) raise TypeError self._utd_graph = False
def _AddVariable(self, variable): """ Add a variable to the model. Should not be used by end-user """ if isinstance(variable, Signal): if not variable in self.signals: self.signals.append(variable) elif isinstance(variable, Variable): if not variable in self.variables: self.variables.append(variable) else: raise TypeError self._utd_graph = False def _get_Graph(self): if not self._utd_graph: # Generate graph self._graph = nx.DiGraph() for variable in self.variables: self._graph.add_node(variable, bipartite=0) for block in self.blocks: self._graph.add_node(block, bipartite=1) for variable in block.inputs: self._graph.add_edge(variable, block) for variable in block.outputs: self._graph.add_edge(block, variable) self._utd_graph = True return self._graph graph = property(_get_Graph) def _ResolutionOrder(self, variables_to_solve): """ return a list of lists of tuples (block,output,ndof) to be solved """ # Gp=nx.DiGraph() # # for i in range(nvar): # Gp.add_node('v'+str(i),bipartite=0) # # for i in range(neq): # Gp.add_node('e'+str(i),bipartite=1) # for j in range(nvar): # if Mo[i,j]==1: # Gp.add_edge('e'+str(i),'v'+str(j)) Gp = nx.DiGraph() for variable in self.variables: Gp.add_node(variable, bipartite=0) for block in self.blocks: for iov, output_variable in enumerate(block.outputs): Gp.add_node((block, iov), bipartite=1) Gp.add_edge((block, iov), output_variable) Gp.add_edge(output_variable, (block, iov)) for input_variable in block.inputs: if not isinstance(input_variable, Signal): Gp.add_edge(input_variable, (block, iov)) # for n1,n2 in M.items(): # Gp.add_edge(n1,n2) sinks = [] sources = [] for node in Gp.nodes(): if Gp.out_degree(node) == 0: sinks.append(node) elif Gp.in_degree(node) == 0: sources.append(node) G2 = sources[:] for node in sources: for node2 in nx.descendants(Gp, node): if node2 not in G2: G2.append(node2) if G2 != []: print(G2) raise ModelError('Overconstrained variables') G3 = sinks[:] for node in sinks: for node2 in nx.ancestors(Gp, node): if node2 not in G3: G3.append(node2) if G3 != []: raise ModelError('Underconstrained variables') # vars_resolvables=[] # for var in vars_resoudre: # if not 'v'+str(var) in G2+G3: # vars_resolvables.append(var) # G1=Gp.copy() # G1.remove_nodes_from(G2+G3) # # M1=nx.bipartite.maximum_matching(G1) # G1p=nx.DiGraph() # # G1p.add_nodes_from(G1.nodes()) # for e in G1.edges(): # # equation vers variable # if e[0][0]=='v': # G1p.add_edge(e[0],e[1]) # else: # G1p.add_edge(e[1],e[0]) # # print(len(M)) # for n1,n2 in M1.items(): # # print(n1,n2) # if n1[0]=='e': # G1p.add_edge(n1,n2) # else: # G1p.add_edge(n2,n1) scc = list(nx.strongly_connected_components(Gp)) # pos=nx.spring_layout(G1p) # plt.figure() # nx.draw(G1p,pos) # nx.draw_networkx_labels(G1p,pos) # print(scc) if scc != []: C = nx.condensation(Gp, scc) isc_vars = [] for isc, sc in enumerate(scc): for var in variables_to_solve: if var in sc: isc_vars.append(isc) break ancestors_vars = isc_vars[:] for isc_var in isc_vars: for ancetre in nx.ancestors(C, isc_var): if ancetre not in ancestors_vars: ancestors_vars.append(ancetre) order_sc = [sc for sc in nx.topological_sort( C) if sc in ancestors_vars] order_ev = [] for isc in order_sc: # liste d'équations et de variables triées pour être séparées evs = list(scc[isc]) # print(evs) # levs=int(len(evs)/2) eqs = [] var = [] for element in evs: if type(element) == tuple: eqs.append(element) else: var.append(element) order_ev.append((len(eqs), eqs, var)) return order_ev raise ModelError
[docs] def Simulate(self, variables_to_solve=None): if variables_to_solve == None: variables_to_solve = [ variable for variable in self.variables if not variable.hidden] order = self._ResolutionOrder(variables_to_solve) # Initialisation of variables values for variable in self.variables+self.signals: variable._InitValues(self.ns, self.ts, self.max_order) # ============================================================================== # Enhancement to do: defining functions out of loop (copy args)s # ============================================================================== # print(order) residue = [] for it, t in enumerate(self.t[1:]): for neqs, equations, variables in order: if neqs == 1: equations[0][0].Solve(it+self.max_order+1, self.ts) else: # x0=np.zeros(neqs) x0 = [equations[i][0].outputs[equations[i][1]]._values[it + self.max_order] for i in range(len(equations))] # print('===========') def r(x, equations=equations[:]): # Writing variables values proposed by optimizer for i, xi in enumerate(x): equations[i][0].outputs[equations[i][1] ]._values[it+self.max_order+1] = xi # Computing regrets r = [] # s=0 for ieq, (block, neq) in enumerate(equations): # print(block,it) # print(block.Evaluate(it+self.max_order+1,self.ts).shape) # print(block.Evaluate(it+self.max_order+1,self.ts),block) r.append(x[ieq]-block.Evaluate(it + self.max_order+1, self.ts)[neq]) # print(block) # print('xproposed:',x[ieq]) # print('block eval',block.Evaluate(it+self.max_order+1,self.ts)[neq]) # print('value', x[ieq]-block.Evaluate(it+self.max_order+1,self.ts)[neq]) # s+=abs(x[ieq]-block.Evaluate(it+self.max_order+1,self.ts)[neq]) # print(x[ieq],block.Evaluate(it+self.max_order+1,self.ts)[neq]) return r def f(x, equations=equations[:]): # Writing variables values proposed by optimizer for i, xi in enumerate(x): equations[i][0].outputs[equations[i][1] ]._values[it+self.max_order+1] = xi # Computing regrets # r=[] s = 0 for ieq, (block, neq) in enumerate(equations): # print(block,it) # print(block.Evaluate(it+self.max_order+1,self.ts).shape) # print(block.Evaluate(it+self.max_order+1,self.ts),block) # r.append(x[ieq]-block.Evaluate(it+self.max_order+1,self.ts)[neq]) # print(block) s += abs(x[ieq]-block.Evaluate(it + self.max_order+1, self.ts)[neq]) # print(x[ieq],block.Evaluate(it+self.max_order+1,self.ts)[neq]) # return r # print(s) return s x, d, i, m = fsolve(r, x0, full_output=True)
[docs] def VariablesValues(self, variables, t): """ Returns the value of given variables at time t. Linear interpolation is performed between two time steps. :param variables: one variable or a list of variables :param t: time of evaluation """ # TODO: put interpolation in variables if (t < self.te) | (t > 0): i = t//self.ts # time step ti = self.ts*i if type(variables) == list: values = [] for variable in variables: # interpolation values.append( variables.values[i]*((ti-t)/self.ts+1)+variables.values[i+1]*(t-ti)/self.ts) return values else: # interpolation i = int(i)# Fixing bug #31 return variables.values[i]*((ti-t)/self.ts+1)+variables.values[i+1]*(t-ti)/self.ts else: raise ValueError
[docs] def PlotVariables(self, subplots_variables=None): if subplots_variables == None: subplots_variables = [self.signals+self.variables] subplots_variables = [ [variable for variable in self.signals+self.variables if not variable.hidden]] # plt.figure() fig, axs = plt.subplots(len(subplots_variables), sharex=True) if len(subplots_variables) == 1: axs = [axs] for isub, subplot in enumerate(subplots_variables): legend = [] for variable in subplot: axs[isub].plot(self.t, variable.values) legend.append( axs[isub].legend(legend, loc='best') axs[isub].margins(0.08) axs[isub].grid() plt.xlabel('Time')
[docs] def DrawModel(self): from .interface import ModelDrawer ModelDrawer(self)
[docs] def Save(self, name_file): """ name_file: name of the file without extension. The extension .bms is added by function """ with open(name_file+'.bms', 'wb') as file: model = dill.dump(self, file)
def __getstate__(self): dic = self.__dict__.copy() return dic def __setstate__(self, dic): self.__dict__ = dic
[docs]def Load(file): """ Loads a model from specified file """ with open(file, 'rb') as file: model = dill.load(file) return model
[docs]class PhysicalNode: """ Abstract class """ def __init__(self, cl_solves_potential, cl_solves_fluxes, node_name, potential_variable_name, flux_variable_name): self.cl_solves_potential = cl_solves_potential self.cl_solves_fluxes = cl_solves_fluxes = node_name self.potential_variable_name = potential_variable_name self.flux_variable_name = flux_variable_name self.variable = Variable(potential_variable_name+' '+node_name)
[docs]class PhysicalBlock: """ Abstract class to inherit when coding a physical block """ def __init__(self, physical_nodes, nodes_with_fluxes, occurence_matrix, commands, name): self.physical_nodes = physical_nodes = name self.nodes_with_fluxes = nodes_with_fluxes self.occurence_matrix = occurence_matrix self.commands = commands self.variables = [Variable(physical_nodes[inode].flux_variable_name+' from ' + physical_nodes[inode].name+' to ' for inode in nodes_with_fluxes]
[docs]class PhysicalSystem: """ Defines a physical system """ def __init__(self, te, ns, physical_blocks, command_blocks): self.te = te self.ns = ns self.physical_blocks = [] self.physical_nodes = [] self.variables = [] self.command_blocks = [] for block in physical_blocks: self.AddPhysicalBlock(block) for block in command_blocks: self.AddCommandBlock(block) self._utd_ds = False
[docs] def AddPhysicalBlock(self, block): if isinstance(block, PhysicalBlock): self.physical_blocks.append(block) for node in block.physical_nodes: self._AddPhysicalNode(node) else: raise TypeError self._utd_ds = False
def _AddPhysicalNode(self, node): if isinstance(node, PhysicalNode): if not node in self.physical_nodes: self.physical_nodes.append(node) self._utd_ds = False
[docs] def AddCommandBlock(self, block): if isinstance(block, Block): self.command_blocks.append(block) for variable in block.inputs+block.outputs: self._AddVariable(variable) else: raise TypeError self._utd_ds = False
def _AddVariable(self, variable): if isinstance(variable, Variable): if not variable in self.variables: self.variables.append(variable) self._utd_ds = False
[docs] def GenerateDynamicSystem(self): # from bms.blocks.continuous import WeightedSum G = nx.Graph() # variables={} # Adding node variables for node in self.physical_nodes: G.add_node(node.variable, bipartite=0) for block in self.physical_blocks: # print(block.variables) for variable in block.variables: # add variable realted to connection G.add_node(node.variable, bipartite=0) ne, nv = block.occurence_matrix.shape # print(block,block.occurence_matrix) # Add equations of blocs for ie in range(ne): G.add_node((block, ie), bipartite=1) for iv in range(nv): # print(iv) if block.occurence_matrix[ie, iv] == 1: if iv % 2 == 0: G.add_edge( (block, ie), block.physical_nodes[iv//2].variable) else: G.add_edge((block, ie), block.variables[iv//2]) # Adding equation of physical nodes: conservative law of node from its occurence matrix # Restricted to nodes to which are brought fluxes for node in self.physical_nodes: # linking conservative equation of flux to potential if if node.cl_solves_potential: G.add_edge(node, node.variable) if node.cl_solves_fluxes: # Linking fluxes of node G.add_node(node, bipartite=1) for block in self.physical_blocks: for inb in block.nodes_with_fluxes: # print(block,block.physical_nodes[inb].name) node_block = block.physical_nodes[inb] if node == node_block: G.add_edge( node, block.variables[block.nodes_with_fluxes.index(inb)]) # print(node_block,block.variables[inb].name) # # Draw graph for debug # pos=nx.spring_layout(G) # nx.draw(G,pos) # names={} # for node in G.nodes(): # if type(node)==tuple: # names[node]=(node[0].name,node[1]) # else: # names[node] # nx.draw_networkx_labels(G,pos,names) # G2 = nx.DiGraph() G2.add_nodes_from(G) eq_out_var = {} for e in nx.bipartite.maximum_matching(G).items(): # print(e[0].__class__.__name__) # eq -> variable if e[0].__class__.__name__ == 'Variable': G2.add_edge(e[1], e[0]) eq_out_var[e[1]] = e[0] else: G2.add_edge(e[0], e[1]) eq_out_var[e[0]] = e[1] for e in G.edges(): if e[0].__class__.__name__ == 'Variable': G2.add_edge(e[0], e[1]) else: G2.add_edge(e[1], e[0]) # print('@@@@@@@@@@@@@@@@@@@@@@@@') sinks = [] sources = [] for node in G2.nodes(): if G2.out_degree(node) == 0: sinks.append(node) elif G2.in_degree(node) == 0: sources.append(node) # print(sinks,sources) if sinks != []: print(sinks) raise ModelError if sources != []: print(sources) raise ModelError # Model is solvable: it must say to equations of blocks which is their # output variable # print(eq_out_var) model_blocks = [] for block_node, variable in eq_out_var.items(): # print(block_node,variable) if type(block_node) == tuple: # Blocks writes an equation model_blocks.extend( block_node[0].PartialDynamicSystem(block_node[1], variable)) else: # Sum of incomming variables at nodes # searching attached nodes variables = [] for block in self.physical_blocks: try: ibn = block.physical_nodes.index(block_node) if ibn in block.nodes_with_fluxes: variable2 = block.variables[ibn] if variable2 != variable: variables.append(variable2) except ValueError: pass # model_blocks.append(WeightedSum(variables,variable,[-1]*len(variables))) model_blocks.extend( block_node.ConservativeLaw(variables, variable)) # model_blocks.append(Gain(v1,variable,-1)) model_blocks.extend(self.command_blocks) return DynamicSystem(self.te, self.ns, model_blocks)
def _get_ds(self): if not self._utd_ds: self._dynamic_system = self.GenerateDynamicSystem() self._utd_ds = True return self._dynamic_system dynamic_system = property(_get_ds)
[docs] def Simulate(self): self.dynamic_system.Simulate()