added album, api, artist, filecache, tag and xml2dict module.

This commit is contained in:
Abhinav Sarkar 2008-07-03 19:03:41 +00:00
parent a481228895
commit 63470dc254
7 changed files with 636 additions and 0 deletions

128
lastfm/album.py Normal file
View File

@ -0,0 +1,128 @@
#!/usr/bin/env python
__author__ = "Abhinav Sarkar"
__version__ = "0.1"
__license__ = "GNU Lesser General Public License"
class Album(object):
"""A class representing an album."""
def __init__(self,
api,
name = None,
artist = None,
id = None,
mbid = None,
url = None,
releaseDate = None,
image = None,
listeners = None,
playcount = None,
topTags = None):
self.__api = api
self.__name = name
self.__artist = artist
self.__id = id
self.__mbid = mbid
self.__url = url
self.__releaseDate = releaseDate
self.__image = image
self.__listeners = listeners
self.__playcount = playcount
self.__topTags = topTags
def getName(self):
return self.__name
def getArtist(self):
return self.__artist
def getId(self):
return self.__id
def getMbid(self):
return self.__mbid
def getUrl(self):
return self.__url
def getReleaseDate(self):
return self.__releaseDate
def getImage(self):
return self.__image
def getListeners(self):
return self.__listeners
def getPlaycount(self):
return self.__playcount
def getTopTags(self):
return self.__topTags
name = property(getName, None, None, "Name's Docstring")
artist = property(getArtist, None, None, "Artist's Docstring")
id = property(getId, None, None, "Id's Docstring")
mbid = property(getMbid, None, None, "Mbid's Docstring")
url = property(getUrl, None, None, "Url's Docstring")
releaseDate = property(getReleaseDate, None, None, "ReleaseDate's Docstring")
image = property(getImage, None, None, "Image's Docstring")
listeners = property(getListeners, None, None, "Listeners's Docstring")
playcount = property(getPlaycount, None, None, "Playcount's Docstring")
topTags = property(getTopTags, None, None, "TopTags's Docstring")
@staticmethod
def getInfo(api,
artist = None,
album = None,
mbid = None):
apiKey = api.getApiKey()
params = {'method': 'album.getinfo', 'api_key': apiKey}
if not ((artist and album) or mbid):
raise LastfmError("either (artist and album) or mbid has to be given as argument.")
if artist and album:
params.update({'artist': artist, 'album': album})
elif mbid:
params.update({'mbid': mbid})
xml = api.fetchUrl(Api.API_ROOT_URL, params)
data = Xml2Dict(ElementTree.XML(xml))
if data['@status'] != "ok":
raise LastfmError("Error code: %s (%s)" % (data['error']['@code'], data['error']['text']))
return Album(
api,
name = data['album']['name'],
artist = Artist(
api,
name = data['album']['artist']
),
id = int(data['album']['id']),
mbid = data['album']['mbid'],
url = data['album']['url'],
releaseDate = data['album']['releasedate'] and
datetime(*(time.strptime(data['album']['releasedate'], '%d %b %Y, 00:00')[0:6])),
image = dict([(i['@size'],i['text']) for i in data['album']['image']]),
listeners = int(data['album']['listeners']),
playcount = int(data['album']['playcount']),
topTags = [Tag(api, name = t['name'], url = t['url']) for t in data['album']['toptags']['tag']]
)
import cElementTree as ElementTree
from datetime import datetime
import time
from xml2dict import Xml2Dict
from error import LastfmError
from api import Api
from tag import Tag
from artist import Artist

194
lastfm/api.py Normal file
View File

