Source code for cyclum.core

"""
This module implements the scae core.
Current version is a fast implementation
"""

#sphinx-build -b html rst html
#sphinx-apidoc -o . .

import numpy as np
import tensorflow as tf
import time
from sklearn.decomposition import PCA


class _BaseCyclum:
    """
    The base class for all the realizations.
    All this class knows is math.
    """
    __slots__ = ["Y_value", "N", "P", "Q", "Y", "X", "Y2", "isbuilt"]

    def __init__(self, Y, Q, ispreload=True):
        """
        :type Y: numpy matrix
        :param Y:
        :type q: int
        :param q: dimension of embedding (pseudotime)
        """
        self.Y_value = Y
        self.N, self.P = Y.shape
        self.Q = Q
        if ispreload:
            self.Y = tf.constant(Y, dtype=tf.float32)
        else:
            self.Y = tf.placeholder([None, self.P])
        self.isbuilt = False

    def linear_encoder(qs):
        """
        declare a linear encoder

        :param qs: the index of rows in embedding to generate
        :return: an dict in a list; the sum of these list is the required encoder configuration for build()
        """
        return [dict(type='linear', qs=qs)]

    def nonlinear_encoder(qs, hidden_qs):
        """
        declare a nonlinear encoder

        :param qs: the index of rows in embedding to generate
        :return: an dict in a list; the sum of these list is the required decoder configuration for build()
        """
        return [dict(type='nonlinear', qs=qs, hidden_qs=hidden_qs)]

    def linear_decoder(qs):
        """
        declare a linear decoder

        :param qs: the index of rows in embedding to be used
        :return: an dict in a list; the sum of these list is the required input of build()
        """
        return [dict(type='linear', qs=qs)]

    def circular_decoder(qs):
        """
        declare a circular decoder

        :param qs: the index of rows in embedding to be used
        :return: an dict in a list; the sum of these list is the required input of build()
        """
        return [dict(type='circular', qs=qs)]

    def _make_linear_layer(Z, Q):
        """
        make a linear layer

        :param Z: the input tensor
        :param Q: dimension of the output
        :return: the output tensor of this layer
        """
        W = tf.Variable(tf.random_normal([int(Z.shape[-1]), Q]) / 4, name='W')
        b = tf.Variable(tf.zeros([1, Q]), name='b')
        print(W)
        return tf.add(Z @ W, b, name='Z')

    def _make_nonlinear_layer(Z, Q):
        """
        make a nonlinear layer

        :param Z: the input tensor
        :param Q: dimension of the output
        :return: the output tensor of this layer
        """
        W = tf.Variable(tf.random_normal([int(Z.shape[-1]), Q]), name='W')
        b = tf.Variable(tf.zeros([1, Q]), name='b')
        print(W)
        return tf.tanh(Z @ W + b, name='Z')

    def _make_circular_layer(Z, Q):
        """
        make a circular layer

        :param Z: the input tensor
        :param Q: dimension of the output
        :return: the output tensor of this layer
        """
        assert Z.shape[1] == 1
        W = tf.Variable(tf.random_normal([3, Q]), name='W')
        temp = tf.concat([tf.cos(Z + i * 2 * np.pi / 3) for i in range(3)], 1)
        return tf.matmul(temp, W, name='Z')

    def _make_nonlinear_encoder(Z, Q, hidden_qs):
        """
        sum up the nonlinear layers to make a nonlinear encoder

        :param Z: the input tensor
        :param Q: the final output dimension
        :param hidden_qs: the hidden layer dimensions
        :return: the output tensor of this encoder
        """
        temp = Z
        for i, q in enumerate(hidden_qs):
            with tf.name_scope('layer' + str(i)):
                temp = _BaseCyclum._make_nonlinear_layer(temp, q)
        with tf.name_scope('output'):
            return _BaseCyclum._make_linear_layer(temp, Q)

    def _make_linear_encoder(Z, Q):
        """
        Use one linear layer as a linear encoder.
        No need to have hidden layers due to the property of linear transformation.

        :param Z: the input tensor
        :param Q: the output dimension
        :return: the output tensor of this encoder
        """
        with tf.name_scope('output'):
            return _BaseCyclum._make_linear_layer(Z, Q)

    def _make_linear_decoder(Z, P):
        """
        Use one linear layer as a linear decoder.
        No need to have hidden layers due to the property of linear transformation.

        :param Z: the input tensor
        :param P: the output dimension
        :return: the output tensor of this encoder
        """
        with tf.name_scope('output'):
            return _BaseCyclum._make_linear_layer(Z, P)

    def _make_circular_decoder(Z, P):
        """
        Use one circular layer as a linear decoder.

        :param Z: the input tensor
        :param P: the output dimension
        :return: the output tensor of this encoder
        """
        with tf.name_scope('output'):
            return _BaseCyclum._make_circular_layer(Z, P)

    def _make_encoder(Y, Q, encoder):
        """
        make the full encoder

        :param Y: the input tensor
        :param Q: the output dimension
        :param encoder: the encoder configuration
        :return: the output tensor of the full encoder
        """
        temp = [tf.zeros([tf.shape(Y)[0]]) for i in range(Q)]
        for i, component in enumerate(encoder):
            with tf.name_scope('encoder' + str(i)):
                if component['type'] == 'linear':
                    res = _BaseCyclum._make_linear_encoder(Y, len(component['qs']))
                    for j, q in enumerate(component['qs']):
                        temp[q] += res[:, j]
                elif component['type'] == 'nonlinear':
                    res = _BaseCyclum._make_nonlinear_encoder(Y, len(component['qs']), component['hidden_qs'])
                    for j, q in enumerate(component['qs']):
                        temp[q] += res[:, j]
                else:
                    assert False  # You should never get here
        return tf.stack(temp, axis=1, name='neck')

    def _make_decoder(X, P, decoder):
        """
        make the full decoder

        :param X: the input tensor
        :param P: the output dimension
        :param decoder: the decoder configuration
        :return: the output tensor of the full decoder
        """
        temp = []
        for i, component in enumerate(decoder):
            with tf.name_scope('decoder' + str(i)):
                if component['type'] == 'linear':
                    temp.append(_BaseCyclum._make_linear_decoder(tf.gather(X, component['qs'], axis=1), P))
                elif component['type'] == 'circular':
                    temp.append(_BaseCyclum._make_circular_decoder(tf.gather(X, component['qs'], axis=1), P))
                else:
                    assert False
        return tf.add_n(temp)

    def build(self, encoder, decoder):
        """
        build the model

        :param encoder: encoder configuration
        :param decoder: decoder configuration
        :return: None
        """
        self.X = _BaseCyclum._make_encoder(self.Y, self.Q, encoder)
        self.Y2 = _BaseCyclum._make_decoder(self.X, self.P, decoder)

    def train(self):
        """
        Train the model. To be implemented in derived classes.
        """
        raise NotImplementedError

    def predict(self):
        raise NotImplementedError

    def generate(self):
        raise NotImplementedError

