Source code for pygeons.db

"""Implements low-level database structures and functions.

Expects you to call :func:`connect` before you do anything with the DB.

Expects the database to be initialized.  If it is not, see :mod:`pygeons.initialize`.

By default, the database lives under ``$HOME/.pygeons``.
You can modify this behavior using the ``PYGEONS_HOME`` environment variable.
You can also specify the subdirectory explicitly when you call :func:`connect`.
import collections
import io
import os
import os.path as P
import sqlite3

from typing import (

import marisa_trie  # type: ignore

CONN = None
TRIE = None

ENCODING = 'utf-8'

DEFAULT_SUBDIR = os.environ.get('PYGEONS_HOME', P.expanduser('~/.pygeons'))

Geoname = collections.namedtuple(

CountryInfo = collections.namedtuple(

def _load_country_info() -> List[CountryInfo]:
    assert CONN
    c = CONN.cursor()
    result = [
        for result in c.execute('SELECT * FROM countryinfo')
    return result

[docs]def connect(subdir: str = DEFAULT_SUBDIR) -> None: global CONN global TRIE global COUNTRYINFO if CONN is None: CONN = sqlite3.connect(P.join(subdir, 'db.sqlite3')) if os.environ.get('PYGEONS_ECHO'): CONN.set_trace_callback(print) COUNTRYINFO = _load_country_info() if TRIE is None: TRIE = marisa_trie.RecordTrie(MARISA_FORMAT).load(P.join(subdir, MARISA_FILENAME))
[docs]def select_geonames(subcommand: str, params: Iterable[Any]) -> List[Geoname]: assert CONN c = CONN.cursor() command = 'SELECT * FROM geoname %s' % subcommand result = [Geoname(*r) for r in c.execute(command, params)] c.close() return result
[docs]def select_geonames_ids( ids: Iterable[int], country_code: Optional[str] = None, ) -> List[Geoname]: if not ids: return [] params: List[Any] = list(ids) assert CONN c = CONN.cursor() buf = io.StringIO() buf.write('SELECT * FROM geoname WHERE') buf.write(' geonameid IN (%s)' % ','.join('?' for _ in params)) if country_code: buf.write(' AND country_code = ?') params.append(country_code) buf.write(' ORDER BY population DESC') command = buf.getvalue() result = [Geoname(*r) for r in c.execute(command, params)] c.close() return result
[docs]def select_geonames_name(name: str) -> List[Geoname]: def g(): try: matches = TRIE[name.lower()] except KeyError: pass else: for m in matches: if m[0] in (b'A', b'P'): yield m[2] geoname_ids = set(g()) return select_geonames_ids(geoname_ids)
[docs]def country_info(name: str) -> CountryInfo: """ >>> connect() >>> i = country_info('ru') >>> (, i.population, i.currency_name) ('Russia', 144478050, 'Ruble') """ assert TRIE assert COUNTRYINFO try: ids = {m[2] for m in TRIE[name.lower()]} except KeyError: ids = set() candidates = [ ci for ci in COUNTRYINFO if ci.geonameid in ids or name.lower() in (ci.iso.lower(), ci.iso3.lower()) ] if not candidates: raise ValueError('no such country: %r' % name) elif len(candidates) == 1: return candidates[0] else: raise ValueError('ambiguous country name: %r' % name)
[docs]def get_alternatenames(geonameid: str) -> List[Tuple[str, str]]: assert CONN c = CONN.cursor() command = ( 'SELECT isolanguage, alternate_name FROM alternatename' ' WHERE geonameid = ?' ) def g(): for isolanguage, alternate_name in c.execute(command, (geonameid, )): yield isolanguage, alternate_name return list(g())