# -*-python-*- __package__ = "s60_twitteradio.py" __version__ = "1.0" __author__ = "Aaron Straup Cope" __url__ = "http://www.aaronland.info/python/twitteradio/" __cvsversion__ = "$Revision: 1.10 $" __date__ = "$Date: 2007/04/01 05:54:14 $" __copyright__ = "Copyright (c) 2006-2007 Aaron Straup Cope. Perl Artistic License." # # extlib : s60-simplejson.py # from __future__ import generators import sre_parse, sre_compile, sre_constants from sre_constants import BRANCH, SUBPATTERN from sre import VERBOSE, MULTILINE, DOTALL import re FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL WHITESPACE = re.compile(r'\s*', FLAGS) def pattern(pattern, flags=FLAGS): def decorator(fn): fn.pattern = pattern fn.regex = re.compile(pattern, flags) return fn return decorator def enumerate (list) : lot = [] for i in list : lot.append((len(lot), i)) return lot class Scanner(object): def __init__(self, lexicon, flags=FLAGS): self.actions = [None] s = sre_parse.Pattern() s.flags = flags p = [] for idx, token in enumerate(lexicon): phrase = token.pattern try: subpattern = sre_parse.SubPattern(s, [(SUBPATTERN, (idx + 1, sre_parse.parse(phrase, flags)))]) except sre_constants.error: raise p.append(subpattern) self.actions.append(token) p = sre_parse.SubPattern(s, [(BRANCH, (None, p))]) self.scanner = sre_compile.compile(p) def iterscan(self, string, idx=0, context=None): match = self.scanner.scanner(string, idx).match actions = self.actions lastend = idx end = len(string) while True: m = match() if m is None: break matchbegin, matchend = m.span() if lastend == matchend: break action = actions[m.lastindex] if action is not None: rval, next_pos = action(m, context) if next_pos is not None and next_pos != matchend: matchend = next_pos match = self.scanner.scanner(string, matchend).match yield rval, matchend lastend = matchend def _floatconstants(): import struct import sys _BYTES = '7FF80000000000007FF0000000000000'.decode('hex') nan, inf = struct.unpack('dd', _BYTES) return nan, inf, -inf NaN, PosInf, NegInf = _floatconstants() def linecol(doc, pos): lineno = doc.count('\n', 0, pos) + 1 if lineno == 1: colno = pos else: colno = pos - doc.rindex('\n', 0, pos) return lineno, colno def errmsg(msg, doc, pos, end=None): lineno, colno = linecol(doc, pos) if end is None: return '%s: line %d column %d (char %d)' % (msg, lineno, colno, pos) endlineno, endcolno = linecol(doc, end) return '%s: line %d column %d - line %d column %d (char %d - %d)' % ( msg, lineno, colno, endlineno, endcolno, pos, end) _CONSTANTS = { '-Infinity': NegInf, 'Infinity': PosInf, 'NaN': NaN, 'true': True, 'false': False, 'null': None, } def JSONConstant(match, context, c=_CONSTANTS): return c[match.group(0)], None pattern('(-?Infinity|NaN|true|false|null)')(JSONConstant) def JSONNumber(match, context): match = JSONNumber.regex.match(match.string, *match.span()) integer, frac, exp = match.groups() if frac or exp: res = float(integer + (frac or '') + (exp or '')) else: res = int(integer) return res, None pattern(r'(-?(?:0|[1-9]\d*))(\.\d+)?([eE][-+]?\d+)?')(JSONNumber) STRINGCHUNK = re.compile(r'(.*?)(["\\])', FLAGS) BACKSLASH = { '"': u'"', '\\': u'\\', '/': u'/', 'b': u'\b', 'f': u'\f', 'n': u'\n', 'r': u'\r', 't': u'\t', } DEFAULT_ENCODING = "utf-8" def scanstring(s, end, encoding=None, _b=BACKSLASH, _m=STRINGCHUNK.match): if encoding is None: encoding = DEFAULT_ENCODING chunks = [] _append = chunks.append begin = end - 1 while 1: chunk = _m(s, end) if chunk is None: raise ValueError( errmsg("Unterminated string starting at", s, begin)) end = chunk.end() content, terminator = chunk.groups() if content: if not isinstance(content, unicode): content = unicode(content, encoding) _append(content) if terminator == '"': break try: esc = s[end] except IndexError: raise ValueError( errmsg("Unterminated string starting at", s, begin)) if esc != 'u': try: m = _b[esc] except KeyError: raise ValueError( errmsg("Invalid \\escape: %r" % (esc,), s, end)) end += 1 else: esc = s[end + 1:end + 5] try: m = unichr(int(esc, 16)) if len(esc) != 4 or not esc.isalnum(): raise ValueError except ValueError: raise ValueError(errmsg("Invalid \\uXXXX escape", s, end)) end += 5 _append(m) return u''.join(chunks), end def JSONString(match, context): encoding = getattr(context, 'encoding', None) return scanstring(match.string, match.end(), encoding) pattern(r'"')(JSONString) def JSONObject(match, context, _w=WHITESPACE.match): pairs = {} s = match.string end = _w(s, match.end()).end() nextchar = s[end:end + 1] if nextchar == '}': return pairs, end + 1 if nextchar != '"': raise ValueError(errmsg("Expecting property name", s, end)) end += 1 encoding = getattr(context, 'encoding', None) iterscan = JSONScanner.iterscan while True: key, end = scanstring(s, end, encoding) end = _w(s, end).end() if s[end:end + 1] != ':': raise ValueError(errmsg("Expecting : delimiter", s, end)) end = _w(s, end + 1).end() try: value, end = iterscan(s, idx=end, context=context).next() except StopIteration: raise ValueError(errmsg("Expecting object", s, end)) pairs[key] = value end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar == '}': break if nextchar != ',': raise ValueError(errmsg("Expecting , delimiter", s, end - 1)) end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar != '"': raise ValueError(errmsg("Expecting property name", s, end - 1)) object_hook = getattr(context, 'object_hook', None) if object_hook is not None: pairs = object_hook(pairs) return pairs, end pattern(r'{')(JSONObject) def JSONArray(match, context, _w=WHITESPACE.match): values = [] s = match.string end = _w(s, match.end()).end() nextchar = s[end:end + 1] if nextchar == ']': return values, end + 1 iterscan = JSONScanner.iterscan while True: try: value, end = iterscan(s, idx=end, context=context).next() except StopIteration: raise ValueError(errmsg("Expecting object", s, end)) values.append(value) end = _w(s, end).end() nextchar = s[end:end + 1] end += 1 if nextchar == ']': break if nextchar != ',': raise ValueError(errmsg("Expecting , delimiter", s, end)) end = _w(s, end).end() return values, end pattern(r'\[')(JSONArray) ANYTHING = [ JSONObject, JSONArray, JSONString, JSONConstant, JSONNumber, ] JSONScanner = Scanner(ANYTHING) class JSONDecoder(object): _scanner = Scanner(ANYTHING) __all__ = ['__init__', 'decode', 'raw_decode'] def __init__(self, encoding=None, object_hook=None): self.encoding = encoding self.object_hook = object_hook def decode(self, s, _w=WHITESPACE.match): obj, end = self.raw_decode(s, idx=_w(s, 0).end()) end = _w(s, end).end() if end != len(s): raise ValueError(errmsg("Extra data", s, end, len(s))) return obj def raw_decode(self, s, **kw): kw.setdefault('context', self) try: obj, end = self._scanner.iterscan(s, **kw).next() except StopIteration: raise ValueError("No JSON object could be decoded") return obj, end class simplejson : def __init__ (self) : pass def dump(self, obj, fp, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, **kw): if cls is None: cls = JSONEncoder iterable = cls(skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, **kw).iterencode(obj) for chunk in iterable: fp.write(chunk) def dumps(self, obj, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, **kw): if cls is None: cls = JSONEncoder return cls(skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, **kw).encode(obj) def load(self, fp, encoding=None, cls=None, object_hook=None, **kw): if cls is None: cls = JSONDecoder if object_hook is not None: kw['object_hook'] = object_hook return cls(encoding=encoding, **kw).decode(fp.read()) def loads(self, s, encoding=None, cls=None, object_hook=None, **kw): if cls is None: cls = JSONDecoder if object_hook is not None: kw['object_hook'] = object_hook return cls(encoding=encoding, **kw).decode(s) # # extlib : s60-simplehttp.py # import httplib import base64 class simplehttp : def __init__ (self, creds=None) : self.__auth__ = None if not creds == None : user = creds[0] pswd = creds[1] self.__auth__ = base64.encodestring(user + ":" + pswd) def get (self, host, path, headers={}) : return self.execute_http_request('GET', host, path, headers) def post (self, host, path, params=None, headers={}) : return self.execute_http_request('POST', host, path, params) def execute_http_request(self, meth, host, path, params=None, headers={}) : if self.__auth__ : headers['Authorization'] = "Basic %s" % self.__auth__ c = httplib.HTTPConnection(host) c.request(meth, path, params, headers) r = c.getresponse() return r.read() # # extlib : s60-simplestore.py # import e32 class simplestore : def __init__ (self, path) : self.__path__ = path self.__db__ = None self.open() def open (self) : if e32.in_emulator() : import anydbm self.__db__ = anydbm.open(self.__path__, 'c') else : import e32dbm self.__db__ = e32dbm.open(self.__path__, 'c') def close (self) : self.__db__.close() self.__db__ = None def read (self, key) : key = str(key) if self.__db__.has_key(key) : return self.__db__[key] return None def write (self, key, value) : key = str(key) value = str(value) self.__db__[key] = value self.save() def save (self) : self.close() self.open() # # extlib : s60-simpleapp.py # import e32 import appuifw class simpleapp : def __init__ (self) : self.__log__ = u"" self.__kill__ = False self.__poll__ = False self.__current__ = u"" self.__activity__ = [] if self.use_lock() : self.__lock__ = e32.Ao_lock() appuifw.app.exit_key_handler = self.abort def abort (self) : self.__kill__ = True self.__lock__.signal() def loop (self) : if self.setup() : self.run() if self.use_lock() : self.__lock__.wait() def setup (self) : self.log("I am running") def run (self) : self.poll() def poll (self) : if self.__poll__ : self.error("Already polling") return False self.__poll__ = True self.do_poll() self.__poll__ = False self.sleep() def do_poll (self) : pass def sleep (self, secs=600) : self.record_activity("Sleeping for %s seconds" % secs) if e32.in_emulator() : return time.sleep(secs) self.poll() else : timer = e32.Ao_timer() timer.after(secs, self.poll) self.__timer__ = timer def use_lock (self) : if e32.in_emulator() : return True return False def log (self, msg, reset=False) : if reset : self.__log__ = unicode("") self.__log__ += "%s\n" % msg self.write(unicode(self.__log__)) def write(self, txt) : if e32.in_emulator() : t = appuifw.Text() t.write(txt) appuifw.app.body = t else : appuifw.app.body = appuifw.Text(txt) def announce (self, msg, type='info') : self.note(msg, type, True) def notice (self, msg) : self.note(msg, 'info') def error (self, msg) : self.note(msg, 'error') def note (self, msg, type, gbl=False) : if not e32.in_emulator and gbl : appuifw.note(unicode(msg), type, gbl) else : appuifw.note(unicode(msg), type) def prompt (self, query, type="text") : if e32.in_emulator() : return raw_input("%s : " % query) return appuifw.query(unicode(query), type) def record_activity (self, msg, display=True) : self.__activity__.append(unicode("* %s" % msg)) if display : self.display_activity() def display_activity (self) : msg = "\n".join(self.__activity__) reset = True self.log(msg, reset) def switch_to_app (self) : self.display_app() self.setup_app_menu() def display_app (self) : reset = True self.log(self.__current__, reset) def switch_to_activity (self) : self.display_activity() self.setup_activity_menu() def setup_activity_menu (self) : pass def setup_app_menu (self) : pass def setup_simplestore(self, storename) : storepath = storename if not e32.in_emulator() : import sysinfo if 'E:' in sysinfo.free_drivespace().keys() : storepath = 'e:\\%s' % storename else : storepath = 'c:\\%s' % storename try : self.__store__ = simplestore(storepath) except Exception, e : self.error("Failed to open local datastore, %s" % e) return False return True def vibrate (self) : try : import misty misty.vibrate(500, 100) except Exception, e : return False import e32 import socket if not e32.in_emulator() : import audio class twitteradio (simpleapp) : def __init__ (self) : simpleapp.__init__(self) self.__store__ = None self.__posts__ = [] self.__ptimer__ = None self.__stime__ = None self.__pdelay__ = 90 self.__sdelay__ = 2 self.__running__ = True self.__polling__ = False self.__speaking__ = False # # # def setup (self) : storepath = 'test' if not e32.in_emulator() : import sysinfo if 'E:' in sysinfo.free_drivespace().keys() : storepath = 'e:\\twitteradio' else : storepath = 'c:\\twitteradio' # try : self.__store__ = simplestore(storepath) except Exception, e : self.error("Failed to open local datastore, %s" % e) return False # try : self.setup_access_point() except Exception, e : self.error("Failed to define access point : %s" % e) return False self.setup_menu() self.record_activity("Setup complete") return True # # # def setup_access_point (self) : if e32.in_emulator() : return True apid = socket.select_access_point() apnt = socket.access_point(apid) socket.set_default_access_point(apnt) # # # def reconfigure_access_point (self) : try : self.setup_access_point() except Exception, e : self.error("Failed to define access point : %s" % e) return False # # # def setup_menu(self) : items = self.menu_items() appuifw.app.menu = items # # # def menu_items (self) : items =[] if self.__running__ : items.append((u"Pause", self.pause)) else : items.append((u"Resume", self.resume)) items.append((u"Set delay between polling", self.set_polling_delay)) items.append((u"Set delay between reading", self.set_speaking_delay)) items.append((u"Reconfigure access point", self.reconfigure_access_point)) if e32.in_emulator() : items.append((u"Poll", self.poll)) items.append((u"Speak", self.speak)) return items # # # def pause (self) : self.__running__ = False self.setup_menu() # # # def resume (self) : self.__running__ = True self.setup_menu() self.speak() self.poll() # # # def run (self) : self.poll() # # # def poll(self) : if not self.__running__ : return False # posts = len(self.__posts__) if posts > 0 : sdelay = self.get_speaking_delay() pdelay = self.get_polling_delay() if (posts * sdelay) > pdelay : self.record_activity("%s > %s" % ((posts * sdelay), pdelay)) self.record_activity("Too many pending posts to make it worth polling Twitter right now.") return True # self.record_activity("Polling Twitter for new posts") if self.__polling__ : self.record_activity("Polling already in progress") return True self.__polling__ = True # json = None try : http = simplehttp() json = http.get('twitter.com', '/statuses/public_timeline.json') except Exception, e : self.error("HTTP error, %s" % e) if json : self.record_activity("Parsing Twitter response") self.parse(json) self.__polling__ = False # self.speak() if not e32.in_emulator() : delay = self.get_polling_delay() timer = e32.Ao_timer() timer.after(delay, self.poll) self.__ptimer__ = timer else : self.record_activity("TO DO : use threads or some sort of non-blocking sleep when polling in emulator mode...") # # # def speak (self) : if not self.__running__ : return False if len(self.__posts__) == 0 : return self.poll() if not self.__speaking__ : self.__speaking__ = True txt = self.__posts__.pop(0) self.record_activity(txt) if e32.in_emulator() : import sys import os if sys.platform == 'darwin' : cmd = "/usr/bin/say \"%s\"" % txt try : os.popen(cmd) except Exception, e : self.record_activity("Failed to speak text : %s" % e) else : pass else : try : # Yes, really # PyS60 unicode objects have # no decode method... txt = str(txt) txt = txt.encode("ascii", "ignore") audio.say(txt) except Exception, e : self.record_activity("Failed to speak text : %s" % e) self.__speaking__ = False else : self.record_activity("Already reading a Twitter post") if not e32.in_emulator() : delay = self.get_speaking_delay() timer = e32.Ao_timer() timer.after(delay, self.speak) self.__stimer__ = timer else : self.record_activity("TO DO : use threads or some sort of non-blocking sleep when speaking in emulator mode...") # # # def parse (self, json) : self.record_activity("Parsing JSON message") posts = [] s = simplejson() try : posts = s.loads(json) except Exception, e : self.error("JSON parse error, %s" % e) return False count = len(posts) new = 0 self.record_activity("%s posts to check" % count) for tw in posts : if tw['text'] in self.__posts__ : continue self.__posts__.append(tw['text']) count = len(self.__posts__) self.record_activity("%s pending posts to read" % count) return True # # # def set_polling_delay (self) : n = self.prompt(u"Seconds between polls", "text") if not n or not int(n) : self.error("'%s' is not a valid delay" % n) return False if int(n) < 30 : self.error("Minimum delay is 30 seconds. Why do you want to make Blaine cry?") return False self.__store__.write('pdelay', n) n = self.__store__.read('pdelay') self.record_activity("Set delay to %s seconds" % n) return True # # # def get_polling_delay (self) : n = self.__store__.read('pdelay') if not n or not int(n) : n = self.__pdelay__ return int(n) # # # def set_speaking_delay (self) : n = self.prompt(u"Seconds between reads", "text") if not n or not int(n) : self.error("'%s' is not a valid delay" % n) return False self.__store__.write('sdelay', n) n = self.__store__.read('sdelay') self.record_activity("Set delay to %s seconds" % n) return True # # # def get_speaking_delay (self) : n = self.__store__.read('sdelay') if not n or not int(n) : n = self.__sdelay__ return int(n) if __name__ == "__main__" : app = twitteradio() app.loop()