from __future__ import print_function from keras.models import Sequential from keras.layers.core import Dense, Activation, Dropout from keras.layers.recurrent import LSTM from keras.datasets.data_utils import get_file import numpy as np import random, sys from recurrent import StatefulGRU as SGRU ''' Example modified from Keras's script to generate text from Nietzsche's writings. At least 20 epochs are required before the generated text starts sounding coherent. It is recommended to run this script on GPU, as recurrent networks are quite computationally intensive. If you try this script on new data, make sure your corpus has at least ~100k characters. ~1M is better. ''' import numpy as np import keras.backend as K from keras.callbacks import Callback class ResetRNNState(Callback): """ This is supposed to be used with stateful RNNs like seya.layers.recurrent.StatefulGRU h: the rnn state func: a function that returns true when the state should be reset to zero """ def __init__(self, h, func): self.h = h self.func = func def on_batch_end(self, batch, logs={}): if self.func(batch, logs): self.h.set_value(self.h.get_value()*0) class RenormalizeWeight(Callback): def __init__(self, W, transpose=False): Callback.__init__(self) self.W = W self.W_shape = K.get_value(self.W).shape self.transpose = transpose def on_batch_begin(self, batch, logs={}): W = self.W.get_value() if len(self.W_shape) == 4: if self.transpose: W = W.transpose(1, 0, 2, 3) this_shape = W.shape W = W.reshape((self.W_shape[0], -1)) elif self.transpose: W = W.T norm = np.sqrt((W**2).sum(axis=-1)) W /= norm[:, None] if self.transpose: if len(self.W_shape) == 4: W = W.reshape(this_shape) W = W.transpose(1, 0, 2, 3) else: W = W.T self.W.set_value(W) import types import theano import theano.tensor as T from keras.layers.recurrent import Recurrent, GRU from keras import backend as K def _get_reversed_input(self, train=False): if hasattr(self, 'previous'): X = self.previous.get_output(train=train) else: X = self.input return X[::-1] class Bidirectional(Recurrent): def __init__(self, forward=None, backward=None, return_sequences=False, forward_conf=None, backward_conf=None): assert forward is not None or forward_conf is not None, "Must provide a forward RNN or a forward configuration" assert backward is not None or backward_conf is not None, "Must provide a backward RNN or a backward configuration" super(Bidirectional, self).__init__() if forward is not None: self.forward = forward else: # Must import inside the function, because in order to support loading # we must import this module inside layer_utils... ugly from keras.utils.layer_utils import container_from_config self.forward = container_from_config(forward_conf) if backward is not None: self.backward = backward else: from keras.utils.layer_utils import container_from_config self.backward = container_from_config(backward_conf) self.return_sequences = return_sequences self.output_dim = self.forward.output_dim + self.backward.output_dim if not (self.return_sequences == self.forward.return_sequences == self.backward.return_sequences): raise ValueError("Make sure 'return_sequences' is equal for self," " forward and backward.") def build(self): self.input = T.tensor3() self.forward.input = self.input self.backward.input = self.input self.forward.build() self.backward.build() self.trainable_weights = self.forward.trainable_weights + self.backward.trainable_weights def set_previous(self, layer): assert self.nb_input == layer.nb_output == 1, "Cannot connect layers: input count and output count should be 1." if hasattr(self, 'input_ndim'): assert self.input_ndim == len(layer.output_shape), "Incompatible shapes: layer expected input with ndim=" +\ str(self.input_ndim) + " but previous layer has output_shape " + str(layer.output_shape) self.forward.set_previous(layer) self.backward.set_previous(layer) self.backward.get_input = types.MethodType(_get_reversed_input, self.backward) self.previous = layer self.build() @property def output_shape(self): input_shape = self.input_shape output_dim = self.output_dim if self.return_sequences: return (input_shape[0], input_shape[1], output_dim) else: return (input_shape[0], output_dim) def get_output(self, train=False): Xf = self.forward.get_output(train) Xb = self.backward.get_output(train) Xb = Xb[::-1] return T.concatenate([Xf, Xb], axis=-1) def get_config(self): return {'name': self.__class__.__name__, 'forward_conf': self.forward.get_config(), 'backward_conf': self.backward.get_config(), 'return_sequences': self.return_sequences} class StatefulGRU(GRU): def __init__(self, batch_size, output_dim=128, init='glorot_uniform', inner_init='orthogonal', activation='sigmoid', inner_activation='hard_sigmoid', weights=None, return_sequences=False, input_dim=None, input_length=None, **kwargs): self.batch_size = batch_size self.input_dim = input_dim self.input_length = input_length if self.input_dim: kwargs['input_shape'] = (self.input_length, self.input_dim) super(StatefulGRU, self).__init__( output_dim, init=init, inner_init=inner_init, activation=activation, inner_activation=inner_activation, weights=weights, return_sequences=return_sequences, input_dim=input_dim, input_length=input_length, **kwargs) def build(self): super(StatefulGRU, self).build() self.h = K.zeros((self.batch_size, self.output_dim)) # Here is the state def get_output(self, train=False): X = self.get_input(train) padded_mask = self.get_padded_shuffled_mask(train, X, pad=1) X = X.dimshuffle((1, 0, 2)) x_z = T.dot(X, self.W_z) + self.b_z x_r = T.dot(X, self.W_r) + self.b_r x_h = T.dot(X, self.W_h) + self.b_h outputs, updates = theano.scan( self._step, sequences=[x_z, x_r, x_h, padded_mask], outputs_info=self.h[:X.shape[1]], non_sequences=[self.U_z, self.U_r, self.U_h]) self.updates = ((self.h, outputs[-1]), ) # initial state of next batch # is the last state of this # batch if self.return_sequences: return outputs.dimshuffle((1, 0, 2)) return outputs[-1] def init_updates(self): self.get_output(train=True) def get_config(self): return {"name": self.__class__.__name__, "input_dim": self.input_dim, "output_dim": self.output_dim, "init": self.init.__name__, "inner_init": self.inner_init.__name__, "activation": self.activation.__name__, "inner_activation": self.inner_activation.__name__, "return_sequences": self.return_sequences} #path = get_file('nietzsche.txt', origin="https://s3.amazonaws.com/text-datasets/nietzsche.txt") text = open("input.txt").read().lower() print('corpus length:', len(text)) chars = set(text) print('total chars:', len(chars)) char_indices = dict((c, i) for i, c in enumerate(chars)) indices_char = dict((i, c) for i, c in enumerate(chars)) # cut the text in semi-redundant sequences of maxlen characters batch_size = 128 maxlen = 20 step = 3 sentences = [] next_chars = [] for i in range(0, len(text) - maxlen, step): sentences.append(text[i : i + maxlen]) next_chars.append(text[i + maxlen]) print('nb sequences:', len(sentences)) print('Vectorization...') X = np.zeros((len(sentences), maxlen, len(chars)), dtype=np.bool) y = np.zeros((len(sentences), len(chars)), dtype=np.bool) for i, sentence in enumerate(sentences): for t, char in enumerate(sentence): X[i, t, char_indices[char]] = 1 y[i, char_indices[next_chars[i]]] = 1 # build the model: 2 stacked Stateful GRUs print('Build model...') model = Sequential() model.add(SGRU(input_shape=(None, len(chars)), batch_size=batch_size, output_dim=512, return_sequences=True)) model.add(Dropout(0.2)) model.add(SGRU(batch_size=batch_size, output_dim=512, return_sequences=False)) model.add(Dropout(0.2)) model.add(Dense(len(chars))) model.add(Activation('softmax')) # define callbacks to reset state after every 4 batches h1_reset = ResetRNNState(model.layers[0].h, lambda batches, logs: batches%4 == 0) h2_reset = ResetRNNState(model.layers[2].h, lambda batches, logs: batches%4 == 0) model.compile(loss='categorical_crossentropy', optimizer='rmsprop') # helper function to sample an index from a probability array def sample(a, temperature=1.0): a = np.log(a)/temperature a = np.exp(a)/np.sum(np.exp(a)) return np.argmax(np.random.multinomial(1,a,1)) # train the model, output generated text after each iteration for iteration in range(1, 60): print() print('-' * 50) print('Iteration', iteration) model.fit(X, y, batch_size=batch_size, nb_epoch=1, callbacks=[h1_reset, h2_reset]) start_index = random.randint(0, len(text) - maxlen - 1) for diversity in [0.2, 0.5, 1.0, 1.2]: print() print('----- diversity:', diversity) generated = '' sentence = text[start_index : start_index + maxlen] generated += sentence print('----- Generating with seed: "' + sentence + '"') sys.stdout.write(generated) for iteration in range(400): x = np.zeros((1, maxlen, len(chars))) for t, char in enumerate(sentence): x[0, t, char_indices[char]] = 1. preds = model.predict(x, verbose=0)[0] next_index = sample(preds, diversity) next_char = indices_char[next_index] generated += next_char sentence = sentence[1:] + next_char sys.stdout.write(next_char) sys.stdout.flush() print()