[docs]class PreloadCyclum2(_BaseCyclum): def __init__(self, Y): super().__init__(Y, 2) encoder = _BaseCyclum.nonlinear_encoder([0], [30, 20]) + _BaseCyclum.linear_encoder([1]) decoder = _BaseCyclum.circular_decoder([0]) + _BaseCyclum.linear_decoder([1]) self.build(encoder, decoder) def _get_initial_value(self, n_candidate=5): """ Get initial value by running on the first few PCs. :param n_candidate: number of PCs. :return: proposed initial value. """ pc = PCA(n_components=10, copy=True, whiten=False, svd_solver="auto", tol=0.0, iterated_power="auto", random_state=None).fit_transform(self.Y_value) spc = pc / np.std(pc, axis=0) unit_score_1d = [] uniform_score_1d = [] ind_1d = [(i, j) for i in range(n_candidate) for j in range(i)] for i, j in ind_1d: temp = np.sqrt(spc[:, i] ** 2 + spc[:, j] ** 2) temp = temp / np.mean(temp) unit_score_1d.append(np.mean(np.abs(temp - 1))) temp = np.angle(spc[:, i] + spc[:, j] * 1j) temp.sort() diff = np.append(np.diff(temp), temp[0] + 2 * np.pi - temp[-1]) uniform_score_1d.append(np.std(diff)) min_max_normalize = lambda x: (x - np.min(x)) / (np.max(x) - np.min(x)) unit_score_1d = min_max_normalize(np.array(unit_score_1d)) uniform_score_1d = min_max_normalize(np.array(uniform_score_1d)) final_score_1d = unit_score_1d + uniform_score_1d ind = ind_1d[np.argmin(final_score_1d).item()] temp = np.sqrt(spc[:, ind[0]] ** 2 + spc[:, ind[1]] ** 2) return spc[:, ind] / np.mean(temp)
[docs] def train(self): """ train the model :return: """ graph = tf.get_default_graph() paragon = self._get_initial_value() cossin = lambda x: tf.concat([tf.cos(x), tf.sin(x)], 1) Z_circular_pre_train = cossin(tf.gather(self.X, [0], axis=1)) pretrain_loss = tf.nn.l2_loss(Z_circular_pre_train - paragon) / self.N pretrain_var_names = ['encoder0/layer0/W', 'encoder0/layer0/b', 'encoder0/layer1/W', 'encoder0/layer1/b', 'encoder0/output/W', 'encoder0/output/b'] pretrain_var_list = [tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, name)[0] for name in pretrain_var_names] #self.pretrain_var_list = [self.tf_W_circular1, self.tf_b_circular1, self.tf_W_circular2, self.tf_b_circular2, #self.tf_W_circular3, self.tf_b_circular3] midtrain_var_names = ['decoder0/output/W', 'decoder1/output/W', 'decoder1/output/b'] midtrain_var_list = [tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, name)[0] for name in midtrain_var_names] #self.midtrain_var_list = [self.tf_V0, self.tf_V1, self.tf_c] R = ((tf.nn.l2_loss(tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'encoder0/layer0/W')[0]) + tf.nn.l2_loss(tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'encoder0/layer1/W')[0]) + tf.nn.l2_loss(tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'encoder0/output/W')[0]) ) / 10 + (tf.nn.l2_loss(tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'encoder1/output/W')[0]) + tf.nn.l2_loss(tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'decoder0/output/W')[0]) + tf.nn.l2_loss(tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'decoder1/output/W')[0]) ) ) L = tf.reduce_sum((self.Y - self.Y2) ** 2) / (2 * self.N) pretrain_burnin = tf.train.AdamOptimizer(50e-3).minimize(pretrain_loss, var_list=pretrain_var_list) pretrain_train = tf.train.AdamOptimizer(1e-3).minimize(pretrain_loss, var_list=pretrain_var_list) midtrain_burnin = tf.train.AdamOptimizer(50e-3).minimize(L + R * 1.0, var_list=midtrain_var_list) midtrain_train = tf.train.AdamOptimizer(1e-3).minimize(L + R * 1.0, var_list=midtrain_var_list) #tf_burnin = tf.train.AdamOptimizer(50e-3).minimize(self.tf_L + self.tf_R * 0.2) final_train = tf.train.AdamOptimizer(2e-4).minimize(L + R * 1.0) final_refine = tf.train.AdamOptimizer(5e-5).minimize(L + R * 1.0) t0 = time.time() #saver = tf.train.Saver() self.sess = tf.Session() self.sess.run(tf.global_variables_initializer()) def unit_train(opt, n_step, losses, sess): t1 = time.time() for i in range(n_step): sess.run(opt) if (i + 1) % 2000 == 0: print('step %5d: loss ' % (i + 1), end="") print(self.sess.run(losses), end="") print(' time %.2f' % (time.time() - t1)) t1 = time.time() print("pretrain burnin") unit_train(pretrain_burnin, 2000, [pretrain_loss, L, R], self.sess) print("pretrain train") unit_train(pretrain_train, 4000, [pretrain_loss, L, R], self.sess) print("midtrain burnin") unit_train(midtrain_burnin, 2000, [pretrain_loss, L, R], self.sess) print("midtrain train") unit_train(midtrain_train, 4000, [pretrain_loss, L, R], self.sess) print("finaltrain train") unit_train(final_train, 6000, [pretrain_loss, L, R], self.sess) print("finaltrain refine") unit_train(final_refine, 10000, [pretrain_loss, L, R], self.sess) self.pseudotime = self.sess.run(self.X) self.rotation = self.sess.run(tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'decoder0/output/W')[0]) print('Full time %.2f' % (time.time() - t0)) return self.pseudotime, self.rotation
[docs]class PreloadCyclum: """ The core of Cyclum. The data is preloaded into graphic memory to run it as fast as possible. Before fit() :type Y: numpy matrix :param Y: each row is a cell and each column is a gene. :param q_circular: number of :param q_linear: :param seed: random seed :type verbose: bool :param verbose: if True, show the training progress """ def __init__(self, Y, q_circular=3, q_linear=0, seed=0, verbose=True): """ Initialize the model. """ self.verbose = verbose tf.set_random_seed(seed) self.q_circular = q_circular self.q_linear = q_linear self.q = q_linear + q_circular n, d = Y.shape self.Y = Y self.tf_X = tf.constant(Y, dtype=tf.float32) # Linear Part self.tf_W = tf.Variable(tf.random_normal([d, self.q_linear]) / 4) self.tf_b = tf.Variable(tf.zeros([1, self.q_linear])) self.tf_E = (self.tf_X @ self.tf_W + self.tf_b) # Circular part self.tf_W_circular1 = tf.Variable(tf.random_normal([d, 30])) self.tf_b_circular1 = tf.Variable(tf.zeros([1, 30])) self.tf_E_circular1 = tf.tanh(self.tf_X @ self.tf_W_circular1 + self.tf_b_circular1) self.tf_W_circular2 = tf.Variable(tf.random_normal([30, 20])) self.tf_b_circular2 = tf.Variable(tf.zeros([1, 20])) self.tf_E_circular2 = tf.tanh(self.tf_E_circular1 @ self.tf_W_circular2 + self.tf_b_circular2) self.tf_W_circular3 = tf.Variable(tf.random_normal([20, 1])) self.tf_b_circular3 = tf.Variable(tf.zeros([1, 1])) self.tf_E_circular = self.tf_E_circular2 @ self.tf_W_circular3 + self.tf_b_circular3 # slice self.tf_E_linear = self.tf_E tf_phase = lambda x, num: tf.concat([tf.cos(x + i * 2 * np.pi / num) for i in range(num)], 1) tf_cossin = lambda x: tf.concat([tf.cos(x), tf.sin(x)], 1) if q_circular == 2: self.tf_Z_circular = tf_cossin(self.tf_E_circular) else: self.tf_Z_circular = tf_phase(self.tf_E_circular, 3) self.tf_Z_linear = self.tf_E_linear # decode self.tf_D_circular = self.tf_Z_circular self.tf_D_linear = self.tf_Z_linear self.tf_V0 = tf.Variable(tf.random_normal([self.q_circular, d])) self.tf_V1 = tf.Variable(tf.random_normal([self.q_linear, d])) self.tf_c = tf.Variable(tf.zeros([1, d])) self.tf_X_r_circular = self.tf_D_circular @ self.tf_V0 self.tf_X_r = self.tf_X_r_circular + self.tf_D_linear @ self.tf_V1 + self.tf_c self.tf_R = (tf.nn.l2_loss(self.tf_W_circular1) + tf.nn.l2_loss(self.tf_W_circular2) + tf.nn.l2_loss(self.tf_W_circular3)) / 10 + tf.nn.l2_loss(self.tf_W) + tf.nn.l2_loss(self.tf_V1) + tf.nn.l2_loss(self.tf_V0) self.tf_L = tf.reduce_sum((self.tf_X - self.tf_X_r) ** 2) / (2 * n) # pretrain part paragon = self._get_initial_value() self.tf_Z_circular_pre_train = tf_cossin(self.tf_E_circular) self.tf_pretrain_loss = tf.nn.l2_loss(self.tf_Z_circular_pre_train - paragon) / n self.pretrain_var_list = [self.tf_W_circular1, self.tf_b_circular1, self.tf_W_circular2, self.tf_b_circular2, self.tf_W_circular3, self.tf_b_circular3] self.midtrain_var_list = [self.tf_V0, self.tf_V1, self.tf_c] def _get_initial_value(self, n_candidate=5): """ Get initial value by running on the first few PCs. :param n_candidate: number of PCs. :return: proposed initial value. """ pc = PCA(n_components=10, copy=True, whiten=False, svd_solver="auto", tol=0.0, iterated_power="auto", random_state=None).fit_transform(self.Y) spc = pc / np.std(pc, axis=0) unit_score_1d = [] uniform_score_1d = [] ind_1d = [(i, j) for i in range(n_candidate) for j in range(i)] for i, j in ind_1d: temp = np.sqrt(spc[:, i] ** 2 + spc[:, j] ** 2) temp = temp / np.mean(temp) unit_score_1d.append(np.mean(np.abs(temp - 1))) temp = np.angle(spc[:, i] + spc[:, j] * 1j) temp.sort() diff = np.append(np.diff(temp), temp[0] + 2 * np.pi - temp[-1]) uniform_score_1d.append(np.std(diff)) min_max_normalize = lambda x: (x - np.min(x)) / (np.max(x) - np.min(x)) unit_score_1d = min_max_normalize(np.array(unit_score_1d)) uniform_score_1d = min_max_normalize(np.array(uniform_score_1d)) final_score_1d = unit_score_1d + uniform_score_1d ind = ind_1d[np.argmin(final_score_1d).item()] #if self.verbose: # print(ind) # print(ind_1d) # print(unit_score_1d) # print(uniform_score_1d) temp = np.sqrt(spc[:, ind[0]] ** 2 + spc[:, ind[1]] ** 2) return spc[:, ind] / np.mean(temp)
[docs] def fit(self): """ Fits the model and give the inferred pseudotime, and also its relationship to the gene expression. These outputs are essential for downstream analysis. :return: pseudotime: the pseudo-time for each cell, in the same order as the input, in a [0, 2\pi] scale. rotation: the rotation matrix """ tf_pretrain_burnin = tf.train.AdamOptimizer(50e-3).minimize(self.tf_pretrain_loss, var_list=self.pretrain_var_list) tf_pretrain_train = tf.train.AdamOptimizer(1e-3).minimize(self.tf_pretrain_loss, var_list=self.pretrain_var_list) tf_midtrain_burnin = tf.train.AdamOptimizer(50e-3).minimize(self.tf_L + self.tf_R * 1.0, var_list=self.midtrain_var_list) tf_midtrain_train = tf.train.AdamOptimizer(1e-3).minimize(self.tf_L + self.tf_R * 1.0, var_list=self.midtrain_var_list) #tf_burnin = tf.train.AdamOptimizer(50e-3).minimize(self.tf_L + self.tf_R * 0.2) tf_train = tf.train.AdamOptimizer(2e-4).minimize(self.tf_L + self.tf_R * 1.0) tf_refine = tf.train.AdamOptimizer(5e-5).minimize(self.tf_L + self.tf_R * 1.0) t1 = t0 = time.time() #saver = tf.train.Saver() self.sess = tf.Session() self.sess.run(tf.global_variables_initializer()) print("pretrain burnin") for i in range(2000): self.sess.run(tf_pretrain_burnin) if (i + 1) % 2000 == 0: print('step %5d: loss ' % (i + 1), end="") print(self.sess.run([self.tf_pretrain_loss, self.tf_L, self.tf_R]), end="") print(' time %.2f' % (time.time() - t1)) t1 = time.time() print("pretrain train") for i in range(4000): self.sess.run(tf_pretrain_train) if (i + 1) % 2000 == 0: print('step %5d: loss ' % (i + 1), end="") print(self.sess.run([self.tf_pretrain_loss, self.tf_L, self.tf_R]), end="") print(' time %.2f' % (time.time() - t1)) t1 = time.time() print("midtrain burnin") for i in range(2000): self.sess.run(tf_midtrain_burnin) if (i + 1) % 2000 == 0: print('step %5d: loss ' % (i + 1), end="") print(self.sess.run([self.tf_pretrain_loss, self.tf_L, self.tf_R]), end="") print(' time %.2f' % (time.time() - t1)) t1 = time.time() print("midtrain train") for i in range(4000): self.sess.run(tf_midtrain_train) if (i + 1) % 2000 == 0: print('step %5d: loss ' % (i + 1), end="") print(self.sess.run([self.tf_pretrain_loss, self.tf_L, self.tf_R]), end="") print(' time %.2f' % (time.time() - t1)) t1 = time.time() print("finaltrain train") for i in range(6000): self.sess.run(tf_train) if (i + 1) % 2000 == 0: print('step %5d: loss ' % (i + 1), end="") print(self.sess.run([self.tf_pretrain_loss, self.tf_L, self.tf_R]), end="") print(' time %.2f' % (time.time() - t1)) t1 = time.time() print("finaltrain refine") for i in range(10000): self.sess.run(tf_refine) if (i + 1) % 2000 == 0: print('step %5d: loss ' % (i + 1), end="") print(self.sess.run([self.tf_pretrain_loss, self.tf_L, self.tf_R]), end="") print(' time %.2f' % (time.time() - t1)) t1 = time.time() self.pseudotime = self.sess.run(self.tf_E_circular) self.rotation = self.sess.run(self.tf_V0) print('Full time %.2f' % (time.time() - t0)) #saver.save(self.sess, './' + rec_file_name) return self.pseudotime, self.rotation
[docs] def close(self): """ Close the TensorFlow Session. All information about the model will be **deleted**. :return: None """ self.sess.close()
[docs] def save(self): """ Save the Tensorflow Session for future use :return: """ pass
[docs] def generate(self, pseudotime=None): """ Given a pseudo-time, generate a "ideal cycling cell" :type pseudotime: float or numpy.matrix :param pseudotime: the pseudo-time. If not specified it will use the whole inferred pseudotime. :return: the gene expression of a "ideal cycling cell" """ if self.q_circular == 2: tf.concat([tf.cos(pseudotime), tf.sin(pseudotime)], 1) @ self.rotation else: np.concatenate([np.cos(pseudotime + i * 2 * np.pi / self.q_circular) for i in range(3)], axis=1) @ self.rotation
[docs] def correct(self): """ Correct the input expression matrix wrt cell cycle. :return: """ return self.Y - self.generate()
[docs] def predict(self): """ Given a cell, predict its pseudo-time, given a fitted model. The model will not be fitted again. :return: """ pass
[docs]class PreloadNaiveCyclum: def __init__(self, Y, q_circular=3, q_linear=0, seed=0): tf.set_random_seed(seed) self.q_circular = q_circular self.q_linear = q_linear self.q = q_linear + q_circular n, d = Y.shape self.tf_X = tf.constant(Y, dtype=tf.float32) # Linear Part self.tf_W = tf.Variable(tf.random_normal([d, self.q_linear]) / 4) self.tf_b = tf.Variable(tf.zeros([1, self.q_linear])) self.tf_E = (self.tf_X @ self.tf_W + self.tf_b) # Circular part self.tf_W_circular1 = tf.Variable(tf.random_normal([d, 30])) self.tf_b_circular1 = tf.Variable(tf.zeros([1, 30])) self.tf_E_circular1 = tf.tanh(self.tf_X @ self.tf_W_circular1 + self.tf_b_circular1) self.tf_W_circular2 = tf.Variable(tf.random_normal([30, 20])) self.tf_b_circular2 = tf.Variable(tf.zeros([1, 20])) self.tf_E_circular2 = tf.tanh(self.tf_E_circular1 @ self.tf_W_circular2 + self.tf_b_circular2) self.tf_W_circular3 = tf.Variable(tf.random_normal([20, 1])) self.tf_b_circular3 = tf.Variable(tf.zeros([1, 1])) self.tf_E_circular = self.tf_E_circular2 @ self.tf_W_circular3 + self.tf_b_circular3 # slice self.tf_E_linear = self.tf_E tf_phase = lambda x, num: tf.concat([tf.cos(x + i * 2 * np.pi / num) for i in range(num)], 1) tf_cossin = lambda x: tf.concat([tf.cos(x), tf.sin(x)], 1) self.tf_Z_circular = tf_phase(self.tf_E_circular, 3) self.tf_Z_linear = self.tf_E_linear # decode self.tf_D_circular = self.tf_Z_circular self.tf_D_linear = self.tf_Z_linear self.tf_V0 = tf.Variable(tf.random_normal([self.q_circular, d])) self.tf_V1 = tf.Variable(tf.random_normal([self.q_linear, d])) self.tf_c = tf.Variable(tf.zeros([1, d])) self.tf_X_r_circular = self.tf_D_circular @ self.tf_V0 self.tf_X_r = self.tf_X_r_circular + self.tf_D_linear @ self.tf_V1 + self.tf_c self.tf_R = tf.nn.l2_loss(self.tf_W) + tf.nn.l2_loss(self.tf_V1) + tf.nn.l2_loss(self.tf_V0) self.tf_L = tf.reduce_sum((self.tf_X - self.tf_X_r) ** 2)
[docs] def fit(self): tf_burnin = tf.train.AdamOptimizer(50e-3).minimize(self.tf_L + self.tf_R * 50) tf_train = tf.train.AdamOptimizer(5e-3).minimize(self.tf_L + self.tf_R * 50) tf_refine = tf.train.AdamOptimizer(1e-3).minimize(self.tf_L + self.tf_R * 50) t1 = t0 = time.time() saver = tf.train.Saver() sess = tf.Session() sess.run(tf.global_variables_initializer()) for i in range(10000): sess.run(tf_burnin) if (i + 1) % 5000 == 0: print('step %5d: loss ' % (i + 1), end="") print(sess.run([self.tf_L, self.tf_R]), end="") print('time %.2f' % (time.time() - t1)) t1 = time.time() for i in range(10000): sess.run(tf_train) if (i + 1) % 5000 == 0: print('step %5d: loss ' % (i + 1), end="") print(sess.run([self.tf_L, self.tf_R]), end="") print('time %.2f' % (time.time() - t1)) t1 = time.time() for i in range(10000): sess.run(tf_refine) if (i + 1) % 5000 == 0: print('step %5d: loss ' % (i + 1), end="") print(sess.run([self.tf_L, self.tf_R]), end="") print('time %.2f' % (time.time() - t1)) t1 = time.time() pseudotime = sess.run(self.tf_E_circular) rotation = sess.run(self.tf_V0) print(time.time() - t0) sess.close() return pseudotime, rotation
[docs]class Cyclum: """ Wraps the mathematical method up and provide more functions. """