top of page

TwitchPlays_Connection.py
for Python 3.9.x
[ published on 2022-08-16 ]

  1. # DougDoug Note: 

  2. # This is the code that connects to Twitch / Youtube and checks for new messages.

  3. # You should not need to modify anything in this file, just use as is.

  4. ​

  5. # This code is based on Wituz's "Twitch Plays" tutorial, updated for Python 3.9.X

  6. # http://www.wituz.com/make-your-own-twitch-plays-stream.html

  7. # Updated for Youtube by DDarknut, with help by Ottomated

  8. ​

  9. import requests

  10. import sys

  11. import socket

  12. import re

  13. import random

  14. import time

  15. import os

  16. import json

  17. import concurrent.futures

  18. ​

  19. MAX_TIME_TO_WAIT_FOR_LOGIN = 3

  20. YOUTUBE_FETCH_INTERVAL = 1

  21. ​

  22. class Twitch:

  23.     re_prog = None

  24.     sock = None

  25.     partial = b''

  26.     login_ok = False

  27.     channel = ''

  28.     login_timestamp = 0

  29. ​

  30.     def twitch_connect(self, channel):

  31.         if self.sock: self.sock.close()

  32.         self.sock = None

  33.         self.partial = b''

  34.         self.login_ok = False

  35.         self.channel = channel

  36. ​

  37.         # Compile regular expression

  38.         self.re_prog = re.compile(b'^(?::(?:([^ !\r\n]+)![^ \r\n]*|[^ \r\n]*) )?([^ \r\n]+)(?: ([^:\r\n]*))?(?: :([^\r\n]*))?\r\n', re.MULTILINE)

  39. ​

  40.         # Create socket

  41.         print('Connecting to Twitch...')

  42.         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

  43. ​

  44.         # Attempt to connect socket

  45.         self.sock.connect(('irc.chat.twitch.tv', 6667))

  46. ​

  47.         # Log in anonymously

  48.         user = 'justinfan%i' % random.randint(10000, 99999)

  49.         print('Connected to Twitch. Logging in anonymously...')

  50.         self.sock.send(('PASS asdf\r\nNICK %s\r\n' % user).encode())

  51. ​

  52.         self.sock.settimeout(1.0/60.0)

  53. ​

  54.         self.login_timestamp = time.time()

  55. ​

  56.     # Attempt to reconnect after a delay

  57.     def reconnect(self, delay):

  58.         time.sleep(delay)

  59.         self.twitch_connect(self.channel)

  60. ​

  61.     # Returns a list of irc messages received

  62.     def receive_and_parse_data(self):

  63.         buffer = b''

  64.         while True:

  65.             received = b''

  66.             try:

  67.                 received = self.sock.recv(4096)

  68.             except socket.timeout:

  69.                 break

  70.             # except OSError as e:

  71.             #     if e.winerror == 10035:

  72.             #         # This "error" is expected -- we receive it if timeout is set to zero, and there is no data to read on the socket.

  73.             #         break

  74.             except Exception as e:

  75.                 print('Unexpected connection error. Reconnecting in a second...', e)

  76.                 self.reconnect(1)

  77.                 return []

  78.             if not received:

  79.                 print('Connection closed by Twitch. Reconnecting in 5 seconds...')

  80.                 self.reconnect(5)

  81.                 return []

  82.             buffer += received

  83. ​

  84.         if buffer:

  85.             # Prepend unparsed data from previous iterations

  86.             if self.partial:

  87.                 buffer = self.partial + buffer

  88.                 self.partial = []

  89. ​

  90.             # Parse irc messages

  91.             res = []

  92.             matches = list(self.re_prog.finditer(buffer))

  93.             for match in matches:

  94.                 res.append({

  95.                     'name':     (match.group(1) or b'').decode(errors='replace'),

  96.                     'command':  (match.group(2) or b'').decode(errors='replace'),

  97.                     'params':   list(map(lambda p: p.decode(errors='replace'), (match.group(3) or b'').split(b' '))),

  98.                     'trailing': (match.group(4) or b'').decode(errors='replace'),

  99.                 })

  100.  

  101.             # Save any data we couldn't parse for the next iteration

  102.             if not matches:

  103.                 self.partial += buffer

  104.             else:

  105.                 end = matches[-1].end()

  106.                 if end < len(buffer):

  107.                     self.partial = buffer[end:]

  108. ​

  109.                 if matches[0].start() != 0:

  110.                     # If we get here, we might have missed a message. pepeW

  111.                     print('either ddarknut fucked up or twitch is bonkers, or both I mean who really knows anything at this point')

  112.  

  113.             return res

  114. ​

  115.         return []

  116. ​

  117.     def twitch_receive_messages(self):

  118.         privmsgs = []

  119.         for irc_message in self.receive_and_parse_data():

  120.             cmd = irc_message['command']

  121.             if cmd == 'PRIVMSG':

  122.                 privmsgs.append({

  123.                     'username': irc_message['name'],

  124.                     'message': irc_message['trailing'],

  125.                 })

  126.             elif cmd == 'PING':

  127.                 self.sock.send(b'PONG :tmi.twitch.tv\r\n')

  128.             elif cmd == '001':

  129.                 print('Successfully logged in. Joining channel %s.' % self.channel)

  130.                 self.sock.send(('JOIN #%s\r\n' % self.channel).encode())

  131.                 self.login_ok = True

  132.             elif cmd == 'JOIN':

  133.                 print('Successfully joined channel %s' % irc_message['params'][0])

  134.             elif cmd == 'NOTICE':

  135.                 print('Server notice:', irc_message['params'], irc_message['trailing'])

  136.             elif cmd == '002': continue

  137.             elif cmd == '003': continue

  138.             elif cmd == '004': continue

  139.             elif cmd == '375': continue

  140.             elif cmd == '372': continue

  141.             elif cmd == '376': continue

  142.             elif cmd == '353': continue

  143.             elif cmd == '366': continue

  144.             else:

  145.                 print('Unhandled irc message:', irc_message)

  146. ​

  147.         if not self.login_ok:

  148.             # We are still waiting for the initial login message. If we've waited longer than we should, try to reconnect.

  149.             if time.time() - self.login_timestamp > MAX_TIME_TO_WAIT_FOR_LOGIN:

  150.                 print('No response from Twitch. Reconnecting...')

  151.                 self.reconnect(0)

  152.                 return []

  153.  

  154.         return privmsgs

  155.  

  156. # Thanks to Ottomated for helping with the yt side of things!

  157. class YouTube:

  158.     session = None

  159.     config = {}

  160.     payload = {}

  161. ​

  162.     thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)

  163.     fetch_job = None

  164.     next_fetch_time = 0

  165. ​

  166.     re_initial_data = re.compile('(?:window\\s*\\[\\s*[\\"\']ytInitialData[\\"\']\\s*\\]|ytInitialData)\\s*=\\s*({.+?})\\s*;')

  167.     re_config = re.compile('(?:ytcfg\\s*.set)\\(({.+?})\\)\\s*;')

  168. ​

  169.     def get_continuation_token(self, data):

  170.         cont = data['continuationContents']['liveChatContinuation']['continuations'][0]

  171.         if 'timedContinuationData' in cont:

  172.             return cont['timedContinuationData']['continuation']

  173.         else:

  174.             return cont['invalidationContinuationData']['continuation']

  175. ​

  176.     def reconnect(self, delay):

  177.        if self.fetch_job and self.fetch_job.running():

  178.             if not fetch_job.cancel():

  179.                 print("Waiting for fetch job to finish...")

  180.                 self.fetch_job.result()

  181.         print(f"Retrying in {delay}...")

  182.         if self.session: self.session.close()

  183.         self.session = None

  184.         self.config = {}

  185.         self.payload = {}

  186.         self.fetch_job = None

  187.         self.next_fetch_time = 0

  188.         time.sleep(delay)

  189.         self.youtube_connect(self.channel_id, self.stream_url)

  190. ​

  191.     def youtube_connect(self, channel_id, stream_url=None):

  192.         print("Connecting to YouTube...")

  193. ​

  194.         self.channel_id = channel_id

  195.         self.stream_url = stream_url

  196. ​

  197.         # Create http client session

  198.         self.session = requests.Session()

  199.         # Spoof user agent so yt thinks we're an upstanding browser

  200.         self.session.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36'

  201.         # Add consent cookie to bypass google's consent page

  202.         requests.utils.add_dict_to_cookiejar(self.session.cookies, {'CONSENT': 'YES+'})

  203. ​

  204.         # Connect using stream_url if provided, otherwise use the channel_id

  205.         if stream_url is not None:

  206.             live_url = self.stream_url

  207.         else:

  208.             live_url = f"https://youtube.com/channel/{self.channel_id}/live"

  209.  

  210.         res = self.session.get(live_url)

  211.         if res.status_code == 404:

  212.             live_url = f"https://youtube.com/c/{self.channel_id}/live"

  213.             res = self.session.get(live_url)

  214.         if not res.ok:

  215.             if stream_url is not None:

  216.                 print(f"Couldn't load the stream URL ({res.status_code} {res.reason}). Is the stream URL correct? {self.stream_url}")

  217.             else:

  218.                 print(f"Couldn't load livestream page ({res.status_code} {res.reason}). Is the channel ID correct? {self.channel_id}")

  219.             time.sleep(5)

  220.             exit(1)

  221.         livestream_page = res.text

  222. ​

  223.         # Find initial data in livestream page

  224.         matches = list(self.re_initial_data.finditer(livestream_page))

  225.         if len(matches) == 0:

  226.             print("Couldn't find initial data in livestream page")

  227.             time.sleep(5)

  228.             exit(1)

  229.         initial_data = json.loads(matches[0].group(1))

  230. ​

  231.         # Get continuation token for live chat iframe

  232.         iframe_continuation = None

  233.         try:

  234.             iframe_continuation = initial_data['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['header']['liveChatHeaderRenderer']['viewSelector']['sortFilterSubMenuRenderer']['subMenuItems'][1]['continuation']['reloadContinuationData']['continuation']

  235.         except Exception as e:

  236.             print(f"Couldn't find the livestream chat. Is the channel not live? url: {live_url}")

  237.             time.sleep(5)

  238.             exit(1)

  239. ​

  240.         # Fetch live chat page

  241.         res = self.session.get(f'https://youtube.com/live_chat?continuation={iframe_continuation}')

  242.         if not res.ok:

  243.             print(f"Couldn't load live chat page ({res.status_code} {res.reason})")

  244.             time.sleep(5)

  245.             exit(1)

  246.         live_chat_page = res.text

  247. ​

  248.         # Find initial data in live chat page

  249.         matches = list(self.re_initial_data.finditer(live_chat_page))

  250.         if len(matches) == 0:

  251.             print("Couldn't find initial data in live chat page")

  252.             time.sleep(5)

  253.             exit(1)

  254.         initial_data = json.loads(matches[0].group(1))

  255. ​

  256.         # Find config data

  257.         matches = list(self.re_config.finditer(live_chat_page))

  258.         if len(matches) == 0:

  259.             print("Couldn't find config data in live chat page")

  260.             time.sleep(5)

  261.             exit(1)

  262.         self.config = json.loads(matches[0].group(1))

  263. ​

  264.         # Create payload object for making live chat requests

  265.         token = self.get_continuation_token(initial_data)

  266.         self.payload = {

  267.             "context": self.config['INNERTUBE_CONTEXT'],

  268.             "continuation": token,

  269.             "webClientInfo": {

  270.                 "isDocumentHidden": False

  271.             },

  272.         }

  273.         print("Connected.")

  274. ​

  275.     def fetch_messages(self):

  276.         payload_bytes = bytes(json.dumps(self.payload), "utf8")

  277.         res = self.session.post(f"https://www.youtube.com/youtubei/v1/live_chat/get_live_chat?key={self.config['INNERTUBE_API_KEY']}&prettyPrint=false", payload_bytes)

  278.         if not res.ok:

  279.             print(f"Failed to fetch messages. {res.status_code} {res.reason}")

  280.             print("Body:", res.text)

  281.             print("Payload:", payload_bytes)

  282.             self.session.close()

  283.             self.session = None

  284.             return []

  285.         data = json.loads(res.text)

  286.         self.payload['continuation'] = self.get_continuation_token(data)

  287.         cont = data['continuationContents']['liveChatContinuation']

  288.         messages = []

  289.         if 'actions' in cont:

  290.             for action in cont['actions']:

  291.                 if 'addChatItemAction' in action:

  292.                     item = action['addChatItemAction']['item']['liveChatTextMessageRenderer']

  293.                     messages.append({'author': item['authorName']['simpleText'], 'content': item['message']['runs']})

  294.         return messages

  295. ​

  296.     def twitch_receive_messages(self):

  297.         if self.session == None:

  298.             self.reconnect(1)

  299.         messages = []

  300.         if not self.fetch_job:

  301.             time.sleep(1.0/60.0)

  302.             if time.time() > self.next_fetch_time:

  303.                 self.fetch_job = self.thread_pool.submit(self.fetch_messages)

  304.         else:

  305.             res = []

  306.             timed_out = False

  307.             try:

  308.                 res = self.fetch_job.result(1.0/60.0)

  309.             except concurrent.futures.TimeoutError:

  310.                 timed_out = True

  311.             except Exception as e:

  312.                 print(e)

  313.                 self.session.close()

  314.                 self.session = None

  315.                 return

  316.             if not timed_out:

  317.                 self.fetch_job = None

  318.                 self.next_fetch_time = time.time() + YOUTUBE_FETCH_INTERVAL

  319.             for item in res:

  320.                 msg = {

  321.                     'username': item['author'],

  322.                     'message': ''

  323.                 }

  324.                 for part in item['content']:

  325.                     if 'text' in part:

  326.                         msg['message'] += part['text']

  327.                     elif 'emoji' in part:

  328.                         msg['message'] += part['emoji']['emojiId']

  329.                 messages.append(msg)

  330.         return messages

  331. ​

End of Document

bottom of page