{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Serial Version and Spark Tuning (word-level correction)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# word_level_documentation.ipynb\n", "\n", "######################\n", "#\n", "# Submission by Kendrick Lo (Harvard ID: 70984997) for\n", "# CS 205 - Computing Foundations for Computational Science (Prof. R. Jones)\n", "# Advisor: A. Peleg\n", "# \n", "# This is part of a joint project with Gioia Dominedo that includes a separate\n", "# component for context checking. This notebook outlines algorithms for\n", "# word-level correction, and includes a serial Python algorithm based on a third\n", "# party algorithm (namely SymSpell, see below), as well as a Spark/Python\n", "# algorithm. A number of optimizations/compromises were attempted with varying\n", "# levels of success -- these attempts have been documented in this notebook.\n", "#\n", "######################" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# contents\n", "\n", "* [Performance Summary](#Performance-Summary)

\n", "\n", "* [1a. word/document - original serial implementation](#1a.-serial-code-performance)\n", "* [1b. single word - optimized serial version (SymSpell python port)](#1b.-optimized-serial-version)\n", "* [2. single word - SPARK implementation (slow)](#2.-Original-SPARK-version:-SLOW)\n", "* [3. single word - SPARK implementation (faster)](#3.-Optimized-SPARK-version:-FASTER)\n", "* [4. single word - SPARK implementation (also fast)](#4.-Optimized-SPARK-version:-ALSO-FAST)\n", "* [5. document - SPARK implementation](#5.-SPARK-version-document-check)

\n", "\n", "* [AWS Experiments](#AWS-experiments)\n", "* [Appendix: code snippets from optimization experiments](#Appendix)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Performance Summary" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1a. serial code performance" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

\n", "single word correction - corresponds to `serial_listsugg.py`
\n", "document correction - corresponds to `serial_document.py`\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "'''\n", "v 1.0 last revised 22 Nov 2015\n", "\n", "This program is a Python version of a spellchecker based on SymSpell, \n", "a Symmetric Delete spelling correction algorithm developed by Wolf Garbe \n", "and originally written in C#.\n", "\n", "From the original SymSpell documentation:\n", "\n", "\"The Symmetric Delete spelling correction algorithm reduces the complexity \n", " of edit candidate generation and dictionary lookup for a given Damerau-\n", " Levenshtein distance. It is six orders of magnitude faster and language \n", " independent. Opposite to other algorithms only deletes are required, \n", " no transposes + replaces + inserts. Transposes + replaces + inserts of the \n", " input term are transformed into deletes of the dictionary term.\n", " Replaces and inserts are expensive and language dependent: \n", " e.g. Chinese has 70,000 Unicode Han characters!\"\n", "\n", "For further information on SymSpell, please consult the original documentation:\n", " URL: blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/\n", " Description: blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/\n", "\n", "The current version of this program will output all possible suggestions for\n", "corrections up to an edit distance (configurable) of max_edit_distance = 3. \n", "\n", "Future improvements may entail allowing for less verbose options, \n", "including the output of a single recommended correction. Note also that we\n", "have generally kept to the form of the original program, and have not\n", "introduced any major optimizations or structural changes in this Python port.\n", "\n", "To execute program:\n", "1. Ensure \"big.txt\" is in the current working directory. This is the corpus\n", " from which the dictionary for the spellchecker will be built.\n", "2a. Check recommended single word corrections by executing get_suggestions(\"word\") \n", " in the corresponding marked box below; or\n", "2b. Check single word corrections for a document by executing\n", " correct_document(\"\") in the corresponding marked box below.\n", "\n", "################\n", "\n", "Example input/output:\n", "\n", "################\n", "\n", "get_suggestions(\"there\")\n", "\n", "number of possible corrections: 604\n", " edit distance for deletions: 3\n", " \n", "[('there', (2972, 0)),\n", " ('these', (1231, 1)),\n", " ('where', (977, 1)),\n", " ('here', (691, 1)),\n", " ('three', (584, 1)),\n", " ('thee', (26, 1)),\n", " ('chere', (9, 1)),\n", " ('theme', (8, 1)),\n", " ('the', (80030, 2)), ...\n", "\n", "####\n", "\n", "correct_document(\"OCRsample.txt\")\n", "\n", "Finding misspelled words in your document...\n", "In line 3, taiths: suggested correction is < faith >\n", "In line 11, the word < oonipiittee > was not found (no suggested correction)\n", "In line 13, tj: suggested correction is < to >\n", "In line 13, mnnff: suggested correction is < snuff >\n", "[...]\n", "\n", "total words checked: 700\n", "total unknown words: 3\n", "total potential errors found: 94\n", "\n", "'''\n", "\n", "import re\n", "\n", "max_edit_distance = 3 \n", "\n", "dictionary = {}\n", "longest_word_length = 0\n", "\n", "def get_deletes_list(w):\n", " '''given a word, derive strings with up to max_edit_distance characters deleted'''\n", " deletes = []\n", " queue = [w]\n", " for d in range(max_edit_distance):\n", " temp_queue = []\n", " for word in queue:\n", " if len(word)>1:\n", " for c in range(len(word)): # character index\n", " word_minus_c = word[:c] + word[c+1:]\n", " if word_minus_c not in deletes:\n", " deletes.append(word_minus_c)\n", " if word_minus_c not in temp_queue:\n", " temp_queue.append(word_minus_c)\n", " queue = temp_queue\n", " \n", " return deletes\n", "\n", "def create_dictionary_entry(w):\n", " '''add word and its derived deletions to dictionary'''\n", " # check if word is already in dictionary\n", " # dictionary entries are in the form: (list of suggested corrections, frequency of word in corpus)\n", " global longest_word_length\n", " new_real_word_added = False\n", " if w in dictionary:\n", " dictionary[w] = (dictionary[w][0], dictionary[w][1] + 1) # increment count of word in corpus\n", " else:\n", " dictionary[w] = ([], 1) \n", " longest_word_length = max(longest_word_length, len(w))\n", " \n", " if dictionary[w][1]==1:\n", " # first appearance of word in corpus\n", " # n.b. word may already be in dictionary as a derived word (deleting character from a real word)\n", " # but counter of frequency of word in corpus is not incremented in those cases)\n", " new_real_word_added = True\n", " deletes = get_deletes_list(w)\n", " for item in deletes:\n", " if item in dictionary:\n", " # add (correct) word to delete's suggested correction list if not already there\n", " if item not in dictionary[item][0]:\n", " dictionary[item][0].append(w)\n", " else:\n", " dictionary[item] = ([w], 0) # note frequency of word in corpus is not incremented\n", " \n", " return new_real_word_added\n", "\n", "def create_dictionary(fname):\n", "\n", " total_word_count = 0\n", " unique_word_count = 0\n", " print \"Creating dictionary...\" \n", " \n", " with open(fname) as file: \n", " for line in file:\n", " words = re.findall('[a-z]+', line.lower()) # separate by words by non-alphabetical characters \n", " for word in words:\n", " total_word_count += 1\n", " if create_dictionary_entry(word):\n", " unique_word_count += 1\n", " \n", " print \"total words processed: %i\" % total_word_count\n", " print \"total unique words in corpus: %i\" % unique_word_count\n", " print \"total items in dictionary (corpus words and deletions): %i\" % len(dictionary)\n", " print \" edit distance for deletions: %i\" % max_edit_distance\n", " print \" length of longest word in corpus: %i\" % longest_word_length\n", " \n", " return dictionary\n", "\n", "def dameraulevenshtein(seq1, seq2):\n", " \"\"\"Calculate the Damerau-Levenshtein distance between sequences.\n", "\n", " This method has not been modified from the original.\n", " Source: http://mwh.geek.nz/2009/04/26/python-damerau-levenshtein-distance/\n", " \n", " This distance is the number of additions, deletions, substitutions,\n", " and transpositions needed to transform the first sequence into the\n", " second. Although generally used with strings, any sequences of\n", " comparable objects will work.\n", "\n", " Transpositions are exchanges of *consecutive* characters; all other\n", " operations are self-explanatory.\n", "\n", " This implementation is O(N*M) time and O(M) space, for N and M the\n", " lengths of the two sequences.\n", "\n", " >>> dameraulevenshtein('ba', 'abc')\n", " 2\n", " >>> dameraulevenshtein('fee', 'deed')\n", " 2\n", "\n", " It works with arbitrary sequences too:\n", " >>> dameraulevenshtein('abcd', ['b', 'a', 'c', 'd', 'e'])\n", " 2\n", " \"\"\"\n", " # codesnippet:D0DE4716-B6E6-4161-9219-2903BF8F547F\n", " # Conceptually, this is based on a len(seq1) + 1 * len(seq2) + 1 matrix.\n", " # However, only the current and two previous rows are needed at once,\n", " # so we only store those.\n", " oneago = None\n", " thisrow = range(1, len(seq2) + 1) + [0]\n", " for x in xrange(len(seq1)):\n", " # Python lists wrap around for negative indices, so put the\n", " # leftmost column at the *end* of the list. This matches with\n", " # the zero-indexed strings and saves extra calculation.\n", " twoago, oneago, thisrow = oneago, thisrow, [0] * len(seq2) + [x + 1]\n", " for y in xrange(len(seq2)):\n", " delcost = oneago[y] + 1\n", " addcost = thisrow[y - 1] + 1\n", " subcost = oneago[y - 1] + (seq1[x] != seq2[y])\n", " thisrow[y] = min(delcost, addcost, subcost)\n", " # This block deals with transpositions\n", " if (x > 0 and y > 0 and seq1[x] == seq2[y - 1]\n", " and seq1[x-1] == seq2[y] and seq1[x] != seq2[y]):\n", " thisrow[y] = min(thisrow[y], twoago[y - 2] + 1)\n", " return thisrow[len(seq2) - 1]\n", "\n", "def get_suggestions(string, silent=False):\n", " '''return list of suggested corrections for potentially incorrectly spelled word'''\n", " if (len(string) - longest_word_length) > max_edit_distance:\n", " if not silent:\n", " print \"no items in dictionary within maximum edit distance\"\n", " return []\n", " \n", " suggest_dict = {}\n", " \n", " queue = [string]\n", " q_dictionary = {} # items other than string that we've checked\n", " \n", " while len(queue)>0:\n", " q_item = queue[0] # pop\n", " queue = queue[1:]\n", " \n", " # process queue item\n", " if (q_item in dictionary) and (q_item not in suggest_dict):\n", " if (dictionary[q_item][1]>0):\n", " # word is in dictionary, and is a word from the corpus, and not already in suggestion list\n", " # so add to suggestion dictionary, indexed by the word with value (frequency in corpus, edit distance)\n", " # note q_items that are not the input string are shorter than input string \n", " # since only deletes are added (unless manual dictionary corrections are added)\n", " assert len(string)>=len(q_item)\n", " suggest_dict[q_item] = (dictionary[q_item][1], len(string) - len(q_item))\n", " \n", " ## the suggested corrections for q_item as stored in dictionary (whether or not\n", " ## q_item itself is a valid word or merely a delete) can be valid corrections\n", " for sc_item in dictionary[q_item][0]:\n", " if (sc_item not in suggest_dict):\n", " \n", " # compute edit distance\n", " # suggested items should always be longer (unless manual corrections are added)\n", " assert len(sc_item)>len(q_item)\n", " # q_items that are not input should be shorter than original string \n", " # (unless manual corrections added)\n", " assert len(q_item)<=len(string)\n", " if len(q_item)==len(string):\n", " assert q_item==string\n", " item_dist = len(sc_item) - len(q_item)\n", "\n", " # item in suggestions list should not be the same as the string itself\n", " assert sc_item!=string \n", " # calculate edit distance using, for example, Damerau-Levenshtein distance\n", " item_dist = dameraulevenshtein(sc_item, string)\n", " \n", " if item_dist<=max_edit_distance:\n", " assert sc_item in dictionary # should already be in dictionary if in suggestion list\n", " suggest_dict[sc_item] = (dictionary[sc_item][1], item_dist)\n", " \n", " # now generate deletes (e.g. a substring of string or of a delete) from the queue item\n", " # as additional items to check -- add to end of queue\n", " assert len(string)>=len(q_item)\n", " if (len(string)-len(q_item))1:\n", " for c in range(len(q_item)): # character index \n", " word_minus_c = q_item[:c] + q_item[c+1:]\n", " if word_minus_c not in q_dictionary:\n", " queue.append(word_minus_c)\n", " q_dictionary[word_minus_c] = None # arbitrary value, just to identify we checked this\n", " \n", " # queue is now empty: convert suggestions in dictionary to list for output\n", " if not silent:\n", " print \"number of possible corrections: %i\" %len(suggest_dict)\n", " print \" edit distance for deletions: %i\" % max_edit_distance\n", " \n", " # output option 1\n", " # sort results by ascending order of edit distance and descending order of frequency\n", " # and return list of suggested word corrections only:\n", " # return sorted(suggest_dict, key = lambda x: (suggest_dict[x][1], -suggest_dict[x][0]))\n", "\n", " # output option 2\n", " # return list of suggestions with (correction, (frequency in corpus, edit distance)):\n", " as_list = suggest_dict.items()\n", " return sorted(as_list, key = lambda (term, (freq, dist)): (dist, -freq))\n", "\n", " '''\n", " Option 1:\n", " get_suggestions(\"file\")\n", " ['file', 'five', 'fire', 'fine', ...]\n", " \n", " Option 2:\n", " get_suggestions(\"file\")\n", " [('file', (5, 0)),\n", " ('five', (67, 1)),\n", " ('fire', (54, 1)),\n", " ('fine', (17, 1))...] \n", " '''\n", "\n", "def best_word(s, silent=False):\n", " try:\n", " return get_suggestions(s, silent)[0]\n", " except:\n", " return None\n", " \n", "def correct_document(fname, printlist=True):\n", " # correct an entire document\n", " with open(fname) as file:\n", " doc_word_count = 0\n", " corrected_word_count = 0\n", " unknown_word_count = 0\n", " print \"Finding misspelled words in your document...\" \n", " \n", " for i, line in enumerate(file):\n", " doc_words = re.findall('[a-z]+', line.lower()) # separate by words by non-alphabetical characters \n", " for doc_word in doc_words:\n", " doc_word_count += 1\n", " suggestion = best_word(doc_word, silent=True)\n", " if suggestion is None:\n", " if printlist:\n", " print \"In line %i, the word < %s > was not found (no suggested correction)\" % (i, doc_word)\n", " unknown_word_count += 1\n", " elif suggestion[0]!=doc_word:\n", " if printlist:\n", " print \"In line %i, %s: suggested correction is < %s >\" % (i, doc_word, suggestion[0])\n", " corrected_word_count += 1\n", " \n", " print \"-----\"\n", " print \"total words checked: %i\" % doc_word_count\n", " print \"total unknown words: %i\" % unknown_word_count\n", " print \"total potential errors found: %i\" % corrected_word_count\n", "\n", " return" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Run the cell below only once to build the dictionary.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "d = create_dictionary(\"testdata/big.txt\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 26.2 s, sys: 556 ms, total: 26.8 s\n",
    "Wall time: 26.8 s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Enter word to correct below.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "get_suggestions(\"there\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "number of possible corrections: 604\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 60.6 ms, sys: 11.5 ms, total: 72.1 ms\n",
    "Wall time: 64 ms\n",
    "Out[8]:\n",
    "[('there', (2972, 0)),\n",
    " ('these', (1231, 1)),\n",
    " ('where', (977, 1)),\n",
    " ('here', (691, 1)),\n",
    " ('three', (584, 1)),\n",
    " ('thee', (26, 1)),\n",
    " ('chere', (9, 1)),\n",
    " ('theme', (8, 1)),\n",
    " ('the', (80030, 2)), ...\n",
    "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "get_suggestions(\"zzffttt\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "number of possible corrections: 0\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 191 µs, sys: 81 µs, total: 272 µs\n",
    "Wall time: 208 µs\n",
    "Out[4]:\n",
    "[]\n",
    "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "best_word(\"there\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "number of possible corrections: 604\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 55.3 ms, sys: 2.97 ms, total: 58.2 ms\n",
    "Wall time: 57.2 ms\n",
    "Out[4]:\n",
    "('there', (2972, 0))\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Enter file name of document to correct below.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "correct_document(\"testdata/OCRsample.txt\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Finding misspelled words in your document...\n",
    "In line 3, taiths: suggested correction is < faith >\n",
    "In line 11, the word < oonipiittee > was not found (no suggested correction)\n",
    "In line 13, tj: suggested correction is < to >\n",
    "In line 13, mnnff: suggested correction is < snuff >\n",
    "In line 13, gjpt: suggested correction is < get >\n",
    "In line 15, bh: suggested correction is < by >\n",
    "In line 15, snc: suggested correction is < sac >\n",
    "In line 15, uth: suggested correction is < th >\n",
    "In line 15, unuer: suggested correction is < under >\n",
    "In line 20, mthiitt: suggested correction is < thirty >\n",
    "In line 21, cas: suggested correction is < was >\n",
    "In line 22, pythian: suggested correction is < scythian >\n",
    "In line 26, brainin: suggested correction is < brain >\n",
    "In line 27, jfl: suggested correction is < of >\n",
    "In line 28, ji: suggested correction is < i >\n",
    "In line 28, stice: suggested correction is < stick >\n",
    "In line 28, blaci: suggested correction is < black >\n",
    "In line 28, eug: suggested correction is < dug >\n",
    "In line 28, debbs: suggested correction is < debts >\n",
    "In line 29, nericans: suggested correction is < americans >\n",
    "In line 30, ainin: suggested correction is < again >\n",
    "In line 30, ergs: suggested correction is < eggs >\n",
    "In line 31, trumped: suggested correction is < trumpet >\n",
    "In line 32, erican: suggested correction is < american >\n",
    "In line 33, unorthodox: suggested correction is < orthodox >\n",
    "In line 33, nenance: suggested correction is < penance >\n",
    "In line 33, thg: suggested correction is < the >\n",
    "In line 34, sln: suggested correction is < son >\n",
    "In line 34, rgs: suggested correction is < rags >\n",
    "In line 38, williaij: suggested correction is < william >\n",
    "In line 38, eu: suggested correction is < e >\n",
    "In line 40, fcsf: suggested correction is < ff >\n",
    "In line 40, ber: suggested correction is < be >\n",
    "In line 42, unorthodoxy: suggested correction is < orthodox >\n",
    "In line 42, thpt: suggested correction is < that >\n",
    "In line 42, the word < senbrnrgs > was not found (no suggested correction)\n",
    "In line 44, fascism: suggested correction is < fascia >\n",
    "In line 62, loo: suggested correction is < look >\n",
    "In line 65, ththn: suggested correction is < then >\n",
    "In line 65, scbell: suggested correction is < bell >\n",
    "In line 65, ife: suggested correction is < if >\n",
    "In line 65, yktcn: suggested correction is < skin >\n",
    "In line 65, thl: suggested correction is < the >\n",
    "In line 66, thi: suggested correction is < the >\n",
    "In line 68, saij: suggested correction is < said >\n",
    "In line 69, defendants: suggested correction is < defendant >\n",
    "In line 69, cornr: suggested correction is < corner >\n",
    "In line 69, nists: suggested correction is < fists >\n",
    "In line 72, ro: suggested correction is < to >\n",
    "In line 74, ath: suggested correction is < at >\n",
    "In line 75, tti: suggested correction is < ti >\n",
    "In line 75, rg: suggested correction is < re >\n",
    "In line 75, acrific: suggested correction is < pacific >\n",
    "In line 77, korea: suggested correction is < more >\n",
    "In line 78, ro: suggested correction is < to >\n",
    "In line 78, doatli: suggested correction is < death >\n",
    "In line 81, ith: suggested correction is < it >\n",
    "In line 81, ry: suggested correction is < by >\n",
    "In line 81, kl: suggested correction is < ll >\n",
    "In line 81, ech: suggested correction is < each >\n",
    "In line 82, rb: suggested correction is < re >\n",
    "In line 82, the word < ghmhvestigat > was not found (no suggested correction)\n",
    "In line 82, nb: suggested correction is < no >\n",
    "In line 82, rg: suggested correction is < re >\n",
    "In line 83, rosenbt: suggested correction is < rodent >\n",
    "In line 83, rgs: suggested correction is < rags >\n",
    "In line 84, coriritted: suggested correction is < committed >\n",
    "In line 86, fighti: suggested correction is < fight >\n",
    "In line 88, bths: suggested correction is < baths >\n",
    "In line 88, tchf: suggested correction is < the >\n",
    "In line 91, ro: suggested correction is < to >\n",
    "In line 91, ijb: suggested correction is < in >\n",
    "In line 92, telegrnm: suggested correction is < telegram >\n",
    "In line 92, jillia: suggested correction is < william >\n",
    "In line 92, patt: suggested correction is < part >\n",
    "In line 92, rson: suggested correction is < son >\n",
    "In line 93, ecretdry: suggested correction is < secretary >\n",
    "In line 95, purview: suggested correction is < purves >\n",
    "In line 95, rder: suggested correction is < order >\n",
    "In line 99, gor: suggested correction is < for >\n",
    "In line 99, dthethg: suggested correction is < teeth >\n",
    "In line 99, ared: suggested correction is < are >\n",
    "In line 99, ro: suggested correction is < to >\n",
    "In line 99, enb: suggested correction is < end >\n",
    "In line 99, rg: suggested correction is < re >\n",
    "In line 100, sacc: suggested correction is < sac >\n",
    "In line 100, vthnz: suggested correction is < the >\n",
    "In line 100, dri: suggested correction is < dry >\n",
    "In line 100, yfu: suggested correction is < you >\n",
    "In line 101, ile: suggested correction is < ill >\n",
    "In line 101, rosi: suggested correction is < rose >\n",
    "In line 101, rg: suggested correction is < re >\n",
    "In line 102, fnir: suggested correction is < fair >\n",
    "In line 102, jhy: suggested correction is < why >\n",
    "In line 102, azi: suggested correction is < ami >\n",
    "In line 103, fascist: suggested correction is < fascia >\n",
    "In line 104, nb: suggested correction is < no >\n",
    "-----\n",
    "total words checked: 700\n",
    "total unknown words: 3\n",
    "total potential errors found: 94\n",
    "CPU times: user 14.5 s, sys: 147 ms, total: 14.6 s\n",
    "Wall time: 15 s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1b. optimized serial version" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "The above version did not incorporate all of the functionality of the original SymSpell code. For example, the above version did not terminate early if the user only wanted to find a single, best word. It also did not allow for interactive input (e.g. from the console). We added these features to the final version below for completeness.

\n", "\n", "corresponds to `serial_single.py`\n", "

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "'''\n", "v 1.1 last revised 28 Nov 2015\n", "\n", "This program is a Python version of a spellchecker based on SymSpell, \n", "a Symmetric Delete spelling correction algorithm developed by Wolf Garbe \n", "and originally written in C#.\n", "\n", "From the original SymSpell documentation:\n", "\n", "\"The Symmetric Delete spelling correction algorithm reduces the complexity \n", " of edit candidate generation and dictionary lookup for a given Damerau-\n", " Levenshtein distance. It is six orders of magnitude faster and language \n", " independent. Opposite to other algorithms only deletes are required, \n", " no transposes + replaces + inserts. Transposes + replaces + inserts of the \n", " input term are transformed into deletes of the dictionary term.\n", " Replaces and inserts are expensive and language dependent: \n", " e.g. Chinese has 70,000 Unicode Han characters!\"\n", "\n", "For further information on SymSpell, please consult the original documentation:\n", " URL: blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/\n", " Description: blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/\n", "\n", "The current version of this program will output all possible suggestions for\n", "corrections up to an edit distance (configurable) of max_edit_distance = 3. \n", "\n", "Changes in this version (1.1):\n", "We implement allowing for less verbose options: e.g. when only a single recommended\n", "correction is required, the search may terminate early, thereby enhancing performance. \n", "\n", "Sample output:\n", "\n", "Please wait...\n", "Creating dictionary...\n", "total words processed: 1105285\n", "total unique words in corpus: 29157\n", "total items in dictionary (corpus words and deletions): 2151998\n", " edit distance for deletions: 3\n", " length of longest word in corpus: 18\n", " \n", "Word correction\n", "---------------\n", "Enter your input (or enter to exit): there\n", "('there', (2972, 0))\n", " \n", "Enter your input (or enter to exit): hellot\n", "('hello', (1, 1))\n", " \n", "Enter your input (or enter to exit): accomodation\n", "('accommodation', (5, 1))\n", " \n", "Enter your input (or enter to exit): \n", "goodbye\n", "\n", "'''\n", "\n", "import re\n", "\n", "max_edit_distance = 3 \n", "verbose = 0\n", "# 0: top suggestion\n", "# 1: all suggestions of smallest edit distance\n", "# 2: all suggestions <= max_edit_distance (slower, no early termination)\n", "\n", "dictionary = {}\n", "longest_word_length = 0\n", "\n", "def get_deletes_list(w):\n", " '''given a word, derive strings with up to max_edit_distance characters deleted'''\n", " deletes = []\n", " queue = [w]\n", " for d in range(max_edit_distance):\n", " temp_queue = []\n", " for word in queue:\n", " if len(word)>1:\n", " for c in range(len(word)): # character index\n", " word_minus_c = word[:c] + word[c+1:]\n", " if word_minus_c not in deletes:\n", " deletes.append(word_minus_c)\n", " if word_minus_c not in temp_queue:\n", " temp_queue.append(word_minus_c)\n", " queue = temp_queue\n", " \n", " return deletes\n", "\n", "def create_dictionary_entry(w):\n", " '''add word and its derived deletions to dictionary'''\n", " # check if word is already in dictionary\n", " # dictionary entries are in the form: (list of suggested corrections, frequency of word in corpus)\n", " global longest_word_length\n", " new_real_word_added = False\n", " if w in dictionary:\n", " dictionary[w] = (dictionary[w][0], dictionary[w][1] + 1) # increment count of word in corpus\n", " else:\n", " dictionary[w] = ([], 1) \n", " longest_word_length = max(longest_word_length, len(w))\n", " \n", " if dictionary[w][1]==1:\n", " # first appearance of word in corpus\n", " # n.b. word may already be in dictionary as a derived word (deleting character from a real word)\n", " # but counter of frequency of word in corpus is not incremented in those cases)\n", " new_real_word_added = True\n", " deletes = get_deletes_list(w)\n", " for item in deletes:\n", " if item in dictionary:\n", " # add (correct) word to delete's suggested correction list if not already there\n", " if item not in dictionary[item][0]:\n", " dictionary[item][0].append(w)\n", " else:\n", " dictionary[item] = ([w], 0) # note frequency of word in corpus is not incremented\n", " \n", " return new_real_word_added\n", "\n", "def create_dictionary(fname):\n", "\n", " total_word_count = 0\n", " unique_word_count = 0\n", " print \"Creating dictionary...\" \n", " \n", " with open(fname) as file: \n", " for line in file:\n", " words = re.findall('[a-z]+', line.lower()) # separate by words by non-alphabetical characters \n", " for word in words:\n", " total_word_count += 1\n", " if create_dictionary_entry(word):\n", " unique_word_count += 1\n", " \n", " print \"total words processed: %i\" % total_word_count\n", " print \"total unique words in corpus: %i\" % unique_word_count\n", " print \"total items in dictionary (corpus words and deletions): %i\" % len(dictionary)\n", " print \" edit distance for deletions: %i\" % max_edit_distance\n", " print \" length of longest word in corpus: %i\" % longest_word_length\n", " \n", " return dictionary\n", "\n", "def dameraulevenshtein(seq1, seq2):\n", " \"\"\"Calculate the Damerau-Levenshtein distance between sequences.\n", "\n", " This method has not been modified from the original.\n", " Source: http://mwh.geek.nz/2009/04/26/python-damerau-levenshtein-distance/\n", " \n", " This distance is the number of additions, deletions, substitutions,\n", " and transpositions needed to transform the first sequence into the\n", " second. Although generally used with strings, any sequences of\n", " comparable objects will work.\n", "\n", " Transpositions are exchanges of *consecutive* characters; all other\n", " operations are self-explanatory.\n", "\n", " This implementation is O(N*M) time and O(M) space, for N and M the\n", " lengths of the two sequences.\n", "\n", " >>> dameraulevenshtein('ba', 'abc')\n", " 2\n", " >>> dameraulevenshtein('fee', 'deed')\n", " 2\n", "\n", " It works with arbitrary sequences too:\n", " >>> dameraulevenshtein('abcd', ['b', 'a', 'c', 'd', 'e'])\n", " 2\n", " \"\"\"\n", " # codesnippet:D0DE4716-B6E6-4161-9219-2903BF8F547F\n", " # Conceptually, this is based on a len(seq1) + 1 * len(seq2) + 1 matrix.\n", " # However, only the current and two previous rows are needed at once,\n", " # so we only store those.\n", " oneago = None\n", " thisrow = range(1, len(seq2) + 1) + [0]\n", " for x in xrange(len(seq1)):\n", " # Python lists wrap around for negative indices, so put the\n", " # leftmost column at the *end* of the list. This matches with\n", " # the zero-indexed strings and saves extra calculation.\n", " twoago, oneago, thisrow = oneago, thisrow, [0] * len(seq2) + [x + 1]\n", " for y in xrange(len(seq2)):\n", " delcost = oneago[y] + 1\n", " addcost = thisrow[y - 1] + 1\n", " subcost = oneago[y - 1] + (seq1[x] != seq2[y])\n", " thisrow[y] = min(delcost, addcost, subcost)\n", " # This block deals with transpositions\n", " if (x > 0 and y > 0 and seq1[x] == seq2[y - 1]\n", " and seq1[x-1] == seq2[y] and seq1[x] != seq2[y]):\n", " thisrow[y] = min(thisrow[y], twoago[y - 2] + 1)\n", " return thisrow[len(seq2) - 1]\n", "\n", "def get_suggestions(string, silent=False):\n", " '''return list of suggested corrections for potentially incorrectly spelled word'''\n", " if (len(string) - longest_word_length) > max_edit_distance:\n", " if not silent:\n", " print \"no items in dictionary within maximum edit distance\"\n", " return []\n", " \n", " global verbose\n", " suggest_dict = {}\n", " min_suggest_len = float('inf')\n", " \n", " queue = [string]\n", " q_dictionary = {} # items other than string that we've checked\n", " \n", " while len(queue)>0:\n", " q_item = queue[0] # pop\n", " queue = queue[1:]\n", " \n", " # early exit\n", " if ((verbose<2) and (len(suggest_dict)>0) and ((len(string)-len(q_item))>min_suggest_len)):\n", " break\n", " \n", " # process queue item\n", " if (q_item in dictionary) and (q_item not in suggest_dict):\n", " if (dictionary[q_item][1]>0):\n", " # word is in dictionary, and is a word from the corpus, and not already in suggestion list\n", " # so add to suggestion dictionary, indexed by the word with value (frequency in corpus, edit distance)\n", " # note q_items that are not the input string are shorter than input string \n", " # since only deletes are added (unless manual dictionary corrections are added)\n", " assert len(string)>=len(q_item)\n", " suggest_dict[q_item] = (dictionary[q_item][1], len(string) - len(q_item))\n", " # early exit\n", " if ((verbose<2) and (len(string)==len(q_item))):\n", " break\n", " elif (len(string) - len(q_item)) < min_suggest_len:\n", " min_suggest_len = len(string) - len(q_item)\n", " \n", " ## the suggested corrections for q_item as stored in dictionary (whether or not\n", " ## q_item itself is a valid word or merely a delete) can be valid corrections\n", " for sc_item in dictionary[q_item][0]:\n", " if (sc_item not in suggest_dict):\n", " \n", " # compute edit distance\n", " # suggested items should always be longer (unless manual corrections are added)\n", " assert len(sc_item)>len(q_item)\n", " # q_items that are not input should be shorter than original string \n", " # (unless manual corrections added)\n", " assert len(q_item)<=len(string)\n", " if len(q_item)==len(string):\n", " assert q_item==string\n", " item_dist = len(sc_item) - len(q_item)\n", "\n", " # item in suggestions list should not be the same as the string itself\n", " assert sc_item!=string \n", " # calculate edit distance using, for example, Damerau-Levenshtein distance\n", " item_dist = dameraulevenshtein(sc_item, string)\n", " \n", " # do not add words with greater edit distance if verbose setting not on\n", " if ((verbose<2) and (item_dist>min_suggest_len)):\n", " pass\n", " elif item_dist<=max_edit_distance:\n", " assert sc_item in dictionary # should already be in dictionary if in suggestion list\n", " suggest_dict[sc_item] = (dictionary[sc_item][1], item_dist)\n", " if item_dist < min_suggest_len:\n", " min_suggest_len = item_dist\n", " \n", " # depending on order words are processed, some words with different edit distances\n", " # may be entered into suggestions; trim suggestion dictionary if verbose setting not on\n", " if verbose<2:\n", " suggest_dict = {k:v for k, v in suggest_dict.items() if v[1]<=min_suggest_len}\n", " \n", " # now generate deletes (e.g. a substring of string or of a delete) from the queue item\n", " # as additional items to check -- add to end of queue\n", " assert len(string)>=len(q_item)\n", " \n", " # do not add words with greater edit distance if verbose setting not on\n", " if ((verbose<2) and ((len(string)-len(q_item))>min_suggest_len)):\n", " pass\n", " elif (len(string)-len(q_item))1:\n", " for c in range(len(q_item)): # character index \n", " word_minus_c = q_item[:c] + q_item[c+1:]\n", " if word_minus_c not in q_dictionary:\n", " queue.append(word_minus_c)\n", " q_dictionary[word_minus_c] = None # arbitrary value, just to identify we checked this\n", " \n", " # queue is now empty: convert suggestions in dictionary to list for output\n", " if not silent and verbose!=0:\n", " print \"number of possible corrections: %i\" %len(suggest_dict)\n", " print \" edit distance for deletions: %i\" % max_edit_distance\n", " \n", " # output option 1\n", " # sort results by ascending order of edit distance and descending order of frequency\n", " # and return list of suggested word corrections only:\n", " # return sorted(suggest_dict, key = lambda x: (suggest_dict[x][1], -suggest_dict[x][0]))\n", "\n", " # output option 2\n", " # return list of suggestions with (correction, (frequency in corpus, edit distance)):\n", " as_list = suggest_dict.items()\n", " outlist = sorted(as_list, key = lambda (term, (freq, dist)): (dist, -freq))\n", " \n", " if verbose==0:\n", " return outlist[0]\n", " else:\n", " return outlist\n", "\n", " '''\n", " Option 1:\n", " get_suggestions(\"file\")\n", " ['file', 'five', 'fire', 'fine', ...]\n", " \n", " Option 2:\n", " get_suggestions(\"file\")\n", " [('file', (5, 0)),\n", " ('five', (67, 1)),\n", " ('fire', (54, 1)),\n", " ('fine', (17, 1))...] \n", " '''\n", "\n", "def best_word(s, silent=False):\n", " try:\n", " return get_suggestions(s, silent)[0]\n", " except:\n", " return None\n", " \n", "def correct_document(fname, printlist=True):\n", " # correct an entire document\n", " with open(fname) as file:\n", " doc_word_count = 0\n", " corrected_word_count = 0\n", " unknown_word_count = 0\n", " print \"Finding misspelled words in your document...\" \n", " \n", " for i, line in enumerate(file):\n", " doc_words = re.findall('[a-z]+', line.lower()) # separate by words by non-alphabetical characters \n", " for doc_word in doc_words:\n", " doc_word_count += 1\n", " suggestion = best_word(doc_word, silent=True)\n", " if suggestion is None:\n", " if printlist:\n", " print \"In line %i, the word < %s > was not found (no suggested correction)\" % (i, doc_word)\n", " unknown_word_count += 1\n", " elif suggestion[0]!=doc_word:\n", " if printlist:\n", " print \"In line %i, %s: suggested correction is < %s >\" % (i, doc_word, suggestion[0])\n", " corrected_word_count += 1\n", " \n", " print \"-----\"\n", " print \"total words checked: %i\" % doc_word_count\n", " print \"total unknown words: %i\" % unknown_word_count\n", " print \"total potential errors found: %i\" % corrected_word_count\n", "\n", " return\n", "\n", "## main\n", "\n", "import time\n", "\n", "if __name__ == \"__main__\":\n", " \n", " print \"Please wait...\"\n", " time.sleep(2)\n", " create_dictionary(\"testdata/big.txt\")\n", "\n", " print \" \"\n", " print \"Word correction\"\n", " print \"---------------\"\n", " \n", " while True:\n", " word_in = raw_input('Enter your input (or enter to exit): ')\n", " if len(word_in)==0:\n", " print \"goodbye\"\n", " break\n", " print get_suggestions(word_in)\n", " print \" \"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Simply run the code block above and follow the interactive instructions.\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Please wait...\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    " \n",
    "Word correction\n",
    "---------------\n",
    "Enter your input (or enter to exit): hello\n",
    "('hello', (1, 0))\n",
    " \n",
    "Enter your input (or enter to exit): there\n",
    "('there', (2972, 0))\n",
    " \n",
    "Enter your input (or enter to exit): thinkl\n",
    "('think', (557, 1))\n",
    " \n",
    "Enter your input (or enter to exit): prest\n",
    "('rest', (209, 1))\n",
    " \n",
    "Enter your input (or enter to exit): \n",
    "goodbye\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "------" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2. Original SPARK version: SLOW" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " To run this program, restart notebook, and start executing the cells of this section starting here.

\n", " WARNING: This version is extremely slow for even moderately sized files.\n", "

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "corresponds to `spark_1.py`\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "'''\n", "v 1.0 last revised 22 Nov 2015\n", "\n", "This program is a Spark (PySpark) version of a spellchecker based on SymSpell, \n", "a Symmetric Delete spelling correction algorithm developed by Wolf Garbe \n", "and originally written in C#.\n", "\n", "From the original SymSpell documentation:\n", "\n", "\"The Symmetric Delete spelling correction algorithm reduces the complexity \n", " of edit candidate generation and dictionary lookup for a given Damerau-\n", " Levenshtein distance. It is six orders of magnitude faster and language \n", " independent. Opposite to other algorithms only deletes are required, \n", " no transposes + replaces + inserts. Transposes + replaces + inserts of the \n", " input term are transformed into deletes of the dictionary term.\n", " Replaces and inserts are expensive and language dependent: \n", " e.g. Chinese has 70,000 Unicode Han characters!\"\n", "\n", "For further information on SymSpell, please consult the original documentation:\n", " URL: blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/\n", " Description: blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/\n", "\n", "The current version of this program will output all possible suggestions for\n", "corrections up to an edit distance (configurable) of max_edit_distance = 3. \n", "\n", "Future improvements may entail allowing for less verbose options, \n", "including the output of a single recommended correction. Note also that we\n", "have generally kept to the form of the original program, and have not\n", "introduced any major optimizations or structural changes in this PySpark port.\n", "\n", "To execute program:\n", "1. Ensure \"big.txt\" is in the current working directory. This is the corpus\n", " from which the dictionary for the spellchecker will be built.\n", "2. Check recommended single word corrections by executing get_suggestions(\"word\") \n", " in the corresponding marked box below.\n", "\n", "Note: we did not implement entire document checking given speed of program,\n", " since we are parallelizing the processing of deletions of the input word\n", " (however, see later Spark version).\n", "\n", "################\n", "\n", "Example input/output:\n", "\n", "################\n", "\n", "get_suggestions(\"there\")\n", "\n", "number of possible corrections: 604\n", " edit distance for deletions: 3\n", " \n", "[('there', (2972, 0)),\n", " ('these', (1231, 1)),\n", " ('where', (977, 1)),\n", " ('here', (691, 1)),\n", " ('three', (584, 1)),\n", " ('thee', (26, 1)),\n", " ('chere', (9, 1)),\n", " ('theme', (8, 1)),\n", " ('the', (80030, 2)), ...\n", "\n", "\n", "'''\n", "\n", "import findspark\n", "import os\n", "findspark.init('/Users/K-Lo/spark-1.5.0')\n", "\n", "import pyspark\n", "conf = (pyspark.SparkConf()\n", " .setMaster('local')\n", " .setAppName('pyspark')\n", " .set(\"spark.executor.memory\", \"2g\"))\n", "sc = pyspark.SparkContext(conf=conf)\n", "\n", "import re\n", "\n", "n_partitions = 6 # number of partitions to be used\n", "max_edit_distance = 3\n", "\n", "\n", "def get_deletes_list(word):\n", " '''given a word, derive strings with one character deleted'''\n", " # takes a string as input and returns all 1-deletes in a list\n", " # allows for duplicates to be created, will deal with duplicates later to minimize shuffling\n", " if len(word)>1:\n", " return ([word[:c] + word[c+1:] for c in range(len(word))])\n", " else:\n", " return []\n", " \n", "def copartitioned(RDD1, RDD2):\n", " '''check if two RDDs are copartitioned'''\n", " return RDD1.partitioner == RDD2.partitioner\n", "\n", "def combine_joined_lists(tup):\n", " '''takes as input a tuple in the form (a, b) where each of a, b may be None (but not both) or a list\n", " and returns a concatenated list of unique elements'''\n", " concat_list = []\n", " if tup[1] is None:\n", " concat_list = tup[0]\n", " elif tup[0] is None:\n", " concat_list = tup[1]\n", " else:\n", " concat_list = tup[0] + tup[1]\n", " \n", " return list(set(concat_list))\n", "\n", "def parallel_create_dictionary(fname):\n", " '''Create dictionary using Spark RDDs.'''\n", " # we generate and count all words for the corpus,\n", " # then add deletes to the dictionary\n", " # this is a slightly different approach from the SymSpell algorithm\n", " # that may be more appropriate for Spark processing\n", " \n", " print \"Creating dictionary...\" \n", " \n", " ############\n", " #\n", " # process corpus\n", " #\n", " ############\n", " \n", " # http://stackoverflow.com/questions/22520932/python-remove-all-non-alphabet-chars-from-string\n", " regex = re.compile('[^a-z ]')\n", "\n", " # convert file into one long sequence of words\n", " make_all_lower = sc.textFile(fname).map(lambda line: line.lower())\n", " replace_nonalphs = make_all_lower.map(lambda line: regex.sub(' ', line))\n", " all_words = replace_nonalphs.flatMap(lambda line: line.split())\n", "\n", " # create core corpus dictionary (i.e. only words appearing in file, no \"deletes\") and cache it\n", " # output RDD of unique_words_with_count: [(word1, count1), (word2, count2), (word3, count3)...]\n", " count_once = all_words.map(lambda word: (word, 1))\n", " unique_words_with_count = count_once.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions).cache()\n", " \n", " # output stats on core corpus\n", " print \"total words processed: %i\" % unique_words_with_count.map(lambda (k, v): v).reduce(lambda a, b: a + b)\n", " print \"total unique words in corpus: %i\" % unique_words_with_count.count()\n", " \n", " ############\n", " #\n", " # generate deletes list\n", " #\n", " ############\n", " \n", " # generate list of n-deletes from words in a corpus of the form: [(word1, count1), (word2, count2), ...]\n", " # we will handle possible duplicates after map/reduce:\n", " # our thinking is the resulting suggestions lists for each delete will be much smaller than the\n", " # list of potential deletes, and it is more efficient to reduce first, then remove duplicates \n", " # from these smaller lists (at each worker node), rather than calling `distinct()` on \n", " # flattened `expand_deletes` which would require a large shuffle\n", "\n", " ##\n", " ## generate 1-deletes\n", " ##\n", " \n", " assert max_edit_distance>0 \n", " \n", " generate_deletes = unique_words_with_count.map(lambda (parent, count): (parent, get_deletes_list(parent)), \n", " preservesPartitioning=True)\n", " expand_deletes = generate_deletes.flatMapValues(lambda x: x)\n", " \n", " # swap and combine, resulting RDD after processing 1-deletes has elements:\n", " # [(delete1, [correct1, correct2...]), (delete2, [correct1, correct2...])...]\n", " swap = expand_deletes.map(lambda (orig, delete): (delete, [orig]))\n", " combine = swap.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions)\n", "\n", " # cache \"master\" deletes RDD, list of (deletes, [unique suggestions]), for use in loop\n", " deletes = combine.mapValues(lambda sl: list(set(sl))).cache()\n", " \n", " ##\n", " ## generate 2+ deletes\n", " ##\n", " \n", " d_remaining = max_edit_distance - 1 # decreasing counter\n", " queue = deletes\n", "\n", " while d_remaining>0:\n", "\n", " # generate further deletes -- we parallelize processing of all deletes in this version\n", " #'expand_new_deletes' will be of the form [(parent \"delete\", [new child \"deletes\"]), ...]\n", " # n.b. this will filter out elements with no new child deletes\n", " gen_new_deletes = queue.map(lambda (x, y): (x, get_deletes_list(x)), preservesPartitioning=True)\n", " expand_new_deletes = gen_new_deletes.flatMapValues(lambda x: x) \n", "\n", " # associate each new child delete with same corpus word suggestions that applied for parent delete\n", " # update queue with [(new child delete, [corpus suggestions]) ...] and cache for next iteration\n", " \n", " assert copartitioned(queue, expand_new_deletes) # check partitioning for efficient join\n", " get_sugglist_from_parent = expand_new_deletes.join(queue)\n", " new_deletes = get_sugglist_from_parent.map(lambda (p, (c, sl)): (c, sl))\n", " combine_new = new_deletes.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions)\n", " queue = combine_new.mapValues(lambda sl: list(set(sl))).cache()\n", "\n", " # update \"master\" deletes list with new deletes, and cache for next iteration\n", " \n", " assert copartitioned(deletes, queue) # check partitioning for efficient join\n", " join_delete_lists = deletes.fullOuterJoin(queue)\n", " deletes = join_delete_lists.mapValues(lambda y: combine_joined_lists(y)).cache()\n", "\n", " d_remaining -= 1\n", " \n", " ############\n", " #\n", " # merge deletes with unique corpus words to construct main dictionary\n", " #\n", " ############\n", "\n", " # dictionary entries are in the form: (list of suggested corrections, frequency of word in corpus)\n", " # note frequency of word in corpus is not incremented for deletes\n", " deletes_for_dict = deletes.mapValues(lambda sl: (sl, 0)) \n", " unique_words_for_dict = unique_words_with_count.mapValues(lambda count: ([], count))\n", "\n", " assert copartitioned(unique_words_for_dict, deletes_for_dict) # check partitioning for efficient join\n", " join_deletes = unique_words_for_dict.fullOuterJoin(deletes_for_dict)\n", " '''\n", " entries now in form of (word, ( ([], count), ([suggestions], 0) )) for words in both corpus/deletes\n", " (word, ( ([], count), None )) for (real) words in corpus only\n", " (word, ( None , ([suggestions], 0) )) for (fake) words in deletes only\n", " '''\n", "\n", " # if entry has deletes and is a real word, take suggestion list from deletes and count from corpus\n", " dictionary_RDD = join_deletes.mapValues(lambda (xtup, ytup): \n", " xtup if ytup is None\n", " else ytup if xtup is None\n", " else (ytup[0], xtup[1])).cache()\n", "\n", " print \"total items in dictionary (corpus words and deletions): %i\" % dictionary_RDD.count()\n", " print \" edit distance for deletions: %i\" % max_edit_distance\n", " longest_word_length = unique_words_with_count.map(lambda (k, v): len(k)).reduce(max)\n", " print \" length of longest word in corpus: %i\" % longest_word_length\n", " \n", " return dictionary_RDD, longest_word_length\n", "\n", "def dameraulevenshtein(seq1, seq2):\n", " \"\"\"Calculate the Damerau-Levenshtein distance (an integer) between sequences.\n", "\n", " This code has not been modified from the original.\n", " Source: http://mwh.geek.nz/2009/04/26/python-damerau-levenshtein-distance/\n", " \n", " This distance is the number of additions, deletions, substitutions,\n", " and transpositions needed to transform the first sequence into the\n", " second. Although generally used with strings, any sequences of\n", " comparable objects will work.\n", "\n", " Transpositions are exchanges of *consecutive* characters; all other\n", " operations are self-explanatory.\n", "\n", " This implementation is O(N*M) time and O(M) space, for N and M the\n", " lengths of the two sequences.\n", "\n", " >>> dameraulevenshtein('ba', 'abc')\n", " 2\n", " >>> dameraulevenshtein('fee', 'deed')\n", " 2\n", "\n", " It works with arbitrary sequences too:\n", " >>> dameraulevenshtein('abcd', ['b', 'a', 'c', 'd', 'e'])\n", " 2\n", " \"\"\"\n", " # codesnippet:D0DE4716-B6E6-4161-9219-2903BF8F547F\n", " # Conceptually, this is based on a len(seq1) + 1 * len(seq2) + 1 matrix.\n", " # However, only the current and two previous rows are needed at once,\n", " # so we only store those.\n", " oneago = None\n", " thisrow = range(1, len(seq2) + 1) + [0]\n", " for x in xrange(len(seq1)):\n", " # Python lists wrap around for negative indices, so put the\n", " # leftmost column at the *end* of the list. This matches with\n", " # the zero-indexed strings and saves extra calculation.\n", " twoago, oneago, thisrow = oneago, thisrow, [0] * len(seq2) + [x + 1]\n", " for y in xrange(len(seq2)):\n", " delcost = oneago[y] + 1\n", " addcost = thisrow[y - 1] + 1\n", " subcost = oneago[y - 1] + (seq1[x] != seq2[y])\n", " thisrow[y] = min(delcost, addcost, subcost)\n", " # This block deals with transpositions\n", " if (x > 0 and y > 0 and seq1[x] == seq2[y - 1]\n", " and seq1[x-1] == seq2[y] and seq1[x] != seq2[y]):\n", " thisrow[y] = min(thisrow[y], twoago[y - 2] + 1)\n", " return thisrow[len(seq2) - 1]\n", "\n", "def get_n_deletes_list(w, n):\n", " '''given a word, derive strings with up to n characters deleted'''\n", " deletes = []\n", " queue = [w]\n", " for d in range(n):\n", " temp_queue = []\n", " for word in queue:\n", " if len(word)>1:\n", " for c in range(len(word)): # character index\n", " word_minus_c = word[:c] + word[c+1:]\n", " if word_minus_c not in deletes:\n", " deletes.append(word_minus_c)\n", " if word_minus_c not in temp_queue:\n", " temp_queue.append(word_minus_c)\n", " queue = temp_queue\n", " \n", " return deletes\n", "\n", "def get_suggestions(s, dictRDD, longest_word_length=float('inf'), silent=False):\n", " '''return list of suggested corrections for potentially incorrectly spelled word.\n", " \n", " s: input string\n", " dictRDD: the main dictionary, which includes deletes\n", " entries are in the form of: [(word, ([suggested corrections], frequency of word in corpus)), ...]\n", " longest_word_length: optional identifier of longest real word in dictRDD\n", " silent: verbose output\n", " '''\n", "\n", " if (len(s) - longest_word_length) > max_edit_distance:\n", " if not silent:\n", " print \"no items in dictionary within maximum edit distance\"\n", " return []\n", "\n", " ##########\n", " #\n", " # initialize suggestions RDD\n", " # suggestRDD entries: (word, (frequency of word in corpus, edit distance))\n", " #\n", " ##########\n", " \n", " if not silent:\n", " print \"looking up suggestions based on input word...\"\n", " \n", " # ensure input RDDs are partitioned\n", " dictRDD = dictRDD.partitionBy(n_partitions).cache()\n", " \n", " # check if input word is in dictionary, and is a word from the corpus (edit distance = 0)\n", " # if so, add input word itself to suggestRDD\n", " exact_match = dictRDD.filter(lambda (w, (sl, freq)): w==s).cache()\n", " suggestRDD = exact_match.mapValues(lambda (sl, freq): (freq, 0)).cache()\n", "\n", " ##########\n", " #\n", " # add suggestions for input word\n", " #\n", " ##########\n", " \n", " # the suggested corrections for the item in dictionary (whether or not\n", " # the input string s itself is a valid word or merely a delete) can be valid corrections\n", " sc_items = exact_match.flatMap(lambda (w, (sl, freq)): sl)\n", " calc_dist = sc_items.map(lambda sc: (sc, len(sc)-len(s))).partitionBy(n_partitions).cache()\n", " \n", " assert copartitioned(dictRDD, calc_dist) # check partitioning for efficient join\n", " get_freq = dictRDD.join(calc_dist)\n", " parent_sugg = get_freq.mapValues(lambda ((sl, freq), dist): (freq, dist))\n", " suggestRDD = suggestRDD.union(parent_sugg).cache()\n", " assert copartitioned(parent_sugg, suggestRDD) # check partitioning\n", "\n", " ##########\n", " #\n", " # process deletes on the input string\n", " #\n", " ##########\n", " \n", " assert max_edit_distance>0\n", " \n", " list_deletes_of_s = sc.parallelize(get_n_deletes_list(s, max_edit_distance))\n", " deletes_of_s = list_deletes_of_s.map(lambda k: (k, 0)).partitionBy(n_partitions).cache()\n", " \n", " assert copartitioned(dictRDD, deletes_of_s) # check partitioning for efficient join\n", " check_matches = dictRDD.join(deletes_of_s).cache()\n", " \n", " # if delete is a real word in corpus, add it to suggestion list\n", " del_exact_match = check_matches.filter(lambda (w, ((sl, freq), _)): freq>0)\n", " del_sugg = del_exact_match.map(lambda (w, ((s1, freq), _)): (w, (freq, len(s)-len(w))),\n", " preservesPartitioning=True)\n", " suggestRDD = suggestRDD.union(del_sugg).cache()\n", " \n", " # the suggested corrections for the item in dictionary (whether or not\n", " # the delete itself is a valid word or merely a delete) can be valid corrections \n", " list_sl = check_matches.mapValues(lambda ((sl, freq), _): sl).flatMapValues(lambda x: x)\n", " swap_del = list_sl.map(lambda (w, sc): (sc, 0))\n", " combine_del = swap_del.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions).cache()\n", "\n", " # need to recalculate actual Deverau-Levenshtein distance to be within max_edit_distance for all deletes\n", " calc_dist = combine_del.map(lambda (w, _): (w, dameraulevenshtein(s, w)),\n", " preservesPartitioning=True)\n", " filter_by_dist = calc_dist.filter(lambda (w, dist): dist<=max_edit_distance)\n", " \n", " # get frequencies from main dictionary and add to suggestions list\n", " assert copartitioned(dictRDD, filter_by_dist) # check partitioning for efficient join\n", " get_freq = dictRDD.join(filter_by_dist)\n", " del_parent_sugg = get_freq.mapValues(lambda ((sl, freq), dist): (freq, dist))\n", " \n", " suggestRDD = suggestRDD.union(del_parent_sugg).distinct().cache() \n", " \n", " if not silent:\n", " print \"number of possible corrections: %i\" %suggestRDD.count()\n", " print \" edit distance for deletions: %i\" % max_edit_distance\n", "\n", " ##########\n", " #\n", " # sort RDD for output\n", " #\n", " ##########\n", " \n", " # suggest_RDD is in the form: [(word, (freq, editdist)), (word, (freq, editdist)), ...]\n", " # there does not seem to be a straightforward way to sort by both primary and secondary keys in Spark\n", " # this is a documented issue: one option is to simply work with a list since there are likely not\n", " # going to be an extremely large number of recommended suggestions\n", " \n", " output = suggestRDD.collect()\n", " \n", " # output option 1\n", " # sort results by ascending order of edit distance and descending order of frequency\n", " # and return list of suggested corrections only:\n", " # return sorted(output, key = lambda x: (suggest_dict[x][1], -suggest_dict[x][0]))\n", "\n", " # output option 2\n", " # return list of suggestions with (correction, (frequency in corpus, edit distance)):\n", " # return sorted(output, key = lambda (term, (freq, dist)): (dist, -freq))\n", "\n", " return sorted(output, key = lambda (term, (freq, dist)): (dist, -freq))\n", "\n", "def best_word(s, d, l, silent=False):\n", " a = get_suggestions(s, d, l, silent)\n", " if len(a)==0:\n", " return (None, (None, None))\n", " else: \n", " return a[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Run the cell below only once to build the dictionary.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "d, lwl = parallel_create_dictionary(\"testdata/big.txt\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 113 ms, sys: 29.4 ms, total: 142 ms\n",
    "Wall time: 5min 26s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Enter word to correct below.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "get_suggestions(\"there\", d, lwl)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "looking up suggestions based on input word...\n",
    "number of possible corrections: 604\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 79.9 ms, sys: 20.2 ms, total: 100 ms\n",
    "Wall time: 1min 33s\n",
    "Out[6]:\n",
    "[(u'there', (2972, 0)),\n",
    " (u'these', (1231, 1)),\n",
    " (u'where', (977, 1)),\n",
    " (u'here', (691, 1)),\n",
    " (u'three', (584, 1)),\n",
    " (u'thee', (26, 1)),\n",
    " (u'chere', (9, 1)),\n",
    " (u'theme', (8, 1)), ...\n",
    "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "get_suggestions(\"zzffttt\", d, lwl)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "looking up suggestions based on input word...\n",
    "number of possible corrections: 0\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 76.2 ms, sys: 18.9 ms, total: 95.1 ms\n",
    "Wall time: 2min 7s\n",
    "Out[5]:\n",
    "[]\n",
    "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "best_word(\"there\", d, lwl)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "looking up suggestions based on input word...\n",
    "number of possible corrections: 604\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 95.8 ms, sys: 22.7 ms, total: 119 ms\n",
    "Wall time: 1min 33s\n",
    "Out[4]:\n",
    "(u'there', (2972, 0))\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3. Optimized SPARK version: FASTER" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " To run this program, restart notebook, and start executing the cells of this section starting here.\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "corresponds to `spark_2.py`\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "'''\n", "v 2.0 last revised 26 Nov 2015\n", "\n", "This program is a Spark (PySpark) version of a spellchecker based on SymSpell, \n", "a Symmetric Delete spelling correction algorithm developed by Wolf Garbe \n", "and originally written in C#.\n", "\n", "From the original SymSpell documentation:\n", "\n", "\"The Symmetric Delete spelling correction algorithm reduces the complexity \n", " of edit candidate generation and dictionary lookup for a given Damerau-\n", " Levenshtein distance. It is six orders of magnitude faster and language \n", " independent. Opposite to other algorithms only deletes are required, \n", " no transposes + replaces + inserts. Transposes + replaces + inserts of the \n", " input term are transformed into deletes of the dictionary term.\n", " Replaces and inserts are expensive and language dependent: \n", " e.g. Chinese has 70,000 Unicode Han characters!\"\n", "\n", "For further information on SymSpell, please consult the original documentation:\n", " URL: blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/\n", " Description: blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/\n", "\n", "The current version of this program will output all possible suggestions for\n", "corrections up to an edit distance (configurable) of max_edit_distance = 3. \n", "\n", "Future improvements may entail allowing for less verbose options, \n", "including the output of a single recommended correction. Note also that we\n", "have generally kept to the form of the original program, and have not\n", "introduced any major optimizations or structural changes in this PySpark port.\n", "\n", "To execute program:\n", "1. Ensure \"big.txt\" is in the current working directory. This is the corpus\n", " from which the dictionary for the spellchecker will be built.\n", "2. Check recommended single word corrections by executing get_suggestions(\"word\") \n", " in the corresponding marked box below.\n", "\n", "Note: we did not implement entire document checking given speed of program,\n", " since we are parallelizing the processing of deletions of the input word\n", " (however, see later Spark version).\n", "\n", "################\n", "\n", "Example input/output:\n", "\n", "################\n", "\n", "get_suggestions(\"there\")\n", "\n", "number of possible corrections: 604\n", " edit distance for deletions: 3\n", " \n", "[('there', (2972, 0)),\n", " ('these', (1231, 1)),\n", " ('where', (977, 1)),\n", " ('here', (691, 1)),\n", " ('three', (584, 1)),\n", " ('thee', (26, 1)),\n", " ('chere', (9, 1)),\n", " ('theme', (8, 1)),\n", " ('the', (80030, 2)), ...\n", "\n", "\n", "'''\n", "\n", "import findspark\n", "import os\n", "findspark.init('/Users/K-Lo/spark-1.5.0')\n", "\n", "from pyspark import SparkContext\n", "sc = SparkContext()\n", "\n", "import re\n", "\n", "n_partitions = 6 # number of partitions to be used\n", "max_edit_distance = 3\n", "\n", "# helper functions\n", "def get_n_deletes_list(w, n):\n", " '''given a word, derive list of strings with up to n characters deleted'''\n", " # since this list is generally of the same magnitude as the number of \n", " # characters in a word, it may not make sense to parallelize this\n", " # so we use python to create the list\n", " deletes = []\n", " queue = [w]\n", " for d in range(n):\n", " temp_queue = []\n", " for word in queue:\n", " if len(word)>1:\n", " for c in range(len(word)): # character index\n", " word_minus_c = word[:c] + word[c+1:]\n", " if word_minus_c not in deletes:\n", " deletes.append(word_minus_c)\n", " if word_minus_c not in temp_queue:\n", " temp_queue.append(word_minus_c)\n", " queue = temp_queue\n", " \n", " return deletes\n", " \n", "def copartitioned(RDD1, RDD2):\n", " '''check if two RDDs are copartitioned'''\n", " return RDD1.partitioner == RDD2.partitioner\n", "\n", "def combine_joined_lists(tup):\n", " '''takes as input a tuple in the form (a, b) where each of a, b may be None (but not both) or a list\n", " and returns a concatenated list of unique elements'''\n", " concat_list = []\n", " if tup[1] is None:\n", " concat_list = tup[0]\n", " elif tup[0] is None:\n", " concat_list = tup[1]\n", " else:\n", " concat_list = tup[0] + tup[1]\n", " \n", " return list(set(concat_list))\n", "\n", "def parallel_create_dictionary(fname):\n", " '''Create dictionary using Spark RDDs.'''\n", " # we generate and count all words for the corpus,\n", " # then add deletes to the dictionary\n", " # this is a slightly different approach from the SymSpell algorithm\n", " # that may be more appropriate for Spark processing\n", " \n", " print \"Creating dictionary...\" \n", " \n", " ############\n", " #\n", " # process corpus\n", " #\n", " ############\n", " \n", " # http://stackoverflow.com/questions/22520932/python-remove-all-non-alphabet-chars-from-string\n", " regex = re.compile('[^a-z ]')\n", "\n", " # convert file into one long sequence of words\n", " make_all_lower = sc.textFile(fname).map(lambda line: line.lower())\n", " replace_nonalphs = make_all_lower.map(lambda line: regex.sub(' ', line))\n", " all_words = replace_nonalphs.flatMap(lambda line: line.split())\n", "\n", " # create core corpus dictionary (i.e. only words appearing in file, no \"deletes\") and cache it\n", " # output RDD of unique_words_with_count: [(word1, count1), (word2, count2), (word3, count3)...]\n", " count_once = all_words.map(lambda word: (word, 1))\n", " unique_words_with_count = count_once.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions).cache()\n", " \n", " # output stats on core corpus\n", " print \"total words processed: %i\" % unique_words_with_count.map(lambda (k, v): v).reduce(lambda a, b: a + b)\n", " print \"total unique words in corpus: %i\" % unique_words_with_count.count()\n", " \n", " ############\n", " #\n", " # generate deletes list\n", " #\n", " ############\n", " \n", " # generate list of n-deletes from words in a corpus of the form: [(word1, count1), (word2, count2), ...]\n", " \n", " assert max_edit_distance>0 \n", " \n", " generate_deletes = unique_words_with_count.map(lambda (parent, count): \n", " (parent, get_n_deletes_list(parent, max_edit_distance)))\n", " expand_deletes = generate_deletes.flatMapValues(lambda x: x)\n", " swap = expand_deletes.map(lambda (orig, delete): (delete, ([orig], 0)))\n", " \n", " ############\n", " #\n", " # combine delete elements with main dictionary\n", " #\n", " ############\n", " \n", " corpus = unique_words_with_count.mapValues(lambda count: ([], count))\n", " combine = swap.union(corpus) # combine deletes with main dictionary, eliminate duplicates\n", " new_dict = combine.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])).cache()\n", " \n", " print \"total items in dictionary (corpus words and deletions): %i\" % new_dict.count()\n", " print \" edit distance for deletions: %i\" % max_edit_distance\n", " longest_word_length = unique_words_with_count.map(lambda (k, v): len(k)).reduce(max)\n", " print \" length of longest word in corpus: %i\" % longest_word_length\n", "\n", " return new_dict, longest_word_length \n", "\n", "def dameraulevenshtein(seq1, seq2):\n", " \"\"\"Calculate the Damerau-Levenshtein distance (an integer) between sequences.\n", "\n", " This code has not been modified from the original.\n", " Source: http://mwh.geek.nz/2009/04/26/python-damerau-levenshtein-distance/\n", " \n", " This distance is the number of additions, deletions, substitutions,\n", " and transpositions needed to transform the first sequence into the\n", " second. Although generally used with strings, any sequences of\n", " comparable objects will work.\n", "\n", " Transpositions are exchanges of *consecutive* characters; all other\n", " operations are self-explanatory.\n", "\n", " This implementation is O(N*M) time and O(M) space, for N and M the\n", " lengths of the two sequences.\n", "\n", " >>> dameraulevenshtein('ba', 'abc')\n", " 2\n", " >>> dameraulevenshtein('fee', 'deed')\n", " 2\n", "\n", " It works with arbitrary sequences too:\n", " >>> dameraulevenshtein('abcd', ['b', 'a', 'c', 'd', 'e'])\n", " 2\n", " \"\"\"\n", " # codesnippet:D0DE4716-B6E6-4161-9219-2903BF8F547F\n", " # Conceptually, this is based on a len(seq1) + 1 * len(seq2) + 1 matrix.\n", " # However, only the current and two previous rows are needed at once,\n", " # so we only store those.\n", " oneago = None\n", " thisrow = range(1, len(seq2) + 1) + [0]\n", " for x in xrange(len(seq1)):\n", " # Python lists wrap around for negative indices, so put the\n", " # leftmost column at the *end* of the list. This matches with\n", " # the zero-indexed strings and saves extra calculation.\n", " twoago, oneago, thisrow = oneago, thisrow, [0] * len(seq2) + [x + 1]\n", " for y in xrange(len(seq2)):\n", " delcost = oneago[y] + 1\n", " addcost = thisrow[y - 1] + 1\n", " subcost = oneago[y - 1] + (seq1[x] != seq2[y])\n", " thisrow[y] = min(delcost, addcost, subcost)\n", " # This block deals with transpositions\n", " if (x > 0 and y > 0 and seq1[x] == seq2[y - 1]\n", " and seq1[x-1] == seq2[y] and seq1[x] != seq2[y]):\n", " thisrow[y] = min(thisrow[y], twoago[y - 2] + 1)\n", " return thisrow[len(seq2) - 1]\n", "\n", "def get_suggestions(s, dictRDD, longest_word_length=float('inf'), silent=False):\n", " '''return list of suggested corrections for potentially incorrectly spelled word.\n", " \n", " s: input string\n", " dictRDD: the main dictionary, which includes deletes\n", " entries are in the form of: [(word, ([suggested corrections], frequency of word in corpus)), ...]\n", " longest_word_length: optional identifier of longest real word in dictRDD\n", " silent: verbose output\n", " '''\n", "\n", " if (len(s) - longest_word_length) > max_edit_distance:\n", " if not silent:\n", " print \"no items in dictionary within maximum edit distance\"\n", " return []\n", "\n", " ##########\n", " #\n", " # initialize suggestions RDD\n", " # suggestRDD entries: (word, (frequency of word in corpus, edit distance))\n", " #\n", " ##########\n", " \n", " if not silent:\n", " print \"looking up suggestions based on input word...\"\n", " \n", " # ensure input RDDs are partitioned\n", " dictRDD = dictRDD.repartitionAndSortWithinPartitions(n_partitions).cache()\n", " \n", " # check if input word is in dictionary, and is a word from the corpus (edit distance = 0)\n", " # if so, add input word itself to suggestRDD\n", " exact_match = dictRDD.filter(lambda (w, (sl, freq)): w==s).cache()\n", " suggestRDD = exact_match.mapValues(lambda (sl, freq): (freq, 0))\n", " \n", " ##########\n", " #\n", " # add suggestions for input word\n", " #\n", " ##########\n", "\n", " # the suggested corrections for the item in dictionary (whether or not\n", " # the input string s itself is a valid word or merely a delete) can be valid corrections\n", " # the suggestions list will likely be short: it is only for one word\n", " # so we choose to collect here and process as a list, rather than parallelizing \n", " # a very short list\n", " sc_items = exact_match.flatMap(lambda (w, (sl, freq)): sl).collect() \n", " get_freq = dictRDD.filter(lambda (w, (sl, freq)): w in sc_items)\n", " parent_sugg = get_freq.map(lambda (w, (sl, freq)): (w, (freq, len(w)-len(s))), \n", " preservesPartitioning=True)\n", " suggestRDD = suggestRDD.union(parent_sugg).cache()\n", " assert copartitioned(parent_sugg, suggestRDD) # check partitioning\n", "\n", " ##########\n", " #\n", " # process deletes of the input string\n", " #\n", " ##########\n", " \n", " assert max_edit_distance>0\n", " \n", " list_deletes_of_s = get_n_deletes_list(s, max_edit_distance) # this list is also short\n", " check_matches = dictRDD.filter(lambda (w, (sl, freq)): w in list_deletes_of_s).cache()\n", "\n", " # identify deletes that match a dictionary entry, and add matches to suggestions\n", " del_exact_match = check_matches.filter(lambda (w, (sl, freq)): freq>0)\n", " del_sugg = del_exact_match.map(lambda (w, (s1, freq)): (w, (freq, len(s)-len(w))),\n", " preservesPartitioning=True)\n", " suggestRDD = suggestRDD.union(del_sugg).cache()\n", " \n", " ##########\n", " #\n", " # now process suggestions lists of deletes\n", " #\n", " ##########\n", "\n", " # the suggested corrections for the item in dictionary (whether or not\n", " # the delete itself is a valid word or merely a delete) can be valid corrections \n", " list_sl = check_matches.mapValues(lambda (sl, freq): sl).flatMapValues(lambda x: x)\n", " swap_del = list_sl.map(lambda (w, sc): (sc, 0))\n", " combine_del = swap_del.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions).cache()\n", "\n", " # need to recalculate actual Deverau Levenshtein distance to be within max_edit_distance \n", " # for all deletes and check against the threshold value\n", " calc_dist = combine_del.map(lambda (w, _): (w, dameraulevenshtein(s, w)),\n", " preservesPartitioning=True)\n", " filter_by_dist = calc_dist.filter(lambda (w, dist): dist<=max_edit_distance)\n", " \n", " # MERGE: get frequencies from main dictionary and add to suggestions list\n", " assert copartitioned(dictRDD, filter_by_dist) # check partitioning for efficient join\n", " get_freq = dictRDD.join(filter_by_dist)\n", " del_parent_sugg = get_freq.mapValues(lambda ((sl, freq), dist): (freq, dist)).cache()\n", " \n", " suggestRDD = suggestRDD.union(del_parent_sugg).distinct().cache() \n", "\n", " ##########\n", " #\n", " # output suggestions\n", " #\n", " ##########\n", " \n", " if not silent:\n", " print \"number of possible corrections: %i\" % suggestRDD.count()\n", " print \" edit distance for deletions: %i\" % max_edit_distance\n", "\n", " output = suggestRDD.collect()\n", " \n", " # suggest_RDD is in the form: [(word, (freq, editdist)), (word, (freq, editdist)), ...]\n", " # there does not seem to be a straightforward way to sort by both primary and secondary keys in Spark\n", " # this is a documented issue: one option is to simply work with a list since there are likely not\n", " # going to be an extremely large number of recommended suggestions\n", " \n", " # output option 1\n", " # sort results by ascending order of edit distance and descending order of frequency\n", " # and return list of suggested corrections only:\n", " # return sorted(output, key = lambda x: (suggest_dict[x][1], -suggest_dict[x][0]))\n", "\n", " # output option 2\n", " # return list of suggestions with (correction, (frequency in corpus, edit distance)):\n", " # return sorted(output, key = lambda (term, (freq, dist)): (dist, -freq))\n", "\n", " return sorted(output, key = lambda (term, (freq, dist)): (dist, -freq))\n", "\n", "\n", "def best_word(s, d, l, silent=False):\n", " a = get_suggestions(s, d, l, silent)\n", " if len(a)==0:\n", " return (None, (None, None))\n", " else: \n", " return a[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Run the cell below only once to build the dictionary.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "d, lwl = parallel_create_dictionary(\"testdata/big.txt\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 52 ms, sys: 13 ms, total: 65 ms\n",
    "Wall time: 54.1 s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Enter word to correct below.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "get_suggestions(\"there\", d, lwl)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "looking up suggestions based on input word...\n",
    "number of possible corrections: 604\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 56.6 ms, sys: 14.8 ms, total: 71.4 ms\n",
    "Wall time: 53.7 s\n",
    "Out[12]:\n",
    "[(u'there', (2972, 0)),\n",
    " (u'these', (1231, 1)),\n",
    " (u'where', (977, 1)),\n",
    " (u'here', (691, 1)),\n",
    " (u'three', (584, 1)),\n",
    " (u'thee', (26, 1)),\n",
    " (u'chere', (9, 1)),\n",
    " (u'theme', (8, 1)),\n",
    " (u'the', (80030, 2)), ...\n",
    "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "get_suggestions(\"zzffttt\", d, lwl)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "looking up suggestions based on input word...\n",
    "number of possible corrections: 0\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 57.7 ms, sys: 15.6 ms, total: 73.4 ms\n",
    "Wall time: 57.3 s\n",
    "Out[4]:\n",
    "[]\n",
    "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "best_word(\"there\", d, lwl)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "looking up suggestions based on input word...\n",
    "number of possible corrections: 604\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 80.6 ms, sys: 19.3 ms, total: 99.8 ms\n",
    "Wall time: 59.7 s\n",
    "Out[4]:\n",
    "(u'there', (2972, 0))\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4. Optimized SPARK version: ALSO FAST" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " To run this program, restart notebook, and start executing the cells of this section starting here.

\n", " The difference between this version and #3 is that it does not keep the corpus in RDD form; the corpus is broadcast to the workers by the driver and used as a dictionary upon which lookups are performed. When scaling, the dictionary itself, while large, can be smaller than the documents being checked and should fit in memory (since the serial version assumes that the dictionary can be stored in memory anyway).\n", "

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "corresponds to `spark_3.py`\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "'''\n", "v 3.0 last revised 26 Nov 2015\n", "\n", "This program is a Spark (PySpark) version of a spellchecker based on SymSpell, \n", "a Symmetric Delete spelling correction algorithm developed by Wolf Garbe \n", "and originally written in C#.\n", "\n", "From the original SymSpell documentation:\n", "\n", "\"The Symmetric Delete spelling correction algorithm reduces the complexity \n", " of edit candidate generation and dictionary lookup for a given Damerau-\n", " Levenshtein distance. It is six orders of magnitude faster and language \n", " independent. Opposite to other algorithms only deletes are required, \n", " no transposes + replaces + inserts. Transposes + replaces + inserts of the \n", " input term are transformed into deletes of the dictionary term.\n", " Replaces and inserts are expensive and language dependent: \n", " e.g. Chinese has 70,000 Unicode Han characters!\"\n", "\n", "For further information on SymSpell, please consult the original documentation:\n", " URL: blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/\n", " Description: blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/\n", "\n", "The current version of this program will output all possible suggestions for\n", "corrections up to an edit distance (configurable) of max_edit_distance = 3. \n", "\n", "Future improvements may entail allowing for less verbose options, \n", "including the output of a single recommended correction. Note also that we\n", "have generally kept to the form of the original program, and have not\n", "introduced any major optimizations or structural changes in this PySpark port.\n", "\n", "To execute program:\n", "1. Ensure \"big.txt\" is in the current working directory. This is the corpus\n", " from which the dictionary for the spellchecker will be built.\n", "2. Check recommended single word corrections by executing get_suggestions(\"word\") \n", " in the corresponding marked box below.\n", "\n", "Note: we did not implement entire document checking given speed of program,\n", " since we are parallelizing the processing of deletions of the input word\n", " (however, see later Spark version).\n", "\n", "################\n", "\n", "Example input/output:\n", "\n", "################\n", "\n", "get_suggestions(\"there\")\n", "\n", "number of possible corrections: 604\n", " edit distance for deletions: 3\n", " \n", "[('there', (2972, 0)),\n", " ('these', (1231, 1)),\n", " ('where', (977, 1)),\n", " ('here', (691, 1)),\n", " ('three', (584, 1)),\n", " ('thee', (26, 1)),\n", " ('chere', (9, 1)),\n", " ('theme', (8, 1)),\n", " ('the', (80030, 2)), ...\n", "\n", "\n", "'''\n", "\n", "import findspark\n", "import os\n", "findspark.init('/Users/K-Lo/spark-1.5.0')\n", "\n", "from pyspark import SparkContext\n", "sc = SparkContext()\n", "\n", "import re\n", "\n", "n_partitions = 6 # number of partitions to be used\n", "max_edit_distance = 3\n", "\n", "# helper functions\n", "def get_n_deletes_list(w, n):\n", " '''given a word, derive list of strings with up to n characters deleted'''\n", " # since this list is generally of the same magnitude as the number of \n", " # characters in a word, it may not make sense to parallelize this\n", " # so we use python to create the list\n", " deletes = []\n", " queue = [w]\n", " for d in range(n):\n", " temp_queue = []\n", " for word in queue:\n", " if len(word)>1:\n", " for c in range(len(word)): # character index\n", " word_minus_c = word[:c] + word[c+1:]\n", " if word_minus_c not in deletes:\n", " deletes.append(word_minus_c)\n", " if word_minus_c not in temp_queue:\n", " temp_queue.append(word_minus_c)\n", " queue = temp_queue\n", " \n", " return deletes\n", " \n", "def copartitioned(RDD1, RDD2):\n", " '''check if two RDDs are copartitioned'''\n", " return RDD1.partitioner == RDD2.partitioner\n", "\n", "def combine_joined_lists(tup):\n", " '''takes as input a tuple in the form (a, b) where each of a, b may be None (but not both) or a list\n", " and returns a concatenated list of unique elements'''\n", " concat_list = []\n", " if tup[1] is None:\n", " concat_list = tup[0]\n", " elif tup[0] is None:\n", " concat_list = tup[1]\n", " else:\n", " concat_list = tup[0] + tup[1]\n", " \n", " return list(set(concat_list))\n", "\n", "def parallel_create_dictionary(fname):\n", " '''Create dictionary using Spark RDDs.'''\n", " # we generate and count all words for the corpus,\n", " # then add deletes to the dictionary\n", " # this is a slightly different approach from the SymSpell algorithm\n", " # that may be more appropriate for Spark processing\n", " \n", " print \"Creating dictionary...\" \n", " \n", " ############\n", " #\n", " # process corpus\n", " #\n", " ############\n", " \n", " # http://stackoverflow.com/questions/22520932/python-remove-all-non-alphabet-chars-from-string\n", " regex = re.compile('[^a-z ]')\n", "\n", " # convert file into one long sequence of words\n", " make_all_lower = sc.textFile(fname).map(lambda line: line.lower())\n", " replace_nonalphs = make_all_lower.map(lambda line: regex.sub(' ', line))\n", " all_words = replace_nonalphs.flatMap(lambda line: line.split())\n", "\n", " # create core corpus dictionary (i.e. only words appearing in file, no \"deletes\") and cache it\n", " # output RDD of unique_words_with_count: [(word1, count1), (word2, count2), (word3, count3)...]\n", " count_once = all_words.map(lambda word: (word, 1))\n", " unique_words_with_count = count_once.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions).cache()\n", " \n", " # output stats on core corpus\n", " print \"total words processed: %i\" % unique_words_with_count.map(lambda (k, v): v).reduce(lambda a, b: a + b)\n", " print \"total unique words in corpus: %i\" % unique_words_with_count.count()\n", " \n", " ############\n", " #\n", " # generate deletes list\n", " #\n", " ############\n", " \n", " # generate list of n-deletes from words in a corpus of the form: [(word1, count1), (word2, count2), ...]\n", " \n", " assert max_edit_distance>0 \n", " \n", " generate_deletes = unique_words_with_count.map(lambda (parent, count): \n", " (parent, get_n_deletes_list(parent, max_edit_distance)))\n", " expand_deletes = generate_deletes.flatMapValues(lambda x: x)\n", " swap = expand_deletes.map(lambda (orig, delete): (delete, ([orig], 0)))\n", " \n", " ############\n", " #\n", " # combine delete elements with main dictionary\n", " #\n", " ############\n", " \n", " corpus = unique_words_with_count.mapValues(lambda count: ([], count))\n", " combine = swap.union(corpus) # combine deletes with main dictionary, eliminate duplicates\n", " \n", " ## since the dictionary will only be a lookup table once created, we can\n", " ## pass on as a Python dictionary rather than RDD by reducing locally and\n", " ## avoiding an extra shuffle from reduceByKey\n", " new_dict = combine.reduceByKeyLocally(lambda a, b: (a[0]+b[0], a[1]+b[1]))\n", " \n", " print \"total items in dictionary (corpus words and deletions): %i\" % len(new_dict)\n", " print \" edit distance for deletions: %i\" % max_edit_distance\n", " longest_word_length = unique_words_with_count.map(lambda (k, v): len(k)).reduce(max)\n", " print \" length of longest word in corpus: %i\" % longest_word_length\n", "\n", " return new_dict, longest_word_length \n", "\n", "def dameraulevenshtein(seq1, seq2):\n", " \"\"\"Calculate the Damerau-Levenshtein distance (an integer) between sequences.\n", "\n", " This code has not been modified from the original.\n", " Source: http://mwh.geek.nz/2009/04/26/python-damerau-levenshtein-distance/\n", " \n", " This distance is the number of additions, deletions, substitutions,\n", " and transpositions needed to transform the first sequence into the\n", " second. Although generally used with strings, any sequences of\n", " comparable objects will work.\n", "\n", " Transpositions are exchanges of *consecutive* characters; all other\n", " operations are self-explanatory.\n", "\n", " This implementation is O(N*M) time and O(M) space, for N and M the\n", " lengths of the two sequences.\n", "\n", " >>> dameraulevenshtein('ba', 'abc')\n", " 2\n", " >>> dameraulevenshtein('fee', 'deed')\n", " 2\n", "\n", " It works with arbitrary sequences too:\n", " >>> dameraulevenshtein('abcd', ['b', 'a', 'c', 'd', 'e'])\n", " 2\n", " \"\"\"\n", " # codesnippet:D0DE4716-B6E6-4161-9219-2903BF8F547F\n", " # Conceptually, this is based on a len(seq1) + 1 * len(seq2) + 1 matrix.\n", " # However, only the current and two previous rows are needed at once,\n", " # so we only store those.\n", " oneago = None\n", " thisrow = range(1, len(seq2) + 1) + [0]\n", " for x in xrange(len(seq1)):\n", " # Python lists wrap around for negative indices, so put the\n", " # leftmost column at the *end* of the list. This matches with\n", " # the zero-indexed strings and saves extra calculation.\n", " twoago, oneago, thisrow = oneago, thisrow, [0] * len(seq2) + [x + 1]\n", " for y in xrange(len(seq2)):\n", " delcost = oneago[y] + 1\n", " addcost = thisrow[y - 1] + 1\n", " subcost = oneago[y - 1] + (seq1[x] != seq2[y])\n", " thisrow[y] = min(delcost, addcost, subcost)\n", " # This block deals with transpositions\n", " if (x > 0 and y > 0 and seq1[x] == seq2[y - 1]\n", " and seq1[x-1] == seq2[y] and seq1[x] != seq2[y]):\n", " thisrow[y] = min(thisrow[y], twoago[y - 2] + 1)\n", " return thisrow[len(seq2) - 1]\n", "\n", "def get_suggestions(s, masterdict, longest_word_length=float('inf'), silent=False):\n", " '''return list of suggested corrections for potentially incorrectly spelled word.\n", " \n", " s: input string\n", " masterdict: the main dictionary (python dict), which includes deletes\n", " entries, is in the form of: {word: ([suggested corrections], \n", " frequency of word in corpus), ...}\n", " longest_word_length: optional identifier of longest real word in masterdict\n", " silent: verbose output (when False)\n", " '''\n", "\n", " if (len(s) - longest_word_length) > max_edit_distance:\n", " if not silent:\n", " print \"no items in dictionary within maximum edit distance\"\n", " return []\n", "\n", " ##########\n", " #\n", " # initialize suggestions RDD\n", " # suggestRDD entries: (word, (frequency of word in corpus, edit distance))\n", " #\n", " ##########\n", " \n", " ## since the dictionary will only be a lookup table once created, we can\n", " ## keep as Python dictionary rather than converting to RDD \n", " ##### try broadcasting to workers for efficiency\n", " masterdict2 = sc.broadcast(masterdict)\n", " \n", " if not silent:\n", " print \"looking up suggestions based on input word...\"\n", " \n", " got_suggestions = False # initialize flag\n", " init_sugg = []\n", " \n", " # check if input word is in dictionary, and is a word from the corpus (edit distance = 0)\n", " # if so, add input word itself and suggestions to suggestRDD\n", " \n", " if s in masterdict2.value:\n", " got_suggestions = True\n", " # dictionary values are in the form of ([suggestions], freq)\n", " if masterdict2.value[s][1]>0: # frequency>0 -> real corpus word\n", " init_sugg = [(s, (masterdict2.value[s][1], 0))]\n", "\n", " # the suggested corrections for the item in dictionary (whether or not\n", " # the input string s itself is a valid word or merely a delete) can be \n", " # valid corrections -- essentially we serialize this portion since\n", " # the list of corrections tends to be very short\n", " \n", " add_sugg = [(sugg, (masterdict2.value[sugg][1], len(sugg)-len(s))) \n", " for sugg in masterdict2.value[s][0]]\n", " \n", " suggestRDD = sc.parallelize(init_sugg + add_sugg, n_partitions).cache()\n", " \n", " ##########\n", " #\n", " # process deletes on the input string \n", " #\n", " ##########\n", " \n", " assert max_edit_distance>0\n", " \n", " list_deletes_of_s = get_n_deletes_list(s, max_edit_distance) # this list is short\n", " \n", " add_sugg_2 = [(sugg, (masterdict2.value[sugg][1], len(s)-len(sugg))) \n", " for sugg in list_deletes_of_s if ((sugg in masterdict2.value) and\n", " (masterdict2.value[sugg][1]>0))]\n", " \n", " add_sugg_2_RDD = sc.parallelize(add_sugg_2, n_partitions)\n", " \n", " if got_suggestions:\n", " suggestRDD = suggestRDD.union(add_sugg_2_RDD).cache()\n", " else:\n", " got_suggestions = True\n", " suggestRDD = add_sugg_2_RDD.cache()\n", " \n", " # check each item of suggestion list of all new-found suggestions \n", " # the suggested corrections for the item in dictionary (whether or not\n", " # the delete itself is a valid word or merely a delete) can be valid corrections \n", " # expand lists of list\n", " \n", " sugg_lists = [masterdict2.value[sugg][0] for sugg in list_deletes_of_s \n", " if sugg in masterdict2.value]\n", " sugg_lists_RDD = sc.parallelize(sugg_lists)\n", " list_sl = sugg_lists_RDD.flatMap(lambda x: x).map(lambda w: (w, 0))\n", " combine_del = list_sl.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions).cache()\n", "\n", " # need to recalculate actual Deverau Levenshtein distance to be within max_edit_distance for all deletes\n", " calc_dist = combine_del.map(lambda (w, _): (w, dameraulevenshtein(s, w)),\n", " preservesPartitioning=True)\n", " filter_by_dist = calc_dist.filter(lambda (w, dist): dist<=max_edit_distance)\n", " \n", " # get frequencies from main dictionary and add to suggestions list\n", " get_freq = filter_by_dist.filter(lambda (w, dist): w in masterdict2.value)\n", " del_parent_sugg = get_freq.map(lambda (w, dist): (w, (masterdict2.value[w][1], dist)),\n", " preservesPartitioning=True)\n", " \n", " if got_suggestions:\n", " if len(sugg_lists)>0:\n", " suggestRDD = suggestRDD.union(del_parent_sugg).distinct().cache()\n", " else:\n", " # no additional suggestions, leave suggest RDD intact\n", " pass\n", " \n", " # convert unicode to string (currently input not in unicode)\n", " suggestRDD = suggestRDD.map(lambda (w, _): (str(w), _)).cache()\n", " else:\n", " suggestRDD = sc.EmptyRDD() \n", " \n", " if not silent:\n", " print \"number of possible corrections: %i\" % suggestRDD.count()\n", " print \" edit distance for deletions: %i\" % max_edit_distance\n", "\n", " ##########\n", " #\n", " # optionally, sort RDD for output\n", " #\n", " ##########\n", " \n", " # suggest_RDD is in the form: [(word, (freq, editdist)), (word, (freq, editdist)), ...]\n", " # there does not seem to be a straightforward way to sort by both primary and secondary keys in Spark\n", " # this is a documented issue: one option is to simply work with a list since there are likely not\n", " # going to be an extremely large number of recommended suggestions\n", " \n", " output = suggestRDD.collect()\n", " \n", " # output option 1\n", " # sort results by ascending order of edit distance and descending order of frequency\n", " # and return list of suggested corrections only:\n", " # return sorted(output, key = lambda x: (suggest_dict[x][1], -suggest_dict[x][0]))\n", "\n", " # output option 2\n", " # return list of suggestions with (correction, (frequency in corpus, edit distance)):\n", " # return sorted(output, key = lambda (term, (freq, dist)): (dist, -freq))\n", "\n", " if len(output)>0:\n", " return sorted(output, key = lambda (term, (freq, dist)): (dist, -freq))\n", " else:\n", " return []\n", "\n", "def best_word(s, d, l, silent=False):\n", " a = get_suggestions(s, d, l, silent)\n", " if len(a)==0:\n", " return (None, (None, None))\n", " else: \n", " return a[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Run the cell below only once to build the dictionary.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "d, lwl = parallel_create_dictionary(\"testdata/big.txt\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 10.9 s, sys: 1.01 s, total: 11.9 s\n",
    "Wall time: 42.1 s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Enter word to correct below.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "get_suggestions(\"there\", d, lwl)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "looking up suggestions based on input word...\n",
    "number of possible corrections: 604\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 25.3 s, sys: 1.65 s, total: 26.9 s\n",
    "Wall time: 1min 3s\n",
    "Out[6]:\n",
    "[('there', (2972, 0)),\n",
    " ('these', (1231, 1)),\n",
    " ('where', (977, 1)),\n",
    " ('here', (691, 1)),\n",
    " ('three', (584, 1)),\n",
    " ('thee', (26, 1)),\n",
    " ('chere', (9, 1)),\n",
    " ('theme', (8, 1)),\n",
    " ('the', (80030, 2)), ...\n",
    "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "get_suggestions(\"zzffttt\", d, lwl)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "looking up suggestions based on input word...\n",
    "number of possible corrections: 0\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 15.9 s, sys: 1.3 s, total: 17.2 s\n",
    "Wall time: 18.5 s\n",
    "Out[6]:\n",
    "[]\n",
    "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "best_word(\"there\", d, lwl)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "looking up suggestions based on input word...\n",
    "number of possible corrections: 604\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 23.3 s, sys: 1.3 s, total: 24.6 s\n",
    "Wall time: 1min 2s\n",
    "Out[4]:\n",
    "('there', (2972, 0))\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5. SPARK version document check" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " To run this program, restart notebook, and start executing the cells of this section starting here.

\n", " This version parallelizes the word check for all the words in a document, using word-level correction. Since SPARK does not permit RDD manipulation from within an RDD transformation (i.e. no parallelism within a parallel task), we converted the `get_suggestions` function that acts on an individual word to a serial method. This allows us to then parallelize across multiple words in a document. This is a reasonable trade off when the number of words in a document is much larger compared to the number of suggestions that will likely be found for any given word).

\n", " IMPORTANT NOTE REGARDING PERFORMANCE RESULTS: The (modified) `no_RDD_get_suggestions` function still returns an entire list of all possible suggestions to the calling function (e.g. for context checking), even if only the top match is used or required. Future improvements may be made to `no_RDD_get_suggestions` to terminate early once a \"top\" match (e.g. minimum edit distance) is found; a speedup in that function will in turn lead to a performance improvement of the document checking function as well.\n", "

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "corresponds to `spark_4.py`\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "'''\n", "v 4.0 last revised 27 Nov 2015\n", "\n", "This program is a Spark (PySpark) version of a spellchecker based on SymSpell, \n", "a Symmetric Delete spelling correction algorithm developed by Wolf Garbe \n", "and originally written in C#.\n", "\n", "From the original SymSpell documentation:\n", "\n", "\"The Symmetric Delete spelling correction algorithm reduces the complexity \n", " of edit candidate generation and dictionary lookup for a given Damerau-\n", " Levenshtein distance. It is six orders of magnitude faster and language \n", " independent. Opposite to other algorithms only deletes are required, \n", " no transposes + replaces + inserts. Transposes + replaces + inserts of the \n", " input term are transformed into deletes of the dictionary term.\n", " Replaces and inserts are expensive and language dependent: \n", " e.g. Chinese has 70,000 Unicode Han characters!\"\n", "\n", "For further information on SymSpell, please consult the original documentation:\n", " URL: blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/\n", " Description: blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/\n", "\n", "The current version of this program will output all possible suggestions for\n", "corrections up to an edit distance (configurable) of max_edit_distance = 3. \n", "\n", "To execute program:\n", "1. Ensure \"big.txt\" is in the current working directory. This is the corpus\n", " from which the dictionary for the spellchecker will be built.\n", "2a. Check recommended single word corrections by executing no_RDD_get_suggestions(\"word\") \n", " in the corresponding marked box below; or\n", "2b. Check single word corrections for a document by executing\n", " correct_document(\"\") in the corresponding marked box below.\n", "\n", "################\n", "\n", "Example input/output:\n", "\n", "################\n", "\n", "no_RDD_get_suggestions(\"there\")\n", "\n", "number of possible corrections: 604\n", " edit distance for deletions: 3\n", " \n", "[('there', (2972, 0)),\n", " ('these', (1231, 1)),\n", " ('where', (977, 1)),\n", " ('here', (691, 1)),\n", " ('three', (584, 1)),\n", " ('thee', (26, 1)),\n", " ('chere', (9, 1)),\n", " ('theme', (8, 1)),\n", " ('the', (80030, 2)), ...\n", "\n", "####\n", "\n", "correct_document(\"OCRsample.txt\")\n", "\n", "Finding misspelled words in your document...\n", " Unknown words (line number, word in text):\n", "[(11, 'oonipiittee'), (42, 'senbrnrgs'), (82, 'ghmhvestigat')]\n", " Words with suggested corrections (line number, word in text, top match):\n", "[(3, 'taiths --> faith'), (13, 'gjpt --> get'), (13, 'tj --> to'), \n", " (13, 'mnnff --> snuff'), (15, 'bh --> by'), (15, 'uth --> th'), ...\n", " (15, 'unuer --> under'),\n", "\n", "---\n", "total words checked: 700\n", "total unknown words: 3\n", "total potential errors found: 94\n", "\n", "'''\n", "\n", "import findspark\n", "import os\n", "findspark.init('/Users/K-Lo/spark-1.5.0')\n", "\n", "from pyspark import SparkContext\n", "sc = SparkContext()\n", "\n", "import re\n", "\n", "n_partitions = 6 # number of partitions to be used\n", "max_edit_distance = 3\n", "\n", "# helper functions\n", "def get_n_deletes_list(w, n):\n", " '''given a word, derive list of strings with up to n characters deleted'''\n", " # since this list is generally of the same magnitude as the number of \n", " # characters in a word, it may not make sense to parallelize this\n", " # so we use python to create the list\n", " deletes = []\n", " queue = [w]\n", " for d in range(n):\n", " temp_queue = []\n", " for word in queue:\n", " if len(word)>1:\n", " for c in range(len(word)): # character index\n", " word_minus_c = word[:c] + word[c+1:]\n", " if word_minus_c not in deletes:\n", " deletes.append(word_minus_c)\n", " if word_minus_c not in temp_queue:\n", " temp_queue.append(word_minus_c)\n", " queue = temp_queue\n", " \n", " return deletes\n", " \n", "def copartitioned(RDD1, RDD2):\n", " '''check if two RDDs are copartitioned'''\n", " return RDD1.partitioner == RDD2.partitioner\n", "\n", "def combine_joined_lists(tup):\n", " '''takes as input a tuple in the form (a, b) where each of a, b may be None (but not both) or a list\n", " and returns a concatenated list of unique elements'''\n", " concat_list = []\n", " if tup[1] is None:\n", " concat_list = tup[0]\n", " elif tup[0] is None:\n", " concat_list = tup[1]\n", " else:\n", " concat_list = tup[0] + tup[1]\n", " \n", " return list(set(concat_list))\n", "\n", "def parallel_create_dictionary(fname):\n", " '''Create dictionary using Spark RDDs.'''\n", " # we generate and count all words for the corpus,\n", " # then add deletes to the dictionary\n", " # this is a slightly different approach from the SymSpell algorithm\n", " # that may be more appropriate for Spark processing\n", " \n", " print \"Creating dictionary...\" \n", " \n", " ############\n", " #\n", " # process corpus\n", " #\n", " ############\n", " \n", " # http://stackoverflow.com/questions/22520932/python-remove-all-non-alphabet-chars-from-string\n", " regex = re.compile('[^a-z ]')\n", "\n", " # convert file into one long sequence of words\n", " make_all_lower = sc.textFile(fname).map(lambda line: line.lower())\n", " replace_nonalphs = make_all_lower.map(lambda line: regex.sub(' ', line))\n", " all_words = replace_nonalphs.flatMap(lambda line: line.split())\n", "\n", " # create core corpus dictionary (i.e. only words appearing in file, no \"deletes\") and cache it\n", " # output RDD of unique_words_with_count: [(word1, count1), (word2, count2), (word3, count3)...]\n", " count_once = all_words.map(lambda word: (word, 1))\n", " unique_words_with_count = count_once.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions).cache()\n", " \n", " # output stats on core corpus\n", " print \"total words processed: %i\" % unique_words_with_count.map(lambda (k, v): v).reduce(lambda a, b: a + b)\n", " print \"total unique words in corpus: %i\" % unique_words_with_count.count()\n", " \n", " ############\n", " #\n", " # generate deletes list\n", " #\n", " ############\n", " \n", " # generate list of n-deletes from words in a corpus of the form: [(word1, count1), (word2, count2), ...]\n", " \n", " assert max_edit_distance>0 \n", " \n", " generate_deletes = unique_words_with_count.map(lambda (parent, count): \n", " (parent, get_n_deletes_list(parent, max_edit_distance)))\n", " expand_deletes = generate_deletes.flatMapValues(lambda x: x)\n", " swap = expand_deletes.map(lambda (orig, delete): (delete, ([orig], 0)))\n", " \n", " ############\n", " #\n", " # combine delete elements with main dictionary\n", " #\n", " ############\n", " \n", " corpus = unique_words_with_count.mapValues(lambda count: ([], count))\n", " combine = swap.union(corpus) # combine deletes with main dictionary, eliminate duplicates\n", " \n", " ## since the dictionary will only be a lookup table once created, we can\n", " ## pass on as a Python dictionary rather than RDD by reducing locally and\n", " ## avoiding an extra shuffle from reduceByKey\n", " new_dict = combine.reduceByKeyLocally(lambda a, b: (a[0]+b[0], a[1]+b[1]))\n", " \n", " print \"total items in dictionary (corpus words and deletions): %i\" % len(new_dict)\n", " print \" edit distance for deletions: %i\" % max_edit_distance\n", " longest_word_length = unique_words_with_count.map(lambda (k, v): len(k)).reduce(max)\n", " print \" length of longest word in corpus: %i\" % longest_word_length\n", "\n", " return new_dict, longest_word_length \n", "\n", "def dameraulevenshtein(seq1, seq2):\n", " \"\"\"Calculate the Damerau-Levenshtein distance (an integer) between sequences.\n", "\n", " This code has not been modified from the original.\n", " Source: http://mwh.geek.nz/2009/04/26/python-damerau-levenshtein-distance/\n", " \n", " This distance is the number of additions, deletions, substitutions,\n", " and transpositions needed to transform the first sequence into the\n", " second. Although generally used with strings, any sequences of\n", " comparable objects will work.\n", "\n", " Transpositions are exchanges of *consecutive* characters; all other\n", " operations are self-explanatory.\n", "\n", " This implementation is O(N*M) time and O(M) space, for N and M the\n", " lengths of the two sequences.\n", "\n", " >>> dameraulevenshtein('ba', 'abc')\n", " 2\n", " >>> dameraulevenshtein('fee', 'deed')\n", " 2\n", "\n", " It works with arbitrary sequences too:\n", " >>> dameraulevenshtein('abcd', ['b', 'a', 'c', 'd', 'e'])\n", " 2\n", " \"\"\"\n", " # codesnippet:D0DE4716-B6E6-4161-9219-2903BF8F547F\n", " # Conceptually, this is based on a len(seq1) + 1 * len(seq2) + 1 matrix.\n", " # However, only the current and two previous rows are needed at once,\n", " # so we only store those.\n", " oneago = None\n", " thisrow = range(1, len(seq2) + 1) + [0]\n", " for x in xrange(len(seq1)):\n", " # Python lists wrap around for negative indices, so put the\n", " # leftmost column at the *end* of the list. This matches with\n", " # the zero-indexed strings and saves extra calculation.\n", " twoago, oneago, thisrow = oneago, thisrow, [0] * len(seq2) + [x + 1]\n", " for y in xrange(len(seq2)):\n", " delcost = oneago[y] + 1\n", " addcost = thisrow[y - 1] + 1\n", " subcost = oneago[y - 1] + (seq1[x] != seq2[y])\n", " thisrow[y] = min(delcost, addcost, subcost)\n", " # This block deals with transpositions\n", " if (x > 0 and y > 0 and seq1[x] == seq2[y - 1]\n", " and seq1[x-1] == seq2[y] and seq1[x] != seq2[y]):\n", " thisrow[y] = min(thisrow[y], twoago[y - 2] + 1)\n", " return thisrow[len(seq2) - 1]\n", "\n", "def no_RDD_get_suggestions(s, masterdict, longest_word_length=float('inf'), silent=False):\n", " '''return list of suggested corrections for potentially incorrectly spelled word.\n", " \n", " Note: serialized version for Spark document correction.\n", " \n", " s: input string\n", " masterdict: the main dictionary (python dict), which includes deletes\n", " entries, is in the form of: {word: ([suggested corrections], \n", " frequency of word in corpus), ...}\n", " longest_word_length: optional identifier of longest real word in masterdict\n", " silent: verbose output (when False)\n", " '''\n", "\n", " if (len(s) - longest_word_length) > max_edit_distance:\n", " if not silent:\n", " print \"no items in dictionary within maximum edit distance\"\n", " return []\n", "\n", " ##########\n", " #\n", " # initialize suggestions list\n", " # suggestList entries: (word, (frequency of word in corpus, edit distance))\n", " #\n", " ##########\n", " \n", " if not silent:\n", " print \"looking up suggestions based on input word...\"\n", " \n", " suggestList = []\n", " \n", " # check if input word is in dictionary, and is a word from the corpus (edit distance = 0)\n", " # if so, add input word itself and suggestions to suggestRDD\n", " \n", " if s in masterdict:\n", " init_sugg = []\n", " # dictionary values are in the form of ([suggestions], freq)\n", " if masterdict[s][1]>0: # frequency>0 -> real corpus word\n", " init_sugg = [(str(s), (masterdict[s][1], 0))]\n", "\n", " # the suggested corrections for the item in dictionary (whether or not\n", " # the input string s itself is a valid word or merely a delete) can be \n", " # valid corrections -- essentially we serialize this portion since\n", " # the list of corrections tends to be very short\n", " \n", " add_sugg = [(str(sugg), (masterdict[sugg][1], len(sugg)-len(s))) \n", " for sugg in masterdict[s][0]]\n", " \n", " suggestList = init_sugg + add_sugg\n", " \n", " ##########\n", " #\n", " # process deletes on the input string \n", " #\n", " ##########\n", " \n", " assert max_edit_distance>0\n", " \n", " list_deletes_of_s = get_n_deletes_list(s, max_edit_distance) # this list is short\n", " \n", " # check suggestions is in dictionary and is a real word\n", " add_sugg_2 = [(str(sugg), (masterdict[sugg][1], len(s)-len(sugg))) \n", " for sugg in list_deletes_of_s if ((sugg in masterdict) and\n", " (masterdict[sugg][1]>0))]\n", " \n", " suggestList += add_sugg_2\n", " \n", " # check each item of suggestion list of all new-found suggestions \n", " # the suggested corrections for any item in dictionary (whether or not\n", " # the delete itself is a valid word or merely a delete) can be valid corrections \n", " # expand lists of list\n", " \n", " sugg_lists = [masterdict[sugg][0] for sugg in list_deletes_of_s if sugg in masterdict]\n", " list_sl = [(val, 0) for sublist in sugg_lists for val in sublist]\n", " combine_del = list(set((list_sl))) \n", "\n", " # need to recalculate actual Deverau Levenshtein distance to be within \n", " # max_edit_distance for all deletes; also check that suggestion is a real word\n", " filter_by_dist = []\n", " for item in combine_del:\n", " calc_dist = dameraulevenshtein(s, item[0])\n", " if (calc_dist<=max_edit_distance) and (item[0] in masterdict):\n", " filter_by_dist += [(item[0], calc_dist)]\n", " \n", " # get frequencies from main dictionary and add new suggestions to suggestions list\n", " suggestList += [(str(item[0]), (masterdict[item[0]][1], item[1]))\n", " for item in filter_by_dist]\n", " \n", " output = list(set(suggestList))\n", " \n", " if not silent:\n", " print \"number of possible corrections: %i\" % len(output)\n", " print \" edit distance for deletions: %i\" % max_edit_distance\n", "\n", " ##########\n", " #\n", " # optionally, sort RDD for output\n", " #\n", " ##########\n", " \n", " # output option 1\n", " # sort results by ascending order of edit distance and descending order of frequency\n", " # and return list of suggested corrections only:\n", " # return sorted(output, key = lambda x: (suggest_dict[x][1], -suggest_dict[x][0]))\n", "\n", " # output option 2\n", " # return list of suggestions with (correction, (frequency in corpus, edit distance)):\n", " # return sorted(output, key = lambda (term, (freq, dist)): (dist, -freq))\n", "\n", " if len(output)>0:\n", " return sorted(output, key = lambda (term, (freq, dist)): (dist, -freq))\n", " else:\n", " return []\n", " \n", "def correct_document(fname, d, lwl=float('inf'), printlist=True):\n", " '''Correct an entire document using word-level correction.\n", " \n", " Note: Uses a serialized version of an individual word checker. \n", " \n", " fname: filename\n", " d: the main dictionary (python dict), which includes deletes\n", " entries, is in the form of: {word: ([suggested corrections], \n", " frequency of word in corpus), ...}\n", " lwl: optional identifier of longest real word in masterdict\n", " printlist: identify unknown words and words with error (default is True)\n", " '''\n", " \n", " # broadcast lookup dictionary to workers\n", " bd = sc.broadcast(d)\n", " \n", " print \"Finding misspelled words in your document...\" \n", " \n", " # http://stackoverflow.com/questions/22520932/python-remove-all-non-alphabet-chars-from-string\n", " regex = re.compile('[^a-z ]')\n", "\n", " # convert file into one long sequence of words with the line index for reference\n", " make_all_lower = sc.textFile(fname).map(lambda line: line.lower()).zipWithIndex()\n", " replace_nonalphs = make_all_lower.map(lambda (line, index): (regex.sub(' ', line), index))\n", " flattened = replace_nonalphs.map(lambda (line, index): \n", " [(i, index) for i in line.split()]).flatMap(list)\n", " \n", " # create RDD with (each word in document, corresponding line index) \n", " # key value pairs and cache it\n", " all_words = flattened.partitionBy(n_partitions).cache()\n", " \n", " # check all words in parallel -- stores whole list of suggestions for each word\n", " get_corrections = all_words.map(lambda (w, index): \n", " (w, (no_RDD_get_suggestions(w, bd.value, lwl, True), index)),\n", " preservesPartitioning=True).cache()\n", " \n", " # UNKNOWN words are words where the suggestion list is empty\n", " unknown_words = get_corrections.filter(lambda (w, (sl, index)): len(sl)==0)\n", " if printlist:\n", " print \" Unknown words (line number, word in text):\"\n", " print unknown_words.map(lambda (w, (sl, index)): (index, str(w))).sortByKey().collect()\n", " \n", " # ERROR words are words where the word does not match the first tuple's word (top match)\n", " error_words = get_corrections.filter(lambda (w, (sl, index)): len(sl)>0 and w!=sl[0][0]) \n", " if printlist:\n", " print \" Words with suggested corrections (line number, word in text, top match):\"\n", " print error_words.map(lambda (w, (sl, index)): \n", " (index, str(w) + \" --> \" +\n", " str(sl[0][0]))).sortByKey().collect()\n", " \n", " gc = sc.accumulator(0)\n", " get_corrections.foreach(lambda x: gc.add(1))\n", " uc = sc.accumulator(0)\n", " unknown_words.foreach(lambda x: uc.add(1))\n", " ew = sc.accumulator(0)\n", " error_words.foreach(lambda x: ew.add(1))\n", " \n", " print \"-----\"\n", " print \"total words checked: %i\" % gc.value\n", " print \"total unknown words: %i\" % uc.value\n", " print \"total potential errors found: %i\" % ew.value\n", "\n", " return" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Run the cell below only once to build the dictionary.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "d, lwl = parallel_create_dictionary(\"testdata/big.txt\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 10.9 s, sys: 976 ms, total: 11.8 s\n",
    "Wall time: 41.2 s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Enter word to correct below.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "no_RDD_get_suggestions(\"theref\", d, lwl)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "looking up suggestions based on input word...\n",
    "number of possible corrections: 604\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 56.3 ms, sys: 4.17 ms, total: 60.5 ms\n",
    "Wall time: 58.2 ms\n",
    "Out[3]:\n",
    "[('there', (2972, 0)),\n",
    " ('these', (1231, 1)),\n",
    " ('where', (977, 1)),\n",
    " ('here', (691, 1)),\n",
    " ('three', (584, 1)),\n",
    " ('thee', (26, 1)),\n",
    " ('chere', (9, 1)),\n",
    " ('theme', (8, 1)),\n",
    " ('the', (80030, 2)), ...\n",
    "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "no_RDD_get_suggestions(\"zzffttt\", d, lwl)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "looking up suggestions based on input word...\n",
    "number of possible corrections: 0\n",
    "  edit distance for deletions: 3\n",
    "CPU times: user 419 µs, sys: 108 µs, total: 527 µs\n",
    "Wall time: 435 µs\n",
    "Out[3]:\n",
    "[]\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Enter file name of document to correct below.\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "correct_document(\"testdata/OCRsample.txt\", d, lwl)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Finding misspelled words in your document...\n",
    "    Unknown words (line number, word in text):\n",
    "[(11, 'oonipiittee'), (42, 'senbrnrgs'), (82, 'ghmhvestigat')]\n",
    "    Words with suggested corrections (line number, word in text, top match):\n",
    "[(3, 'taiths --> faith'), (13, 'gjpt --> get'), (13, 'tj --> to'), (13, 'mnnff --> snuff'), (15, 'bh --> by'), (15, 'uth --> th'), (15, 'unuer --> under'), (15, 'snc --> sac'), (20, 'mthiitt --> thirty'), (21, 'cas --> was'), (22, 'pythian --> scythian'), (26, 'brainin --> brain'), (27, 'jfl --> of'), (28, 'eug --> dug'), (28, 'stice --> stick'), (28, 'blaci --> black'), (28, 'ji --> i'), (28, 'debbs --> debts'), (29, 'nericans --> americans'), (30, 'ergs --> eggs'), (30, 'ainin --> again'), (31, 'trumped --> trumpet'), (32, 'erican --> american'), (33, 'thg --> the'), (33, 'nenance --> penance'), (33, 'unorthodox --> orthodox'), (34, 'rgs --> rags'), (34, 'sln --> son'), (38, 'eu --> e'), (38, 'williaij --> william'), (40, 'fcsf --> ff'), (40, 'ber --> be'), (42, 'thpt --> that'), (42, 'unorthodoxy --> orthodox'), (44, 'fascism --> fascia'), (62, 'loo --> look'), (65, 'ththn --> then'), (65, 'thl --> the'), (65, 'yktcn --> skin'), (65, 'scbell --> bell'), (65, 'ife --> if'), (66, 'thi --> the'), (68, 'saij --> said'), (69, 'cornr --> corner'), (69, 'defendants --> defendant'), (69, 'nists --> lists'), (72, 'ro --> to'), (74, 'ath --> at'), (75, 'rg --> re'), (75, 'acrific --> pacific'), (75, 'tti --> tit'), (77, 'korea --> more'), (78, 'doatli --> death'), (78, 'ro --> to'), (81, 'ry --> by'), (81, 'ith --> it'), (81, 'kl --> ll'), (81, 'ech --> each'), (82, 'rg --> re'), (82, 'rb --> re'), (82, 'nb --> no'), (83, 'rosenbt --> rodent'), (83, 'rgs --> rags'), (84, 'coriritted --> committed'), (86, 'fighti --> fight'), (88, 'bths --> baths'), (88, 'tchf --> the'), (91, 'ro --> to'), (91, 'ijb --> in'), (92, 'telegrnm --> telegram'), (92, 'rson --> son'), (92, 'jillia --> william'), (92, 'patt --> part'), (93, 'ecretdry --> secretary'), (95, 'purview --> purves'), (95, 'rder --> order'), (99, 'gor --> for'), (99, 'rg --> re'), (99, 'enb --> end'), (99, 'dthethg --> teeth'), (99, 'ro --> to'), (99, 'ared --> are'), (100, 'dri --> dry'), (100, 'yfu --> you'), (100, 'vthnz --> the'), (100, 'sacc --> sac'), (101, 'rosi --> rose'), (101, 'rg --> re'), (101, 'ile --> ill'), (102, 'jhy --> why'), (102, 'fnir --> fair'), (102, 'azi --> ai'), (103, 'fascist --> fascia'), (104, 'nb --> no')]\n",
    "-----\n",
    "total words checked: 700\n",
    "total unknown words: 3\n",
    "total potential errors found: 94\n",
    "CPU times: user 9.9 s, sys: 619 ms, total: 10.5 s\n",
    "Wall time: 1min 3s\n",
    "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%time\n", "correct_document(\"testdata/OCRsample.txt\", d, lwl, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 700\n",
    "total unknown words: 3\n",
    "total potential errors found: 94\n",
    "CPU times: user 9.33 s, sys: 505 ms, total: 9.83 s\n",
    "Wall time: 56.3 s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "# additional tests (local machine)\n",
    "\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 12029\n",
    "total unknown words: 19\n",
    "total potential errors found: 719\n",
    "-----\n",
    "266.66 seconds to run\n",
    "-----\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 131340\n",
    "total unknown words: 325\n",
    "total potential errors found: 8460\n",
    "-----\n",
    "2897.28 seconds to run\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# AWS Experiments" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "It took a long time to debug problems with AWS, but in the end the programs ran. The key thing I learned is that when broadcasting the dictionary to the workers, we have to set the executor memory (e.g. to 5G or 8G) explicitly to ensure that the workers can store it (was originally left out). We also include sample output for the document-level Spark version below.\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## individual word checks" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "# 2 executors, 4 cores, 16 partitions\n",
    "# \"there\"\n",
    "\n",
    "serial: 62 ms\n",
    "spark 1: 96.31 s\n",
    "spark 2: 22.62 s\n",
    "spark 3: 41.35 s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "This result was interesting, as the `reduceByKeyLocally` performs much worse with many workers. This is the reverse results compared to those obtained when running the Spark versions 2 and 3 on my local machine. The cost of a potential shuffle between building the dictionary and the word check seems negligble now, compared to the overhead of setting up communication with all the workers and then having to bring everything back to the driver.\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## document checks (on AWS)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "\n",
    "# Serial version:\n",
    "\n",
    "total words checked: 700\n",
    "total unknown words: 3\n",
    "total potential errors found: 94\n",
    "-----\n",
    "19.01 seconds to run\n",
    "-----\n",
    "\n",
    "total words checked: 12029\n",
    "total unknown words: 19\n",
    "total potential errors found: 719\n",
    "-----\n",
    "384.75 seconds to run\n",
    "-----\n",
    "\n",
    "-----\n",
    "total words checked: 131340\n",
    "total unknown words: 325\n",
    "total potential errors found: 8460\n",
    "-----\n",
    "4305.46 seconds to run\n",
    "-----\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "The serial version now lags behind the Spark versions once the documents size starts getting very big.\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2 executors, 4 cores, 16 partitions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Please wait...\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "-----\n",
    "35.37 seconds to run\n",
    "-----\n",
    " \n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 700\n",
    "total unknown words: 3\n",
    "total potential errors found: 94\n",
    "-----\n",
    "41.82 seconds to run\n",
    "-----\n",
    "\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "total words checked: 12029\n",
    "total unknown words: 19\n",
    "total potential errors found: 719\n",
    "-----\n",
    "125.45 seconds to run\n",
    "-----\n",
    "\n",
    "-----\n",
    "total words checked: 131340\n",
    "total unknown words: 325\n",
    "total potential errors found: 8460\n",
    "-----\n",
    "995.66 seconds to run\n",
    "-----\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "The Spark versions begin to show their strength at the document sizes get larger. We began to fiddle with the various parameters (e.g. number of executors, number of cores, number of partitions set in the code). Note that we are still generating all suggestions for EACH words - for the purposes of comparing performance this is fine, but we would likely have to optimize the code (in the manner of the final serial version where we can terminate searches early once suggestions of the lowest edit distances are found) if we were to run this on even larger documents.\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2 executors, 4 cores, 32 partitions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Please wait...\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "-----\n",
    "34.84 seconds to run\n",
    "-----\n",
    "\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 700\n",
    "total unknown words: 3\n",
    "total potential errors found: 94\n",
    "-----\n",
    "66.17 seconds to run\n",
    "\n",
    "\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 12029\n",
    "total unknown words: 19\n",
    "total potential errors found: 719\n",
    "-----\n",
    "150.06 seconds to run\n",
    "-----\n",
    "\n",
    "*** when I left memory at 5G\n",
    "-----\n",
    "total words checked: 131340\n",
    "total unknown words: 325\n",
    "total potential errors found: 8460\n",
    "-----\n",
    "2357.24 seconds to run\n",
    "-----\n",
    "\n",
    "*** boosted memory to 8G\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 131340\n",
    "total unknown words: 325\n",
    "total potential errors found: 8460\n",
    "-----\n",
    "972.82 seconds to run\n",
    "-----\n",
    "\n",
    "*** changed reduceByKeyLocally to reduceByKey + collectAsMap in createDocument\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 131340\n",
    "total unknown words: 325\n",
    "total potential errors found: 8460\n",
    "-----\n",
    "992.03 seconds to run\n",
    "-----\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4 executors, 4 cores, 16 partitions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "\n",
    "Please wait...\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "-----\n",
    "66.97 seconds to run\n",
    "-----\n",
    "\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 700\n",
    "total unknown words: 3\n",
    "total potential errors found: 94\n",
    "-----\n",
    "148.86 seconds to run\n",
    "-----\n",
    "\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 12029\n",
    "total unknown words: 19\n",
    "total potential errors found: 719\n",
    "-----\n",
    "240.04 seconds to run\n",
    "-----\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4 executors, 4 cores, 32 partitions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Please wait...\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "-----\n",
    "71.95 seconds to run\n",
    "-----\n",
    "\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 700\n",
    "total unknown words: 3\n",
    "total potential errors found: 94\n",
    "-----\n",
    "163.45 seconds to run\n",
    "\n",
    "\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 12029\n",
    "total unknown words: 19\n",
    "total potential errors found: 719\n",
    "-----\n",
    "246.38 seconds to run\n",
    "-----\n",
    "\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 131340\n",
    "total unknown words: 325\n",
    "total potential errors found: 8460\n",
    "-----\n",
    "1191.01 seconds to run\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4 executors, 4 cores, 64 partitions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "\n",
    "Please wait...\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "-----\n",
    "69.25 seconds to run\n",
    "-----\n",
    "\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 700\n",
    "total unknown words: 3\n",
    "total potential errors found: 94\n",
    "-----\n",
    "198.92 seconds to run\n",
    "-----\n",
    "\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 12029\n",
    "total unknown words: 19\n",
    "total potential errors found: 719\n",
    "-----\n",
    "286.15 seconds to run\n",
    "\n",
    "Document correction... Please wait...\n",
    "-------------------------------------\n",
    "finding corrections for document\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 131340\n",
    "total unknown words: 325\n",
    "total potential errors found: 8460\n",
    "-----\n",
    "1283.60 seconds to run\n",
    "-----\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Future investigations may entail exploring and tuning these parameters, and experimenting with others, to optimize performance." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Appendix" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " In this section, we document various attempts at optimizing the original Spark (v 1.0) program.

\n", " Note we also examined the output at localhost:4040 to identify the time-consuming stages.\n", "

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### building dictionary" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Removed setting of executor memory decreased time to build dictionary.

\n", " Replace:\n", "

  \n",
    "import pyspark\n",
    "conf = (pyspark.SparkConf()\n",
    "    .setMaster('local')\n",
    "    .setAppName('pyspark')\n",
    "    .set(\"spark.executor.memory\", \"2g\"))\n",
    "sc = pyspark.SparkContext(conf=conf)\n",
    "  

\n", " with:\n", "

  \n",
    "  from pyspark import SparkContext\n",
    "  sc = SparkContext()\n",
    "  
\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 113 ms, sys: 24.6 ms, total: 138 ms\n",
    "Wall time: 2min 48s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Setting `minPartitions` argument in `textFile` did not improve performance.

\n", " Replace:\n", "

  \n",
    "make_all_lower = sc.textFile(fname).map(lambda line: line.lower())\n",
    "  

\n", " with:\n", "

  \n",
    "make_all_lower = sc.textFile(fname, minPartitions = n_partitions).map(lambda line: line.lower())\n",
    "  
\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 133 ms, sys: 28.9 ms, total: 162 ms\n",
    "Wall time: 2min 55s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Repartitioning and sorting corpus after processing core words did not improve performance.

\n", " Replace:\n", "

  \n",
    "unique_words_with_count = count_once.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions).cache()\n",
    "  

\n", " with:\n", "

  \n",
    "unique_words_with_count = count_once.reduceByKey(lambda a, b: a + b).repartitionAndSortWithinPartitions(numPartitions =n_partitions).cache()\n",
    "  
\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 150 ms, sys: 31.8 ms, total: 182 ms\n",
    "Wall time: 3min 5s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Explicitly setting use_unicode to False, which is documented as possibly leading to a faster read, did not significantly improve performance.

\n", " Replace:\n", "

  \n",
    "make_all_lower = sc.textFile(fname).map(lambda line: line.lower())\n",
    "  

\n", " with:\n", "

  \n",
    "make_all_lower = sc.textFile(fname, use_unicode=False).map(lambda line: line.lower())\n",
    "  
\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 106 ms, sys: 24.3 ms, total: 130 ms\n",
    "Wall time: 2min 46s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Removing parallelism when generating individual 1-deletes, 2-deletes, and 3-deletes improved performance. This is likely due to the fact that these lists are pretty short relative to the length of the corpus; parallelizing them introduces overhead.

\n", " Replaced `get_deletes_list` with `get_n_deletes_list` function, and modified `parallel_create_dictionary` accordingly.

See Spark v. 2.0 above. \n", "

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 52 ms, sys: 13 ms, total: 65 ms\n",
    "Wall time: 54.1 s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Replacing a `join` with `union`, and `reduceByKey` with `reduceByKeyLocally` to return dictionary to driver without an extra shuffle, provides comparable performance (however, dictionary is now a python dictionary, and not an RDD).

\n", " Modified `parallel_create_dictionary` accordingly.

See Spark v. 3.0 above. \n", "

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 10.9 s, sys: 1.01 s, total: 11.9 s\n",
    "Wall time: 42.1 s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Replacing `reduceByKeyLocally` with a `reduceByKey` and `collectAsMap` did not significantly improve performance. This is because an extra shuffle was introduced.

\n", " Replace:\n", "

  \n",
    "new_dict = combine.reduceByKeyLocally(lambda a, b: (a[0]+b[0], a[1]+b[1]))\n",
    "  

\n", " with:\n", "

  \n",
    "new_dict = combine.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])).collectAsMap()\n",
    "  
\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
 \n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 7.16 s, sys: 921 ms, total: 8.08 s\n",
    "Wall time: 1min 7s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Modifying the initial `reduceByKey` when reading the corpus to remove explicit partitioning (now not needed with the use of `union`) did not significantly improve performance.

\n", " Replace:\n", "

  \n",
    "unique_words_with_count = count_once.reduceByKey(lambda a, b: a + b, numPartitions=n_partitions).cache()\n",
    "  

\n", " with:\n", "

  \n",
    "unique_words_with_count = count_once.reduceByKey(lambda a, b: a + b).cache()\n",
    "  
\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
 \n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 9.05 s, sys: 824 ms, total: 9.87 s\n",
    "Wall time: 43.4 s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Increasing number of partitions from 6 to 8, on our local machine (dual core, 2015 MacBook Pro) did not significantly improve performance.

\n", " Replace:\n", "

  \n",
    "n_partitions = 6 # number of partitions to be used\n",
    "  

\n", " with:\n", "

  \n",
    "n_partitions = 8 # number of partitions to be used\n",
    "  
\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Creating dictionary...\n",
    "total words processed: 1105285\n",
    "total unique words in corpus: 29157\n",
    "total items in dictionary (corpus words and deletions): 2151998\n",
    "  edit distance for deletions: 3\n",
    "  length of longest word in corpus: 18\n",
    "CPU times: user 11.6 s, sys: 1.08 s, total: 12.7 s\n",
    "Wall time: 45.5 s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### implement document checking" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Serializing `get_suggestions` allowed us to parallelize the checking of words within an entire document. Spark does not appear to permit parallelizing a task within a parallel task (transformation).

\n", " Replaced `get_suggestions` with `no_RDD_get_suggestions` function, and modified `correct_document` accordingly. \n", "

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
 \n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 700\n",
    "total unknown words: 3\n",
    "total potential errors found: 94\n",
    "CPU times: user 1min 6s, sys: 1.36 s, total: 1min 8s\n",
    "Wall time: 2min 1s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Broadcasting dictionary to workers before checking document improved performance.

\n", " Added:\n", "

  \n",
    "    # broadcast lookup dictionary to workers\n",
    "    bd = sc.broadcast(d)\n",
    "  

\n", " Modified `get_corrections` to:\n", "

  \n",
    "    get_corrections = all_words.map(lambda (w, index): (w, (get_suggestions(w, bd.value, lwl, True), index)), preservesPartitioning=True).cache()\n",
    "  
\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 700\n",
    "total unknown words: 3\n",
    "total potential errors found: 94\n",
    "CPU times: user 10.6 s, sys: 620 ms, total: 11.2 s\n",
    "Wall time: 57.3 s\n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " Converting counters for words in dictionary, unknown words, and error words to accumulators only improved performance nominally (may not be significant due to variations between runs).

These changes have been implemented. See Spark v. 4.0 above.

\n", " Add:\n", "

  \n",
    "    gc = sc.accumulator(0)\n",
    "    get_corrections.foreach(lambda x: gc.add(1))\n",
    "    uc = sc.accumulator(0)\n",
    "    unknown_words.foreach(lambda x: uc.add(1))\n",
    "    ew = sc.accumulator(0)\n",
    "    error_words.foreach(lambda x: ew.add(1))\n",
    "  

\n", " Modified print statements to read:\n", "

  \n",
    "    print \"total words checked: %i\" % gc.value\n",
    "    print \"total unknown words: %i\" % uc.value\n",
    "    print \"total potential errors found: %i\" % ew.value\n",
    "  
\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
 \n",
    "Finding misspelled words in your document...\n",
    "-----\n",
    "total words checked: 700\n",
    "total unknown words: 3\n",
    "total potential errors found: 94\n",
    "CPU times: user 9.33 s, sys: 505 ms, total: 9.83 s\n",
    "Wall time: 56.3 s \n",
    "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", " We tried using the MultiProcessing (Pool) module in Python in conjunction\n", " with our Spark commands for the get_suggestions method. Unfortunately, this did not\n", " seem to work, and the output gave errors causing system instability. Occasionally, it would give key errors, while at other times, it would give an error of the type: \"PythonRDD does not exist in the JVM\". This instability seems to be documented: http://stackoverflow.com/questions/32443362/passing-sparkcontext-to-new-process-python-multiprocessing-module. In any event, this is something that might be subject to further investigation in the future.

\n", " Added:\n", "

  \n",
    "    def f(x):\n",
    "       val, dictionary, longestword, silent = x\n",
    "       return get_suggestions(val, dictionary, longestword, silent)\n",
    "  

\n", " and to `create_dictionary`:\n", "

  \n",
    "    # parallelization via Pool\n",
    "    jobs = [(w, bd.value, lwl, False) for w in doc_words]\n",
    "    suggestion_lists = p.map(f, jobs)\n",
    "  
\n", "
" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.10" } }, "nbformat": 4, "nbformat_minor": 0 }