import logging
import os
import sys
import time
from copy import deepcopy
import numpy as np
from scipy.integrate import solve_ivp
from scipy.signal import StateSpace
from cpsim.formal.gaussian_distribution import GaussianDistribution
from cpsim.utils.linearizer import Linearizer
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("debug.log"),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
rseed_str = os.getenv('RANDOM_SEED')
if rseed_str is None:
rseed = np.uint32(int(time.time()))
else:
try:
rseed = np.uint32(rseed_str)
except Exception as e:
print('rseed read error:', e)
rseed = np.uint32(int(time.time()))
logger.info(f'simulator: numpy random seed is set to {rseed}.')
np.random.seed(rseed)
[docs]
class Simulator:
"""
states, utils inputs/outputs are instance of np.array with shape (n,) (m,) (p,)
"""
def __init__(self, name, Ts, max_index):
self.name = name
self.model_type = None
self.dt = Ts
self.sysc = None
self.sysd = None
self.max_index = max_index
self.C = None
self.D = None
self.n = None # number of states
self.m = None # number of utils inputs
self.p = None # number of sensor measurements
self.ode = None # ode function
self.init_state = None
# values under self.cur_index
self.cur_x = None
self.cur_y = None
self.cur_u = None
self.cur_feedback = None
self.cur_ref = None
self.cur_index = 0
self.noise_setting = None
self.m_noise = None
self.p_noise = None
self.m_noise_dist = None
self.p_noise_dist = None
[docs]
def data_init(self):
self.inputs = np.empty((self.max_index + 2, self.m), dtype=float)
self.outputs = np.empty((self.max_index + 2, self.p), dtype=float)
self.states = np.empty((self.max_index + 2, self.n), dtype=float)
if self.feedback_type == 'output':
self.feedbacks = np.empty((self.max_index + 2, self.p), dtype=float)
self.refs = np.empty((self.max_index + 2, self.p), dtype=float)
elif self.feedback_type == 'state':
self.feedbacks = np.empty((self.max_index + 2, self.n), dtype=float)
self.refs = np.empty((self.max_index + 2, self.n), dtype=float) # reference value
[docs]
def reset(self):
# the noise will not reset!
self.data_init()
self.cur_index = 0
self.set_init_state(self.init_state)
self.controller.clear()
[docs]
def linear(self, A, B, C=None, D=None):
self.model_type = 'linear'
if C is None:
C = np.eye(len(A))
if D is None:
D = np.zeros((len(C), len(B[0])))
self.sysc = StateSpace(A, B, C, D)
self.sysd = self.sysc.to_discrete(self.dt)
self.n = self.sysc.A.shape[1]
self.m = self.sysc.B.shape[1]
self.p = self.sysc.C.shape[0]
self.C = self.sysc.C
self.D = self.sysc.D
def fprime(t, x, u):
return self.sysc.A @ x + self.sysc.B @ u
self.ode = fprime
[docs]
def nonlinear(self, ode, n, m, p, C=None, D=None):
self.model_type = 'nonlinear'
self.C = np.eye(n) if C is None else np.array(C)
self.D = np.zeros((p, m)) if D is None else np.array(D)
self.n = n
self.m = m
self.p = p
self.ode = ode
[docs]
def linearize_at(self, x_0: np.ndarray, u_0: np.ndarray):
"""
self.sysc = Ax + Bu + c
"""
assert self.model_type == 'nonlinear'
linearize = Linearizer(self.ode, self.n, self.m)
Ac, Bc, Cc = linearize.at(x_0, u_0)
self.sysc = StateSpace(Ac, Bc, self.C, self.D)
self.sysc.c = Cc
self.sysd = self.sysc.to_discrete(self.dt)
self.sysd.c = self.dt * Cc
[docs]
def sim_init(self, settings: dict):
"""
keys:
'feedback_type': 'state', 'output', None
'init_state': np.ndarray (n,)
'controller': object with update method
"""
self.set_feedback_type(settings['feedback_type'])
self.data_init()
if 'noise' in settings:
self.noise_init(settings['noise'])
self.set_init_state(settings['init_state'])
self.set_controller(settings['controller'])
[docs]
def noise_init(self, noise):
"""
Only implement the white noise
keys:
'process'/'measurement':
'type': 'white' todo: 'white_bounded', 'box_uniform', 'ball_uniform'
'param':
'C': linear transformation matrix from standard normal distribution scale for 'white'
"""
if 'process' in noise:
if noise['process']['type'] == 'white':
miu = np.zeros((self.n,))
C = noise['process']['param']['C']
self.p_noise_dist = GaussianDistribution.from_standard(miu, C)
self.p_noise = self.p_noise_dist.random(self.max_index + 2).T
elif noise['process']['type'] == 'box_uniform':
lo = noise['process']['param']['lo']
up = noise['process']['param']['up']
assert up.shape == (self.n,) and lo.shape == (self.n,)
noise_base = np.random.random_sample((self.max_index + 2, self.n))
self.p_noise = lo + noise_base * (up-lo)
if 'measurement' in noise:
if noise['measurement']['type'] == 'white':
miu = np.zeros((self.p,))
C = noise['measurement']['param']['C']
self.m_noise_dist = GaussianDistribution.from_standard(miu, C)
self.m_noise = self.m_noise_dist.random(self.max_index + 2).T
elif noise['measurement']['type'] == 'box_uniform':
lo = noise['process']['param']['lo']
up = noise['process']['param']['up']
assert up.shape == (self.p,) and lo.shape == (self.p,)
noise_base = np.random.random_sample((self.max_index + 2, self.p))
self.p_noise = lo + noise_base * (up - lo)
[docs]
def set_init_state(self, x):
self.init_state = x
self.cur_x = x
self.cur_y = self.C @ self.cur_x
if self.m_noise is not None:
self.cur_y += self.m_noise[self.cur_index]
self.outputs[0] = self.cur_y
self.states[0] = self.cur_x
if self.feedback_type:
self.cur_feedback = self.cur_x if self.feedback_type == 'state' else self.cur_y
[docs]
def set_feedback_type(self, feedback_type):
"""
'state', 'output', None
"""
self.feedback_type = feedback_type
[docs]
def set_controller(self, controller):
"""
please implement update method to get utils input
"""
self.controller = controller
[docs]
def update_current_ref(self, ref):
self.cur_ref = ref
[docs]
def evolve(self, u=None):
# record data
self.feedbacks[self.cur_index] = deepcopy(self.cur_feedback)
self.refs[self.cur_index] = deepcopy(self.cur_ref)
# compute utils input
if self.feedback_type:
self.cur_u = self.controller.update(self.cur_ref, self.cur_feedback, self.dt * self.cur_index)
else:
self.cur_u = u
# override utils input
if not (u is None):
self.cur_u = u
assert self.cur_u.shape == (self.m,)
self.inputs[self.cur_index] = deepcopy(self.cur_u)
# implement utils input
ts = (self.cur_index * self.dt, (self.cur_index + 1) * self.dt)
res = solve_ivp(self.ode, ts, self.cur_x, args=(self.cur_u,))
self.cur_index += 1
self.cur_x = res.y[:, -1]
# if self.model_type == 'linear':
# self.cur_x = self.sysd.A @ self.cur_x + self.sysd.B @ self.cur_u
if self.p_noise is not None: # process noise
self.cur_x += self.p_noise[self.cur_index]
self.cur_y = self.C @ self.cur_x + self.D @ self.cur_u
if self.m_noise is not None: # measurement noise
self.cur_y += self.m_noise[self.cur_index]
assert self.cur_x.shape == (self.n,)
assert self.cur_y.shape == (self.p,)
self.states[self.cur_index] = deepcopy(self.cur_x)
self.outputs[self.cur_index] = deepcopy(self.cur_y)
# prepare feedback
if self.feedback_type:
self.cur_feedback = self.cur_x if self.feedback_type == 'state' else self.cur_y
# self.cur_feedback may be attacked before implement
else:
self.cur_feedback = None
return self.cur_index