/
home
/
obinna
/
html
/
twitter_search
/
Upload File
HOME
import requests import os import json from datetime import datetime, timezone # To set your enviornment variables in your terminal run the following line: # export 'BEARER_TOKEN'='<your_bearer_token>' areas = ['lekki', 'ikeja', 'surulere', 'yaba', 'ajah', 'festac', 'lagos', 'maryland', 'ilupeju', 'gbagada', 'ikorodu', 'magodo', 'jibowu', 'akoka', 'phase one', 'phase 1', 'jakande'] def auth(): return 'AAAAAAAAAAAAAAAAAAAAAH5s8gAAAAAAdik5PNTxnaMs67zsVDY31vhKLv4%3D9PqMhzQImj8ckvQGWaJiTGI5MZqxAeIYMmpQI7K4DupAZPCW3e' def create_url(area, next_token = None): query = f"where can I get in {area}" # Tweet fields are adjustable. # Options include: # attachments, author_id, context_annotations, # conversation_id, created_at, entities, geo, id, # in_reply_to_user_id, lang, non_public_metrics, organic_metrics, # possibly_sensitive, promoted_metrics, public_metrics, referenced_tweets, # source, text, and withheld tweet_fields = "tweet.fields=created_at,author_id" user_fields = "user.fields=username" expansions = "expansions=author_id" url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}".format( query, tweet_fields, expansions, user_fields ) today = datetime.utcnow().date() start = datetime(today.year, today.month, today.day) url = "{}&next_token={}".format(url, next_token) if next_token else url url = "{}&start_time={}Z".format(url, start.isoformat()) # url = "{}&since_id={}".format(url, since_id) if since_id else url return url def create_headers(bearer_token): headers = {"Authorization": "Bearer {}".format(bearer_token)} return headers def connect_to_endpoint(url, headers): proxy = { 'http': '165.22.199.170:8080', 'https': '159.65.131.211:8080' } response = requests.request("GET", url, headers=headers) # print(response.status_code) if response.status_code != 200: raise Exception(response.status_code, response.text) return response.json() def main(): bearer_token = auth() tweets = {} with open('data/tweets.json', 'r') as fp: json_text = fp.read() if json_text: tweets = json.loads(json_text) new_tweets = [] with open('data/tweets.json', 'w') as fh: for a in areas: try: headers = create_headers(bearer_token) ar = [] next_token = None while True: url = create_url(a, next_token) json_response = connect_to_endpoint(url, headers) if json_response.get('data'): # tws = json_response['data'] for tweet, user in zip(json_response['data'], json_response['includes']['users']): # user = json_response.get('includes').get('users')[i] tweet['url'] = f"https://www.twitter.com/{user['username']}/status/{tweet['id']}" ar.append(tweet) next_token = json_response['meta'].get('next_token') if not next_token: break # if len(ar): # if not last_tweet_id: # tweets[a] = ar # new_tweets.extend(ar) # else: # combined = ar + tweets[a] # tweets[a] = combined tweets[a] = ar except Exception as e: print(e) fh.write(json.dumps(tweets, indent=4, sort_keys=True)) if __name__ == "__main__": main()