@ -0,0 +1,194 @@
#!/usr/bin/env python
__author__ = "Abhinav Sarkar"
__version__ = "0.1"
__license__ = "GNU Lesser General Public License"
class Api(object):
"""The class representing the last.fm web services API."""
DEFAULT_CACHE_TIMEOUT = 3600 # cache for 1 hour
API_ROOT_URL = "http://ws.audioscrobbler.com/2.0/"
def __init__(self,
apiKey = '23caa86333d2cb2055fa82129802780a',
input_encoding=None,
request_headers=None):
self.__apiKey = apiKey
self._cache = FileCache()
self._urllib = urllib2
self._cache_timeout = Api.DEFAULT_CACHE_TIMEOUT
self._InitializeRequestHeaders(request_headers)
self._InitializeUserAgent()
self._input_encoding = input_encoding
def getApiKey(self):
return self.__apiKey
def setCache(self, cache):
'''Override the default cache. Set to None to prevent caching.
Args:
cache: an instance that supports the same API as the audioscrobblerws.FileCache
'''
self._cache = cache
def setUrllib(self, urllib):
'''Override the default urllib implementation.
Args:
urllib: an instance that supports the same API as the urllib2 module
'''
self._urllib = urllib
def setCacheTimeout(self, cache_timeout):
'''Override the default cache timeout.
Args:
cache_timeout: time, in seconds, that responses should be reused.
'''
self._cache_timeout = cache_timeout
def setUserAgent(self, user_agent):
'''Override the default user agent
Args:
user_agent: a string that should be send to the server as the User-agent
'''
self._request_headers['User-Agent'] = user_agent
def _BuildUrl(self, url, path_elements=None, extra_params=None):
# Break url into consituent parts
(scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url)
path = path.replace(' ', '+')
# Add any additional path elements to the path
if path_elements:
# Filter out the path elements that have a value of None
p = [i for i in path_elements if i]
if not path.endswith('/'):
path += '/'
path += '/'.join(p)
# Add any additional query parameters to the query string
if extra_params and len(extra_params) > 0:
extra_query = self._EncodeParameters(extra_params)
# Add it to the existing query
if query:
query += '&' + extra_query
else:
query = extra_query
# Return the rebuilt URL
return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
def _InitializeRequestHeaders(self, request_headers):
if request_headers:
self._request_headers = request_headers
else:
self._request_headers = {}
def _InitializeUserAgent(self):
user_agent = 'Python-urllib/%s (python-audioscrobblerws/%s)' % \
(self._urllib.__version__, __version__)
self.setUserAgent(user_agent)
def _GetOpener(self, url):
opener = self._urllib.build_opener()
opener.addheaders = self._request_headers.items()
return opener
def _Encode(self, s):
if self._input_encoding:
return unicode(s, self._input_encoding).encode('utf-8')
else:
return unicode(s).encode('utf-8')
def _EncodeParameters(self, parameters):
'''Return a string in key=value&key=value form
Values of None are not included in the output string.
Args:
parameters:
A dict of (key, value) tuples, where value is encoded as
specified by self._encoding
Returns:
A URL-encoded string in "key=value&key=value" form
'''
if parameters is None:
return None
else:
return urllib.urlencode(dict([(k, self._Encode(v)) for k, v in parameters.items() if v is not None]))
def getAlbum(self,
artist = None,
album = None,
mbid = None):
return Album.getInfo(self, artist, album, mbid)
def fetchUrl(self,
url,
parameters = None,
no_cache = False):
'''Fetch a URL, optionally caching for a specified time.
Args:
url: The URL to retrieve
parameters: A dict of key/value pairs that should added to
the query string. [OPTIONAL]
no_cache: If true, overrides the cache on the current request
Returns:
A string containing the body of the response.
'''
# Add key/value parameters to the query string of the url
url = self._BuildUrl(url, extra_params=parameters)
# Get a url opener that can handle basic auth
opener = self._GetOpener(url)
# Open and return the URL immediately if we're not going to cache
if no_cache or not self._cache or not self._cache_timeout:
try:
url_data = opener.open(url).read()
except urllib2.HTTPError, e:
url_data = e.read()
else:
# Unique keys are a combination of the url and the username
key = url.encode('utf-8')
# See if it has been cached before
last_cached = self._cache.GetCachedTime(key)
# If the cached version is outdated then fetch another and store it
if not last_cached or time.time() >= last_cached + self._cache_timeout:
try:
url_data = opener.open(url).read()
except urllib2.HTTPError, e:
url_data = e.read()
self._cache.Set(key, url_data)
else:
url_data = self._cache.Get(key)
# Always return the latest version
return url_data
import urllib
import urllib2
import urlparse
import cElementTree as ElementTree
import time
from datetime import datetime
from album import Album
from artist import Artist
from geo import Geo
from event import Event
from filecache import FileCache
#from group import Group
#from tag import Tag
#from track import Track
#from user import User
from xml2dict import Xml2Dict
from error import LastfmError

