# -*- coding=ISO-8859-1 -*- """This file contains the public interface to the aiml module.""" import AimlParser import DefaultSubs import Utils from PatternMgr import PatternMgr from WordSub import WordSub from ConfigParser import ConfigParser import copy import glob import os import random import re import string import sys import time import threading import xml.sax class Kernel: # module constants _globalSessionID = "_global" # key of the global session (duh) _maxHistorySize = 10 # maximum length of the _inputs and _responses lists _maxRecursionDepth = 100 # maximum number of recursive / tags before the response is aborted. # special predicate keys _inputHistory = "_inputHistory" # keys to a queue (list) of recent user input _outputHistory = "_outputHistory" # keys to a queue (list) of recent responses. _inputStack = "_inputStack" # Should always be empty in between calls to respond() def __init__(self): self._verboseMode = True self._version = "PyAIML 0.8.5" self._brain = PatternMgr() self._respondLock = threading.RLock() self._textEncoding = "utf-8" # set up the sessions self._sessions = {} self._addSession(self._globalSessionID) # Set up the bot predicates self._botPredicates = {} self.setBotPredicate("name", "Nameless") # set up the word substitutors (subbers): self._subbers = {} self._subbers['gender'] = WordSub(DefaultSubs.defaultGender) self._subbers['person'] = WordSub(DefaultSubs.defaultPerson) self._subbers['person2'] = WordSub(DefaultSubs.defaultPerson2) self._subbers['normal'] = WordSub(DefaultSubs.defaultNormal) # set up the element processors self._elementProcessors = { "bot": self._processBot, "condition": self._processCondition, "date": self._processDate, "formal": self._processFormal, "gender": self._processGender, "get": self._processGet, "gossip": self._processGossip, "id": self._processId, "input": self._processInput, "javascript": self._processJavascript, "learn": self._processLearn, "li": self._processLi, "lowercase": self._processLowercase, "person": self._processPerson, "person2": self._processPerson2, "random": self._processRandom, "text": self._processText, "sentence": self._processSentence, "set": self._processSet, "size": self._processSize, "sr": self._processSr, "srai": self._processSrai, "star": self._processStar, "system": self._processSystem, "template": self._processTemplate, "that": self._processThat, "thatstar": self._processThatstar, "think": self._processThink, "topicstar": self._processTopicstar, "uppercase": self._processUppercase, "version": self._processVersion, } def bootstrap(self, brainFile = None, learnFiles = [], commands = []): """Prepare a Kernel object for use. If a brainFile argument is provided, the Kernel attempts to load the brain at the specified filename. If learnFiles is provided, the Kernel attempts to load the specified AIML files. Finally, each of the input strings in the commands list is passed to respond(). """ start = time.clock() if brainFile: self.loadBrain(brainFile) # learnFiles might be a string, in which case it should be # turned into a single-element list. learns = learnFiles try: learns = [ learnFiles + "" ] except: pass for file in learns: self.learn(file) # ditto for commands cmds = commands try: cmds = [ commands + "" ] except: pass for cmd in cmds: print self._respond(cmd, self._globalSessionID) if self._verboseMode: print "Kernel bootstrap completed in %.2f seconds" % (time.clock() - start) def verbose(self, isVerbose = True): """Enable/disable verbose output mode.""" self._verboseMode = isVerbose def version(self): """Return the Kernel's version string.""" return self._version def numCategories(self): """Return the number of categories the Kernel has learned.""" # there's a one-to-one mapping between templates and categories return self._brain.numTemplates() def resetBrain(self): """Reset the brain to its initial state. This is essentially equivilant to: del(kern) kern = aiml.Kernel() """ del(self._brain) self.__init__() def loadBrain(self, filename): """Attempt to load a previously-saved 'brain' from the specified filename. NOTE: the current contents of the 'brain' will be discarded! """ if self._verboseMode: print "Loading brain from %s..." % filename, start = time.clock() self._brain.restore(filename) if self._verboseMode: end = time.clock() - start print "done (%d categories in %.2f seconds)" % (self._brain.numTemplates(), end) def saveBrain(self, filename): """Dump the contents of the bot's brain to a file on disk.""" if self._verboseMode: print "Saving brain to %s..." % filename, start = time.clock() self._brain.save(filename) if self._verboseMode: print "done (%.2f seconds)" % (time.clock() - start) def getPredicate(self, name, sessionID = _globalSessionID): """Retrieve the current value of the predicate 'name' from the specified session. If name is not a valid predicate in the session, the empty string is returned. """ try: return self._sessions[sessionID][name] except KeyError: return "" def setPredicate(self, name, value, sessionID = _globalSessionID): """Set the value of the predicate 'name' in the specified session. If sessionID is not a valid session, it will be created. If name is not a valid predicate in the session, it will be created. """ self._addSession(sessionID) # add the session, if it doesn't already exist. self._sessions[sessionID][name] = value def getBotPredicate(self, name): """Retrieve the value of the specified bot predicate. If name is not a valid bot predicate, the empty string is returned. """ try: return self._botPredicates[name] except KeyError: return "" def setBotPredicate(self, name, value): """Set the value of the specified bot predicate. If name is not a valid bot predicate, it will be created. """ self._botPredicates[name] = value # Clumsy hack: if updating the bot name, we must update the # name in the brain as well if name == "name": self._brain.setBotName(self.getBotPredicate("name")) def setTextEncoding(self, encoding): """Set the text encoding used when loading AIML files (Latin-1, UTF-8, etc.).""" self._textEncoding = encoding def loadSubs(self, filename): """Load a substitutions file. The file must be in the Windows-style INI format (see the standard ConfigParser module docs for information on this format). Each section of the file is loaded into its own substituter. """ inFile = file(filename) parser = ConfigParser() parser.readfp(inFile, filename) inFile.close() for s in parser.sections(): # Add a new WordSub instance for this section. If one already # exists, delete it. if self._subbers.has_key(s): del(self._subbers[s]) self._subbers[s] = WordSub() # iterate over the key,value pairs and add them to the subber for k,v in parser.items(s): self._subbers[s][k] = v def _addSession(self, sessionID): """Create a new session with the specified ID string.""" if self._sessions.has_key(sessionID): return # Create the session. self._sessions[sessionID] = { # Initialize the special reserved predicates self._inputHistory: [], self._outputHistory: [], self._inputStack: [] } def _deleteSession(self, sessionID): """Delete the specified session.""" if self._sessions.has_key(sessionID): _sessions.pop(sessionID) def getSessionData(self, sessionID = None): """Return a copy of the session data dictionary for the specified session. If no sessionID is specified, return a dictionary containing *all* of the individual session dictionaries. """ s = None if sessionID is not None: try: s = self._sessions[sessionID] except KeyError: s = {} else: s = self._sessions return copy.deepcopy(s) def learn(self, filename): """Load and learn the contents of the specified AIML file. If filename includes wildcard characters, all matching files will be loaded and learned. """ for f in glob.glob(filename): if self._verboseMode: print "Loading %s..." % f, start = time.clock() # Load and parse the AIML file. parser = AimlParser.create_parser() handler = parser.getContentHandler() handler.setEncoding(self._textEncoding) try: parser.parse(f) except xml.sax.SAXParseException, msg: err = "\nFATAL PARSE ERROR in file %s:\n%s\n" % (f,msg) sys.stderr.write(err) continue # store the pattern/template pairs in the PatternMgr. for key,tem in handler.categories.items(): self._brain.add(key,tem) # Parsing was successful. if self._verboseMode: print "done (%.2f seconds)" % (time.clock() - start) def respond(self, input, sessionID = _globalSessionID): """Return the Kernel's response to the input string.""" if len(input) == 0: return "" #ensure that input is a unicode string try: input = input.decode(self._textEncoding, 'replace') except UnicodeError: pass except AttributeError: pass # prevent other threads from stomping all over us. self._respondLock.acquire() # Add the session, if it doesn't already exist self._addSession(sessionID) # split the input into discrete sentences sentences = Utils.sentences(input) finalResponse = "" for s in sentences: # Add the input to the history list before fetching the # response, so that tags work properly. inputHistory = self.getPredicate(self._inputHistory, sessionID) inputHistory.append(s) while len(inputHistory) > self._maxHistorySize: inputHistory.pop(0) self.setPredicate(self._inputHistory, inputHistory, sessionID) # Fetch the response response = self._respond(s, sessionID) # add the data from this exchange to the history lists outputHistory = self.getPredicate(self._outputHistory, sessionID) outputHistory.append(response) while len(outputHistory) > self._maxHistorySize: outputHistory.pop(0) self.setPredicate(self._outputHistory, outputHistory, sessionID) # append this response to the final response. finalResponse += (response + " ") finalResponse = finalResponse.strip() assert(len(self.getPredicate(self._inputStack, sessionID)) == 0) # release the lock and return self._respondLock.release() try: return finalResponse.encode(self._textEncoding) except UnicodeError: return finalResponse # This version of _respond() just fetches the response for some input. # It does not mess with the input and output histories. Recursive calls # to respond() spawned from tags like should call this function # instead of respond(). def _respond(self, input, sessionID): """Private version of respond(), does the real work.""" if len(input) == 0: return "" # guard against infinite recursion inputStack = self.getPredicate(self._inputStack, sessionID) if len(inputStack) > self._maxRecursionDepth: if self._verboseMode: err = "WARNING: maximum recursion depth exceeded (input='%s')" % input.encode(self._textEncoding, 'replace') sys.stderr.write(err) return "" # push the input onto the input stack inputStack = self.getPredicate(self._inputStack, sessionID) inputStack.append(input) self.setPredicate(self._inputStack, inputStack, sessionID) # run the input through the 'normal' subber subbedInput = self._subbers['normal'].sub(input) # fetch the bot's previous response, to pass to the match() # function as 'that'. outputHistory = self.getPredicate(self._outputHistory, sessionID) try: that = outputHistory[-1] except IndexError: that = "" subbedThat = self._subbers['normal'].sub(that) # fetch the current topic topic = self.getPredicate("topic", sessionID) subbedTopic = self._subbers['normal'].sub(topic) # Determine the final response. response = "" elem = self._brain.match(subbedInput, subbedThat, subbedTopic) if elem is None: if self._verboseMode: err = "WARNING: No match found for input: %s\n" % input.encode(self._textEncoding) sys.stderr.write(err) else: # Process the element into a response string. response += self._processElement(elem, sessionID).strip() response += " " response = response.strip() # pop the top entry off the input stack. inputStack = self.getPredicate(self._inputStack, sessionID) inputStack.pop() self.setPredicate(self._inputStack, inputStack, sessionID) return response def _processElement(self,elem, sessionID): """Process an AIML element. The first item of the elem list is the name of the element's XML tag. The second item is a dictionary containing any attributes passed to that tag, and their values. Any further items in the list are the elements enclosed by the current element's begin and end tags; they are handled by each element's handler function. """ try: handlerFunc = self._elementProcessors[elem[0]] except: # Oops -- there's no handler function for this element # type! if self._verboseMode: err = "WARNING: No handler found for <%s> element\n" % elem[0].encode(self._textEncoding, 'replace') sys.stderr.write(err) return "" return handlerFunc(elem, sessionID) ###################################################### ### Individual element-processing functions follow ### ###################################################### # def _processBot(self, elem, sessionID): """Process a AIML element. Required element attributes: name: The name of the bot predicate to retrieve. elements are used to fetch the value of global, read-only "bot predicates." These predicates cannot be set from within AIML; you must use the setBotPredicate() function. """ attrName = elem[1]['name'] return self.getBotPredicate(attrName) # def _processCondition(self, elem, sessionID): """Process a AIML element. Optional element attributes: name: The name of a predicate to test. value: The value to test the predicate for. elements come in three flavors. Each has different attributes, and each handles their contents differently. The simplest case is when the tag has both a 'name' and a 'value' attribute. In this case, if the predicate 'name' has the value 'value', then the contents of the element are processed and returned. If the element has only a 'name' attribute, then its contents are a series of
  • elements, each of which has a 'value' attribute. The list is scanned from top to bottom until a match is found. Optionally, the last
  • element can have no 'value' attribute, in which case it is processed and returned if no other match is found. If the element has neither a 'name' nor a 'value' attribute, then it behaves almost exactly like the previous case, except that each
  • subelement (except the optional last entry) must now include both 'name' and 'value' attributes. """ attr = None response = "" attr = elem[1] # Case #1: test the value of a specific predicate for a # specific value. if attr.has_key('name') and attr.has_key('value'): val = self.getPredicate(attr['name'], sessionID) if val == attr['value']: for e in elem[2:]: response += self._processElement(e,sessionID) return response else: # Case #2 and #3: Cycle through
  • contents, testing a # name and value pair for each one. try: name = None if attr.has_key('name'): name = attr['name'] # Get the list of
  • elemnents listitems = [] for e in elem[2:]: if e[0] == 'li': listitems.append(e) # if listitems is empty, return the empty string if len(listitems) == 0: return "" # iterate through the list looking for a condition that # matches. foundMatch = False for li in listitems: try: liAttr = li[1] # if this is the last list item, it's allowed # to have no attributes. We just skip it for now. if len(liAttr.keys()) == 0 and li == listitems[-1]: continue # get the name of the predicate to test liName = name if liName == None: liName = liAttr['name'] # get the value to check against liValue = liAttr['value'] # do the test if self.getPredicate(liName, sessionID) == liValue: foundMatch = True response += self._processElement(li,sessionID) break except: # No attributes, no name/value attributes, no # such predicate/session, or processing error. if self._verboseMode: print "Something amiss -- skipping listitem", li raise if not foundMatch: # Check the last element of listitems. If it has # no 'name' or 'value' attribute, process it. try: li = listitems[-1] liAttr = li[1] if not (liAttr.has_key('name') or liAttr.has_key('value')): response += self._processElement(li, sessionID) except: # listitems was empty, no attributes, missing # name/value attributes, or processing error. if self._verboseMode: print "error in default listitem" raise except: # Some other catastrophic cataclysm if self._verboseMode: print "catastrophic condition failure" raise return response # def _processDate(self, elem, sessionID): """Process a AIML element. elements resolve to the current date and time. The AIML specification doesn't require any particular format for this information, so I go with whatever's simplest. """ return time.asctime() # def _processFormal(self, elem, sessionID): """Process a AIML element. elements process their contents recursively, and then capitalize the first letter of each word of the result. """ response = "" for e in elem[2:]: response += self._processElement(e, sessionID) return string.capwords(response) # def _processGender(self,elem, sessionID): """Process a AIML element. elements process their contents, and then swap the gender of any third-person singular pronouns in the result. This subsitution is handled by the aiml.WordSub module. """ response = "" for e in elem[2:]: response += self._processElement(e, sessionID) return self._subbers['gender'].sub(response) # def _processGet(self, elem, sessionID): """Process a AIML element. Required element attributes: name: The name of the predicate whose value should be retrieved from the specified session and returned. If the predicate doesn't exist, the empty string is returned. elements return the value of a predicate from the specified session. """ return self.getPredicate(elem[1]['name'], sessionID) # def _processGossip(self, elem, sessionID): """Process a AIML element. elements are used to capture and store user input in an implementation-defined manner, theoretically allowing the bot to learn from the people it chats with. I haven't descided how to define my implementation, so right now behaves identically to . """ return self._processThink(elem, sessionID) # def _processId(self, elem, sessionID): """ Process an AIML element. elements return a unique "user id" for a specific conversation. In PyAIML, the user id is the name of the current session. """ return sessionID # def _processInput(self, elem, sessionID): """Process an AIML element. Optional attribute elements: index: The index of the element from the history list to return. 1 means the most recent item, 2 means the one before that, and so on. elements return an entry from the input history for the current session. """ inputHistory = self.getPredicate(self._inputHistory, sessionID) try: index = int(elem[1]['index']) except: index = 1 try: return inputHistory[-index] except IndexError: if self._verboseMode: err = "No such index %d while processing element.\n" % index sys.stderr.write(err) return "" # def _processJavascript(self, elem, sessionID): """Process a AIML element. elements process their contents recursively, and then run the results through a server-side Javascript interpreter to compute the final response. Implementations are not required to provide an actual Javascript interpreter, and right now PyAIML doesn't; elements are behave exactly like elements. """ return self._processThink(elem, sessionID) # def _processLearn(self, elem, sessionID): """Process a AIML element. elements process their contents recursively, and then treat the result as an AIML file to open and learn. """ filename = "" for e in elem[2:]: filename += self._processElement(e, sessionID) self.learn(filename) return "" #
  • def _processLi(self,elem, sessionID): """Process an
  • AIML element. Optional attribute elements: name: the name of a predicate to query. value: the value to check that predicate for.
  • elements process their contents recursively and return the results. They can only appear inside and elements. See _processCondition() and _processRandom() for details of their usage. """ response = "" for e in elem[2:]: response += self._processElement(e, sessionID) return response # def _processLowercase(self,elem, sessionID): """Process a AIML element. elements process their contents recursively, and then convert the results to all-lowercase. """ response = "" for e in elem[2:]: response += self._processElement(e, sessionID) return string.lower(response) # def _processPerson(self,elem, sessionID): """Process a AIML element. elements process their contents recursively, and then convert all pronouns in the results from 1st person to 2nd person, and vice versa. This subsitution is handled by the aiml.WordSub module. If the tag is used atomically (e.g. ), it is a shortcut for . """ response = "" for e in elem[2:]: response += self._processElement(e, sessionID) if len(elem[2:]) == 0: # atomic = response = self._processElement(['star',{}], sessionID) return self._subbers['person'].sub(response) # def _processPerson2(self,elem, sessionID): """Process a AIML element. elements process their contents recursively, and then convert all pronouns in the results from 1st person to 3rd person, and vice versa. This subsitution is handled by the aiml.WordSub module. If the tag is used atomically (e.g. ), it is a shortcut for . """ response = "" for e in elem[2:]: response += self._processElement(e, sessionID) if len(elem[2:]) == 0: # atomic = response = self._processElement(['star',{}], sessionID) return self._subbers['person2'].sub(response) # def _processRandom(self, elem, sessionID): """Process a AIML element. elements contain zero or more
  • elements. If none, the empty string is returned. If one or more
  • elements are present, one of them is selected randomly to be processed recursively and have its results returned. Only the chosen
  • element's contents are processed. Any non-
  • contents are ignored. """ listitems = [] for e in elem[2:]: if e[0] == 'li': listitems.append(e) if len(listitems) == 0: return "" # select and process a random listitem. random.shuffle(listitems) return self._processElement(listitems[0], sessionID) # def _processSentence(self,elem, sessionID): """Process a AIML element. elements process their contents recursively, and then capitalize the first letter of the results. """ response = "" for e in elem[2:]: response += self._processElement(e, sessionID) try: response = response.strip() words = string.split(response, " ", 1) words[0] = string.capitalize(words[0]) response = string.join(words) return response except IndexError: # response was empty return "" # def _processSet(self, elem, sessionID): """Process a AIML element. Required element attributes: name: The name of the predicate to set. elements process their contents recursively, and assign the results to a predicate (given by their 'name' attribute) in the current session. The contents of the element are also returned. """ value = "" for e in elem[2:]: value += self._processElement(e, sessionID) self.setPredicate(elem[1]['name'], value, sessionID) return value # def _processSize(self,elem, sessionID): """Process a AIML element. elements return the number of AIML categories currently in the bot's brain. """ return str(self.numCategories()) # def _processSr(self,elem,sessionID): """Process an AIML element. elements are shortcuts for . """ star = self._processElement(['star',{}], sessionID) response = self._respond(star, sessionID) return response # def _processSrai(self,elem, sessionID): """Process a AIML element. elements recursively process their contents, and then pass the results right back into the AIML interpreter as a new piece of input. The results of this new input string are returned. """ newInput = "" for e in elem[2:]: newInput += self._processElement(e, sessionID) return self._respond(newInput, sessionID) # def _processStar(self, elem, sessionID): """Process a AIML element. Optional attribute elements: index: Which "*" character in the current pattern should be matched? elements return the text fragment matched by the "*" character in the current input pattern. For example, if the input "Hello Tom Smith, how are you?" matched the pattern "HELLO * HOW ARE YOU", then a element in the template would evaluate to "Tom Smith". """ try: index = int(elem[1]['index']) except KeyError: index = 1 # fetch the user's last input inputStack = self.getPredicate(self._inputStack, sessionID) input = self._subbers['normal'].sub(inputStack[-1]) # fetch the Kernel's last response (for 'that' context) outputHistory = self.getPredicate(self._outputHistory, sessionID) try: that = self._subbers['normal'].sub(outputHistory[-1]) except: that = "" # there might not be any output yet topic = self.getPredicate("topic", sessionID) response = self._brain.star("star", input, that, topic, index) return response # def _processSystem(self,elem, sessionID): """Process a AIML element. elements process their contents recursively, and then attempt to execute the results as a shell command on the server. The AIML interpreter blocks until the command is complete, and then returns the command's output. For cross-platform compatibility, any file paths inside tags should use Unix-style forward slashes ("/") as a directory separator. """ # build up the command string command = "" for e in elem[2:]: command += self._processElement(e, sessionID) # normalize the path to the command. Under Windows, this # switches forward-slashes to back-slashes; all system # elements should use unix-style paths for cross-platform # compatibility. #executable,args = command.split(" ", 1) #executable = os.path.normpath(executable) #command = executable + " " + args command = os.path.normpath(command) # execute the command. response = "" try: out = os.popen(command) except RuntimeError, msg: if self._verboseMode: err = "WARNING: RuntimeError while processing \"system\" element:\n%s\n" % msg.encode(self._textEncoding, 'replace') sys.stderr.write(err) return "There was an error while computing my response. Please inform my botmaster." for line in out: response += line + "\n" response = string.join(response.splitlines()).strip() return response #