This notebook is an excercise in text preprocessing using Apache Spark's RDD API and functional programming. The goal is simple: to ingest the text of Shakespear's The Tragedy of Hamlet, Prince of Denmark and to transform it into a dataset that could be used for a bag-of-words model.
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
import re
import pandas as pd
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet, stopwords
from sklearn.feature_extraction.text import TfidfVectorizer
import functools
from functools import partial
raw_hamlet = sc.textFile('Desktop/hamlet.txt')
raw_hamlet.take(25)
def get_line_id(line):
line_id = line[0].replace('hamlet@', '')
line[0] = int(line_id)
return line
def no_pipes(line):
line_out = [line[0]] + []
for l in line[1:]:
if l == '|':
pass
else:
line_out.append(l.replace('|', ''))
return line_out
hamlet_data = (raw_hamlet.map(lambda x: x.split('\t')) # split tab delimeter
.map(lambda line: get_line_id(line)) # get line numbers, convert to int
.map(lambda x: no_pipes(x)) # get rid of pipe characters
.map(lambda x: [l for l in x if l != '']).filter(lambda x: len(x)>1) # get rid of empty lines
.filter(lambda x: x[1][0] != '[') # get rid of stage directions
.filter(lambda x: x[0] >= 1014) # skip introduction text
)
hamlet_ls = hamlet_data.collect()
hamlet_ls[:20]
act_lines = [x for x in hamlet_ls if re.search(r'ACT [I]*V?[I]*', x[1])]
act_lines
acts_dict = {}
for act in act_lines:
if act[1] in acts_dict:
acts_dict[act[1]].append(act[0])
else:
acts_dict[act[1]] = [act[0]]
acts_dict
for k in acts_dict.keys():
acts_dict[k] = min(acts_dict[k])
acts_dict
def add_act(line):
act = ''
for k in acts_dict.keys():
if line[0] >= acts_dict[k]:
act = k
out_line = [line[0], act] + line[1:]
return out_line
hamlet_data = (hamlet_data.map(lambda x: add_act(x)) # add act value to each line
.filter(lambda x: not(re.search(r'ACT [I]*V?[I]', x[2]))) # no lines with act only
)
hamlet_data.take(10)
no_act_lines_ls = hamlet_data.collect()
scene_lines = [x for x in no_act_lines_ls if re.search(r'SCENE [I]*V?[I]*', x[2])]
scene_lines
scenes_dict = {}
for s in scene_lines:
if s[1] in scenes_dict:
scenes_dict[s[1]][s[2]] = s[0]
else:
scenes_dict[s[1]] = {s[2]: s[0]}
scenes_dict
def add_scene(line):
scene = ''
for a in scenes_dict.keys():
for s in scenes_dict[a].keys():
if line[0] >= scenes_dict[a][s]:
scene = s
out_line = line[0:2] + [scene] + line[2:]
return out_line
hamlet_data = (hamlet_data.map(lambda x: add_scene(x)) # add scene to each line
.filter(lambda x: not(re.search(r'SCENE [I]*V?[I]*', x[3]))) # no scene only lines
)
hamlet_lol = hamlet_data.collect()
hamlet_lol = [x[:3]+['']+x[3:] if len(x) == 4 else x for x in hamlet_lol]
wrong_len = [x for x in hamlet_lol if len(x) != 5]
wrong_len
hamlet_lol = [x[:4]+[x[5]] if x[4] == '[Within]' else x for x in hamlet_lol]
cols = ['line_no', 'act', 'scene', 'speaker', 'text']
df = pd.DataFrame(hamlet_lol, columns=cols)
df.head(20)
def fill_with_previous(ls):
out = []
for i in range(len(ls)):
if ls[i] == '':
out.append(out[i-1])
else:
out.append(ls[i])
return out
names_ls = list(df['speaker'])
names_ls[:10]
names_ls = fill_with_previous(names_ls)
df['speaker'] = names_ls
df.head(20)
df['speaker'].unique()
prologue = df.loc[df['speaker']=='Prologue', :]
prologue
df = df.loc[df['speaker']!='Prologue', :]
df.info()
df.text[:10]
df['q_marks'] = df['text'].str.count('\?')
df['ex_marks'] = df['text'].str.count('\!')
df['words_only'] = df['text'].str.replace(r'\W+', ' ')
df.words_only
def compose(*functions):
return functools.reduce(lambda f, g: lambda x: g(f(x)), functions, lambda x: x)
def get_tokens(line):
return nltk.word_tokenize(line)
def get_wordnet_pos(word):
pos_dict = {"J": wordnet.ADJ,
"N": wordnet.NOUN,
"V": wordnet.VERB,
"R": wordnet.ADV}
tag = nltk.pos_tag([word])[0][1][0]
return pos_dict.get(tag, wordnet.NOUN)
def lemma_gen(lemmatizer, token_ls):
for word in token_ls:
yield lemmatizer().lemmatize(word, get_wordnet_pos(word))
lemma_gen = partial(lemma_gen, WordNetLemmatizer)
def extract_lem(lem_gen):
return [l for l in lem_gen if l != 's']
def no_stopwords(ls):
return ' '.join([w.lower() for w in ls if w.lower() not in stopwords.words('english')])
pipeline = compose(get_tokens, lemma_gen, extract_lem, no_stopwords)
lems = pipeline('I have seen you at the store.')
lems
df['lemmatized'] = df['words_only'].apply(pipeline)
df.head(20)
df = df.loc[df['lemmatized'].str.len() > 0]
df.head()
vectorizer = TfidfVectorizer()
vectorized = vectorizer.fit_transform(df['lemmatized'])
X_train = pd.DataFrame.sparse.from_spmatrix(vectorized)
col_map = {v:k for k,v in vectorizer.vocabulary_.items()}
for col in X_train.columns:
X_train.rename(columns={col: col_map[col]}, inplace=True)
X_train.head()
len(X_train)