136
lastfm/artist.py Normal file
View File

@ -0,0 +1,136 @@
#!/usr/bin/env python
__author__ = "Abhinav Sarkar"
__version__ = "0.1"
__license__ = "GNU Lesser General Public License"
class Artist(object):
"""A class representing an artist."""
def __init__(self,
api,
name = None,
mbid = None,
url = None,
image = None,
streamable = None,
stats = None,
similar = None,
tags = None,
bio = None):
self.__api = api
self.__name = name
self.__mbid = mbid
self.___url = url
self.__image = image
self.__streamable = streamable
self.__stats = stats
self.__similar = similar
self.__tags = tags
self.__bio = bio
def getName(self):
return self.__name
def getMbid(self):
return self.__mbid
def getImage(self):
return self.__image
def getStreamable(self):
return self.__streamable
def getStats(self):
return self.__stats
def getSimilar(self):
if self.__similar:
return self.__similar
else:
pass
def getTags(self):
return self.__tags
def getBio(self):
return self.__bio
name = property(getName, None, None, "Name's Docstring")
mbid = property(getMbid, None, None, "Mbid's Docstring")
image = property(getImage, None, None, "Image's Docstring")
streamable = property(getStreamable, None, None, "Streamable's Docstring")
stats = property(getStats, None, None, "Stats's Docstring")
similar = property(getSimilar, None, None, "Similar's Docstring")
tags = property(getTags, None, None, "Tags's Docstring")
bio = property(getBio, None, None, "Bio's Docstring")
@staticmethod
def getInfo(api,
artist = None,
mbid = None):
pass
class Stats(object):
"""A class representing the stats of an artist."""
def __init__(self,
artist,
listeners = None,
plays = None):
self.__artist = artist
self.__listeners = listeners
self.__plays = plays
def getArtist(self):
return self.__artist
def getListeners(self):
return self.__listeners
def getPlays(self):
return self.__plays
listeners = property(getListeners, None, None, "Listeners's Docstring")
plays = property(getPlays, None, None, "Plays's Docstring")
artist = property(getArtist, None, None, "Artist's Docstring")
class Bio(object):
"""A class representing the biography of an artist."""
def __init__(self,
artist,
published = None,
summary = None,
content = None):
self.__artist = artist
self.__published = published
self.__summary = summary
self.__content = content
def getArtist(self):
return self.__artist
def getPublished(self):
return self.__published
def getSummary(self):
return self.__summary
def getContent(self):
return self.__content
published = property(getPublished, None, None, "Published's Docstring")
summary = property(getSummary, None, None, "Summary's Docstring")
content = property(getContent, None, None, "Content's Docstring")
artist = property(getArtist, None, None, "Artist's Docstring")

8
lastfm/error.py Normal file
View File

@ -0,0 +1,8 @@
#!/usr/bin/env python
__author__ = "Abhinav Sarkar"
__version__ = "0.1"
__license__ = "GNU Lesser General Public License"
class LastfmError(Exception):
"""Base class for Lastfm errors"""

92
lastfm/filecache.py Normal file
View File

