Youtube Watched History Analyzer - Printable Version +- Python Forum (https://python-forum.io) +-- Forum: General (https://python-forum.io/forum-1.html) +--- Forum: Code sharing (https://python-forum.io/forum-5.html) +--- Thread: Youtube Watched History Analyzer (/thread-4672.html) |
Youtube Watched History Analyzer - Aerosmite - Sep-02-2017 Hey again! I often watch Youtube, so I made an easy script to collect interesting data from my watched history, thought it was a bit scared I share it for you, if you want to beat my data Features
Installing
#!/usr/bin/python #-*- coding: utf-8 -*- import os import re import json from datetime import datetime, timedelta from apiclient import discovery from apiclient.discovery import build # Init API_key = "REPLACE_ME" # List of your watch-history files (allow several accounts) files = ['/Path/to/watch-history.json','/Path/to/watch-history.json'] results_dir = '/Path/to/Results_dir' # Load only videos after this date min_date = "01/01/00" # DD/MM/YY # List of allowed results files results = {'clean_history': True, 'average': True, 'channels': True, 'topics': True, 'tags': True, 'days': True, 'months': True, 'years': True} # Average percentage of videos watched watch_percentage = 60 # % service = build("youtube", "v3",developerKey=API_key) Day = ['Monday','Tuesday','Friday','Wednesday','Thursday','Sunday','Saturday'] PT_format = re.compile(r'PT((?P<hours>\d+?)hr)?((?P<minutes>\d+?)M)?((?P<seconds>\d+?)S)?') min_date = datetime.strptime(min_date, '%d/%m/%y') watch_percentage /= 100 date_sorter = [] videos = [] average_li = [] sorted_li = [] channel_hm = {} topic_hm = {} tag_hm = {} duration_hm = {} day_name_hm = {} day_hm = {} month_hm = {} year_hm = {} # remove keyword arguments that are not set def remove_empty_kwargs(**kwargs): good_kwargs = {} if kwargs is not None: for key, value in kwargs.iteritems(): if value: good_kwargs[key] = value return good_kwargs # sample python code for channels.list def videos_list_by_id(service, **kwargs): kwargs = remove_empty_kwargs(**kwargs) return service.videos().list(**kwargs).execute() def parse_time(time_str): parts = PT_format.match(time_str) if not parts: return parts = parts.groupdict() time_params = {} for (name, param) in parts.iteritems(): if param: time_params[name] = int(param) return timedelta(**time_params) def clear_videos_vars(): video_title = video_description = video_duration = video_thumbnails = video_location = video_topics = video_tags = video_date = channel_id = channel_title = None def count_data(hm_name,key): try: hm_name[key] = {'nb':hm_name[key]['nb'] + 1} except: hm_name[key] = {'nb':1} def average(dict): nb = 0 length = 0 for key in dict: nb += dict[key]['nb'] length += 1 return float(nb)/length def sorted_list(hm_name): sorted_li = [] for key in hm_name: sorted_li.append({'name':key,'nb':hm_name[key]['nb']}) return sorted(sorted_li, key=lambda k: k['nb'], reverse=True) # create new path if necessary if os.path.isdir(results_dir) == False: os.makedirs(results_dir) # save watching date of videos for f in files: for x in json.load(open(f)): date = datetime.strptime(x['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%S.000Z') delta_date = date - min_date if delta_date.total_seconds() > 0: date_sorter.append(date) # sort videos per watching date print(str(len(date_sorter)) + " videos detected" ) date_sorter.sort(reverse=True) for f in files: for x in json.load(open(f)): date = datetime.strptime(x['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%S.000Z') delta_date = date - min_date if delta_date.total_seconds() < 0: continue # define video position pos = 0 for sorted_date in date_sorter: if sorted_date == date: break else: pos += 1 # access to video data if x['snippet']['title'] != "Deleted video" and x['snippet']['title'] != "Private video": try: video_data = videos_list_by_id(service,part='snippet,contentDetails,topicDetails,recordingDetails',id=x['contentDetails']['videoId']) try: video_location = (video_data['items'][0]['recordingDetails']['location']['latitude'],video_data['items'][0]['recordingDetails']['location']['longitude']) except KeyError: video_location = None try: video_topics = video_data['items'][0]['topicDetails']['topicCategories'] except KeyError: video_topics = [] try: video_tags = video_data['items'][0]['snippet']['tags'] except KeyError: video_tags = [] video_title = video_data['items'][0]['snippet']['title'] video_description = video_data['items'][0]['snippet']['description'] video_duration = parse_time(video_data['items'][0]['contentDetails']['duration']) video_thumbnails = video_data['items'][0]['snippet']['thumbnails']['default']['url'] video_date = datetime.strptime(video_data['items'][0]['snippet']['publishedAt'],'%Y-%m-%dT%H:%M:%S.000Z').strftime('%d/%m/%y %H:%M') channel_id = video_data['items'][0]['snippet']['channelId'] channel_title = video_data['items'][0]['snippet']['channelTitle'] # save video data in dicts for topic in video_topics: count_data(topic_hm,topic) for tag in video_tags: count_data(tag_hm,tag.lower()) if pos > 0 and pos < len(date_sorter): next_date = date_sorter[pos-1] if video_duration > next_date - date: video_duration = next_date - date else: video_duration *= watch_percentage count_data(duration_hm,video_duration) count_data(channel_hm,channel_title) except (KeyError,IndexError): print("missing information for video '" + x['contentDetails']['videoId'] + "'") clear_videos_vars() except: print("unknown error for video '" + x['contentDetails']['videoId'] + "'") clear_videos_vars() else: print("unable to access video '" + x['contentDetails']['videoId'] + "'") clear_videos_vars() # save date data in dicts count_data(year_hm,str(date.year)) count_data(month_hm,str(date.year) + "/" + str(format(date.month, '02'))) count_data(day_hm,str(date.year) + "/" + str(format(date.month, '02')) + "/" + str(format(date.day, '02'))) count_data(day_name_hm,date.strftime("%A")) # add all data to 'clean_history' videos.append({'pos':pos,'date':date.strftime('%d/%m/%y %H:%M'),'video':{'title':video_title,'description':video_description,'duration':str(video_duration),'thumbnails':video_thumbnails,'date':video_date,'location':video_location,'topics':video_topics,'tags':video_tags},'channel':{'title':channel_title,'id':channel_id}}) print("account " + str(json.load(open(f))[0]['snippet']['channelTitle']) + " done") # sort and save dicts in results files for list_result, value in results.iteritems(): if value: if list_result == 'clean_history': with open(os.path.join(results_dir,"clean_history.json"), 'w') as outfile: json.dump(videos, outfile,indent=4) elif list_result == 'average': for day in Day: try: day_name_hm[day]['nb'] = day_name_hm[day]['nb'] / (len(day_hm) / 7.0) except KeyError: day_name_hm[day] = {'nb':0} average_li.append({'videos per':{'day of the week':{Day[0]:day_name_hm[Day[0]]['nb'],Day[1]:day_name_hm[Day[1]]['nb'],Day[2]:day_name_hm[Day[2]]['nb'],Day[3]:day_name_hm[Day[3]]['nb'],Day[4]:day_name_hm[Day[4]]['nb'],Day[5]:day_name_hm[Day[5]]['nb'],Day[6]:day_name_hm[Day[6]]['nb']},'year':average(year_hm),'month':average(month_hm),'day':average(day_hm),'channel':average(channel_hm)}}) nb = dur_sum = 0 for key in duration_hm: dur_sum += duration_hm[key]['nb'] * key.total_seconds() nb += duration_hm[key]['nb'] average_li.append({'video length (min)':dur_sum/nb/60}) with open(os.path.join(results_dir,"average.json"), 'w') as outfile: json.dump(average_li, outfile,indent=4) elif list_result == 'channels': with open(os.path.join(results_dir,"channels.json"), 'w') as outfile: json.dump(sorted_list(channel_hm), outfile,indent=4) elif list_result == 'topics': with open(os.path.join(results_dir,"topics.json"), 'w') as outfile: json.dump(sorted_list(topic_hm), outfile,indent=4) elif list_result == 'tags': with open(os.path.join(results_dir,"tags.json"), 'w') as outfile: json.dump(sorted_list(tag_hm), outfile,indent=4) elif list_result == 'days': with open(os.path.join(results_dir,"days.json"), 'w') as outfile: json.dump(sorted_list(day_hm), outfile,indent=4) elif list_result == 'months': with open(os.path.join(results_dir,"months.json"), 'w') as outfile: json.dump(sorted_list(month_hm), outfile,indent=4) elif list_result == 'years': with open(os.path.join(results_dir,"years.json"), 'w') as outfile: json.dump(sorted_list(year_hm), outfile,indent=4) My average.json, if you try to beat me :P
RE: Youtube Watched History Analyzer - Aerosmite - Sep-04-2017 New update! Changelog
#!/usr/bin/python #-*- coding: utf-8 -*- import os import re import json import operator from datetime import datetime, timedelta from apiclient import discovery from apiclient.discovery import build # Init API_key = ";)" # List of your watch-history files (allow several accounts) files = ['/Users/mathieu/python/Historique Youtube/Aerosmite/watch-history.json','/Users/mathieu/python/Historique Youtube/Tetedecraft/watch-history.json','/Users/mathieu/python/Historique Youtube/mattraque2000/watch-history.json'] results_dir = '/Users/mathieu/python/Historique Youtube/Resultds' # Load only videos after this date min_date = "28/08/17" # DD/MM/YY # List of allowed results files results = {'average': True, 'clean_history': True, 'channels': True, 'topics': True, 'tags': True, 'country': True, 'category': True, 'days': True, 'months': True, 'years': True} # Average percentage of total video watched watch_percentage = 60 # % service = build("youtube", "v3",developerKey=API_key) Day = ['Monday','Tuesday','Friday','Wednesday','Thursday','Sunday','Saturday'] PT_format = re.compile(r'PT((?P<hours>\d+?)H)?((?P<minutes>\d+?)M)?((?P<seconds>\d+?)S)?') min_date = datetime.strptime(min_date, '%d/%m/%y') video_part = 'snippet,contentDetails,topicDetails,recordingDetails' date_sorter = videos = average_li = channel_hm = {} topic_hm = {} tag_hm = {} country_hm = {} category_hm = {} categories_hm = {} duration_hm = {} day_name_hm = {} day_hm = {} month_hm = {} year_hm = {} # remove keyword arguments that are not set def remove_empty_kwargs(**kwargs): good_kwargs = {} if kwargs is not None: for key, value in kwargs.iteritems(): if value: good_kwargs[key] = value return good_kwargs # sample python code for videos.list def videos_list_by_id(service, **kwargs): kwargs = remove_empty_kwargs(**kwargs) return service.videos().list(**kwargs).execute() # sample python code for channels.list def channels_list_by_id(service, **kwargs): kwargs = remove_empty_kwargs(**kwargs) return service.channels().list(**kwargs).execute() # sample python code for videoCategories.list def video_categories_list(service, **kwargs): kwargs = remove_empty_kwargs(**kwargs) return service.videoCategories().list(**kwargs).execute() # sample python code for search.list def search_list_by_keyword(service, **kwargs): kwargs = remove_empty_kwargs(**kwargs) return service.search().list(**kwargs).execute() def get_new_videoId(video_title): results = search_list_by_keyword(service,maxResults=3,part='snippet',q=video_title,type='video') for video_result in results['items']: if video_result['snippet']['title'] == video_title: return video_result['id']['videoId'] return None def parse_time(time_str): parts = PT_format.match(time_str) if not parts: return parts = parts.groupdict() time_params = {} for (name, param) in parts.iteritems(): if param: time_params[name] = int(param) return timedelta(**time_params) def clear_videos_vars(): video_title = video_description = video_duration = video_categoryId = video_categoryName = video_thumbnails = video_location = video_topics = video_tags = video_date = channel_id = channel_name = channel_country = None def count_data(hm_name,key): if key in hm_name.keys(): hm_name[key] += 1 else: hm_name[key] = 1 def average(dict_name): nb = 0 for key in dict_name: nb += dict_name[key] return float(nb)/len(dict_name) def create_file(name,data): with open(os.path.join(results_dir,name + ".json"), 'w') as outfile: json.dump(data, outfile,indent=4) # save watching date of videos for f in files: for x in json.load(open(f)): date = datetime.strptime(x['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%S.000Z') delta_date = date - min_date if delta_date.total_seconds() > 0: date_sorter.append(date) if date_sorter == : raise Exception('No videos detected') print(str(len(date_sorter)) + " videos detected" ) # sort videos per watching date date_sorter.sort(reverse=True) for f in files: for x in json.load(open(f)): date = datetime.strptime(x['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%S.000Z') delta_date = date - min_date if delta_date.total_seconds() < 0: continue # define video position pos = 0 for sorted_date in date_sorter: if sorted_date == date: break else: pos += 1 # access to video data if x['snippet']['title'] != "Deleted video" and x['snippet']['title'] != "Private video": try: video_data = videos_list_by_id(service,part=video_part,id=x['contentDetails']['videoId']) if len(video_data['items']) == 0: new_videoId = get_new_videoId(x['snippet']['title']) if new_videoId != None: video_data = videos_list_by_id(service,part=video_part,id=new_videoId) else: raise IndexError video_title = video_data['items'][0]['snippet']['title'] video_description = video_data['items'][0]['snippet']['description'] video_duration = parse_time(video_data['items'][0]['contentDetails']['duration']) if pos > 0 and pos < len(date_sorter): next_date = date_sorter[pos-1] if video_duration > next_date - date: video_duration = next_date - date else: video_duration = video_duration * watch_percentage / 100 video_thumbnails = video_data['items'][0]['snippet']['thumbnails']['default']['url'] video_categoryId = video_data['items'][0]['snippet']['categoryId'] video_date = datetime.strptime(video_data['items'][0]['snippet']['publishedAt'],'%Y-%m-%dT%H:%M:%S.000Z').strftime('%d/%m/%y %H:%M') channel_id = video_data['items'][0]['snippet']['channelId'] channel_name = video_data['items'][0]['snippet']['channelTitle'] if video_data['items'][0].get('recordingDetails') != None: video_location = video_data['items'][0]['recordingDetails'].get('location') else: video_location = None if video_data['items'][0].get('topicDetails') != None: video_topics = video_data['items'][0]['topicDetails'].get('topicCategories',) else: video_topics = video_tags = video_data['items'][0]['snippet'].get('tags',) # try to get country code channel_data = channels_list_by_id(service,part='snippet',id=channel_id) channel_country = channel_data['items'][0]['snippet'].get('country',video_data['items'][0]['snippet'].get('defaultLanguage')) video_categoryName = None if channel_country != None: channel_country = channel_country.lower() count_data(country_hm,channel_country) # try to get category name try: if channel_country in categories_hm.keys(): categories_list = categories_hm[channel_country] else: if channel_country == 'en': channel_country = 'us' # or "gb" elif channel_country.split('-')[0] == 'en': channel_country = channel_country.split('-')[1] categories_list = video_categories_list(service,part='snippet',regionCode=channel_country)['items'] categories_hm[channel_country] = categories_list for categoryId in categories_list: if categoryId['id'] == video_categoryId: video_categoryName = categoryId['snippet']['title'] count_data(category_hm,video_categoryName) except: pass # save video data in dicts count_data(duration_hm,video_duration) count_data(channel_hm,channel_name) for topic in video_topics: count_data(topic_hm,topic) for tag in video_tags: count_data(tag_hm,tag.lower()) except (KeyError,IndexError): print("missing information for video '" + x['snippet']['title'] + "'") clear_videos_vars() except Exception as e: print(str(e) + " for video '" + x['snippet']['title'] + "'") clear_videos_vars() else: print("unable to access a " + x['snippet']['title']) clear_videos_vars() # save date in dicts count_data(year_hm,str(date.year)) count_data(month_hm,str(date.year) + "/" + str(format(date.month, '02'))) count_data(day_hm,str(date.year) + "/" + str(format(date.month, '02')) + "/" + str(format(date.day, '02'))) count_data(day_name_hm,date.strftime("%A")) # add all data to 'clean_history' videos.append({'pos':pos,'date':date.strftime('%d/%m/%y %H:%M'),'video':{'title':video_title,'description':video_description,'duration':str(video_duration),'category':video_categoryName,'thumbnails':video_thumbnails,'date':video_date,'location':video_location,'topics':video_topics,'tags':video_tags,'channel':{'title':channel_name,'id':channel_id,'country':channel_country}}}) print("account " + str(json.load(open(f))[0]['snippet']['channelTitle']) + " done") # create new path if necessary if os.path.isdir(results_dir) == False: os.makedirs(results_dir) # sort and save dicts in results files for list_result, value in results.iteritems(): if value: if list_result == 'average': if day_hm != {}: for day in Day: if day in day_name_hm.keys(): day_name_hm[day] = day_name_hm[day] / (len(day_hm) / 7.0) else: day_name_hm[day] = 0 average_li.append({'videos per':{'day of the week':{Day[0]:day_name_hm[Day[0]],Day[1]:day_name_hm[Day[1]],Day[2]:day_name_hm[Day[2]],Day[3]:day_name_hm[Day[3]],Day[4]:day_name_hm[Day[4]],Day[5]:day_name_hm[Day[5]],Day[6]:day_name_hm[Day[6]]},'year':average(year_hm),'month':average(month_hm),'day':average(day_hm),'channel':average(channel_hm)}}) else: average_li.append({'videos per':None}) if duration_hm != {}: nb = dur_sum = 0 for key in duration_hm: dur_sum += duration_hm[key] * key.total_seconds() nb += duration_hm[key] average_li.append({'video length (min)':dur_sum/nb/60}) else: average_li.append({'video length (min)':None}) create_file(list_result,average_li) elif list_result == 'clean_history': create_file(list_result,videos) elif list_result == 'channels': create_file(list_result,sorted(channel_hm.items(), key=operator.itemgetter(1),reverse=True)) elif list_result == 'topics': create_file(list_result,sorted(topic_hm.items(), key=operator.itemgetter(1),reverse=True)) elif list_result == 'tags': create_file(list_result,sorted(tag_hm.items(), key=operator.itemgetter(1),reverse=True)) elif list_result == 'country': create_file(list_result,sorted(country_hm.items(), key=operator.itemgetter(1),reverse=True)) elif list_result == 'category': create_file(list_result,sorted(category_hm.items(), key=operator.itemgetter(1),reverse=True)) elif list_result == 'days': create_file(list_result,sorted(day_hm.items(), key=operator.itemgetter(1),reverse=True)) elif list_result == 'months': create_file(list_result,sorted(month_hm.items(), key=operator.itemgetter(1),reverse=True)) elif list_result == 'years': create_file(list_result,year_hm(topic_hm.items(), key=operator.itemgetter(1),reverse=True)) RE: Youtube Watched History Analyzer - Redoudou - Nov-05-2017 Hello, this is very usefull ! I can wait to be able to analyze my history. Thank you :) ligne 29 date_sorter = videos = average_li =should it be ? date_sorter = {} videos = {} average_li = {}Also would you have a text version of this code because I have an intending error and I think there might be a problem when copy paste. Thank you ! RE: Youtube Watched History Analyzer - metulburr - Nov-05-2017 @[Redoudou] Its an issue with our editor. its suppose to be empty lists second bullet here https://python-forum.io/misc.php?action=help&hid=39 RE: Youtube Watched History Analyzer - Redoudou - Nov-06-2017 Thank you @[metulburr], I've had done the correction. However I keep facing and intentation error in the follow function. # remove keyword arguments that are not set def remove_empty_kwargs(**kwargs): good_kwargs = {} if kwargs is not None: for key, value in kwargs.iteritems(): if value: good_kwargs[key] = value return good_kwargs For what I know from my research the syntax is good. It edited the file in nqq and check good_kwargs(singlespace)=(singlespace){}but the error stays. @[Aerosmite] Any idea where I should oriented my research ? Thank you :) |