# -*-python-*- __package__ = "s60_gos_recorder.py" __version__ = "1.0" __author__ = "Aaron Straup Cope" __url__ = "http://www.aaronland.info/python/gps_recorder/" __cvsversion__ = "$Revision: 1.12 $" __date__ = "$Date: 2007/03/29 05:48:44 $" __copyright__ = "Copyright (c) 2006-2007 Aaron Straup Cope. Perl Artistic License." # # extlib : s60-simpleapp.py # import e32 import appuifw class simpleapp : def __init__ (self) : self.__log__ = u"" self.__kill__ = False self.__poll__ = False self.__current__ = u"" self.__activity__ = [] if self.use_lock() : self.__lock__ = e32.Ao_lock() appuifw.app.exit_key_handler = self.abort def abort (self) : self.__kill__ = True self.__lock__.signal() def loop (self) : if self.setup() : self.run() if self.use_lock() : self.__lock__.wait() def setup (self) : self.log("I am running") def run (self) : self.poll() def poll (self) : if self.__poll__ : self.error("Already polling") return False self.__poll__ = True self.do_poll() self.__poll__ = False self.sleep() def do_poll (self) : pass def sleep (self, secs=600) : self.record_activity("Sleeping for %s seconds" % secs) if e32.in_emulator() : return time.sleep(secs) self.poll() else : timer = e32.Ao_timer() timer.after(secs, self.poll) self.__timer__ = timer def use_lock (self) : if e32.in_emulator() : return True return False def log (self, msg, reset=False) : if reset : self.__log__ = unicode("") self.__log__ += "%s\n" % msg self.write(unicode(self.__log__)) def write(self, txt) : if e32.in_emulator() : t = appuifw.Text() t.write(txt) appuifw.app.body = t else : appuifw.app.body = appuifw.Text(txt) def announce (self, msg, type='info') : self.note(msg, type, True) def notice (self, msg) : self.note(msg, 'info') def error (self, msg) : self.note(msg, 'error') def note (self, msg, type, gbl=False) : if not e32.in_emulator and gbl : appuifw.note(unicode(msg), type, gbl) else : appuifw.note(unicode(msg), type) def prompt (self, query, type="text") : if e32.in_emulator() : return raw_input("%s : " % query) return appuifw.query(unicode(query), type) def record_activity (self, msg, display=True) : self.__activity__.append(unicode("* %s" % msg)) if display : self.display_activity() def display_activity (self) : msg = "\n".join(self.__activity__) reset = True self.log(msg, reset) def switch_to_app (self) : self.display_app() self.setup_app_menu() def display_app (self) : reset = True self.log(self.__current__, reset) def switch_to_activity (self) : self.display_activity() self.setup_activity_menu() def setup_activity_menu (self) : pass def setup_app_menu (self) : pass def setup_simplestore(self, storename) : storepath = storename if not e32.in_emulator() : import sysinfo if 'E:' in sysinfo.free_drivespace().keys() : storepath = 'e:\\%s' % storename else : storepath = 'c:\\%s' % storename try : self.__store__ = simplestore(storepath) except Exception, e : self.error("Failed to open local datastore, %s" % e) return False return True def vibrate (self) : try : import misty misty.vibrate(500, 100) except Exception, e : return False # # extlib : s60-simplestore.py # import e32 class simplestore : def __init__ (self, path) : self.__path__ = path self.__db__ = None self.open() def open (self) : if e32.in_emulator() : import anydbm self.__db__ = anydbm.open(self.__path__, 'c') else : import e32dbm self.__db__ = e32dbm.open(self.__path__, 'c') def close (self) : self.__db__.close() self.__db__ = None def read (self, key) : key = str(key) if self.__db__.has_key(key) : return self.__db__[key] return None def write (self, key, value) : key = str(key) value = str(value) self.__db__[key] = value self.save() def save (self) : self.close() self.open() # # extlib : s60-simplegps.py # import socket class simplegps : def __init__ (self, addr, service) : self.__addr = addr self.__service = service self.__target = (self.__addr, self.__service) self.__sock = None def connect (self) : try : sock = socket.socket(socket.AF_BT, socket.SOCK_STREAM) sock.connect(self.__target) self.__sock = sock except Exception, e : raise Exception("Connect error : %s" % e) def disconnect (self) : if not self.__sock : return self.__sock.close() self.__sock = None def plot (self) : try : ln = self.read() except Exception, e : raise Exception("Read error : %s" % e) try : return self.parse(ln) except Exception, e : raise Exception("Parse error : %s" % e) def read (self, type="$GPRMC") : ln = "" while 1: char = self.__sock.recv(1) if not char: break ln += char if char == "\n": if ln.startswith(type) : break else : ln = '' return ln def parse (self, gps_data) : lat = '' lon = '' if (gps_data.count(",") == 11) : (GPRMC,hhmmss,status,lat,l1,lon,l2,speed_knots,course,ddmmyy,mvd,mdr_cs)=gps_data.split(",") elif (gps_data.count(",") == 12) : (GPRMC,hhmmss,status,lat,l1,lon,l2,speed_knots,course,ddmmyy,mvd,mdr,fix_cs)=gps_data.split(",") else: pass if (status is 'A' and lat is not "" and lon is not ""): lat, lon = self.parse_latlong(lat, l1, lon, l2) dt = self.parse_dt(ddmmyy, hhmmss) return (dt, lat, lon) def parse_dt (self, ddmmyy, hhmmss) : d = (ddmmyy[4:6], ddmmyy[2:4], ddmmyy[0:2]) t = (hhmmss[0:2], hhmmss[2:4], hhmmss[4:6]) d = "-".join(d) t = ":".join(t) return "20%sT%s" % (d, t) def parse_latlong (self, lat, l1, lon, l2): try: latitude = float(lat[0:2]) + (float(lat[2:4] + "." + lat[5:9])/60) longitude = float(lon[0:3]) + (float(lon[3:5] + "." + lon[6:10])/60) if l1 is "S": latitude = -latitude if l2 is "W": longitude = -longitude latitude = float("%.6f" % latitude) longitude = float("%.6f" % longitude) return latitude, longitude except: return (None, None) # # extlib : s60-simplelogger.py # import os import os.path class simplelogger : def __init (self) : self.__queue = [] self.__logroot = '' self.__logfile = '' def set_logroot (self, root) : self.__logroot = root def set_logfile (self, fname) : self.__logfile = fname def log (self, msg) : try : self.log_to_disk(msg) except Exception, e: raise Exception(e) return True def log_to_disk (self, msg) : if not os.path.isdir(self.__logroot) : os.mkdir(self.__logroot) path = "%s\%s" % (self.__logroot, self.__logfile) try : fh = open(path, "a") fh.write(msg) fh.close() except Exception, e : raise Exception("Failed to open %s : %s" % (path, e)) return True class gps_recorder (simpleapp) : def __init__ (self) : simpleapp.__init__(self) self.__cfg = None self.__gps = None self.__log = None self.__poll = True self.__delay = 60 # # # def setup (self) : if not self.setup_cfg() : self.record_activity("Setup failed : config") return False if not self.setup_gps() : self.record_activity("Setup failed : GPS") return False if not self.setup_logger() : self.record_activity("Setup failed : logging") return False self.setup_menu() self.record_activity("Setup complete") return True # # # def setup_cfg (self) : storepath = 'test' if not e32.in_emulator() : import sysinfo if 'E:' in sysinfo.free_drivespace().keys() : storepath = 'e:\\gpsr.cfg' else : storepath = 'c:\\gpsr.cfg' try : self.__cfg = simplestore(storepath) except Exception, e : self.error("Failed to load configs : %s" % e) return False return True # # # def setup_gps (self, force=False) : try : (addr, service) = self.get_device_config(force) self.__gps = simplegps(addr, service) except Exception, e : self.error("Failed to load device glue : %s" % e) return False return True # # # def setup_menu (self) : items = self.menu_items() appuifw.app.menu = items # # # def menu_items (self) : items = [] if self.__poll : items.append((unicode("Pause"), self.pause)) else : items.append((unicode("Resume"), self.resume)) items.append((unicode("Set delay"), self.set_delay)) items.append((unicode("Configure GPS device"), self.configure_gps_device)) items.append((unicode("Transfer GPS logs"), self.transfer_logs)) return items # # # def configure_gps_device (self) : already_paused = self.__poll if not already_paused : self.pause() force = True res = self.setup_gps(force) if not already_paused : poll = False self.resume(poll) if res : self.record_activity("GPS device settings saved") return res # # # def get_device_config (self, force=False) : addr = self.__cfg.read('bt_addr') service = self.__cfg.read('bt_service') if force or not addr or not service : try : addr, services = socket.bt_discover() service = services.values()[0] except Exception, e : raise Exception("Discovery error : %s" % e) try : self.__cfg.write('bt_addr', addr) self.__cfg.write('bt_service', service) except Exception, e : self.error("Failed to record device information : %s" % e) return (addr, int(service)) # # # def define_logroot (self) : storepath = './gpsr' if not e32.in_emulator() : import sysinfo if 'E:' in sysinfo.free_drivespace().keys() : storepath = 'e:\\gpsr' else : storepath = 'c:\\gpsr' return storepath # # # def setup_logger (self) : self.__log = simplelogger() # storepath = self.define_logroot() self.__log.set_logroot(storepath) return True # # # def do_poll (self) : if not self.__poll : return self.record_activity("Polling GPS device") try : self.__gps.connect() except Exception, e : self.error("SNFU : %s" % e) return False try : (dt, lat, lon) = self.__gps.plot() self.store_data(dt, lat, lon) except Exception, e : self.error("Failed to generate location information : %s" % e) self.__gps.disconnect() # # # def store_data (self, dt, lat, lon) : if not lat or not lon : self.record_activity("missing lat/long data, skipping") return msg = "%s\t%s\t%s" % (dt, lat, lon) self.record_activity(msg) ymd = dt[0:10] try : self.__log.set_logfile(ymd) self.__log.log(msg + "\n") except Exception, e : self.error("Log error : %s" % e) return False return True # # # def sleep (self) : delay = self.get_delay() simpleapp.sleep(self, delay) # # # def resume (self, poll=True) : self.__poll = True self.setup_menu() if poll : self.poll() self.record_activity("GPS polling restarted") # # # def pause (self) : self.__poll = False self.setup_menu() self.record_activity("GPS polling paused") # # # def set_delay (self) : delay = self.prompt(u"Seconds between polling", "text") self.__cfg.write('delay', delay) n = self.__cfg.read('delay') self.record_activity("Set delay to %s seconds" % n) # # # def get_delay (self) : n = self.__cfg.read('delay') if not n or not int(n) : n = self.__delay return int(n) # # # def transfer_logs (self) : addr = None service = None try : addr, services = socket.bt_obex_discover() service = services.values()[0] except Exception, e : self.error("Discovery error : %s" % e) return False import os import os.path logroot = self.define_logroot() filelist = os.listdir(logroot) for fname in filelist : path = os.path.join(logroot, fname) self.record_activity("send %s" % path) self.send_log(addr, service, unicode(path)) # # # def send_log (self, addr, service, path) : try : socket.bt_obex_send_file(addr, service, path) except Exception, e : self.error("Transfer error : %s" % e) return False return True # # # if __name__ == "__main__" : gps = gps_recorder() gps.loop()