From b2d38f17e56bf3b391fe20b77532df23e5721413 Mon Sep 17 00:00:00 2001 From: Aleksey Lim Date: Thu, 12 Aug 2010 16:20:23 +0000 Subject: Add copyright notes to python files --- (limited to 'aiml/Kernel.py') diff --git a/aiml/Kernel.py b/aiml/Kernel.py new file mode 100644 index 0000000..1014ade --- /dev/null +++ b/aiml/Kernel.py @@ -0,0 +1,1183 @@ +# -*- coding=ISO-8859-1 -*- +"""This file contains the public interface to the aiml module.""" +import AimlParser +import DefaultSubs +import Utils +from PatternMgr import PatternMgr +from WordSub import WordSub + +from ConfigParser import ConfigParser +import copy +import glob +import os +import random +import re +import string +import sys +import time +import threading +import xml.sax + + +class Kernel: + # module constants + _globalSessionID = "_global" # key of the global session (duh) + _maxHistorySize = 10 # maximum length of the _inputs and _responses lists + _maxRecursionDepth = 100 # maximum number of recursive / tags before the response is aborted. + # special predicate keys + _inputHistory = "_inputHistory" # keys to a queue (list) of recent user input + _outputHistory = "_outputHistory" # keys to a queue (list) of recent responses. + _inputStack = "_inputStack" # Should always be empty in between calls to respond() + + def __init__(self): + self._verboseMode = True + self._version = "PyAIML 0.8.5" + self._brain = PatternMgr() + self._respondLock = threading.RLock() + self._textEncoding = "utf-8" + + # set up the sessions + self._sessions = {} + self._addSession(self._globalSessionID) + + # Set up the bot predicates + self._botPredicates = {} + self.setBotPredicate("name", "Nameless") + + # set up the word substitutors (subbers): + self._subbers = {} + self._subbers['gender'] = WordSub(DefaultSubs.defaultGender) + self._subbers['person'] = WordSub(DefaultSubs.defaultPerson) + self._subbers['person2'] = WordSub(DefaultSubs.defaultPerson2) + self._subbers['normal'] = WordSub(DefaultSubs.defaultNormal) + + # set up the element processors + self._elementProcessors = { + "bot": self._processBot, + "condition": self._processCondition, + "date": self._processDate, + "formal": self._processFormal, + "gender": self._processGender, + "get": self._processGet, + "gossip": self._processGossip, + "id": self._processId, + "input": self._processInput, + "javascript": self._processJavascript, + "learn": self._processLearn, + "li": self._processLi, + "lowercase": self._processLowercase, + "person": self._processPerson, + "person2": self._processPerson2, + "random": self._processRandom, + "text": self._processText, + "sentence": self._processSentence, + "set": self._processSet, + "size": self._processSize, + "sr": self._processSr, + "srai": self._processSrai, + "star": self._processStar, + "system": self._processSystem, + "template": self._processTemplate, + "that": self._processThat, + "thatstar": self._processThatstar, + "think": self._processThink, + "topicstar": self._processTopicstar, + "uppercase": self._processUppercase, + "version": self._processVersion, + } + + def bootstrap(self, brainFile = None, learnFiles = [], commands = []): + """Prepare a Kernel object for use. + + If a brainFile argument is provided, the Kernel attempts to + load the brain at the specified filename. + + If learnFiles is provided, the Kernel attempts to load the + specified AIML files. + + Finally, each of the input strings in the commands list is + passed to respond(). + + """ + start = time.clock() + if brainFile: + self.loadBrain(brainFile) + + # learnFiles might be a string, in which case it should be + # turned into a single-element list. + learns = learnFiles + try: learns = [ learnFiles + "" ] + except: pass + for file in learns: + self.learn(file) + + # ditto for commands + cmds = commands + try: cmds = [ commands + "" ] + except: pass + for cmd in cmds: + print self._respond(cmd, self._globalSessionID) + + if self._verboseMode: + print "Kernel bootstrap completed in %.2f seconds" % (time.clock() - start) + + def verbose(self, isVerbose = True): + """Enable/disable verbose output mode.""" + self._verboseMode = isVerbose + + def version(self): + """Return the Kernel's version string.""" + return self._version + + def numCategories(self): + """Return the number of categories the Kernel has learned.""" + # there's a one-to-one mapping between templates and categories + return self._brain.numTemplates() + + def resetBrain(self): + """Reset the brain to its initial state. + + This is essentially equivilant to: + del(kern) + kern = aiml.Kernel() + + """ + del(self._brain) + self.__init__() + + def loadBrain(self, filename): + """Attempt to load a previously-saved 'brain' from the + specified filename. + + NOTE: the current contents of the 'brain' will be discarded! + + """ + if self._verboseMode: print "Loading brain from %s..." % filename, + start = time.clock() + self._brain.restore(filename) + if self._verboseMode: + end = time.clock() - start + print "done (%d categories in %.2f seconds)" % (self._brain.numTemplates(), end) + + def saveBrain(self, filename): + """Dump the contents of the bot's brain to a file on disk.""" + if self._verboseMode: print "Saving brain to %s..." % filename, + start = time.clock() + self._brain.save(filename) + if self._verboseMode: + print "done (%.2f seconds)" % (time.clock() - start) + + def getPredicate(self, name, sessionID = _globalSessionID): + """Retrieve the current value of the predicate 'name' from the + specified session. + + If name is not a valid predicate in the session, the empty + string is returned. + + """ + try: return self._sessions[sessionID][name] + except KeyError: return "" + + def setPredicate(self, name, value, sessionID = _globalSessionID): + """Set the value of the predicate 'name' in the specified + session. + + If sessionID is not a valid session, it will be created. If + name is not a valid predicate in the session, it will be + created. + + """ + self._addSession(sessionID) # add the session, if it doesn't already exist. + self._sessions[sessionID][name] = value + + def getBotPredicate(self, name): + """Retrieve the value of the specified bot predicate. + + If name is not a valid bot predicate, the empty string is returned. + + """ + try: return self._botPredicates[name] + except KeyError: return "" + + def setBotPredicate(self, name, value): + """Set the value of the specified bot predicate. + + If name is not a valid bot predicate, it will be created. + + """ + self._botPredicates[name] = value + # Clumsy hack: if updating the bot name, we must update the + # name in the brain as well + if name == "name": + self._brain.setBotName(self.getBotPredicate("name")) + + def setTextEncoding(self, encoding): + """Set the text encoding used when loading AIML files (Latin-1, UTF-8, etc.).""" + self._textEncoding = encoding + + def loadSubs(self, filename): + """Load a substitutions file. + + The file must be in the Windows-style INI format (see the + standard ConfigParser module docs for information on this + format). Each section of the file is loaded into its own + substituter. + + """ + inFile = file(filename) + parser = ConfigParser() + parser.readfp(inFile, filename) + inFile.close() + for s in parser.sections(): + # Add a new WordSub instance for this section. If one already + # exists, delete it. + if self._subbers.has_key(s): + del(self._subbers[s]) + self._subbers[s] = WordSub() + # iterate over the key,value pairs and add them to the subber + for k,v in parser.items(s): + self._subbers[s][k] = v + + def _addSession(self, sessionID): + """Create a new session with the specified ID string.""" + if self._sessions.has_key(sessionID): + return + # Create the session. + self._sessions[sessionID] = { + # Initialize the special reserved predicates + self._inputHistory: [], + self._outputHistory: [], + self._inputStack: [] + } + + def _deleteSession(self, sessionID): + """Delete the specified session.""" + if self._sessions.has_key(sessionID): + _sessions.pop(sessionID) + + def getSessionData(self, sessionID = None): + """Return a copy of the session data dictionary for the + specified session. + + If no sessionID is specified, return a dictionary containing + *all* of the individual session dictionaries. + + """ + s = None + if sessionID is not None: + try: s = self._sessions[sessionID] + except KeyError: s = {} + else: + s = self._sessions + return copy.deepcopy(s) + + def learn(self, filename): + """Load and learn the contents of the specified AIML file. + + If filename includes wildcard characters, all matching files + will be loaded and learned. + + """ + for f in glob.glob(filename): + if self._verboseMode: print "Loading %s..." % f, + start = time.clock() + # Load and parse the AIML file. + parser = AimlParser.create_parser() + handler = parser.getContentHandler() + handler.setEncoding(self._textEncoding) + try: parser.parse(f) + except xml.sax.SAXParseException, msg: + err = "\nFATAL PARSE ERROR in file %s:\n%s\n" % (f,msg) + sys.stderr.write(err) + continue + # store the pattern/template pairs in the PatternMgr. + for key,tem in handler.categories.items(): + self._brain.add(key,tem) + # Parsing was successful. + if self._verboseMode: + print "done (%.2f seconds)" % (time.clock() - start) + + def respond(self, input, sessionID = _globalSessionID): + """Return the Kernel's response to the input string.""" + if len(input) == 0: + return "" + + #ensure that input is a unicode string + try: input = input.decode(self._textEncoding, 'replace') + except UnicodeError: pass + except AttributeError: pass + + # prevent other threads from stomping all over us. + self._respondLock.acquire() + + # Add the session, if it doesn't already exist + self._addSession(sessionID) + + # split the input into discrete sentences + sentences = Utils.sentences(input) + finalResponse = "" + for s in sentences: + # Add the input to the history list before fetching the + # response, so that tags work properly. + inputHistory = self.getPredicate(self._inputHistory, sessionID) + inputHistory.append(s) + while len(inputHistory) > self._maxHistorySize: + inputHistory.pop(0) + self.setPredicate(self._inputHistory, inputHistory, sessionID) + + # Fetch the response + response = self._respond(s, sessionID) + + # add the data from this exchange to the history lists + outputHistory = self.getPredicate(self._outputHistory, sessionID) + outputHistory.append(response) + while len(outputHistory) > self._maxHistorySize: + outputHistory.pop(0) + self.setPredicate(self._outputHistory, outputHistory, sessionID) + + # append this response to the final response. + finalResponse += (response + " ") + finalResponse = finalResponse.strip() + + assert(len(self.getPredicate(self._inputStack, sessionID)) == 0) + + # release the lock and return + self._respondLock.release() + try: return finalResponse.encode(self._textEncoding) + except UnicodeError: return finalResponse + + # This version of _respond() just fetches the response for some input. + # It does not mess with the input and output histories. Recursive calls + # to respond() spawned from tags like should call this function + # instead of respond(). + def _respond(self, input, sessionID): + """Private version of respond(), does the real work.""" + if len(input) == 0: + return "" + + # guard against infinite recursion + inputStack = self.getPredicate(self._inputStack, sessionID) + if len(inputStack) > self._maxRecursionDepth: + if self._verboseMode: + err = "WARNING: maximum recursion depth exceeded (input='%s')" % input.encode(self._textEncoding, 'replace') + sys.stderr.write(err) + return "" + + # push the input onto the input stack + inputStack = self.getPredicate(self._inputStack, sessionID) + inputStack.append(input) + self.setPredicate(self._inputStack, inputStack, sessionID) + + # run the input through the 'normal' subber + subbedInput = self._subbers['normal'].sub(input) + + # fetch the bot's previous response, to pass to the match() + # function as 'that'. + outputHistory = self.getPredicate(self._outputHistory, sessionID) + try: that = outputHistory[-1] + except IndexError: that = "" + subbedThat = self._subbers['normal'].sub(that) + + # fetch the current topic + topic = self.getPredicate("topic", sessionID) + subbedTopic = self._subbers['normal'].sub(topic) + + # Determine the final response. + response = "" + elem = self._brain.match(subbedInput, subbedThat, subbedTopic) + if elem is None: + if self._verboseMode: + err = "WARNING: No match found for input: %s\n" % input.encode(self._textEncoding) + sys.stderr.write(err) + else: + # Process the element into a response string. + response += self._processElement(elem, sessionID).strip() + response += " " + response = response.strip() + + # pop the top entry off the input stack. + inputStack = self.getPredicate(self._inputStack, sessionID) + inputStack.pop() + self.setPredicate(self._inputStack, inputStack, sessionID) + + return response + + def _processElement(self,elem, sessionID): + """Process an AIML element. + + The first item of the elem list is the name of the element's + XML tag. The second item is a dictionary containing any + attributes passed to that tag, and their values. Any further + items in the list are the elements enclosed by the current + element's begin and end tags; they are handled by each + element's handler function. + + """ + try: + handlerFunc = self._elementProcessors[elem[0]] + except: + # Oops -- there's no handler function for this element + # type! + if self._verboseMode: + err = "WARNING: No handler found for <%s> element\n" % elem[0].encode(self._textEncoding, 'replace') + sys.stderr.write(err) + return "" + return handlerFunc(elem, sessionID) + + + ###################################################### + ### Individual element-processing functions follow ### + ###################################################### + + # + def _processBot(self, elem, sessionID): + """Process a AIML element. + + Required element attributes: + name: The name of the bot predicate to retrieve. + + elements are used to fetch the value of global, + read-only "bot predicates." These predicates cannot be set + from within AIML; you must use the setBotPredicate() function. + + """ + attrName = elem[1]['name'] + return self.getBotPredicate(attrName) + + # + def _processCondition(self, elem, sessionID): + """Process a AIML element. + + Optional element attributes: + name: The name of a predicate to test. + value: The value to test the predicate for. + + elements come in three flavors. Each has different + attributes, and each handles their contents differently. + + The simplest case is when the tag has both a 'name' + and a 'value' attribute. In this case, if the predicate + 'name' has the value 'value', then the contents of the element + are processed and returned. + + If the element has only a 'name' attribute, then + its contents are a series of
  • elements, each of which has + a 'value' attribute. The list is scanned from top to bottom + until a match is found. Optionally, the last
  • element can + have no 'value' attribute, in which case it is processed and + returned if no other match is found. + + If the element has neither a 'name' nor a 'value' + attribute, then it behaves almost exactly like the previous + case, except that each
  • subelement (except the optional + last entry) must now include both 'name' and 'value' + attributes. + + """ + attr = None + response = "" + attr = elem[1] + + # Case #1: test the value of a specific predicate for a + # specific value. + if attr.has_key('name') and attr.has_key('value'): + val = self.getPredicate(attr['name'], sessionID) + if val == attr['value']: + for e in elem[2:]: + response += self._processElement(e,sessionID) + return response + else: + # Case #2 and #3: Cycle through
  • contents, testing a + # name and value pair for each one. + try: + name = None + if attr.has_key('name'): + name = attr['name'] + # Get the list of
  • elemnents + listitems = [] + for e in elem[2:]: + if e[0] == 'li': + listitems.append(e) + # if listitems is empty, return the empty string + if len(listitems) == 0: + return "" + # iterate through the list looking for a condition that + # matches. + foundMatch = False + for li in listitems: + try: + liAttr = li[1] + # if this is the last list item, it's allowed + # to have no attributes. We just skip it for now. + if len(liAttr.keys()) == 0 and li == listitems[-1]: + continue + # get the name of the predicate to test + liName = name + if liName == None: + liName = liAttr['name'] + # get the value to check against + liValue = liAttr['value'] + # do the test + if self.getPredicate(liName, sessionID) == liValue: + foundMatch = True + response += self._processElement(li,sessionID) + break + except: + # No attributes, no name/value attributes, no + # such predicate/session, or processing error. + if self._verboseMode: print "Something amiss -- skipping listitem", li + raise + if not foundMatch: + # Check the last element of listitems. If it has + # no 'name' or 'value' attribute, process it. + try: + li = listitems[-1] + liAttr = li[1] + if not (liAttr.has_key('name') or liAttr.has_key('value')): + response += self._processElement(li, sessionID) + except: + # listitems was empty, no attributes, missing + # name/value attributes, or processing error. + if self._verboseMode: print "error in default listitem" + raise + except: + # Some other catastrophic cataclysm + if self._verboseMode: print "catastrophic condition failure" + raise + return response + + # + def _processDate(self, elem, sessionID): + """Process a AIML element. + + elements resolve to the current date and time. The + AIML specification doesn't require any particular format for + this information, so I go with whatever's simplest. + + """ + return time.asctime() + + # + def _processFormal(self, elem, sessionID): + """Process a AIML element. + + elements process their contents recursively, and then + capitalize the first letter of each word of the result. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + return string.capwords(response) + + # + def _processGender(self,elem, sessionID): + """Process a AIML element. + + elements process their contents, and then swap the + gender of any third-person singular pronouns in the result. + This subsitution is handled by the aiml.WordSub module. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + return self._subbers['gender'].sub(response) + + # + def _processGet(self, elem, sessionID): + """Process a AIML element. + + Required element attributes: + name: The name of the predicate whose value should be + retrieved from the specified session and returned. If the + predicate doesn't exist, the empty string is returned. + + elements return the value of a predicate from the + specified session. + + """ + return self.getPredicate(elem[1]['name'], sessionID) + + # + def _processGossip(self, elem, sessionID): + """Process a AIML element. + + elements are used to capture and store user input in + an implementation-defined manner, theoretically allowing the + bot to learn from the people it chats with. I haven't + descided how to define my implementation, so right now + behaves identically to . + + """ + return self._processThink(elem, sessionID) + + # + def _processId(self, elem, sessionID): + """ Process an AIML element. + + elements return a unique "user id" for a specific + conversation. In PyAIML, the user id is the name of the + current session. + + """ + return sessionID + + # + def _processInput(self, elem, sessionID): + """Process an AIML element. + + Optional attribute elements: + index: The index of the element from the history list to + return. 1 means the most recent item, 2 means the one + before that, and so on. + + elements return an entry from the input history for + the current session. + + """ + inputHistory = self.getPredicate(self._inputHistory, sessionID) + try: index = int(elem[1]['index']) + except: index = 1 + try: return inputHistory[-index] + except IndexError: + if self._verboseMode: + err = "No such index %d while processing element.\n" % index + sys.stderr.write(err) + return "" + + # + def _processJavascript(self, elem, sessionID): + """Process a AIML element. + + elements process their contents recursively, and + then run the results through a server-side Javascript + interpreter to compute the final response. Implementations + are not required to provide an actual Javascript interpreter, + and right now PyAIML doesn't; elements are behave + exactly like elements. + + """ + return self._processThink(elem, sessionID) + + # + def _processLearn(self, elem, sessionID): + """Process a AIML element. + + elements process their contents recursively, and then + treat the result as an AIML file to open and learn. + + """ + filename = "" + for e in elem[2:]: + filename += self._processElement(e, sessionID) + self.learn(filename) + return "" + + #
  • + def _processLi(self,elem, sessionID): + """Process an
  • AIML element. + + Optional attribute elements: + name: the name of a predicate to query. + value: the value to check that predicate for. + +
  • elements process their contents recursively and return + the results. They can only appear inside and + elements. See _processCondition() and + _processRandom() for details of their usage. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + return response + + # + def _processLowercase(self,elem, sessionID): + """Process a AIML element. + + elements process their contents recursively, and + then convert the results to all-lowercase. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + return string.lower(response) + + # + def _processPerson(self,elem, sessionID): + """Process a AIML element. + + elements process their contents recursively, and then + convert all pronouns in the results from 1st person to 2nd + person, and vice versa. This subsitution is handled by the + aiml.WordSub module. + + If the tag is used atomically (e.g. ), it is + a shortcut for . + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + if len(elem[2:]) == 0: # atomic = + response = self._processElement(['star',{}], sessionID) + return self._subbers['person'].sub(response) + + # + def _processPerson2(self,elem, sessionID): + """Process a AIML element. + + elements process their contents recursively, and then + convert all pronouns in the results from 1st person to 3rd + person, and vice versa. This subsitution is handled by the + aiml.WordSub module. + + If the tag is used atomically (e.g. ), it is + a shortcut for . + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + if len(elem[2:]) == 0: # atomic = + response = self._processElement(['star',{}], sessionID) + return self._subbers['person2'].sub(response) + + # + def _processRandom(self, elem, sessionID): + """Process a AIML element. + + elements contain zero or more
  • elements. If + none, the empty string is returned. If one or more
  • + elements are present, one of them is selected randomly to be + processed recursively and have its results returned. Only the + chosen
  • element's contents are processed. Any non-
  • contents are + ignored. + + """ + listitems = [] + for e in elem[2:]: + if e[0] == 'li': + listitems.append(e) + if len(listitems) == 0: + return "" + + # select and process a random listitem. + random.shuffle(listitems) + return self._processElement(listitems[0], sessionID) + + # + def _processSentence(self,elem, sessionID): + """Process a AIML element. + + elements process their contents recursively, and + then capitalize the first letter of the results. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + try: + response = response.strip() + words = string.split(response, " ", 1) + words[0] = string.capitalize(words[0]) + response = string.join(words) + return response + except IndexError: # response was empty + return "" + + # + def _processSet(self, elem, sessionID): + """Process a AIML element. + + Required element attributes: + name: The name of the predicate to set. + + elements process their contents recursively, and assign the results to a predicate + (given by their 'name' attribute) in the current session. The contents of the element + are also returned. + + """ + value = "" + for e in elem[2:]: + value += self._processElement(e, sessionID) + self.setPredicate(elem[1]['name'], value, sessionID) + return value + + # + def _processSize(self,elem, sessionID): + """Process a AIML element. + + elements return the number of AIML categories currently + in the bot's brain. + + """ + return str(self.numCategories()) + + # + def _processSr(self,elem,sessionID): + """Process an AIML element. + + elements are shortcuts for . + + """ + star = self._processElement(['star',{}], sessionID) + response = self._respond(star, sessionID) + return response + + # + def _processSrai(self,elem, sessionID): + """Process a AIML element. + + elements recursively process their contents, and then + pass the results right back into the AIML interpreter as a new + piece of input. The results of this new input string are + returned. + + """ + newInput = "" + for e in elem[2:]: + newInput += self._processElement(e, sessionID) + return self._respond(newInput, sessionID) + + # + def _processStar(self, elem, sessionID): + """Process a AIML element. + + Optional attribute elements: + index: Which "*" character in the current pattern should + be matched? + + elements return the text fragment matched by the "*" + character in the current input pattern. For example, if the + input "Hello Tom Smith, how are you?" matched the pattern + "HELLO * HOW ARE YOU", then a element in the template + would evaluate to "Tom Smith". + + """ + try: index = int(elem[1]['index']) + except KeyError: index = 1 + # fetch the user's last input + inputStack = self.getPredicate(self._inputStack, sessionID) + input = self._subbers['normal'].sub(inputStack[-1]) + # fetch the Kernel's last response (for 'that' context) + outputHistory = self.getPredicate(self._outputHistory, sessionID) + try: that = self._subbers['normal'].sub(outputHistory[-1]) + except: that = "" # there might not be any output yet + topic = self.getPredicate("topic", sessionID) + response = self._brain.star("star", input, that, topic, index) + return response + + # + def _processSystem(self,elem, sessionID): + """Process a AIML element. + + elements process their contents recursively, and then + attempt to execute the results as a shell command on the + server. The AIML interpreter blocks until the command is + complete, and then returns the command's output. + + For cross-platform compatibility, any file paths inside + tags should use Unix-style forward slashes ("/") as a + directory separator. + + """ + # build up the command string + command = "" + for e in elem[2:]: + command += self._processElement(e, sessionID) + + # normalize the path to the command. Under Windows, this + # switches forward-slashes to back-slashes; all system + # elements should use unix-style paths for cross-platform + # compatibility. + #executable,args = command.split(" ", 1) + #executable = os.path.normpath(executable) + #command = executable + " " + args + command = os.path.normpath(command) + + # execute the command. + response = "" + try: + out = os.popen(command) + except RuntimeError, msg: + if self._verboseMode: + err = "WARNING: RuntimeError while processing \"system\" element:\n%s\n" % msg.encode(self._textEncoding, 'replace') + sys.stderr.write(err) + return "There was an error while computing my response. Please inform my botmaster." + for line in out: + response += line + "\n" + response = string.join(response.splitlines()).strip() + return response + + #