diff --git a/lastfm/album.py b/lastfm/album.py new file mode 100644 index 0000000..8182d09 --- /dev/null +++ b/lastfm/album.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python + +__author__ = "Abhinav Sarkar" +__version__ = "0.1" +__license__ = "GNU Lesser General Public License" + +class Album(object): + """A class representing an album.""" + def __init__(self, + api, + name = None, + artist = None, + id = None, + mbid = None, + url = None, + releaseDate = None, + image = None, + listeners = None, + playcount = None, + topTags = None): + self.__api = api + self.__name = name + self.__artist = artist + self.__id = id + self.__mbid = mbid + self.__url = url + self.__releaseDate = releaseDate + self.__image = image + self.__listeners = listeners + self.__playcount = playcount + self.__topTags = topTags + + def getName(self): + return self.__name + + def getArtist(self): + return self.__artist + + def getId(self): + return self.__id + + def getMbid(self): + return self.__mbid + + def getUrl(self): + return self.__url + + def getReleaseDate(self): + return self.__releaseDate + + def getImage(self): + return self.__image + + def getListeners(self): + return self.__listeners + + def getPlaycount(self): + return self.__playcount + + def getTopTags(self): + return self.__topTags + + name = property(getName, None, None, "Name's Docstring") + + artist = property(getArtist, None, None, "Artist's Docstring") + + id = property(getId, None, None, "Id's Docstring") + + mbid = property(getMbid, None, None, "Mbid's Docstring") + + url = property(getUrl, None, None, "Url's Docstring") + + releaseDate = property(getReleaseDate, None, None, "ReleaseDate's Docstring") + + image = property(getImage, None, None, "Image's Docstring") + + listeners = property(getListeners, None, None, "Listeners's Docstring") + + playcount = property(getPlaycount, None, None, "Playcount's Docstring") + + topTags = property(getTopTags, None, None, "TopTags's Docstring") + + @staticmethod + def getInfo(api, + artist = None, + album = None, + mbid = None): + apiKey = api.getApiKey() + params = {'method': 'album.getinfo', 'api_key': apiKey} + if not ((artist and album) or mbid): + raise LastfmError("either (artist and album) or mbid has to be given as argument.") + if artist and album: + params.update({'artist': artist, 'album': album}) + elif mbid: + params.update({'mbid': mbid}) + xml = api.fetchUrl(Api.API_ROOT_URL, params) + data = Xml2Dict(ElementTree.XML(xml)) + if data['@status'] != "ok": + raise LastfmError("Error code: %s (%s)" % (data['error']['@code'], data['error']['text'])) + + return Album( + api, + name = data['album']['name'], + artist = Artist( + api, + name = data['album']['artist'] + ), + id = int(data['album']['id']), + mbid = data['album']['mbid'], + url = data['album']['url'], + releaseDate = data['album']['releasedate'] and + datetime(*(time.strptime(data['album']['releasedate'], '%d %b %Y, 00:00')[0:6])), + image = dict([(i['@size'],i['text']) for i in data['album']['image']]), + listeners = int(data['album']['listeners']), + playcount = int(data['album']['playcount']), + topTags = [Tag(api, name = t['name'], url = t['url']) for t in data['album']['toptags']['tag']] + ) + + +import cElementTree as ElementTree +from datetime import datetime +import time + +from xml2dict import Xml2Dict +from error import LastfmError +from api import Api +from tag import Tag +from artist import Artist \ No newline at end of file diff --git a/lastfm/api.py b/lastfm/api.py new file mode 100644 index 0000000..36fae2b --- /dev/null +++ b/lastfm/api.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python + +__author__ = "Abhinav Sarkar" +__version__ = "0.1" +__license__ = "GNU Lesser General Public License" + +class Api(object): + """The class representing the last.fm web services API.""" + + DEFAULT_CACHE_TIMEOUT = 3600 # cache for 1 hour + API_ROOT_URL = "http://ws.audioscrobbler.com/2.0/" + + def __init__(self, + apiKey = '23caa86333d2cb2055fa82129802780a', + input_encoding=None, + request_headers=None): + self.__apiKey = apiKey + self._cache = FileCache() + self._urllib = urllib2 + self._cache_timeout = Api.DEFAULT_CACHE_TIMEOUT + self._InitializeRequestHeaders(request_headers) + self._InitializeUserAgent() + self._input_encoding = input_encoding + + def getApiKey(self): + return self.__apiKey + + def setCache(self, cache): + '''Override the default cache. Set to None to prevent caching. + + Args: + cache: an instance that supports the same API as the audioscrobblerws.FileCache + ''' + self._cache = cache + + def setUrllib(self, urllib): + '''Override the default urllib implementation. + + Args: + urllib: an instance that supports the same API as the urllib2 module + ''' + self._urllib = urllib + + def setCacheTimeout(self, cache_timeout): + '''Override the default cache timeout. + + Args: + cache_timeout: time, in seconds, that responses should be reused. + ''' + self._cache_timeout = cache_timeout + + def setUserAgent(self, user_agent): + '''Override the default user agent + + Args: + user_agent: a string that should be send to the server as the User-agent + ''' + self._request_headers['User-Agent'] = user_agent + + def _BuildUrl(self, url, path_elements=None, extra_params=None): + # Break url into consituent parts + (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url) + path = path.replace(' ', '+') + + # Add any additional path elements to the path + if path_elements: + # Filter out the path elements that have a value of None + p = [i for i in path_elements if i] + if not path.endswith('/'): + path += '/' + path += '/'.join(p) + + # Add any additional query parameters to the query string + if extra_params and len(extra_params) > 0: + extra_query = self._EncodeParameters(extra_params) + # Add it to the existing query + if query: + query += '&' + extra_query + else: + query = extra_query + + # Return the rebuilt URL + return urlparse.urlunparse((scheme, netloc, path, params, query, fragment)) + + def _InitializeRequestHeaders(self, request_headers): + if request_headers: + self._request_headers = request_headers + else: + self._request_headers = {} + + def _InitializeUserAgent(self): + user_agent = 'Python-urllib/%s (python-audioscrobblerws/%s)' % \ + (self._urllib.__version__, __version__) + self.setUserAgent(user_agent) + + def _GetOpener(self, url): + opener = self._urllib.build_opener() + opener.addheaders = self._request_headers.items() + return opener + + def _Encode(self, s): + if self._input_encoding: + return unicode(s, self._input_encoding).encode('utf-8') + else: + return unicode(s).encode('utf-8') + + def _EncodeParameters(self, parameters): + '''Return a string in key=value&key=value form + + Values of None are not included in the output string. + + Args: + parameters: + A dict of (key, value) tuples, where value is encoded as + specified by self._encoding + Returns: + A URL-encoded string in "key=value&key=value" form + ''' + if parameters is None: + return None + else: + return urllib.urlencode(dict([(k, self._Encode(v)) for k, v in parameters.items() if v is not None])) + + def getAlbum(self, + artist = None, + album = None, + mbid = None): + return Album.getInfo(self, artist, album, mbid) + + def fetchUrl(self, + url, + parameters = None, + no_cache = False): + '''Fetch a URL, optionally caching for a specified time. + + Args: + url: The URL to retrieve + parameters: A dict of key/value pairs that should added to + the query string. [OPTIONAL] + no_cache: If true, overrides the cache on the current request + + Returns: + A string containing the body of the response. + ''' + # Add key/value parameters to the query string of the url + url = self._BuildUrl(url, extra_params=parameters) + + # Get a url opener that can handle basic auth + opener = self._GetOpener(url) + + # Open and return the URL immediately if we're not going to cache + if no_cache or not self._cache or not self._cache_timeout: + try: + url_data = opener.open(url).read() + except urllib2.HTTPError, e: + url_data = e.read() + else: + # Unique keys are a combination of the url and the username + key = url.encode('utf-8') + + # See if it has been cached before + last_cached = self._cache.GetCachedTime(key) + + # If the cached version is outdated then fetch another and store it + if not last_cached or time.time() >= last_cached + self._cache_timeout: + try: + url_data = opener.open(url).read() + except urllib2.HTTPError, e: + url_data = e.read() + self._cache.Set(key, url_data) + else: + url_data = self._cache.Get(key) + + # Always return the latest version + return url_data + +import urllib +import urllib2 +import urlparse +import cElementTree as ElementTree +import time +from datetime import datetime + +from album import Album +from artist import Artist +from geo import Geo +from event import Event +from filecache import FileCache +#from group import Group +#from tag import Tag +#from track import Track +#from user import User +from xml2dict import Xml2Dict +from error import LastfmError \ No newline at end of file diff --git a/lastfm/artist.py b/lastfm/artist.py new file mode 100644 index 0000000..6456c7b --- /dev/null +++ b/lastfm/artist.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python + +__author__ = "Abhinav Sarkar" +__version__ = "0.1" +__license__ = "GNU Lesser General Public License" + +class Artist(object): + """A class representing an artist.""" + def __init__(self, + api, + name = None, + mbid = None, + url = None, + image = None, + streamable = None, + stats = None, + similar = None, + tags = None, + bio = None): + self.__api = api + self.__name = name + self.__mbid = mbid + self.___url = url + self.__image = image + self.__streamable = streamable + self.__stats = stats + self.__similar = similar + self.__tags = tags + self.__bio = bio + + def getName(self): + return self.__name + + def getMbid(self): + return self.__mbid + + def getImage(self): + return self.__image + + def getStreamable(self): + return self.__streamable + + def getStats(self): + return self.__stats + + def getSimilar(self): + if self.__similar: + return self.__similar + else: + pass + + def getTags(self): + return self.__tags + + def getBio(self): + return self.__bio + + name = property(getName, None, None, "Name's Docstring") + + mbid = property(getMbid, None, None, "Mbid's Docstring") + + image = property(getImage, None, None, "Image's Docstring") + + streamable = property(getStreamable, None, None, "Streamable's Docstring") + + stats = property(getStats, None, None, "Stats's Docstring") + + similar = property(getSimilar, None, None, "Similar's Docstring") + + tags = property(getTags, None, None, "Tags's Docstring") + + bio = property(getBio, None, None, "Bio's Docstring") + + @staticmethod + def getInfo(api, + artist = None, + mbid = None): + pass + + +class Stats(object): + """A class representing the stats of an artist.""" + def __init__(self, + artist, + listeners = None, + plays = None): + self.__artist = artist + self.__listeners = listeners + self.__plays = plays + + def getArtist(self): + return self.__artist + + def getListeners(self): + return self.__listeners + + def getPlays(self): + return self.__plays + + listeners = property(getListeners, None, None, "Listeners's Docstring") + + plays = property(getPlays, None, None, "Plays's Docstring") + + artist = property(getArtist, None, None, "Artist's Docstring") + +class Bio(object): + """A class representing the biography of an artist.""" + def __init__(self, + artist, + published = None, + summary = None, + content = None): + self.__artist = artist + self.__published = published + self.__summary = summary + self.__content = content + + def getArtist(self): + return self.__artist + + def getPublished(self): + return self.__published + + def getSummary(self): + return self.__summary + + def getContent(self): + return self.__content + + published = property(getPublished, None, None, "Published's Docstring") + + summary = property(getSummary, None, None, "Summary's Docstring") + + content = property(getContent, None, None, "Content's Docstring") + + artist = property(getArtist, None, None, "Artist's Docstring") diff --git a/lastfm/error.py b/lastfm/error.py new file mode 100644 index 0000000..4678a30 --- /dev/null +++ b/lastfm/error.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +__author__ = "Abhinav Sarkar" +__version__ = "0.1" +__license__ = "GNU Lesser General Public License" + +class LastfmError(Exception): + """Base class for Lastfm errors""" \ No newline at end of file diff --git a/lastfm/filecache.py b/lastfm/filecache.py new file mode 100644 index 0000000..38c1f83 --- /dev/null +++ b/lastfm/filecache.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python + +__author__ = "Abhinav Sarkar" +__version__ = "0.1" +__license__ = "GNU Lesser General Public License" + +import tempfile +import os +import md5 + +class _FileCacheError(Exception): + '''Base exception class for FileCache related errors''' + +class FileCache(object): + + DEPTH = 3 + + def __init__(self,root_directory=None): + self._InitializeRootDirectory(root_directory) + + def Get(self,key): + path = self._GetPath(key) + if os.path.exists(path): + return open(path).read() + else: + return None + + def Set(self,key,data): + path = self._GetPath(key) + directory = os.path.dirname(path) + if not os.path.exists(directory): + os.makedirs(directory) + if not os.path.isdir(directory): + raise _FileCacheError('%s exists but is not a directory' % directory) + temp_fd, temp_path = tempfile.mkstemp() + temp_fp = os.fdopen(temp_fd, 'w') + temp_fp.write(data) + temp_fp.close() + if not path.startswith(self._root_directory): + raise _FileCacheError('%s does not appear to live under %s' % + (path, self._root_directory)) + if os.path.exists(path): + os.remove(path) + os.rename(temp_path, path) + + def Remove(self,key): + path = self._GetPath(key) + if not path.startswith(self._root_directory): + raise _FileCacheError('%s does not appear to live under %s' % + (path, self._root_directory )) + if os.path.exists(path): + os.remove(path) + + def GetCachedTime(self,key): + path = self._GetPath(key) + if os.path.exists(path): + return os.path.getmtime(path) + else: + return None + + def _GetUsername(self): + '''Attempt to find the username in a cross-platform fashion.''' + return os.getenv('USER') or \ + os.getenv('LOGNAME') or \ + os.getenv('USERNAME') or \ + os.getlogin() or \ + 'nobody' + + def _GetTmpCachePath(self): + username = self._GetUsername() + cache_directory = 'python.cache_' + username + return os.path.join(tempfile.gettempdir(), cache_directory) + + def _InitializeRootDirectory(self, root_directory): + if not root_directory: + root_directory = self._GetTmpCachePath() + root_directory = os.path.abspath(root_directory) + if not os.path.exists(root_directory): + os.mkdir(root_directory) + if not os.path.isdir(root_directory): + raise _FileCacheError('%s exists but is not a directory' % + root_directory) + self._root_directory = root_directory + + def _GetPath(self,key): + hashed_key = md5.new(key).hexdigest() + return os.path.join(self._root_directory, + self._GetPrefix(hashed_key), + hashed_key) + + def _GetPrefix(self,hashed_key): + return os.path.sep.join(hashed_key[0:FileCache.DEPTH]) diff --git a/lastfm/tag.py b/lastfm/tag.py new file mode 100644 index 0000000..6018da2 --- /dev/null +++ b/lastfm/tag.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python + +__author__ = "Abhinav Sarkar" +__version__ = "0.1" +__license__ = "GNU Lesser General Public License" + +class Tag(object): + """"A class representing a tag.""" + def __init__(self, + api, + name = None, + url = None): + self.__api = api + self.__name = name + self.__url = url + + def getName(self): + return self.__name + + def getUrl(self): + return self.__url + + name = property(getName, None, None, "Name's Docstring") + + url = property(getUrl, None, None, "Url's Docstring") + \ No newline at end of file diff --git a/lastfm/xml2dict.py b/lastfm/xml2dict.py new file mode 100644 index 0000000..d9be0c4 --- /dev/null +++ b/lastfm/xml2dict.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python + +__author__ = "Abhinav Sarkar" +__version__ = "0.1" +__license__ = "GNU Lesser General Public License" + +import types + +class Xml2Dict (dict): + + def __init__(self, parent): + self.update(dict([('@' + item[0], item[1]) for item in parent.items()])) + + for child in parent: + # print child.tag + if child: + if not child.tag in self: + self[child.tag] = Xml2Dict(child) + else: + if type(self[child.tag]) != types.ListType: + self[child.tag] = [self[child.tag]] + self[child.tag].append(Xml2Dict(child)) + + if not child and child.items(): + if not child.tag in self: + self[child.tag] = dict([('@' + item[0], item[1]) for item in child.items()]) + else: + if type(self[child.tag]) != types.ListType: + self[child.tag] = [self[child.tag]] + self[child.tag].append( + dict([('@' + item[0], item[1]) for item in child.items()]) + ) + + + if child.text: + if not child.tag in self: + self[child.tag] = child.text.strip() or None + elif child.text.strip(): + if type(self[child.tag]) != types.ListType: + flag = False + for e in self[child.tag].keys(): + if e.startswith('@'): + flag = True + if not flag: + self[child.tag] = [self[child.tag]] + self[child.tag][0].update({'text': child.text.strip()}) + else: + self[child.tag].update({'text': child.text.strip()}) + else: + self[child.tag][-1].update({'text': child.text.strip()}) + if not child.tag in self: + self[child.tag] = None