359 lines
16 KiB
Python
359 lines
16 KiB
Python
#!/usr/bin/env python
|
|
|
|
__author__ = "Abhinav Sarkar <abhinav@abhinavsarkar.net>"
|
|
__version__ = "0.2"
|
|
__license__ = "GNU Lesser General Public License"
|
|
|
|
from lastfm.base import LastfmBase
|
|
from lastfm.mixins import Cacheable
|
|
|
|
class WeeklyChart(LastfmBase, Cacheable):
|
|
"""A class for representing the weekly charts"""
|
|
|
|
def init(self, subject, start, end,
|
|
stats = None):
|
|
self._subject = subject
|
|
self._start = start
|
|
self._end = end
|
|
self._stats = stats
|
|
|
|
@property
|
|
def subject(self):
|
|
return self._subject
|
|
|
|
@property
|
|
def start(self):
|
|
return self._start
|
|
|
|
@property
|
|
def end(self):
|
|
return self._end
|
|
|
|
@property
|
|
def stats(self):
|
|
return self._stats
|
|
|
|
@staticmethod
|
|
def create_from_data(api, subject, data):
|
|
return WeeklyChart(
|
|
subject = subject,
|
|
start = datetime.utcfromtimestamp(int(data.attrib['from'])),
|
|
end = datetime.utcfromtimestamp(int(data.attrib['to']))
|
|
)
|
|
|
|
@staticmethod
|
|
def _check_weekly_chart_params(params, start = None, end = None):
|
|
if (start is not None and end is None) or (start is None and end is not None):
|
|
raise InvalidParametersError("both start and end have to be provided.")
|
|
if start is not None and end is not None:
|
|
if isinstance(start, datetime) and isinstance(end, datetime):
|
|
params.update({
|
|
'from': int(calendar.timegm(start.timetuple())),
|
|
'to': int(calendar.timegm(end.timetuple()))
|
|
})
|
|
else:
|
|
raise InvalidParametersError("start and end must be datetime.datetime instances")
|
|
|
|
return params
|
|
|
|
@staticmethod
|
|
def _hash_func(*args, **kwds):
|
|
try:
|
|
return hash("%s%s%s%s" % (
|
|
kwds['subject'].__class__.__name__,
|
|
kwds['subject'].name,
|
|
kwds['start'],
|
|
kwds['end']
|
|
))
|
|
except KeyError:
|
|
raise InvalidParametersError("subject, start and end have to be provided for hashing")
|
|
|
|
def __hash__(self):
|
|
return self.__class__._hash_func(
|
|
subject = self.subject,
|
|
start = self.start,
|
|
end = self.end
|
|
)
|
|
|
|
def __eq__(self, other):
|
|
return self.subject == other.subject and \
|
|
self.start == other.start and \
|
|
self.end == other.end
|
|
|
|
def __lt__(self, other):
|
|
if self.subject == other.subject:
|
|
if self.start == other.start:
|
|
return self.end < other.end
|
|
else:
|
|
return self.start < other.start
|
|
else:
|
|
return self.subject < other.subject
|
|
|
|
def __repr__(self):
|
|
return "<lastfm.%s: for %s:%s from %s to %s>" % \
|
|
(
|
|
self.__class__.__name__,
|
|
self.subject.__class__.__name__,
|
|
self.subject.name,
|
|
self.start.strftime("%x"),
|
|
self.end.strftime("%x"),
|
|
)
|
|
|
|
class WeeklyAlbumChart(WeeklyChart):
|
|
"""A class for representing the weekly album charts"""
|
|
def init(self, subject, start, end, stats, albums):
|
|
super(WeeklyAlbumChart, self).init(subject, start, end, stats)
|
|
self._albums = albums
|
|
|
|
@property
|
|
def albums(self):
|
|
return self._albums
|
|
|
|
@staticmethod
|
|
def create_from_data(api, subject, data):
|
|
w = WeeklyChart(
|
|
subject = subject,
|
|
start = datetime.utcfromtimestamp(int(data.attrib['from'])),
|
|
end = datetime.utcfromtimestamp(int(data.attrib['to'])),
|
|
)
|
|
return WeeklyAlbumChart(
|
|
subject = subject,
|
|
start = datetime.utcfromtimestamp(int(data.attrib['from'])),
|
|
end = datetime.utcfromtimestamp(int(data.attrib['to'])),
|
|
stats = Stats(
|
|
subject = subject,
|
|
playcount = reduce(
|
|
lambda x,y:(
|
|
x + int(y.findtext('playcount'))
|
|
),
|
|
data.findall('album'),
|
|
0
|
|
)
|
|
),
|
|
albums = [
|
|
Album(
|
|
api,
|
|
subject = w,
|
|
name = a.findtext('name'),
|
|
mbid = a.findtext('mbid'),
|
|
artist = Artist(
|
|
api,
|
|
subject = w,
|
|
name = a.findtext('artist'),
|
|
mbid = a.find('artist').attrib['mbid'],
|
|
),
|
|
stats = Stats(
|
|
subject = a.findtext('name'),
|
|
rank = int(a.attrib['rank']),
|
|
playcount = int(a.findtext('playcount')),
|
|
),
|
|
url = a.findtext('url'),
|
|
)
|
|
for a in data.findall('album')
|
|
]
|
|
)
|
|
|
|
class WeeklyArtistChart(WeeklyChart):
|
|
"""A class for representing the weekly artist charts"""
|
|
def init(self, subject, start, end, stats, artists):
|
|
super(WeeklyArtistChart, self).init(subject, start, end, stats)
|
|
self._artists = artists
|
|
|
|
@property
|
|
def artists(self):
|
|
return self._artists
|
|
|
|
@staticmethod
|
|
def create_from_data(api, subject, data):
|
|
w = WeeklyChart(
|
|
subject = subject,
|
|
start = datetime.utcfromtimestamp(int(data.attrib['from'])),
|
|
end = datetime.utcfromtimestamp(int(data.attrib['to'])),
|
|
)
|
|
count_attribute = data.find('artist').findtext('playcount') and 'playcount' or 'weight'
|
|
def get_count_attribute(artist):
|
|
return {count_attribute: int(eval(artist.findtext(count_attribute)))}
|
|
def get_count_attribute_sum(artists):
|
|
return {count_attribute: reduce(lambda x,y:(x + int(eval(y.findtext(count_attribute)))), artists, 0)}
|
|
|
|
return WeeklyArtistChart(
|
|
subject = subject,
|
|
start = datetime.utcfromtimestamp(int(data.attrib['from'])),
|
|
end = datetime.utcfromtimestamp(int(data.attrib['to'])),
|
|
stats = Stats(
|
|
subject = subject,
|
|
**get_count_attribute_sum(data.findall('artist'))
|
|
),
|
|
artists = [
|
|
Artist(
|
|
api,
|
|
subject = w,
|
|
name = a.findtext('name'),
|
|
mbid = a.findtext('mbid'),
|
|
stats = Stats(
|
|
subject = a.findtext('name'),
|
|
rank = int(a.attrib['rank']),
|
|
**get_count_attribute(a)
|
|
),
|
|
url = a.findtext('url'),
|
|
)
|
|
for a in data.findall('artist')
|
|
]
|
|
)
|
|
|
|
class WeeklyTrackChart(WeeklyChart):
|
|
"""A class for representing the weekly track charts"""
|
|
def init(self, subject, start, end, tracks, stats):
|
|
super(WeeklyTrackChart, self).init(subject, start, end, stats)
|
|
self._tracks = tracks
|
|
|
|
@property
|
|
def tracks(self):
|
|
return self._tracks
|
|
|
|
@staticmethod
|
|
def create_from_data(api, subject, data):
|
|
w = WeeklyChart(
|
|
subject = subject,
|
|
start = datetime.utcfromtimestamp(int(data.attrib['from'])),
|
|
end = datetime.utcfromtimestamp(int(data.attrib['to'])),
|
|
)
|
|
return WeeklyTrackChart(
|
|
subject = subject,
|
|
start = datetime.utcfromtimestamp(int(data.attrib['from'])),
|
|
end = datetime.utcfromtimestamp(int(data.attrib['to'])),
|
|
stats = Stats(
|
|
subject = subject,
|
|
playcount = reduce(
|
|
lambda x,y:(
|
|
x + int(y.findtext('playcount'))
|
|
),
|
|
data.findall('track'),
|
|
0
|
|
)
|
|
),
|
|
tracks = [
|
|
Track(
|
|
api,
|
|
subject = w,
|
|
name = t.findtext('name'),
|
|
mbid = t.findtext('mbid'),
|
|
artist = Artist(
|
|
api,
|
|
name = t.findtext('artist'),
|
|
mbid = t.find('artist').attrib['mbid'],
|
|
),
|
|
stats = Stats(
|
|
subject = t.findtext('name'),
|
|
rank = int(t.attrib['rank']),
|
|
playcount = int(t.findtext('playcount')),
|
|
),
|
|
url = t.findtext('url'),
|
|
)
|
|
for t in data.findall('track')
|
|
]
|
|
)
|
|
|
|
class WeeklyTagChart(WeeklyChart):
|
|
"""A class for representing the weekly tag charts"""
|
|
def init(self, subject, start, end, tags, stats):
|
|
super(WeeklyTagChart, self).init(subject, start, end, stats)
|
|
self._tags = tags
|
|
|
|
@property
|
|
def tags(self):
|
|
return self._tags
|
|
|
|
@staticmethod
|
|
def create_from_data(api, subject, start, end):
|
|
w = WeeklyChart(
|
|
subject = subject,
|
|
start = start,
|
|
end = end,
|
|
)
|
|
max_tag_count = 3
|
|
global_top_tags = api.get_global_top_tags()
|
|
from collections import defaultdict
|
|
|
|
wac = subject.get_weekly_artist_chart(start, end)
|
|
all_tags = defaultdict(lambda:0)
|
|
tag_weights = defaultdict(lambda:0)
|
|
total_playcount = 0
|
|
artist_count = 0
|
|
for artist in wac.artists:
|
|
artist_count += 1
|
|
total_playcount += artist.stats.playcount
|
|
tag_count = 0
|
|
for tag in artist.top_tags:
|
|
if tag not in global_top_tags: continue
|
|
if tag_count >= max_tag_count: break
|
|
all_tags[tag] += 1
|
|
tag_count += 1
|
|
|
|
artist_pp = artist.stats.playcount/float(wac.stats.playcount)
|
|
cumulative_pp = total_playcount/float(wac.stats.playcount)
|
|
if (cumulative_pp > 0.75 or artist_pp < 0.01) and artist_count > 10:
|
|
break
|
|
|
|
for artist in wac.artists[:artist_count]:
|
|
artist_pp = artist.stats.playcount/float(wac.stats.playcount)
|
|
tf = 1/float(max_tag_count)
|
|
tag_count = 0
|
|
weighted_tfidfs = {}
|
|
for tag in artist.top_tags:
|
|
if tag not in global_top_tags: continue
|
|
if tag_count >= max_tag_count: break
|
|
|
|
df = all_tags[tag]/float(artist_count)
|
|
tfidf = tf/df
|
|
weighted_tfidf = float(max_tag_count - tag_count)*tfidf
|
|
weighted_tfidfs[tag.name] = weighted_tfidf
|
|
tag_count += 1
|
|
|
|
sum_weighted_tfidfs = sum(weighted_tfidfs.values())
|
|
for tag in weighted_tfidfs:
|
|
tag_weights[tag] += weighted_tfidfs[tag]/sum_weighted_tfidfs*artist_pp
|
|
|
|
artist_pp = artist.stats.playcount/float(wac.stats.playcount)
|
|
|
|
tag_weights_sum = sum(tag_weights.values())
|
|
tag_weights = tag_weights.items()
|
|
tag_weights.sort(key=lambda x:x[1], reverse=True)
|
|
for i in xrange(len(tag_weights)):
|
|
tag, weight = tag_weights[i]
|
|
tag_weights[i] = (tag, weight, i+1)
|
|
|
|
wtc = WeeklyTagChart(
|
|
subject = subject,
|
|
start = wac.start,
|
|
end = wac.end,
|
|
stats = Stats(
|
|
subject = subject,
|
|
playcount = 1000
|
|
),
|
|
tags = [
|
|
Tag(
|
|
api,
|
|
subject = w,
|
|
name = tag,
|
|
stats = Stats(
|
|
subject = tag,
|
|
rank = rank,
|
|
count = int(round(1000*weight/tag_weights_sum)),
|
|
)
|
|
)
|
|
for (tag, weight, rank) in tag_weights
|
|
]
|
|
)
|
|
wtc._artist_spectrum_analyzed = 100*total_playcount/float(wac.stats.playcount)
|
|
return wtc
|
|
|
|
from datetime import datetime
|
|
import calendar
|
|
|
|
from lastfm.album import Album
|
|
from lastfm.artist import Artist
|
|
from lastfm.error import InvalidParametersError
|
|
from lastfm.stats import Stats
|
|
from lastfm.track import Track
|
|
from lastfm.tag import Tag |