@ -0,0 +1,92 @@
#!/usr/bin/env python
__author__ = "Abhinav Sarkar"
__version__ = "0.1"
__license__ = "GNU Lesser General Public License"
import tempfile
import os
import md5
class _FileCacheError(Exception):
'''Base exception class for FileCache related errors'''
class FileCache(object):
DEPTH = 3
def __init__(self,root_directory=None):
self._InitializeRootDirectory(root_directory)
def Get(self,key):
path = self._GetPath(key)
if os.path.exists(path):
return open(path).read()
else:
return None
def Set(self,key,data):
path = self._GetPath(key)
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
if not os.path.isdir(directory):
raise _FileCacheError('%s exists but is not a directory' % directory)
temp_fd, temp_path = tempfile.mkstemp()
temp_fp = os.fdopen(temp_fd, 'w')
temp_fp.write(data)
temp_fp.close()
if not path.startswith(self._root_directory):
raise _FileCacheError('%s does not appear to live under %s' %
(path, self._root_directory))
if os.path.exists(path):
os.remove(path)
os.rename(temp_path, path)
def Remove(self,key):
path = self._GetPath(key)
if not path.startswith(self._root_directory):
raise _FileCacheError('%s does not appear to live under %s' %
(path, self._root_directory ))
if os.path.exists(path):
os.remove(path)
def GetCachedTime(self,key):
path = self._GetPath(key)
if os.path.exists(path):
return os.path.getmtime(path)
else:
return None
def _GetUsername(self):
'''Attempt to find the username in a cross-platform fashion.'''
return os.getenv('USER') or \
os.getenv('LOGNAME') or \
os.getenv('USERNAME') or \
os.getlogin() or \
'nobody'
def _GetTmpCachePath(self):
username = self._GetUsername()
cache_directory = 'python.cache_' + username
return os.path.join(tempfile.gettempdir(), cache_directory)
def _InitializeRootDirectory(self, root_directory):
if not root_directory:
root_directory = self._GetTmpCachePath()
root_directory = os.path.abspath(root_directory)
if not os.path.exists(root_directory):
os.mkdir(root_directory)
if not os.path.isdir(root_directory):
raise _FileCacheError('%s exists but is not a directory' %
root_directory)
self._root_directory = root_directory
def _GetPath(self,key):
hashed_key = md5.new(key).hexdigest()
return os.path.join(self._root_directory,
self._GetPrefix(hashed_key),
hashed_key)
def _GetPrefix(self,hashed_key):
return os.path.sep.join(hashed_key[0:FileCache.DEPTH])

26
lastfm/tag.py Normal file
View File

@ -0,0 +1,26 @@
#!/usr/bin/env python
__author__ = "Abhinav Sarkar"
__version__ = "0.1"
__license__ = "GNU Lesser General Public License"
class Tag(object):
""""A class representing a tag."""
def __init__(self,
api,
name = None,
url = None):
self.__api = api
self.__name = name
self.__url = url
def getName(self):
return self.__name
def getUrl(self):
return self.__url
name = property(getName, None, None, "Name's Docstring")
url = property(getUrl, None, None, "Url's Docstring")

52
lastfm/xml2dict.py Normal file
View File

@ -0,0 +1,52 @@
#!/usr/bin/env python
__author__ = "Abhinav Sarkar"
__version__ = "0.1"
__license__ = "GNU Lesser General Public License"
import types
class Xml2Dict (dict):
def __init__(self, parent):
self.update(dict([('@' + item[0], item[1]) for item in parent.items()]))
for child in parent:
# print child.tag
if child:
if not child.tag in self:
self[child.tag] = Xml2Dict(child)
else:
if type(self[child.tag]) != types.ListType:
self[child.tag] = [self[child.tag]]
self[child.tag].append(Xml2Dict(child))
if not child and child.items():
if not child.tag in self:
self[child.tag] = dict([('@' + item[0], item[1]) for item in child.items()])
else:
if type(self[child.tag]) != types.ListType:
self[child.tag] = [self[child.tag]]
self[child.tag].append(
dict([('@' + item[0], item[1]) for item in child.items()])
)
if child.text:
if not child.tag in self:
self[child.tag] = child.text.strip() or None
elif child.text.strip():
if type(self[child.tag]) != types.ListType:
flag = False
for e in self[child.tag].keys():
if e.startswith('@'):
flag = True
if not flag:
self[child.tag] = [self[child.tag]]
self[child.tag][0].update({'text': child.text.strip()})
else:
self[child.tag].update({'text': child.text.strip()})
else:
self[child.tag][-1].update({'text': child.text.strip()})
if not child.tag in self:
self[child.tag] = None