Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/bot/aiml/Kernel.py
diff options
context:
space:
mode:
Diffstat (limited to 'bot/aiml/Kernel.py')
-rw-r--r--bot/aiml/Kernel.py1183
1 files changed, 0 insertions, 1183 deletions
diff --git a/bot/aiml/Kernel.py b/bot/aiml/Kernel.py
deleted file mode 100644
index 1014ade..0000000
--- a/bot/aiml/Kernel.py
+++ /dev/null
@@ -1,1183 +0,0 @@
-# -*- coding=ISO-8859-1 -*-
-"""This file contains the public interface to the aiml module."""
-import AimlParser
-import DefaultSubs
-import Utils
-from PatternMgr import PatternMgr
-from WordSub import WordSub
-
-from ConfigParser import ConfigParser
-import copy
-import glob
-import os
-import random
-import re
-import string
-import sys
-import time
-import threading
-import xml.sax
-
-
-class Kernel:
- # module constants
- _globalSessionID = "_global" # key of the global session (duh)
- _maxHistorySize = 10 # maximum length of the _inputs and _responses lists
- _maxRecursionDepth = 100 # maximum number of recursive <srai>/<sr> tags before the response is aborted.
- # special predicate keys
- _inputHistory = "_inputHistory" # keys to a queue (list) of recent user input
- _outputHistory = "_outputHistory" # keys to a queue (list) of recent responses.
- _inputStack = "_inputStack" # Should always be empty in between calls to respond()
-
- def __init__(self):
- self._verboseMode = True
- self._version = "PyAIML 0.8.5"
- self._brain = PatternMgr()
- self._respondLock = threading.RLock()
- self._textEncoding = "utf-8"
-
- # set up the sessions
- self._sessions = {}
- self._addSession(self._globalSessionID)
-
- # Set up the bot predicates
- self._botPredicates = {}
- self.setBotPredicate("name", "Nameless")
-
- # set up the word substitutors (subbers):
- self._subbers = {}
- self._subbers['gender'] = WordSub(DefaultSubs.defaultGender)
- self._subbers['person'] = WordSub(DefaultSubs.defaultPerson)
- self._subbers['person2'] = WordSub(DefaultSubs.defaultPerson2)
- self._subbers['normal'] = WordSub(DefaultSubs.defaultNormal)
-
- # set up the element processors
- self._elementProcessors = {
- "bot": self._processBot,
- "condition": self._processCondition,
- "date": self._processDate,
- "formal": self._processFormal,
- "gender": self._processGender,
- "get": self._processGet,
- "gossip": self._processGossip,
- "id": self._processId,
- "input": self._processInput,
- "javascript": self._processJavascript,
- "learn": self._processLearn,
- "li": self._processLi,
- "lowercase": self._processLowercase,
- "person": self._processPerson,
- "person2": self._processPerson2,
- "random": self._processRandom,
- "text": self._processText,
- "sentence": self._processSentence,
- "set": self._processSet,
- "size": self._processSize,
- "sr": self._processSr,
- "srai": self._processSrai,
- "star": self._processStar,
- "system": self._processSystem,
- "template": self._processTemplate,
- "that": self._processThat,
- "thatstar": self._processThatstar,
- "think": self._processThink,
- "topicstar": self._processTopicstar,
- "uppercase": self._processUppercase,
- "version": self._processVersion,
- }
-
- def bootstrap(self, brainFile = None, learnFiles = [], commands = []):
- """Prepare a Kernel object for use.
-
- If a brainFile argument is provided, the Kernel attempts to
- load the brain at the specified filename.
-
- If learnFiles is provided, the Kernel attempts to load the
- specified AIML files.
-
- Finally, each of the input strings in the commands list is
- passed to respond().
-
- """
- start = time.clock()
- if brainFile:
- self.loadBrain(brainFile)
-
- # learnFiles might be a string, in which case it should be
- # turned into a single-element list.
- learns = learnFiles
- try: learns = [ learnFiles + "" ]
- except: pass
- for file in learns:
- self.learn(file)
-
- # ditto for commands
- cmds = commands
- try: cmds = [ commands + "" ]
- except: pass
- for cmd in cmds:
- print self._respond(cmd, self._globalSessionID)
-
- if self._verboseMode:
- print "Kernel bootstrap completed in %.2f seconds" % (time.clock() - start)
-
- def verbose(self, isVerbose = True):
- """Enable/disable verbose output mode."""
- self._verboseMode = isVerbose
-
- def version(self):
- """Return the Kernel's version string."""
- return self._version
-
- def numCategories(self):
- """Return the number of categories the Kernel has learned."""
- # there's a one-to-one mapping between templates and categories
- return self._brain.numTemplates()
-
- def resetBrain(self):
- """Reset the brain to its initial state.
-
- This is essentially equivilant to:
- del(kern)
- kern = aiml.Kernel()
-
- """
- del(self._brain)
- self.__init__()
-
- def loadBrain(self, filename):
- """Attempt to load a previously-saved 'brain' from the
- specified filename.
-
- NOTE: the current contents of the 'brain' will be discarded!
-
- """
- if self._verboseMode: print "Loading brain from %s..." % filename,
- start = time.clock()
- self._brain.restore(filename)
- if self._verboseMode:
- end = time.clock() - start
- print "done (%d categories in %.2f seconds)" % (self._brain.numTemplates(), end)
-
- def saveBrain(self, filename):
- """Dump the contents of the bot's brain to a file on disk."""
- if self._verboseMode: print "Saving brain to %s..." % filename,
- start = time.clock()
- self._brain.save(filename)
- if self._verboseMode:
- print "done (%.2f seconds)" % (time.clock() - start)
-
- def getPredicate(self, name, sessionID = _globalSessionID):
- """Retrieve the current value of the predicate 'name' from the
- specified session.
-
- If name is not a valid predicate in the session, the empty
- string is returned.
-
- """
- try: return self._sessions[sessionID][name]
- except KeyError: return ""
-
- def setPredicate(self, name, value, sessionID = _globalSessionID):
- """Set the value of the predicate 'name' in the specified
- session.
-
- If sessionID is not a valid session, it will be created. If
- name is not a valid predicate in the session, it will be
- created.
-
- """
- self._addSession(sessionID) # add the session, if it doesn't already exist.
- self._sessions[sessionID][name] = value
-
- def getBotPredicate(self, name):
- """Retrieve the value of the specified bot predicate.
-
- If name is not a valid bot predicate, the empty string is returned.
-
- """
- try: return self._botPredicates[name]
- except KeyError: return ""
-
- def setBotPredicate(self, name, value):
- """Set the value of the specified bot predicate.
-
- If name is not a valid bot predicate, it will be created.
-
- """
- self._botPredicates[name] = value
- # Clumsy hack: if updating the bot name, we must update the
- # name in the brain as well
- if name == "name":
- self._brain.setBotName(self.getBotPredicate("name"))
-
- def setTextEncoding(self, encoding):
- """Set the text encoding used when loading AIML files (Latin-1, UTF-8, etc.)."""
- self._textEncoding = encoding
-
- def loadSubs(self, filename):
- """Load a substitutions file.
-
- The file must be in the Windows-style INI format (see the
- standard ConfigParser module docs for information on this
- format). Each section of the file is loaded into its own
- substituter.
-
- """
- inFile = file(filename)
- parser = ConfigParser()
- parser.readfp(inFile, filename)
- inFile.close()
- for s in parser.sections():
- # Add a new WordSub instance for this section. If one already
- # exists, delete it.
- if self._subbers.has_key(s):
- del(self._subbers[s])
- self._subbers[s] = WordSub()
- # iterate over the key,value pairs and add them to the subber
- for k,v in parser.items(s):
- self._subbers[s][k] = v
-
- def _addSession(self, sessionID):
- """Create a new session with the specified ID string."""
- if self._sessions.has_key(sessionID):
- return
- # Create the session.
- self._sessions[sessionID] = {
- # Initialize the special reserved predicates
- self._inputHistory: [],
- self._outputHistory: [],
- self._inputStack: []
- }
-
- def _deleteSession(self, sessionID):
- """Delete the specified session."""
- if self._sessions.has_key(sessionID):
- _sessions.pop(sessionID)
-
- def getSessionData(self, sessionID = None):
- """Return a copy of the session data dictionary for the
- specified session.
-
- If no sessionID is specified, return a dictionary containing
- *all* of the individual session dictionaries.
-
- """
- s = None
- if sessionID is not None:
- try: s = self._sessions[sessionID]
- except KeyError: s = {}
- else:
- s = self._sessions
- return copy.deepcopy(s)
-
- def learn(self, filename):
- """Load and learn the contents of the specified AIML file.
-
- If filename includes wildcard characters, all matching files
- will be loaded and learned.
-
- """
- for f in glob.glob(filename):
- if self._verboseMode: print "Loading %s..." % f,
- start = time.clock()
- # Load and parse the AIML file.
- parser = AimlParser.create_parser()
- handler = parser.getContentHandler()
- handler.setEncoding(self._textEncoding)
- try: parser.parse(f)
- except xml.sax.SAXParseException, msg:
- err = "\nFATAL PARSE ERROR in file %s:\n%s\n" % (f,msg)
- sys.stderr.write(err)
- continue
- # store the pattern/template pairs in the PatternMgr.
- for key,tem in handler.categories.items():
- self._brain.add(key,tem)
- # Parsing was successful.
- if self._verboseMode:
- print "done (%.2f seconds)" % (time.clock() - start)
-
- def respond(self, input, sessionID = _globalSessionID):
- """Return the Kernel's response to the input string."""
- if len(input) == 0:
- return ""
-
- #ensure that input is a unicode string
- try: input = input.decode(self._textEncoding, 'replace')
- except UnicodeError: pass
- except AttributeError: pass
-
- # prevent other threads from stomping all over us.
- self._respondLock.acquire()
-
- # Add the session, if it doesn't already exist
- self._addSession(sessionID)
-
- # split the input into discrete sentences
- sentences = Utils.sentences(input)
- finalResponse = ""
- for s in sentences:
- # Add the input to the history list before fetching the
- # response, so that <input/> tags work properly.
- inputHistory = self.getPredicate(self._inputHistory, sessionID)
- inputHistory.append(s)
- while len(inputHistory) > self._maxHistorySize:
- inputHistory.pop(0)
- self.setPredicate(self._inputHistory, inputHistory, sessionID)
-
- # Fetch the response
- response = self._respond(s, sessionID)
-
- # add the data from this exchange to the history lists
- outputHistory = self.getPredicate(self._outputHistory, sessionID)
- outputHistory.append(response)
- while len(outputHistory) > self._maxHistorySize:
- outputHistory.pop(0)
- self.setPredicate(self._outputHistory, outputHistory, sessionID)
-
- # append this response to the final response.
- finalResponse += (response + " ")
- finalResponse = finalResponse.strip()
-
- assert(len(self.getPredicate(self._inputStack, sessionID)) == 0)
-
- # release the lock and return
- self._respondLock.release()
- try: return finalResponse.encode(self._textEncoding)
- except UnicodeError: return finalResponse
-
- # This version of _respond() just fetches the response for some input.
- # It does not mess with the input and output histories. Recursive calls
- # to respond() spawned from tags like <srai> should call this function
- # instead of respond().
- def _respond(self, input, sessionID):
- """Private version of respond(), does the real work."""
- if len(input) == 0:
- return ""
-
- # guard against infinite recursion
- inputStack = self.getPredicate(self._inputStack, sessionID)
- if len(inputStack) > self._maxRecursionDepth:
- if self._verboseMode:
- err = "WARNING: maximum recursion depth exceeded (input='%s')" % input.encode(self._textEncoding, 'replace')
- sys.stderr.write(err)
- return ""
-
- # push the input onto the input stack
- inputStack = self.getPredicate(self._inputStack, sessionID)
- inputStack.append(input)
- self.setPredicate(self._inputStack, inputStack, sessionID)
-
- # run the input through the 'normal' subber
- subbedInput = self._subbers['normal'].sub(input)
-
- # fetch the bot's previous response, to pass to the match()
- # function as 'that'.
- outputHistory = self.getPredicate(self._outputHistory, sessionID)
- try: that = outputHistory[-1]
- except IndexError: that = ""
- subbedThat = self._subbers['normal'].sub(that)
-
- # fetch the current topic
- topic = self.getPredicate("topic", sessionID)
- subbedTopic = self._subbers['normal'].sub(topic)
-
- # Determine the final response.
- response = ""
- elem = self._brain.match(subbedInput, subbedThat, subbedTopic)
- if elem is None:
- if self._verboseMode:
- err = "WARNING: No match found for input: %s\n" % input.encode(self._textEncoding)
- sys.stderr.write(err)
- else:
- # Process the element into a response string.
- response += self._processElement(elem, sessionID).strip()
- response += " "
- response = response.strip()
-
- # pop the top entry off the input stack.
- inputStack = self.getPredicate(self._inputStack, sessionID)
- inputStack.pop()
- self.setPredicate(self._inputStack, inputStack, sessionID)
-
- return response
-
- def _processElement(self,elem, sessionID):
- """Process an AIML element.
-
- The first item of the elem list is the name of the element's
- XML tag. The second item is a dictionary containing any
- attributes passed to that tag, and their values. Any further
- items in the list are the elements enclosed by the current
- element's begin and end tags; they are handled by each
- element's handler function.
-
- """
- try:
- handlerFunc = self._elementProcessors[elem[0]]
- except:
- # Oops -- there's no handler function for this element
- # type!
- if self._verboseMode:
- err = "WARNING: No handler found for <%s> element\n" % elem[0].encode(self._textEncoding, 'replace')
- sys.stderr.write(err)
- return ""
- return handlerFunc(elem, sessionID)
-
-
- ######################################################
- ### Individual element-processing functions follow ###
- ######################################################
-
- # <bot>
- def _processBot(self, elem, sessionID):
- """Process a <bot> AIML element.
-
- Required element attributes:
- name: The name of the bot predicate to retrieve.
-
- <bot> elements are used to fetch the value of global,
- read-only "bot predicates." These predicates cannot be set
- from within AIML; you must use the setBotPredicate() function.
-
- """
- attrName = elem[1]['name']
- return self.getBotPredicate(attrName)
-
- # <condition>
- def _processCondition(self, elem, sessionID):
- """Process a <condition> AIML element.
-
- Optional element attributes:
- name: The name of a predicate to test.
- value: The value to test the predicate for.
-
- <condition> elements come in three flavors. Each has different
- attributes, and each handles their contents differently.
-
- The simplest case is when the <condition> tag has both a 'name'
- and a 'value' attribute. In this case, if the predicate
- 'name' has the value 'value', then the contents of the element
- are processed and returned.
-
- If the <condition> element has only a 'name' attribute, then
- its contents are a series of <li> elements, each of which has
- a 'value' attribute. The list is scanned from top to bottom
- until a match is found. Optionally, the last <li> element can
- have no 'value' attribute, in which case it is processed and
- returned if no other match is found.
-
- If the <condition> element has neither a 'name' nor a 'value'
- attribute, then it behaves almost exactly like the previous
- case, except that each <li> subelement (except the optional
- last entry) must now include both 'name' and 'value'
- attributes.
-
- """
- attr = None
- response = ""
- attr = elem[1]
-
- # Case #1: test the value of a specific predicate for a
- # specific value.
- if attr.has_key('name') and attr.has_key('value'):
- val = self.getPredicate(attr['name'], sessionID)
- if val == attr['value']:
- for e in elem[2:]:
- response += self._processElement(e,sessionID)
- return response
- else:
- # Case #2 and #3: Cycle through <li> contents, testing a
- # name and value pair for each one.
- try:
- name = None
- if attr.has_key('name'):
- name = attr['name']
- # Get the list of <li> elemnents
- listitems = []
- for e in elem[2:]:
- if e[0] == 'li':
- listitems.append(e)
- # if listitems is empty, return the empty string
- if len(listitems) == 0:
- return ""
- # iterate through the list looking for a condition that
- # matches.
- foundMatch = False
- for li in listitems:
- try:
- liAttr = li[1]
- # if this is the last list item, it's allowed
- # to have no attributes. We just skip it for now.
- if len(liAttr.keys()) == 0 and li == listitems[-1]:
- continue
- # get the name of the predicate to test
- liName = name
- if liName == None:
- liName = liAttr['name']
- # get the value to check against
- liValue = liAttr['value']
- # do the test
- if self.getPredicate(liName, sessionID) == liValue:
- foundMatch = True
- response += self._processElement(li,sessionID)
- break
- except:
- # No attributes, no name/value attributes, no
- # such predicate/session, or processing error.
- if self._verboseMode: print "Something amiss -- skipping listitem", li
- raise
- if not foundMatch:
- # Check the last element of listitems. If it has
- # no 'name' or 'value' attribute, process it.
- try:
- li = listitems[-1]
- liAttr = li[1]
- if not (liAttr.has_key('name') or liAttr.has_key('value')):
- response += self._processElement(li, sessionID)
- except:
- # listitems was empty, no attributes, missing
- # name/value attributes, or processing error.
- if self._verboseMode: print "error in default listitem"
- raise
- except:
- # Some other catastrophic cataclysm
- if self._verboseMode: print "catastrophic condition failure"
- raise
- return response
-
- # <date>
- def _processDate(self, elem, sessionID):
- """Process a <date> AIML element.
-
- <date> elements resolve to the current date and time. The
- AIML specification doesn't require any particular format for
- this information, so I go with whatever's simplest.
-
- """
- return time.asctime()
-
- # <formal>
- def _processFormal(self, elem, sessionID):
- """Process a <formal> AIML element.
-
- <formal> elements process their contents recursively, and then
- capitalize the first letter of each word of the result.
-
- """
- response = ""
- for e in elem[2:]:
- response += self._processElement(e, sessionID)
- return string.capwords(response)
-
- # <gender>
- def _processGender(self,elem, sessionID):
- """Process a <gender> AIML element.
-
- <gender> elements process their contents, and then swap the
- gender of any third-person singular pronouns in the result.
- This subsitution is handled by the aiml.WordSub module.
-
- """
- response = ""
- for e in elem[2:]:
- response += self._processElement(e, sessionID)
- return self._subbers['gender'].sub(response)
-
- # <get>
- def _processGet(self, elem, sessionID):
- """Process a <get> AIML element.
-
- Required element attributes:
- name: The name of the predicate whose value should be
- retrieved from the specified session and returned. If the
- predicate doesn't exist, the empty string is returned.
-
- <get> elements return the value of a predicate from the
- specified session.
-
- """
- return self.getPredicate(elem[1]['name'], sessionID)
-
- # <gossip>
- def _processGossip(self, elem, sessionID):
- """Process a <gossip> AIML element.
-
- <gossip> elements are used to capture and store user input in
- an implementation-defined manner, theoretically allowing the
- bot to learn from the people it chats with. I haven't
- descided how to define my implementation, so right now
- <gossip> behaves identically to <think>.
-
- """
- return self._processThink(elem, sessionID)
-
- # <id>
- def _processId(self, elem, sessionID):
- """ Process an <id> AIML element.
-
- <id> elements return a unique "user id" for a specific
- conversation. In PyAIML, the user id is the name of the
- current session.
-
- """
- return sessionID
-
- # <input>
- def _processInput(self, elem, sessionID):
- """Process an <input> AIML element.
-
- Optional attribute elements:
- index: The index of the element from the history list to
- return. 1 means the most recent item, 2 means the one
- before that, and so on.
-
- <input> elements return an entry from the input history for
- the current session.
-
- """
- inputHistory = self.getPredicate(self._inputHistory, sessionID)
- try: index = int(elem[1]['index'])
- except: index = 1
- try: return inputHistory[-index]
- except IndexError:
- if self._verboseMode:
- err = "No such index %d while processing <input> element.\n" % index
- sys.stderr.write(err)
- return ""
-
- # <javascript>
- def _processJavascript(self, elem, sessionID):
- """Process a <javascript> AIML element.
-
- <javascript> elements process their contents recursively, and
- then run the results through a server-side Javascript
- interpreter to compute the final response. Implementations
- are not required to provide an actual Javascript interpreter,
- and right now PyAIML doesn't; <javascript> elements are behave
- exactly like <think> elements.
-
- """
- return self._processThink(elem, sessionID)
-
- # <learn>
- def _processLearn(self, elem, sessionID):
- """Process a <learn> AIML element.
-
- <learn> elements process their contents recursively, and then
- treat the result as an AIML file to open and learn.
-
- """
- filename = ""
- for e in elem[2:]:
- filename += self._processElement(e, sessionID)
- self.learn(filename)
- return ""
-
- # <li>
- def _processLi(self,elem, sessionID):
- """Process an <li> AIML element.
-
- Optional attribute elements:
- name: the name of a predicate to query.
- value: the value to check that predicate for.
-
- <li> elements process their contents recursively and return
- the results. They can only appear inside <condition> and
- <random> elements. See _processCondition() and
- _processRandom() for details of their usage.
-
- """
- response = ""
- for e in elem[2:]:
- response += self._processElement(e, sessionID)
- return response
-
- # <lowercase>
- def _processLowercase(self,elem, sessionID):
- """Process a <lowercase> AIML element.
-
- <lowercase> elements process their contents recursively, and
- then convert the results to all-lowercase.
-
- """
- response = ""
- for e in elem[2:]:
- response += self._processElement(e, sessionID)
- return string.lower(response)
-
- # <person>
- def _processPerson(self,elem, sessionID):
- """Process a <person> AIML element.
-
- <person> elements process their contents recursively, and then
- convert all pronouns in the results from 1st person to 2nd
- person, and vice versa. This subsitution is handled by the
- aiml.WordSub module.
-
- If the <person> tag is used atomically (e.g. <person/>), it is
- a shortcut for <person><star/></person>.
-
- """
- response = ""
- for e in elem[2:]:
- response += self._processElement(e, sessionID)
- if len(elem[2:]) == 0: # atomic <person/> = <person><star/></person>
- response = self._processElement(['star',{}], sessionID)
- return self._subbers['person'].sub(response)
-
- # <person2>
- def _processPerson2(self,elem, sessionID):
- """Process a <person2> AIML element.
-
- <person2> elements process their contents recursively, and then
- convert all pronouns in the results from 1st person to 3rd
- person, and vice versa. This subsitution is handled by the
- aiml.WordSub module.
-
- If the <person2> tag is used atomically (e.g. <person2/>), it is
- a shortcut for <person2><star/></person2>.
-
- """
- response = ""
- for e in elem[2:]:
- response += self._processElement(e, sessionID)
- if len(elem[2:]) == 0: # atomic <person2/> = <person2><star/></person2>
- response = self._processElement(['star',{}], sessionID)
- return self._subbers['person2'].sub(response)
-
- # <random>
- def _processRandom(self, elem, sessionID):
- """Process a <random> AIML element.
-
- <random> elements contain zero or more <li> elements. If
- none, the empty string is returned. If one or more <li>
- elements are present, one of them is selected randomly to be
- processed recursively and have its results returned. Only the
- chosen <li> element's contents are processed. Any non-<li> contents are
- ignored.
-
- """
- listitems = []
- for e in elem[2:]:
- if e[0] == 'li':
- listitems.append(e)
- if len(listitems) == 0:
- return ""
-
- # select and process a random listitem.
- random.shuffle(listitems)
- return self._processElement(listitems[0], sessionID)
-
- # <sentence>
- def _processSentence(self,elem, sessionID):
- """Process a <sentence> AIML element.
-
- <sentence> elements process their contents recursively, and
- then capitalize the first letter of the results.
-
- """
- response = ""
- for e in elem[2:]:
- response += self._processElement(e, sessionID)
- try:
- response = response.strip()
- words = string.split(response, " ", 1)
- words[0] = string.capitalize(words[0])
- response = string.join(words)
- return response
- except IndexError: # response was empty
- return ""
-
- # <set>
- def _processSet(self, elem, sessionID):
- """Process a <set> AIML element.
-
- Required element attributes:
- name: The name of the predicate to set.
-
- <set> elements process their contents recursively, and assign the results to a predicate
- (given by their 'name' attribute) in the current session. The contents of the element
- are also returned.
-
- """
- value = ""
- for e in elem[2:]:
- value += self._processElement(e, sessionID)
- self.setPredicate(elem[1]['name'], value, sessionID)
- return value
-
- # <size>
- def _processSize(self,elem, sessionID):
- """Process a <size> AIML element.
-
- <size> elements return the number of AIML categories currently
- in the bot's brain.
-
- """
- return str(self.numCategories())
-
- # <sr>
- def _processSr(self,elem,sessionID):
- """Process an <sr> AIML element.
-
- <sr> elements are shortcuts for <srai><star/></srai>.
-
- """
- star = self._processElement(['star',{}], sessionID)
- response = self._respond(star, sessionID)
- return response
-
- # <srai>
- def _processSrai(self,elem, sessionID):
- """Process a <srai> AIML element.
-
- <srai> elements recursively process their contents, and then
- pass the results right back into the AIML interpreter as a new
- piece of input. The results of this new input string are
- returned.
-
- """
- newInput = ""
- for e in elem[2:]:
- newInput += self._processElement(e, sessionID)
- return self._respond(newInput, sessionID)
-
- # <star>
- def _processStar(self, elem, sessionID):
- """Process a <star> AIML element.
-
- Optional attribute elements:
- index: Which "*" character in the current pattern should
- be matched?
-
- <star> elements return the text fragment matched by the "*"
- character in the current input pattern. For example, if the
- input "Hello Tom Smith, how are you?" matched the pattern
- "HELLO * HOW ARE YOU", then a <star> element in the template
- would evaluate to "Tom Smith".
-
- """
- try: index = int(elem[1]['index'])
- except KeyError: index = 1
- # fetch the user's last input
- inputStack = self.getPredicate(self._inputStack, sessionID)
- input = self._subbers['normal'].sub(inputStack[-1])
- # fetch the Kernel's last response (for 'that' context)
- outputHistory = self.getPredicate(self._outputHistory, sessionID)
- try: that = self._subbers['normal'].sub(outputHistory[-1])
- except: that = "" # there might not be any output yet
- topic = self.getPredicate("topic", sessionID)
- response = self._brain.star("star", input, that, topic, index)
- return response
-
- # <system>
- def _processSystem(self,elem, sessionID):
- """Process a <system> AIML element.
-
- <system> elements process their contents recursively, and then
- attempt to execute the results as a shell command on the
- server. The AIML interpreter blocks until the command is
- complete, and then returns the command's output.
-
- For cross-platform compatibility, any file paths inside
- <system> tags should use Unix-style forward slashes ("/") as a
- directory separator.
-
- """
- # build up the command string
- command = ""
- for e in elem[2:]:
- command += self._processElement(e, sessionID)
-
- # normalize the path to the command. Under Windows, this
- # switches forward-slashes to back-slashes; all system
- # elements should use unix-style paths for cross-platform
- # compatibility.
- #executable,args = command.split(" ", 1)
- #executable = os.path.normpath(executable)
- #command = executable + " " + args
- command = os.path.normpath(command)
-
- # execute the command.
- response = ""
- try:
- out = os.popen(command)
- except RuntimeError, msg:
- if self._verboseMode:
- err = "WARNING: RuntimeError while processing \"system\" element:\n%s\n" % msg.encode(self._textEncoding, 'replace')
- sys.stderr.write(err)
- return "There was an error while computing my response. Please inform my botmaster."
- for line in out:
- response += line + "\n"
- response = string.join(response.splitlines()).strip()
- return response
-
- # <template>
- def _processTemplate(self,elem, sessionID):
- """Process a <template> AIML element.
-
- <template> elements recursively process their contents, and
- return the results. <template> is the root node of any AIML
- response tree.
-
- """
- response = ""
- for e in elem[2:]:
- response += self._processElement(e, sessionID)
- return response
-
- # text
- def _processText(self,elem, sessionID):
- """Process a raw text element.
-
- Raw text elements aren't really AIML tags. Text elements cannot contain
- other elements; instead, the third item of the 'elem' list is a text
- string, which is immediately returned. They have a single attribute,
- automatically inserted by the parser, which indicates whether whitespace
- in the text should be preserved or not.
-
- """
- try: elem[2] + ""
- except TypeError: raise TypeError, "Text element contents are not text"
-
- # If the the whitespace behavior for this element is "default",
- # we reduce all stretches of >1 whitespace characters to a single
- # space. To improve performance, we do this only once for each
- # text element encountered, and save the results for the future.
- if elem[1]["xml:space"] == "default":
- elem[2] = re.sub("\s+", " ", elem[2])
- elem[1]["xml:space"] = "preserve"
- return elem[2]
-
- # <that>
- def _processThat(self,elem, sessionID):
- """Process a <that> AIML element.
-
- Optional element attributes:
- index: Specifies which element from the output history to
- return. 1 is the most recent response, 2 is the next most
- recent, and so on.
-
- <that> elements (when they appear inside <template> elements)
- are the output equivilant of <input> elements; they return one
- of the Kernel's previous responses.
-
- """
- outputHistory = self.getPredicate(self._outputHistory, sessionID)
- index = 1
- try:
- # According to the AIML spec, the optional index attribute
- # can either have the form "x" or "x,y". x refers to how
- # far back in the output history to go. y refers to which
- # sentence of the specified response to return.
- index = int(elem[1]['index'].split(',')[0])
- except:
- pass
- try: return outputHistory[-index]
- except IndexError:
- if self._verboseMode:
- err = "No such index %d while processing <that> element.\n" % index
- sys.stderr.write(err)
- return ""
-
- # <thatstar>
- def _processThatstar(self, elem, sessionID):
- """Process a <thatstar> AIML element.
-
- Optional element attributes:
- index: Specifies which "*" in the <that> pattern to match.
-
- <thatstar> elements are similar to <star> elements, except
- that where <star/> returns the portion of the input string
- matched by a "*" character in the pattern, <thatstar/> returns
- the portion of the previous input string that was matched by a
- "*" in the current category's <that> pattern.
-
- """
- try: index = int(elem[1]['index'])
- except KeyError: index = 1
- # fetch the user's last input
- inputStack = self.getPredicate(self._inputStack, sessionID)
- input = self._subbers['normal'].sub(inputStack[-1])
- # fetch the Kernel's last response (for 'that' context)
- outputHistory = self.getPredicate(self._outputHistory, sessionID)
- try: that = self._subbers['normal'].sub(outputHistory[-1])
- except: that = "" # there might not be any output yet
- topic = self.getPredicate("topic", sessionID)
- response = self._brain.star("thatstar", input, that, topic, index)
- return response
-
- # <think>
- def _processThink(self,elem, sessionID):
- """Process a <think> AIML element.
-
- <think> elements process their contents recursively, and then
- discard the results and return the empty string. They're
- useful for setting predicates and learning AIML files without
- generating any output.
-
- """
- for e in elem[2:]:
- self._processElement(e, sessionID)
- return ""
-
- # <topicstar>
- def _processTopicstar(self, elem, sessionID):
- """Process a <topicstar> AIML element.
-
- Optional element attributes:
- index: Specifies which "*" in the <topic> pattern to match.
-
- <topicstar> elements are similar to <star> elements, except
- that where <star/> returns the portion of the input string
- matched by a "*" character in the pattern, <topicstar/>
- returns the portion of current topic string that was matched
- by a "*" in the current category's <topic> pattern.
-
- """
- try: index = int(elem[1]['index'])
- except KeyError: index = 1
- # fetch the user's last input
- inputStack = self.getPredicate(self._inputStack, sessionID)
- input = self._subbers['normal'].sub(inputStack[-1])
- # fetch the Kernel's last response (for 'that' context)
- outputHistory = self.getPredicate(self._outputHistory, sessionID)
- try: that = self._subbers['normal'].sub(outputHistory[-1])
- except: that = "" # there might not be any output yet
- topic = self.getPredicate("topic", sessionID)
- response = self._brain.star("topicstar", input, that, topic, index)
- return response
-
- # <uppercase>
- def _processUppercase(self,elem, sessionID):
- """Process an <uppercase> AIML element.
-
- <uppercase> elements process their contents recursively, and
- return the results with all lower-case characters converted to
- upper-case.
-
- """
- response = ""
- for e in elem[2:]:
- response += self._processElement(e, sessionID)
- return string.upper(response)
-
- # <version>
- def _processVersion(self,elem, sessionID):
- """Process a <version> AIML element.
-
- <version> elements return the version number of the AIML
- interpreter.
-
- """
- return self.version()
-
-
-##################################################
-### Self-test functions follow ###
-##################################################
-def _testTag(kern, tag, input, outputList):
- """Tests 'tag' by feeding the Kernel 'input'. If the result
- matches any of the strings in 'outputList', the test passes.
-
- """
- global _numTests, _numPassed
- _numTests += 1
- print "Testing <" + tag + ">:",
- response = kern.respond(input).decode(kern._textEncoding)
- if response in outputList:
- print "PASSED"
- _numPassed += 1
- return True
- else:
- print "FAILED (response: '%s')" % response.encode(kern._textEncoding, 'replace')
- return False
-
-if __name__ == "__main__":
- # Run some self-tests
- k = Kernel()
- k.bootstrap(learnFiles="self-test.aiml")
-
- global _numTests, _numPassed
- _numTests = 0
- _numPassed = 0
-
- _testTag(k, 'bot', 'test bot', ["My name is Nameless"])
-
- k.setPredicate('gender', 'male')
- _testTag(k, 'condition test #1', 'test condition name value', ['You are handsome'])
- k.setPredicate('gender', 'female')
- _testTag(k, 'condition test #2', 'test condition name value', [''])
- _testTag(k, 'condition test #3', 'test condition name', ['You are beautiful'])
- k.setPredicate('gender', 'robot')
- _testTag(k, 'condition test #4', 'test condition name', ['You are genderless'])
- _testTag(k, 'condition test #5', 'test condition', ['You are genderless'])
- k.setPredicate('gender', 'male')
- _testTag(k, 'condition test #6', 'test condition', ['You are handsome'])
-
- # the date test will occasionally fail if the original and "test"
- # times cross a second boundary. There's no good way to avoid
- # this problem and still do a meaningful test, so we simply
- # provide a friendly message to be printed if the test fails.
- date_warning = """
- NOTE: the <date> test will occasionally report failure even if it
- succeeds. So long as the response looks like a date/time string,
- there's nothing to worry about.
- """
- if not _testTag(k, 'date', 'test date', ["The date is %s" % time.asctime()]):
- print date_warning
-
- _testTag(k, 'formal', 'test formal', ["Formal Test Passed"])
- _testTag(k, 'gender', 'test gender', ["He'd told her he heard that her hernia is history"])
- _testTag(k, 'get/set', 'test get and set', ["I like cheese. My favorite food is cheese"])
- _testTag(k, 'gossip', 'test gossip', ["Gossip is not yet implemented"])
- _testTag(k, 'id', 'test id', ["Your id is _global"])
- _testTag(k, 'input', 'test input', ['You just said: test input'])
- _testTag(k, 'javascript', 'test javascript', ["Javascript is not yet implemented"])
- _testTag(k, 'lowercase', 'test lowercase', ["The Last Word Should Be lowercase"])
- _testTag(k, 'person', 'test person', ['HE think i knows that my actions threaten him and his.'])
- _testTag(k, 'person2', 'test person2', ['YOU think me know that my actions threaten you and yours.'])
- _testTag(k, 'person2 (no contents)', 'test person2 I Love Lucy', ['YOU Love Lucy'])
- _testTag(k, 'random', 'test random', ["response #1", "response #2", "response #3"])
- _testTag(k, 'random empty', 'test random empty', ["Nothing here!"])
- _testTag(k, 'sentence', "test sentence", ["My first letter should be capitalized."])
- _testTag(k, 'size', "test size", ["I've learned %d categories" % k.numCategories()])
- _testTag(k, 'sr', "test sr test srai", ["srai results: srai test passed"])
- _testTag(k, 'sr nested', "test nested sr test srai", ["srai results: srai test passed"])
- _testTag(k, 'srai', "test srai", ["srai test passed"])
- _testTag(k, 'srai infinite', "test srai infinite", [""])
- _testTag(k, 'star test #1', 'You should test star begin', ['Begin star matched: You should'])
- _testTag(k, 'star test #2', 'test star creamy goodness middle', ['Middle star matched: creamy goodness'])
- _testTag(k, 'star test #3', 'test star end the credits roll', ['End star matched: the credits roll'])
- _testTag(k, 'star test #4', 'test star having multiple stars in a pattern makes me extremely happy',
- ['Multiple stars matched: having, stars in a pattern, extremely happy'])
- _testTag(k, 'system', "test system", ["The system says hello!"])
- _testTag(k, 'that test #1', "test that", ["I just said: The system says hello!"])
- _testTag(k, 'that test #2', "test that", ["I have already answered this question"])
- _testTag(k, 'thatstar test #1', "test thatstar", ["I say beans"])
- _testTag(k, 'thatstar test #2', "test thatstar", ["I just said \"beans\""])
- _testTag(k, 'thatstar test #3', "test thatstar multiple", ['I say beans and franks for everybody'])
- _testTag(k, 'thatstar test #4', "test thatstar multiple", ['Yes, beans and franks for all!'])
- _testTag(k, 'think', "test think", [""])
- k.setPredicate("topic", "fruit")
- _testTag(k, 'topic', "test topic", ["We were discussing apples and oranges"])
- k.setPredicate("topic", "Soylent Green")
- _testTag(k, 'topicstar test #1', 'test topicstar', ["Solyent Green is made of people!"])
- k.setPredicate("topic", "Soylent Ham and Cheese")
- _testTag(k, 'topicstar test #2', 'test topicstar multiple', ["Both Soylents Ham and Cheese are made of people!"])
- _testTag(k, 'unicode support', u"ΤΗΙΟΊΓ", [u"Hey, you speak Chinese! ΤΗΙΟΊΓ"])
- _testTag(k, 'uppercase', 'test uppercase', ["The Last Word Should Be UPPERCASE"])
- _testTag(k, 'version', 'test version', ["PyAIML is version %s" % k.version()])
- _testTag(k, 'whitespace preservation', 'test whitespace', ["Extra Spaces\n Rule! (but not in here!) But Here They Do!"])
-
- # Report test results
- print "--------------------"
- if _numTests == _numPassed:
- print "%d of %d tests passed!" % (_numPassed, _numTests)
- else:
- print "%d of %d tests passed (see above for detailed errors)" % (_numPassed, _numTests)
-
- # Run an interactive interpreter
- #print "\nEntering interactive mode (ctrl-c to exit)"
- #while True: print k.respond(raw_input("> "))