Source code for src.geneticalgorithm

from __future__ import division
import numpy as np
from numpy.linalg import inv
import scipy
from scipy import linalg
from numpy import transpose as t
from scipy.special import gamma
from collections import Counter
import pandas as pd
from neurodesign import msequence, generate, report
import itertools
import scipy.linalg
import os
import sys
import numpy as np
import zipfile
import StringIO
import shutil
import copy
import sklearn.cluster
import time
import progressbar


[docs]class design(object): ''' This class represents an experimental design for an fMRI experiment. :param order: The stimulus order. :type order: list of integers :param ITI: The ITI's between all stimuli. :type ITI: list of floats :param experiment: The experimental setup. :type experiment: experiment object :param onsets: The onsets of all stimuli. :type onsets: list of floats ''' def __init__(self, order, ITI, experiment, onsets=None): self.order = order self.ITI = ITI self.onsets = onsets self.Fe = 0 self.Fd = 0 self.experiment = experiment # assert whether design is valid if not len(self.ITI) == experiment.n_trials: raise ValueError( "length of design (ITI's) does not comply with experiment") if not len(self.order) == experiment.n_trials: raise ValueError( "length of design (orders) does not comply with experiment")
[docs] def check_maxrep(self, maxrep): ''' Function to check whether design does not exceed maximum repeats within design. :param maxrep: How many times should a stimulus maximally be repeated. :type maxrep: integer :returns repcheck: Boolean indicating maximum repeats are respected ''' for stim in range(self.experiment.n_stimuli): repcheck = not ''.join( str(e) for e in [stim] * maxrep) in ''.join(str(e) for e in self.order) if repcheck == False: break return repcheck
[docs] def check_hardprob(self): ''' Function to check whether frequencies of stimuli are **exactly** the prespecified frequencies. :returns probcheck: Boolean indicating probabilities are respected ''' obscnt = Counter(self.order).values() obsprob = np.round(obscnt / np.sum(obscnt), decimals=2) if not len(self.experiment.P) == len(obsprob): probcheck = False else: close = np.isclose(np.array(self.experiment.P), np.array(obsprob), atol=0.05) if not np.sum(close) == len(obsprob): probcheck = False else: probcheck = True return probcheck
[docs] def crossover(self, other, seed=1234): """ Function to crossover design with other design and create offspring. :param other: The design with which the design will be mixed :type other: design object :param seed: The seed with which the change point will be sampled. :type seed: integer or None :returns offspring: List of two offspring designs. """ # check whether designs are compatible assert len(self.order) == len(other.order) np.random.seed(seed) changepoint = np.random.choice(len(self.order), 1)[0] offspringorder1 = list(self.order)[ :changepoint] + list(other.order)[changepoint:] offspringorder2 = list(other.order)[ :changepoint] + list(self.order)[changepoint:] offspring1 = design(order=offspringorder1, ITI=self.ITI, experiment=self.experiment) offspring2 = design(order=offspringorder2, ITI=other.ITI, experiment=self.experiment) return [offspring1, offspring2]
[docs] def mutation(self, q, seed=1234): ''' Function to mutate q% of the stimuli with another stimulus. :param q: The percentage of stimuli that should be mutated :type q: float :param seed: The seed with which the mutation points are sampled. :type seed: integer or None :returns mutated: Mutated design ''' np.random.seed(seed) mut_ind = np.random.choice(len(self.order), int( len(self.order) * q), replace=False) mutated = copy.copy(self.order) for mut in mut_ind: np.random.seed(seed) mut_stim = np.random.choice( self.experiment.n_stimuli, 1, replace=True)[0] mutated[mut] = mut_stim offspring = design(order=mutated, ITI=self.ITI, experiment=self.experiment) return offspring
[docs] def designmatrix(self): ''' Expand from order of stimuli to a fMRI timeseries. ''' # ITIs to onsets if self.experiment.restnum > 0: orderli = list(self.order) ITIli = list(self.ITI) for x in np.arange(0, self.experiment.n_trials, self.experiment.restnum)[1:][::-1]: orderli.insert(x, "R") ITIli.insert(x, self.experiment.restdur) ITIli = [y+self.experiment.trial_duration if not x == "R" else y for x, y in zip(orderli, ITIli)] onsets = np.cumsum(ITIli) - ITIli[0] self.onsets = [y for x, y in zip(orderli, onsets) if not x == "R"] else: ITIli = np.array(self.ITI) + self.experiment.trial_duration self.onsets = np.cumsum(ITIli) - ITIli[0] stimonsets = [x + self.experiment.t_pre for x in self.onsets] # round onsets to resolution self.ITI = [np.floor(x / self.experiment.resolution) * self.experiment.resolution for x in self.ITI] onsetX = [np.floor(x / self.experiment.resolution) * self.experiment.resolution for x in stimonsets] stimonsets = onsetX # find indices in resolution scale of stimuli if np.max(onsetX)>np.max(self.experiment.r_tp): print("WARNING: the latest onset is later than the duration of the experiment. Cannot compute efficiency.") return False XindStim = [int(np.where(self.experiment.r_tp == y)[0]) for y in onsetX] # create design matrix in resolution scale (=deltasM in Kao toolbox) X_X = np.zeros([self.experiment.n_tp, self.experiment.n_stimuli]) stim_duration_tp = int( self.experiment.stim_duration / self.experiment.resolution) for stimulus in xrange(self.experiment.n_stimuli): for dur in xrange(stim_duration_tp): X_X[np.array(XindStim) + dur, int(stimulus) ] = [1 if z == stimulus else 0 for z in self.order] # deconvolved matrix in resolution units deconvM = np.zeros([self.experiment.n_tp, int( self.experiment.laghrf * self.experiment.n_stimuli)]) for stim in xrange(self.experiment.n_stimuli): for j in xrange(int(self.experiment.laghrf)): deconvM[j:, self.experiment.laghrf * stim + j] = X_X[:(self.experiment.n_tp - j), stim] # downsample to TR idx = [int(x) for x in np.arange(0, self.experiment.n_tp, self.experiment.TR / self.experiment.resolution)] deconvMdown = deconvM[idx, :] Xwhite = np.dot( np.dot(t(deconvMdown), self.experiment.white), deconvMdown) # convolve design matrix X_Z = np.zeros([self.experiment.n_tp, self.experiment.n_stimuli]) for stim in range(self.experiment.n_stimuli): X_Z[:, stim] = deconvM[:, (stim * self.experiment.laghrf):( (stim + 1) * self.experiment.laghrf)].dot(self.experiment.basishrf) # downsample to TR idx = [int(x) for x in np.arange(0, self.experiment.n_tp, self.experiment.TR / self.experiment.resolution)] X_Z = X_Z[idx, :] X_X = X_X[idx, :] Zwhite = t(X_Z) * self.experiment.white * X_Z self.X = Xwhite self.Z = Zwhite self.Xconv = X_Z self.Xnonconv = X_X self.CX = self.experiment.CX self.C = self.experiment.C return self
[docs] def FeCalc(self, Aoptimality=True): ''' Compute estimation efficiency. :param Aoptimality: Kind of optimality to optimize, A- or D-optimality :type Aoptimality: boolean ''' try: invM = scipy.linalg.inv(self.X) except scipy.linalg.LinAlgError: try: invM = scipy.linalg.pinv(self.X) except numpy.linalg.linalg.LinAlgError: invM = np.nan sys.exc_clear() invM = np.array(invM) st1 = np.dot(self.CX, invM) CMC = np.dot(st1, t(self.CX)) if Aoptimality == True: self.Fe = float(self.CX.shape[0] / np.matrix.trace(CMC)) else: self.Fe = float(np.linalg.det(CMC)**(-1 / len(self.C))) self.Fe = self.Fe / self.experiment.FeMax return self
[docs] def FdCalc(self, Aoptimality=True): ''' Compute detection power. :param Aoptimality: Kind of optimality to optimize: A- or D-optimality :type Aoptimality: boolean ''' try: invM = scipy.linalg.inv(self.Z) except scipy.linalg.LinAlgError: try: invM = scipy.linalg.pinv(self.Z) except numpy.linalg.linalg.LinAlgError: invM = np.nan sys.exc_clear() invM = np.array(invM) CMC = np.matrix(self.C) * invM * np.matrix(t(self.C)) if Aoptimality == True: self.Fd = float(len(self.C) / np.matrix.trace(CMC)) else: self.Fd = float(np.linalg.det(CMC)**(-1 / len(self.C))) self.Fd = self.Fd / self.experiment.FdMax return self
[docs] def FcCalc(self, confoundorder=3): ''' Compute confounding efficiency. :param confoundorder: To what order should confounding be protected :type confoundorder: integer ''' Q = np.zeros([self.experiment.n_stimuli, self.experiment.n_stimuli, confoundorder]) for n in xrange(len(self.order)): for r in np.arange(1, confoundorder + 1): if n > (r - 1): Q[self.order[n], self.order[n - r], r - 1] += 1 Qexp = np.zeros([self.experiment.n_stimuli, self.experiment.n_stimuli, confoundorder]) for si in xrange(self.experiment.n_stimuli): for sj in xrange(self.experiment.n_stimuli): for r in np.arange(1, confoundorder + 1): Qexp[si, sj, r - 1] = self.experiment.P[si] * \ self.experiment.P[sj] * (self.experiment.n_trials + 1) Qmatch = np.sum(abs(Q - Qexp)) self.Fc = Qmatch self.Fc = 1 - self.Fc / self.experiment.FcMax return self
[docs] def FfCalc(self): ''' Compute efficiency of frequencies. ''' trialcount = Counter(self.order) Pobs = [trialcount[x] for x in xrange(self.experiment.n_stimuli)] self.Ff = np.sum(abs(np.array( Pobs) - np.array(self.experiment.n_trials * np.array(self.experiment.P)))) self.Ff = 1 - self.Ff / self.experiment.FfMax return self
[docs] def FCalc(self, weights,Aoptimality=True,confoundorder=3): ''' Compute weighted average of efficiencies. :param weights: Weights given to each of the efficiency metrics in this order: Estimation, Detection, Frequencies, Confounders. :type weights: list of floats ''' if weights[0]>0: self.FeCalc(Aoptimality) if weights[1]>0: self.FdCalc(Aoptimality) self.FfCalc() self.FcCalc(confoundorder) matr = np.array([self.Fe, self.Fd, self.Ff, self.Fc]) self.F = np.sum(weights * matr) return self
[docs]class experiment(object): ''' This class represents an fMRI experiment. :param TR: The repetition time. :type TR: float :param P: The probabilities of each trialtype. :type P: ndarray :param C: The contrast matrix. Example: np.array([[1,-1,0],[0,1,-1]]) :type C: ndarray :param rho: AR(1) correlation coefficient :type rho: float :param n_stimuli: The number of stimuli (or conditions) in the experiment. :type n_stimuli: integer :param n_trials: The number of trials in the experiment. Either specify n_trials **or** duration. :type n_trials: integer :param duration: The total duration (seconds) of the experiment. Either specify n_trials **or** duration. :type duration: float :param resolution: the maximum resolution of design matrix :type resolution: float :param stim_duration: duration (seconds) of stimulus :type stim_duration: float :param t_pre: duration (seconds) of trial part before stimulus presentation (eg. fixation cross) :type t_pre: float :param t_post: duration (seconds) of trial part after stimulus presentation :type t_post: float :param maxrep: maximum number of repetitions :type maxrep: integer or None :param hardprob: can the probabilities differ from the nominal value? :type hardprob: boolean :param confoundorder: The order to which confounding is controlled. :type confoundorder: integer :param restnum: Number of trials between restblocks :type restnum: integer :param restdur: duration (seconds) of the rest blocks :type restdur: float :param ITImodel: Which model to sample from. Possibilities: "fixed","uniform","exponential" :type ITImodel: string :param ITImin: The minimum ITI (required with "uniform" or "exponential") :type ITImin: float :param ITImean: The mean ITI (required with "fixed" or "exponential") :type ITImean: float :param ITImax: The max ITI (required with "uniform" or "exponential") :type ITImax: float ''' def __init__(self, TR, P, C, rho, stim_duration, n_stimuli, ITImodel=None, ITImin=None, ITImax=None, ITImean=None, restnum=0, restdur=0, t_pre=0, t_post=0, n_trials=None, duration=None, resolution=0.1, FeMax=1, FdMax=1, FcMax=1, FfMax=1, maxrep=None, hardprob=False, confoundorder=3): self.TR = TR self.P = P self.C = C self.rho = rho self.n_stimuli = n_stimuli self.t_pre = t_pre self.t_post = t_post self.n_trials = n_trials self.duration = duration self.resolution = resolution self.stim_duration = stim_duration self.maxrep = maxrep self.hardprob = hardprob self.confoundorder = confoundorder self.ITImodel = ITImodel self.ITImin = ITImin self.ITImean = ITImean self.ITImax = ITImax self.ITIlam = None self.restnum = restnum self.restdur = restdur self.FeMax = FeMax self.FdMax = FdMax self.FcMax = FcMax self.FfMax = FfMax self.countstim() self.CreateTsComp() self.CreateLmComp() self.max_eff()
[docs] def max_eff(self): ''' Function to compute maximum efficiency for Confounding and Frequency efficiency. ''' NulDesign = design( order=[np.argmin(self.P)] * self.n_trials, ITI=[self.ITImean] * self.n_trials, experiment=self ) NulDesign.designmatrix() NulDesign.FcCalc(self.confoundorder) self.FcMax = 1 - NulDesign.Fc NulDesign.FfCalc() self.FfMax = 1 - NulDesign.Ff return self
[docs] def countstim(self): ''' Function to compute some arguments depending on other arguments. ''' self.trial_duration = self.stim_duration + self.t_pre + self.t_post if self.ITImodel == "uniform": self.ITImean = (self.ITImax + self.ITImin) / 2 if self.duration: if not self.restnum == 0: # duration of block between rest blockdurNR = self.restnum * \ (self.ITImean + self.trial_duration) blockdurWR = blockdurNR + self.restdur # duration of block including rest # number of blocks blocknum = np.floor(self.duration / blockdurWR) n_trials = blocknum * self.restnum remain = self.duration - (blocknum * blockdurWR) if remain >= blockdurNR: n_trials = n_trials + self.restnum else: extratrials = np.floor( remain / (self.ITImean + self.trial_duration)) n_trials = n_trials + extratrials self.n_trials = int(n_trials) else: self.n_trials = int( self.duration / (self.ITImean + self.trial_duration)) else: ITIdur = self.n_trials * self.ITImean TRIALdur = self.n_trials * self.trial_duration duration = ITIdur + TRIALdur if self.restnum > 0: duration = duration + \ (np.floor(self.n_trials / self.restnum) * self.restdur) self.duration = duration
[docs] def CreateTsComp(self): ''' This function computes the number of scans and timpoints (in seconds and resolution units) ''' self.n_scans = int(np.ceil(self.duration / self.TR)) # number of scans # number of timepoints (in resolution) self.n_tp = int(np.ceil(self.duration / self.resolution)) self.r_scans = np.arange(0, self.duration, self.TR) self.r_tp = np.arange(0, self.duration, self.resolution) return self
[docs] def CreateLmComp(self): ''' This function generates components for the linear model: hrf, whitening matrix, autocorrelation matrix and CX ''' # hrf self.canonical(0.1) # contrasts # expand contrasts to resolution of HRF (?) self.CX = np.kron(self.C, np.eye(self.laghrf)) # drift self.S = self.drift(np.arange(0, self.n_scans)) # [tp x 1] self.S = np.matrix(self.S) # square of the whitening matrix base = [1 + self.rho**2, -1 * self.rho] + [0] * (self.n_scans - 2) self.V2 = scipy.linalg.toeplitz(base) self.V2[0, 0] = 1 self.V2 = np.matrix(self.V2) self.V2[self.n_scans - 1, self.n_scans - 1] = 1 self.white = self.V2 - self.V2 * \ t(self.S) * np.linalg.pinv(self.S * self.V2 * t(self.S)) * self.S * self.V2 return self
[docs] def canonical(self, resolution): ''' This function generates the canonical hrf :param resolution: resolution to sample the canonical hrf :type resolution: float ''' # translated from spm_hrf p = [6, 16, 1, 1, 6, 0, 32] dt = resolution / 16. s = np.array(xrange(int(p[6] / dt + 1))) # HRF sampled at 0.1 s hrf = self.spm_Gpdf(s, p[0] / p[2], dt / p[2]) - \ self.spm_Gpdf(s, p[1] / p[3], dt / p[3]) / p[4] hrf = hrf[[int(x) for x in np.array( xrange(int(p[6] / resolution + 1))) * 16.]] self.hrf = hrf / np.sum(hrf) # HRF sampled at resolution self.basishrf = self.hrf[[int(x) for x in np.arange( 0, len(self.hrf) - 1, self.resolution * 10)]] # duration of the HRF self.durhrf = 32.0 # length of the HRF parameters in resolution scale self.laghrf = int(np.ceil(self.durhrf / self.resolution)) return self
@staticmethod
[docs] def drift(s, deg=3): ''' Function to compute a drift component ''' S = np.ones([deg, len(s)]) s = np.array(s) tmpt = np.array(2. * s / float(len(s) - 1) - 1) S[1] = tmpt for k in np.arange(2, deg): S[k] = ((2. * k - 1.) / k) * tmpt * S[k - 1] - \ ((k - 1) / float(k)) * S[k - 2] return S
@staticmethod
[docs] def spm_Gpdf(s, h, l): ''' Function to generate gamma pdf ''' s = np.array(s) res = (h - 1) * np.log(s) + h * np.log(l) - l * s - np.log(gamma(h)) return np.exp(res)
[docs]class population(object): ''' This class represents the population of experimental designs for fMRI. :param experiment: The experimental setup of the fMRI experiment. :type experiment: experiment :param G: The size of the generation :type G: integer :param R: with which rate are the orders generated from ['blocked','random','mseq'] :type R: list :param q: percentage of mutations :type q: float :param weights: weights attached to Fe, Fd, Ff, Fc :type weights: list :param I: number of immigrants :type I: integer :param preruncycles: number of prerun cycles (to find maximum Fe and Fd) :type preruncycles: integer :param cycles: number of cycles :type cycles: integer :param seed: seed :type seed: integer :param Aoptimality: optimises A-optimality if true, else D-optimality :type Aoptimality: boolean :param convergence: after how many stable iterations is there convergence :type convergence: integer :param folder: folder to save output :type folder: string :param outdes: number of designs to be saved :type outdes: integer ''' def __init__(self, experiment, weights, preruncycles, cycles, seed=None, I=4, G=20, R=[0.4, 0.4, 0.2], q=0.01, Aoptimality=True, folder=None, outdes=3, convergence=1000): self.exp = experiment self.G = G self.R = R self.q = q self.weights = weights self.I = I self.preruncycles = preruncycles self.cycles = cycles self.convergence = convergence self.Aoptimality = Aoptimality self.outdes = outdes self.folder = folder if seed: self.seed = seed else: self.seed = np.random.randint(10000) self.designs = [] self.optima = [] self.bestdesign = None self.cov = None
[docs] def change_seed(self): ''' Function to change the seed. ''' if self.seed < 4 * 10**9: self.seed = self.seed + 1000 else: self.seed = 1 return self
[docs] def check_develop(self, design, weights=None): ''' Function to check and develop a design to the population. Function will check design against strict options and develop the design if valid. :param design: Design to be added to population. :type design: design object :param weights: weights for efficiency calculation. :type weights: list of floats, summing to 1 ''' # weights if weights == None: weights = self.weights # check maxrep, hardprob, every stimulus at least once if not self.exp.maxrep == None: if not design.check_maxrep(self.exp.maxrep): return False if self.exp.hardprob: if not design.check_hardprob(): return False if len(np.unique(design.order)) < self.exp.n_stimuli: return False # develop out = design.designmatrix() if out == False: return False design.FCalc(weights,confoundorder=self.exp.confoundorder,Aoptimality=self.Aoptimality) if np.isnan(design.F): return False return design
[docs] def add_new_designs(self, weights=None, R=None): ''' This function generates the population. :param experiment: The experimental setup of the fMRI experiment. :type experiment: experiment :param weights: weights for efficiency calculation. :type weights: list of floats, summing to 1 :param seed: The seed for ramdom processes. :type seed: integer or None ''' # weights if weights == None: weights = self.weights if not R: R = np.round(np.array(self.R) * self.G).tolist() if self.exp.n_stimuli in [6, 10] and R[2] > 0: print("warning: for this number of conditions/stimuli, there are no msequences possible. Replaced by random designs.") R[1] = R[1] + R[2] R[2] = 0 NDes = 0 self.change_seed() k = 0 while NDes < np.sum(R): self.change_seed() ind = np.sum(NDes >= np.cumsum(R)) ordertype = ['blocked', 'random', 'msequence'][ind] order = generate.order(self.exp.n_stimuli, self.exp.n_trials, self.exp.P, ordertype=ordertype, seed=self.seed) ITI, ITIlam = generate.iti(ntrials=self.exp.n_trials, model=self.exp.ITImodel, min=self.exp.ITImin, max=self.exp.ITImax, mean=self.exp.ITImean, lam=self.exp.ITIlam, seed=self.seed) if ITIlam: self.exp.ITIlam = ITIlam des = design(order=order, ITI=ITI, experiment=self.exp) fulldes = self.check_develop(des, weights) if fulldes == False: continue else: self.designs.append(fulldes) NDes = NDes + 1 return self
[docs] def to_next_generation(self, weights=None, seed=1234): ''' This function goes from one generation to the next. :param weights: weights for efficiency calculation. :type weights: list of floats, summing to 1 :param seed: The seed for random processes. :type seed: integer or None ''' # remove duplicates and replace by random designs n = 0 rm = 0 while n == 0: orders = [x.order for x in self.designs] cors = np.corrcoef(orders) isone = np.isclose(cors, 1.) if len(isone) == 1: n = 1 else: np.fill_diagonal(isone, 0) if np.sum(isone) == 0: n = 1 else: ind = np.where(isone) remove = ind[1][ind[0] == ind[0][0]] self.designs = [des for ind, des in enumerate( self.designs) if not ind in remove] rm = rm + len(remove) self.add_new_designs(R=[0, rm, 0], weights=weights) # Mutation: # if: Best design: stay untouched # elif Correlation between all is > 0.8: mutate with 20% mutations # else: mutate with 5% mutations # for all: if conditions are not fulfilled: not mutated signals = [x.Xconv for x in self.designs] efficiencies = [x.F for x in self.designs] cors = self.pearsonr(signals, self.exp.n_stimuli) mncor = np.mean(cors) for idx in range(len(self.designs)): design = self.designs[idx] if design.F == np.max(efficiencies): offspring = design elif mncor > 0.6: offspring = design.mutation(0.2, seed=seed) offspring = self.check_develop(offspring, weights) else: offspring = design.mutation(self.q, seed=seed) offspring = self.check_develop(offspring, weights) if offspring == False: continue else: self.designs[idx] = offspring # weights if weights == None: weights = self.weights # crossover # select designs with F>median(F): efficiencies = [x.F for x in self.designs] #crossind = [ind for ind,val in enumerate(efficiencies) if val >= np.median(efficiencies)] crossind = range(len(self.designs)) nparents = int(len(crossind)) npairs = int(nparents / 2.) np.random.seed(seed) CouplingRnd = np.random.choice( nparents, size=(npairs * 2), replace=False) CouplingRnd = [crossind[x] for x in CouplingRnd] CouplingRnd = [[CouplingRnd[i], CouplingRnd[i + 1]] for i in np.arange(0, npairs * 2, 2)] count = 0 for couple in CouplingRnd: baby1, baby2 = self.designs[couple[0]].crossover( self.designs[couple[1]], seed=seed) for baby in [baby1, baby2]: baby = self.check_develop(baby, weights) if baby == False: continue else: self.designs.append(baby) count = count + 1 # immigration noim = self.I R = np.ceil(np.array(self.R) * noim).tolist() self.add_new_designs(R=R, weights=weights) # inspect efficiencies efficiencies = [x.F for x in self.designs] maximum = np.max(efficiencies) self.optima.append(maximum) bestind = [ind for ind, val in enumerate( efficiencies) if val == maximum][0] self.bestdesign = self.designs[bestind] # append best designs to lists # check convergence gen = len(self.optima) if gen > 1000: if self.optima[-1] > self.optima[gen - 1000]: self.finished = True # select best G cutoff = np.sort(efficiencies)[::-1][self.G] self.designs = [des for ind, des in enumerate( self.designs) if des.F >= cutoff] return self
[docs] def clear(self): ''' Function to clear results between optimalisations (maximum Fe, Fd or opt) ''' self.designs = [] self.optima = [] self.finished = False self.change_seed() if self.bestdesign: bestdes = design(order=self.bestdesign.order, ITI=self.bestdesign.ITI, experiment=self.exp) bestdes = self.check_develop(bestdes) if not bestdes == False: self.designs.append(bestdes) self.bestdesign = None return self
[docs] def naturalselection(self): ''' Function to run natural selection for design optimization ''' if (self.exp.FcMax == 1 and self.exp.FfMax == 1): self.exp.max_eff() if self.weights[0] > 0: # add new designs self.clear() self.add_new_designs(weights=[1, 0, 0, 0]) # loop bar = progressbar.ProgressBar() for generation in bar(range(self.preruncycles)): self.to_next_generation(seed=self.seed, weights=[1, 0, 0, 0]) if self.finished: continue self.exp.FeMax = np.max(self.bestdesign.F) if self.weights[1] > 0: self.clear() self.add_new_designs(weights=[0, 1, 0, 0]) # loop bar = progressbar.ProgressBar() for generation in bar(range(self.preruncycles)): self.to_next_generation(seed=self.seed, weights=[0, 1, 0, 0]) if self.finished: continue self.exp.FdMax = np.max(self.bestdesign.F) # clear all attributes self.clear() self.add_new_designs() # loop bar = progressbar.ProgressBar() for generation in bar(range(self.cycles)): self.to_next_generation(seed=self.seed) if self.finished: continue return self
def print_cmd(self): cm1 = "EXP = geneticalgorithm.experiment( \n" \ " TR = {0}, \n" \ " P = {1}, \n" \ " C = {2}, \n" \ " rho = {3}, \n" \ " n_stimuli = {4}, \n" \ " n_trials = {5}, \n" \ " duration = {6}, \n" \ " resolution = {7}, \n" \ " stim_duration = {8}, \n" \ " t_pre = {9}, \n" \ " t_post = {10}, \n" \ " maxrep = {11}, \n" \ " hardprob = {12}, \n" \ " confoundorder = {13}, \n" \ " ITImodel = '{14}', \n" \ " ITImin = {15}, \n" \ " ITImean = {16}, \n" \ " ITImax = {17}, \n" \ " restnum = {18}, \n" \ " restdur = {19}) \n".format( self.exp.TR, self.exp.P if type( self.exp.P) == list else self.exp.P.tolist(), self.exp.C if type( self.exp.C) == list else self.exp.C.tolist(), self.exp.rho, self.exp.n_stimuli, self.exp.n_trials, self.exp.duration, self.exp.resolution, self.exp.stim_duration, self.exp.t_pre, self.exp.t_post, self.exp.maxrep if self.exp.maxrep else 'None', self.exp.hardprob, self.exp.confoundorder, self.exp.ITImodel, self.exp.ITImin, self.exp.ITImean, self.exp.ITImax, self.exp.restnum, self.exp.restdur) cm2 = "POP = geneticalgorithm.population( \n" \ " experiment = EXP, \n" \ " G = {0}, \n" \ " R = {1}, \n" \ " q = {2}, \n" \ " weights = {3}, \n" \ " I = {4}, \n" \ " preruncycles = {5}, \n" \ " cycles = {6}, \n" \ " convergence = {7}, \n" \ " seed = {8}, \n" \ " folder = '{9}') \n".format( self.G, self.R, self.q, self.weights if type( self.weights) == list else self.weights.tolist(), self.I, self.preruncycles, self.cycles, self.convergence, self.seed, "/local/") self.cmd = cm1 + "\n\n" + cm2 + "\n" return self def evaluate(self): # select designs: best from k-means clusters shape = self.bestdesign.Xconv.shape xdim = np.zeros(np.product(shape)) des = np.zeros([np.product(shape), len(self.designs)]) efficiencies = np.array([x.F for x in self.designs]) for d in range(len(self.designs)): hrf = [] for stim in range(shape[1]): hrf = hrf + self.designs[d].Xconv[:, stim].tolist() des[:, d] = hrf clus = sklearn.cluster.k_means(des.T, self.outdes,random_state=self.seed)[1] out = [] des = [] cl = [] first = 0 for c in range(self.outdes): ids = np.where(clus==c)[0] id_ordered = ids[np.flipud(np.argsort(efficiencies[ids]))] out.append(first) for d in id_ordered: cl.append(c) des.append(self.designs[d]) first = first+1 self.designs = des self.out = out self.clus = cl signals = [x.Xconv for x in self.designs] co = self.pearsonr(signals,3) self.cov = co return self def download(self): if not self.folder: raise ValueError('No folder defined to download output.') else: if self.cov==None: self.evaluate() # empty folder if os.path.exists(self.folder): files = os.listdir(self.folder) for f in files: if 'design_' in f: shutil.rmtree(os.path.join(self.folder, f)) else: os.mkdir(self.folder) reportfile = "report.pdf" report.make_report(self, os.path.join(self.folder, reportfile)) files = [] for des in range(self.outdes): os.mkdir(os.path.join(self.folder, "design_" + str(des))) design = self.designs[self.out[des]] for stim in range(self.exp.n_stimuli): onsetsfile = os.path.join( "design_" + str(des), "stimulus_" + str(stim) + ".txt") onsubsets = [str(x) for x in np.array(design.onsets)[ np.array(design.order) == stim]] f = open(os.path.join(self.folder, onsetsfile), 'w+') for line in onsubsets: f.write(line) f.write("\n") f.close() files.append(onsetsfile) itifile = os.path.join("design_" + str(des), "ITIs.txt") f = open(os.path.join(self.folder, itifile), 'w+') for line in design.ITI: f.write(str(line)) f.write("\n") f.close() files.append(itifile) files.append(reportfile) # zip up zip_subdir = "OptimalDesign" self.zip_filename = "%s.zip" % zip_subdir self.file = StringIO.StringIO() zf = zipfile.ZipFile(self.file, "w") for fpath in files: zf.write(os.path.join(self.folder, fpath), os.path.join(zip_subdir, fpath)) zf.close() return self @staticmethod def pearsonr(signals, nstim): cor = [] varcov = np.zeros([len(signals), len(signals)]) for sig1 in range(len(signals)): for sig2 in range(sig1, len(signals)): cors = np.diag(np.corrcoef( t(signals[sig1]), t(signals[sig2]))[nstim:, :nstim]) varcov[sig1, sig2] = np.mean(cors) varcov[sig2, sig1] = np.mean(cors) return varcov