#!/usr/bin/env python __author__ = "Abhinav Sarkar " __version__ = "0.2" __license__ = "GNU Lesser General Public License" __package__ = "lastfm" from lastfm.base import LastfmBase from lastfm.mixins import Cacheable from operator import xor class WeeklyChart(LastfmBase, Cacheable): """A class for representing the weekly charts""" def init(self, subject, start, end, stats = None): self._subject = subject self._start = start self._end = end self._stats = stats @property def subject(self): return self._subject @property def start(self): return self._start @property def end(self): return self._end @property def stats(self): return self._stats @staticmethod def create_from_data(api, subject, data): return WeeklyChart( subject = subject, start = datetime.utcfromtimestamp(int(data.attrib['from'])), end = datetime.utcfromtimestamp(int(data.attrib['to'])) ) @staticmethod def _check_weekly_chart_params(params, start = None, end = None): if xor(start is None, end is None): raise InvalidParametersError("both start and end have to be provided.") if start is not None and end is not None: if isinstance(start, datetime) and isinstance(end, datetime): params.update({ 'from': int(calendar.timegm(start.timetuple())), 'to': int(calendar.timegm(end.timetuple())) }) else: raise InvalidParametersError("start and end must be datetime.datetime instances") return params @staticmethod def _hash_func(*args, **kwds): try: return hash("%s%s%s%s" % ( kwds['subject'].__class__.__name__, kwds['subject'].name, kwds['start'], kwds['end'] )) except KeyError: raise InvalidParametersError("subject, start and end have to be provided for hashing") def __hash__(self): return self.__class__._hash_func( subject = self.subject, start = self.start, end = self.end ) def __eq__(self, other): return self.subject == other.subject and \ self.start == other.start and \ self.end == other.end def __lt__(self, other): if self.subject == other.subject: if self.start == other.start: return self.end < other.end else: return self.start < other.start else: return self.subject < other.subject def __repr__(self): return "" % \ ( self.__class__.__name__, self.subject.__class__.__name__, self.subject.name, self.start.strftime("%x"), self.end.strftime("%x"), ) class WeeklyAlbumChart(WeeklyChart): """A class for representing the weekly album charts""" def init(self, subject, start, end, stats, albums): super(WeeklyAlbumChart, self).init(subject, start, end, stats) self._albums = albums @property def albums(self): return self._albums @staticmethod def create_from_data(api, subject, data): w = WeeklyChart( subject = subject, start = datetime.utcfromtimestamp(int(data.attrib['from'])), end = datetime.utcfromtimestamp(int(data.attrib['to'])), ) return WeeklyAlbumChart( subject = subject, start = datetime.utcfromtimestamp(int(data.attrib['from'])), end = datetime.utcfromtimestamp(int(data.attrib['to'])), stats = Stats( subject = subject, playcount = reduce( lambda x,y:( x + int(y.findtext('playcount')) ), data.findall('album'), 0 ) ), albums = [ Album( api, subject = w, name = a.findtext('name'), mbid = a.findtext('mbid'), artist = Artist( api, subject = w, name = a.findtext('artist'), mbid = a.find('artist').attrib['mbid'], ), stats = Stats( subject = a.findtext('name'), rank = int(a.attrib['rank']), playcount = int(a.findtext('playcount')), ), url = a.findtext('url'), ) for a in data.findall('album') ] ) class WeeklyArtistChart(WeeklyChart): """A class for representing the weekly artist charts""" def init(self, subject, start, end, stats, artists): super(WeeklyArtistChart, self).init(subject, start, end, stats) self._artists = artists @property def artists(self): return self._artists @staticmethod def create_from_data(api, subject, data): w = WeeklyChart( subject = subject, start = datetime.utcfromtimestamp(int(data.attrib['from'])), end = datetime.utcfromtimestamp(int(data.attrib['to'])), ) count_attribute = data.find('artist').findtext('playcount') and 'playcount' or 'weight' def get_count_attribute(artist): return {count_attribute: int(eval(artist.findtext(count_attribute)))} def get_count_attribute_sum(artists): return {count_attribute: reduce( lambda x, y:(x + int(eval(y.findtext(count_attribute)))), artists, 0 )} return WeeklyArtistChart( subject = subject, start = datetime.utcfromtimestamp(int(data.attrib['from'])), end = datetime.utcfromtimestamp(int(data.attrib['to'])), stats = Stats( subject = subject, **get_count_attribute_sum(data.findall('artist')) ), artists = [ Artist( api, subject = w, name = a.findtext('name'), mbid = a.findtext('mbid'), stats = Stats( subject = a.findtext('name'), rank = int(a.attrib['rank']), **get_count_attribute(a) ), url = a.findtext('url'), ) for a in data.findall('artist') ] ) class WeeklyTrackChart(WeeklyChart): """A class for representing the weekly track charts""" def init(self, subject, start, end, tracks, stats): super(WeeklyTrackChart, self).init(subject, start, end, stats) self._tracks = tracks @property def tracks(self): return self._tracks @staticmethod def create_from_data(api, subject, data): w = WeeklyChart( subject = subject, start = datetime.utcfromtimestamp(int(data.attrib['from'])), end = datetime.utcfromtimestamp(int(data.attrib['to'])), ) return WeeklyTrackChart( subject = subject, start = datetime.utcfromtimestamp(int(data.attrib['from'])), end = datetime.utcfromtimestamp(int(data.attrib['to'])), stats = Stats( subject = subject, playcount = reduce( lambda x,y:( x + int(y.findtext('playcount')) ), data.findall('track'), 0 ) ), tracks = [ Track( api, subject = w, name = t.findtext('name'), mbid = t.findtext('mbid'), artist = Artist( api, name = t.findtext('artist'), mbid = t.find('artist').attrib['mbid'], ), stats = Stats( subject = t.findtext('name'), rank = int(t.attrib['rank']), playcount = int(t.findtext('playcount')), ), url = t.findtext('url'), ) for t in data.findall('track') ] ) class WeeklyTagChart(WeeklyChart): """A class for representing the weekly tag charts""" def init(self, subject, start, end, tags, stats): super(WeeklyTagChart, self).init(subject, start, end, stats) self._tags = tags @property def tags(self): return self._tags @staticmethod def create_from_data(api, subject, start, end): w = WeeklyChart( subject = subject, start = start, end = end, ) max_tag_count = 3 global_top_tags = api.get_global_top_tags() from collections import defaultdict wac = subject.get_weekly_artist_chart(start, end) all_tags = defaultdict(lambda:0) tag_weights = defaultdict(lambda:0) total_playcount = 0 artist_count = 0 for artist in wac.artists: artist_count += 1 total_playcount += artist.stats.playcount tag_count = 0 for tag in artist.top_tags: if tag not in global_top_tags: continue if tag_count >= max_tag_count: break all_tags[tag] += 1 tag_count += 1 artist_pp = artist.stats.playcount/float(wac.stats.playcount) cumulative_pp = total_playcount/float(wac.stats.playcount) if (cumulative_pp > 0.75 or artist_pp < 0.01) and artist_count > 10: break for artist in wac.artists[:artist_count]: artist_pp = artist.stats.playcount/float(wac.stats.playcount) tf = 1/float(max_tag_count) tag_count = 0 weighted_tfidfs = {} for tag in artist.top_tags: if tag not in global_top_tags: continue if tag_count >= max_tag_count: break df = all_tags[tag]/float(artist_count) tfidf = tf/df weighted_tfidf = float(max_tag_count - tag_count)*tfidf weighted_tfidfs[tag.name] = weighted_tfidf tag_count += 1 sum_weighted_tfidfs = sum(weighted_tfidfs.values()) for tag in weighted_tfidfs: tag_weights[tag] += weighted_tfidfs[tag]/sum_weighted_tfidfs*artist_pp artist_pp = artist.stats.playcount/float(wac.stats.playcount) tag_weights_sum = sum(tag_weights.values()) tag_weights = tag_weights.items() tag_weights.sort(key=lambda x:x[1], reverse=True) for i in xrange(len(tag_weights)): tag, weight = tag_weights[i] tag_weights[i] = (tag, weight, i+1) wtc = WeeklyTagChart( subject = subject, start = wac.start, end = wac.end, stats = Stats( subject = subject, playcount = 1000 ), tags = [ Tag( api, subject = w, name = tag, stats = Stats( subject = tag, rank = rank, count = int(round(1000*weight/tag_weights_sum)), ) ) for (tag, weight, rank) in tag_weights ] ) wtc._artist_spectrum_analyzed = 100*total_playcount/float(wac.stats.playcount) return wtc from datetime import datetime import calendar from lastfm.album import Album from lastfm.artist import Artist from lastfm.error import InvalidParametersError from lastfm.stats import Stats from lastfm.track import Track from lastfm.tag import Tag