Compare commits

..

No commits in common. "main" and "v2.0.0" have entirely different histories.
main ... v2.0.0

17 changed files with 90 additions and 264 deletions

View File

@ -1,25 +0,0 @@
name: Run tests
run-name: ${{ gitea.actor }} is testing out test
on: [push]
jobs:
Run-Test:
runs-on: ubuntu-act-latest
steps:
- run: echo "🎉 The job was automatically triggered by a ${{ gitea.event_name }} event."
- run: echo "🐧 This job is now running on a ${{ runner.os }} server hosted by Gitea!"
- run: echo "🔎 The name of your branch is ${{ gitea.ref }} and your repository is ${{ gitea.repository }}."
- name: Check out repository code
uses: actions/checkout@v4
- run: echo "💡 The ${{ gitea.repository }} repository has been cloned to the runner."
- run: echo "🖥️ The workflow is now ready to test your code on the runner."
- name: List files in the repository
run: |
ls ${{ gitea.workspace }}
- run: python3 -m venv venv
- run: . venv/bin/activate
- run: pip install -r requirements.txt
- run: pip install time_machine
- run: cp config.py.example config.py
- run: python test.py
- run: echo "🍏 This job's status is ${{ job.status }}."

View File

@ -1,5 +0,0 @@
# Avalaible mirrors
https://inex.dev/localhost_frssoft/FMN_bot.git
https://git.macaw.me/localhost_frssoft/FMN_bot.git
https://code.criminallycute.fi/localhost_frssoft/FMN_bot.git
https://git.poridge.club/localhost_frssoft/FMN_bot.git

View File

@ -4,10 +4,8 @@
Special for [XXIVProduction](https://xxivproduction.video/) and [drq@mastodon.ml](https://mastodon.ml/@drq).
## Первичная инициализация
* Установка зависимостей python3 и виртуального окружения
* Установка зависимостей python3
```
python3 -m venv venv
. venv/bin/activate
pip install -r requirements.txt
```
@ -46,16 +44,15 @@ python3 fmn_bot.py
```
## Использование
Просто упомяните бота (упоминающий должен быть прописан в администраторах бота см. config.py), в воскресенье/понедельник, когда Fediverse Movie Night окончен (В новой версии, теперь встроена функция автоматического срабатывания в полночь, если упоминания не было). Бот инициализирует сборщика предложений и будет собирать фильмы, которые будут поступать в виде ссылок на imdb.com (также теперь есть поддержка фронтедов [libremdb](https://github.com/zyachel/libremdb)) и kinopoisk.ru в том треде, где его упомянули. Сбор будет завершен во вторник 16:00 (по-умолчанию) MSK+3. После этого всех кто не успел предложить фильм бот будет уведомлять, что сбор завершен и даст ссылку на голосовалку. По завершению голосовалки (суббота 16:00), будет вычисляться фильм-победитель на FMN, если у нескольких фильмов одинаковые голоса, то будет создан tie breaker. Победивший фильм будет записан сразу как "просмотренный", чтобы не добавлять его на следующий FMN повторно. Далее бот будет ждать очередного упоминания.
Просто упомяните бота (упоминающий должен быть прописан в администраторах бота см. config.py), в воскресенье/понедельник, когда Fediverse Movie Night окончен. Бот инициализирует сборщика предложений и будет собирать фильмы, которые будут поступать в виде ссылок на imdb.com (также теперь есть поддержка фронтедов [libremdb](https://github.com/zyachel/libremdb)) и kinopoisk.ru в том треде, где его упомянули. Сбор будет завершен во вторник 16:00 (по-умолчанию) MSK+3. После этого всех кто не успел предложить фильм бот будет уведомлять, что сбор завершен и даст ссылку на голосовалку. По завершению голосовалки (суббота 16:00), будет вычисляться фильм-победитель на FMN, если у нескольких фильмов одинаковые голоса, то будет создан tie breaker. Победивший фильм будет записан сразу как "просмотренный", чтобы не добавлять его на следующий FMN повторно. Далее бот будет ждать очередного упоминания.
Note: Рекомендуется использовать ссылки на imdb.com, так как локальная база IMDB надёжнее, чем сетевой сторонний API Кинопоиска.
Note2: Список доступных для приёма инстансов libremdb обновляется вручную и может не соотвествовать официальному.
## Список поддерживаемых инстансов libremdb:
* https://libremdb.leemoon.network/ (Спасибо [Саре](https://lamp.leemoon.network/@sarahquartz) в рамках self-host проекта [Leemoon Network 🍋](https://leemoon.network))
* https://libremdb.herokuapp.com/
* https://libremdb.pussthecat.org/
* https://libremdbeu.herokuapp.com/
* https://libremdb.herokuapp.com
* https://libremdb.pussthecat.org
* https://libremdbeu.herokuapp.com
* https://lmdb.tokhmi.xyz/
* https://libremdb.esmailelbob.xyz/
* http://libremdb.lqs5fjmajyp7rvp4qvyubwofzi6d4imua7vs237rkc4m5qogitqwrgyd.onion/

View File

@ -2,9 +2,6 @@ admins_bot = ('drq@mastodon.ml',) # Адреса админов бота, кот
# Example: ('admin_user', 'another_admin_user2@example.example') or ('admin_user',)
bot_acct = 'fmn' # Ник бота на инстансе
instance = 'pleroma.dark-alexandr.net' # Инстанс, где будет запущен бот
# Ссылка на live stream FMN (API)
peertube_stream_url = 'https://xxivproduction.video/api/v1/videos/1FZeVVVzWBFShaxQVkYiXd'
# Лимиты
limit_movies_per_user = 2 # Ограничение количества фильмов на одного пользователя

View File

@ -1,10 +1,11 @@
from src import listener_context, listener_mention
from src import listener_context, listener_mention, imdb_datasets_worker
from config import logger_default_level
from loguru import logger
import time
import sys
def main():
logger.remove()
logger.add(sink=sys.stderr, level=logger_default_level)
@ -15,6 +16,5 @@ def main():
listener_context.scan_context_thread() # Слушаем тред на новые предложения фильмов
if __name__ == '__main__':
main()

BIN
src/FMN.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 788 KiB

View File

@ -1 +0,0 @@
FMN_prev.webp

Binary file not shown.

Before

Width:  |  Height:  |  Size: 154 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 98 KiB

View File

@ -1,5 +1,6 @@
from config import instance
import time
import json
import requests
from loguru import logger
@ -34,6 +35,7 @@ def get_notifications():
logger.info('Retrying get notificatios...')
def mark_as_read_notification(id_notification):
success = 0
while success == 0:
@ -49,21 +51,17 @@ def mark_as_read_notification(id_notification):
def get_status_context(status_id):
retry = 0
success = 0
while success == 0:
try:
r = s.get(instance_point + f"/statuses/{status_id}/context", timeout=30)
r = s.get(instance_point + f"/statuses/{status_id}/context")
r.raise_for_status()
success = 1
return r.json()
except Exception as E:
except:
logger.exception(f'Ошибка получения контекста треда {status_id}')
time.sleep(30)
logger.info('Повторный запрос треда...')
retry += 1
if retry > 5:
raise IOError(f'Фетчинг треда поломан! {E}')
def get_status(status_id):
@ -80,7 +78,8 @@ def get_status(status_id):
logger.info(f'Retrying get status {status_id}')
def post_status(text, reply_to_status_id=None, poll_options=None, poll_expires=345600, attachments=None, visibility='unlisted'):
def post_status(text, reply_to_status_id=None, poll_options=None, poll_expires=345600, attachments=None):
poll = None
if poll_options is not None:
poll = {
@ -91,7 +90,7 @@ def post_status(text, reply_to_status_id=None, poll_options=None, poll_expires=3
params = {
"status": text,
"in_reply_to_id": reply_to_status_id,
"visibility": visibility,
"visibility": "unlisted",
"content_type": "text/plain",
"language": "ru",
"poll": poll
@ -117,9 +116,17 @@ def upload_attachment(file_path):
params = {
"description": "Fediverse Movie Night\nВоскресенье, 21:00\nLIVE ON XXIV Production",
}
r = s.post(instance_point + "/media", params, files=file, timeout=30)
success = 0
while success == 0:
try:
r = s.post(instance_point + "/media", params, files=file)
r.raise_for_status()
success = 1
return r.json()['id']
except:
logger.exception(f'Error uploading {file_path} attachment')
time.sleep(5)
logger.info(f'Retrying upload {file_path}...')
def mute_user(acct_id=str, acct=str, duration=None):
@ -137,3 +144,4 @@ def mute_user(acct_id=str, acct=str, duration=None):
logger.exception(f'Ошибка глушения {acct}')
time.sleep(5)
logger.info(f'Повторное глушение {acct}...')

View File

@ -113,3 +113,4 @@ def reset_poll():
'''Сброс содержимого предложки-опроса'''
c.execute("DELETE FROM poll")
conn.commit()

View File

@ -1,5 +1,5 @@
from src.fedi_api import get_status, post_status, upload_attachment
from src.fmn_states_db import states_stor, write_states
from src.fmn_states_db import read_states, write_states
from src.fmn_database import get_movies_for_poll, write_votes, read_votes, mark_as_watched_movie, get_already_watched, rewrite_db, reset_poll, get_count_all_watched_movies, force_commit
from collections import Counter
from loguru import logger
@ -16,9 +16,7 @@ def text_create_poll():
return text_poll
@logger.catch
def create_poll_movies(text=text_create_poll(), poll_expires=345600):
logger.debug('Creating poll')
formated_poll_options = []
raw_poll = get_movies_for_poll()
for i in raw_poll:
@ -27,7 +25,6 @@ def create_poll_movies(text=text_create_poll(), poll_expires=345600):
ru_name = i[2]
year = i[3]
poll_option_string = f"{ru_name} / {orig_name}, {year} ({acct})"
logger.debug(f"Adding option in poll: {poll_option_string}")
if ru_name is None:
poll_option_string = f"{orig_name}, {year} ({acct})"
if orig_name is None:
@ -36,25 +33,19 @@ def create_poll_movies(text=text_create_poll(), poll_expires=345600):
poll_option_string = poll_option_string[0:199] # Обрезка на 200 символов.
formated_poll_options.append(poll_option_string)
attaches = []
try:
attaches = [upload_attachment('src/FMN.webp')]
except Exception as E:
logger.error(f"attachements can't do upload: {E}")
poll_status_id = post_status(text, None, formated_poll_options,
poll_expires=poll_expires, attachments=attaches)
poll_expires=poll_expires, attachments=[upload_attachment('src/FMN.png')])
logger.success('Голосовалка создана')
states_stor.states['poll_expires_at'] = int(time.time()) + poll_expires
states_stor.states['poll_status_id'] = poll_status_id['id']
write_states(states_stor.states)
states = read_states()
states['poll_expires_at'] = int(time.time()) + poll_expires
states['poll_status_id'] = poll_status_id['id']
write_states(states)
return poll_status_id
@logger.catch
def get_winner_movie(poll_status_id=str):
'''Отмечаем победивший фильм на голосовании как просмотренный или постим tie breaker'''
states = states_stor.states
states = read_states()
votes_counters = []
status_with_poll = get_status(poll_status_id)
poll = status_with_poll['poll']
@ -92,19 +83,19 @@ def get_winner_movie(poll_status_id=str):
text_winned = f"{expired_poll_count} голосование завершилось! Победил вариант предложенный @{acct_suggested}:\n{win_variant}"
logger.success("Победил " + str(movie))
post_status(text_winned, attachments=[upload_attachment('src/FMN.webp')])
states_stor.states = {}
post_status(text_winned, attachments=[upload_attachment('src/FMN.png')])
write_states()
reset_poll()
@logger.catch
def create_tie_breaker(count_tie=1):
'''Создание tie breaker'''
if count_tie == 1:
states_stor.states['tie_breaker'] = 1
write_states(states_stor.states)
states = read_states()
states['tie_breaker'] = 1
write_states(states)
poll_expires = 8*60*60
else:
poll_expires = 4*60*60
tie_poll = create_poll_movies("TIE BREAKER!!!\n\nВыбираем из победителей!", poll_expires)

View File

@ -3,11 +3,6 @@ from loguru import logger
states_file = 'fmn_states.json'
class states_stor:
states = None
@logger.catch
def read_states():
try:
@ -21,13 +16,10 @@ def read_states():
@logger.catch
def write_states(new_states={}):
def write_states(states={}):
with open(states_file, 'wt') as f:
f.write(json.dumps(new_states, indent=4))
if new_states == {}:
f.write(json.dumps(states, indent=4))
if states == {}:
logger.info('states empty wrote')
return new_states
return states
if not states_stor.states:
states_stor.states = read_states()

View File

@ -3,10 +3,13 @@ from src.fedi_api import get_status_context, get_status, post_status, mute_user
from src.kinopoisk_api import get_kinopoisk_movie_to_imdb
from src.imdb_datasets_worker import get_title_by_id
from src.fmn_database import add_movie_to_poll, get_already_watched, get_suggested_movies_count
from src.fmn_states_db import states_stor
from src.fmn_states_db import read_states, write_states
from src.fmn_poll import create_poll_movies, get_winner_movie
import re
import time
from datetime import datetime
from dateutil.parser import parse as dateutilparse
from dateutil.relativedelta import relativedelta, TU
from collections import Counter
from loguru import logger
@ -20,7 +23,7 @@ def parse_links(text=str):
def parse_links_imdb(text=str):
regex = r"imdb\.com/|libremdb\.leemoon\.network/|libremdb\.pussthecat\.org/|libremdb\.esmailelbob\.xyz/|libremdb\.herokuapp\.com/|libremdbeu\.herokuapp\.com/|lmdb\.tokhmi\.xyz/|libremdb\.lqs5fjmajyp7rvp4qvyubwofzi6d4imua7vs237rkc4m5qogitqwrgyd\.onion/"
regex = r"imdb\.com/|libremdb\.pussthecat\.org/|libremdb\.esmailelbob\.xyz/|libremdb\.herokuapp\.com/|libremdbeu\.herokuapp\.com/|lmdb\.tokhmi\.xyz/|libremdb\.lqs5fjmajyp7rvp4qvyubwofzi6d4imua7vs237rkc4m5qogitqwrgyd\.onion/"
if re.search(regex, text.lower(), flags=re.MULTILINE):
imdb_ids = re.findall(r"tt(\d{1,})", text.lower())
if imdb_ids != []:
@ -30,14 +33,13 @@ def parse_links_imdb(text=str):
def scan_context_thread():
fail_limit = Counter()
while True:
states = states_stor.states
states = read_states()
status_id = states.get('last_thread_id')
poll_created = states.get('poll_status_id')
stop_thread_scan = states.get('stop_thread_scan')
time_now = int(time.time())
reserve_time = False
while status_id is None or stop_thread_scan is None:
states = states_stor.states
fail_limit = Counter()
status_id = states.get('last_thread_id')
stop_thread_scan = states.get('stop_thread_scan')
@ -48,7 +50,7 @@ def scan_context_thread():
logger.debug('Сбор завершён, сканирование треда на опоздавших')
if poll_created is None:
create_poll_movies()
poll_created = states_stor.states.get('poll_status_id')
poll_created = states.get('poll_status_id')
else:
if time_now >= int(states.get('poll_expires_at')):
get_winner_movie(poll_created)
@ -69,10 +71,6 @@ def scan_context_thread():
for status in descendants:
id_st = status['id']
visibility = status['visibility']
if visibility == 'direct':
# Игнорируем личку
continue
in_reply_acct = status['in_reply_to_account_id']
in_reply_id = status['in_reply_to_id']
muted = status['muted']
@ -105,29 +103,18 @@ def scan_context_thread():
index_name = 2
index_ru_name = 3
index_year = 4
message_writer = []
success = False
if parsed_result and parsed_result_imdb:
post_status('Не смешивайте IMDB и кинопоиск в одном посте, пожалуйста.', id_st)
fail_limit[acct] += 1
continue
if parsed_result is not None:
print(parsed_result)
suggested_movies = get_kinopoisk_movie_to_imdb(parsed_result)
message_writer.append('⚠️ внимание при использовании Кинопоиска стабильность может быть понижена')
if suggested_movies is None:
post_status('Не удалось выполнить запрос: возможно некорректный тип фильма, попробуйте использовать https://libremdb.leemoon.network', id_st)
post_status('Не удалось выполнить запрос: возможно некорректный тип фильма, попробуйте использовать imdb.com', id_st)
fail_limit[acct] += 1
continue
elif parsed_result_imdb is not None:
suggested_movies = get_title_by_id(parsed_result_imdb)
if suggested_movies is None:
post_status('❌ Фильм(ы) не найден в базе данных IMDB, пожалуйста обратитесь к администратору, чтобы обновить базу. Примечание: IMDB выкладывает новые изменения не сразу.', id_st)
fail_limit[acct] += 1
continue
message_writer = []
success = False
for movie in suggested_movies:
logger.debug(str(movie))
if movie[index_type] == "404":
@ -159,7 +146,7 @@ def scan_context_thread():
fail_limit[acct] += 1
break
if get_already_watched(name, name_ru, year) is True:
if get_already_watched(name, name_ru, year) == True:
message_writer.append(f" Этот фильм уже был на FMN: {movie_string}")
logger.info(f'Попытка предложить уже просмотренный фильм: {acct} {name} {name_ru} {year}')
fail_limit[acct] += 1
@ -185,3 +172,5 @@ def scan_context_thread():
post_status('\n'.join(message_writer) + message, id_st)
time.sleep(30)

View File

@ -1,37 +1,24 @@
from src.fedi_api import get_notifications, mark_as_read_notification, post_status, upload_attachment
from src.fmn_states_db import write_states, states_stor
from src.sheduler import check_stop_thread_scan
from config import admins_bot, limit_movies_per_user, limit_all_movies_poll, hour_poll_posting, fmn_next_watching_hour, peertube_stream_url
from src.fmn_states_db import write_states, read_states
from config import admins_bot, limit_movies_per_user, limit_all_movies_poll, hour_poll_posting, fmn_next_watching_hour
import threading
import time
import requests
import threading, time
from datetime import datetime
from dateutil.parser import parse as dateutilparse
from dateutil.relativedelta import relativedelta, TU, SU
from loguru import logger
@logger.catch
def get_peertube_stream_name():
try:
return requests.get(peertube_stream_url).json()['name']
except Exception as E:
return f"[не удалось получить название {E}]"
@logger.catch
def get_control_mention():
while True:
states = states_stor.states
states = read_states()
time.sleep(30)
time_now = datetime.now()
now_week = time_now.weekday()
now_hour = time_now.hour
if now_week not in (0, 6):
continue
if now_week == 6 and now_hour < fmn_next_watching_hour:
# Предотвращение работы в холстую до начала сеанса
if now_week == 6 and now_hour < fmn_next_watching_hour: # Предотвращение работы в холстую до начала сеанса
continue
post_exists = states.get('last_thread_id')
if post_exists:
@ -39,90 +26,39 @@ def get_control_mention():
logger.debug('Wait for from admin mention...')
notif = get_notifications()
for i in notif:
if i['type'] not in ("mention"):
if i['type'] != "mention":
continue
seen = i['pleroma']['is_seen']
acct_mention = i['account']['acct']
reply_to_id = i['status']['in_reply_to_id']
if acct_mention in admins_bot and seen is False and reply_to_id is None and now_week in (0, 6):
if acct_mention in admins_bot and seen == False and reply_to_id == None and now_week in (0, 6):
logger.success(f'Найдено упоминание от {acct_mention}')
st_id = i['status']['id']
st_date = i['status']['created_at']
thread_created_at = dateutilparse(st_date)
delta = relativedelta(
hour=hour_poll_posting, minute=0, second=0, weekday=TU(1))
delta = relativedelta(hour=hour_poll_posting, minute=0, second=0, weekday=TU(1))
stop_thread_scan = thread_created_at + delta
# if check_stop_thread_scan(stop_thread_scan) == stop_thread_scan: # broken because pytz
# logger.success("проверка на корректность даты пройдена")
# else:
# logger.warning("проверка на корректность даты не пройдена")
# stop_thread_scan = check_stop_thread_scan(stop_thread_scan)
movies_accept_time = stop_thread_scan.strftime(
'%H:%M %d.%m.%Y по Москве')
stop_thread_scan = time.mktime(
time.struct_time(stop_thread_scan.timetuple()))
movies_accept_time = stop_thread_scan.strftime('%H:%M %d.%m.%Y по Москве')
stop_thread_scan = time.mktime(time.struct_time(stop_thread_scan.timetuple()))
if now_week == 6: # Фикс стыков двух недель. Если вс, то расчитываем на следующую неделю
next_week = 2
else:
next_week = 1
next_movie_watching_delta = relativedelta(
hour=fmn_next_watching_hour, minute=0, second=0, weekday=SU(next_week))
next_movie_watching_delta = relativedelta(hour=fmn_next_watching_hour, minute=0, second=0, weekday=SU(next_week))
next_movie_watching = time_now + next_movie_watching_delta
# Глушение до следующего сеанса FMN.
max_mute_time = time.mktime(
time.struct_time(next_movie_watching.timetuple()))
max_mute_time = time.mktime(time.struct_time(next_movie_watching.timetuple())) # Глушение до следующего сеанса FMN.
next_movie_watching = next_movie_watching.strftime('%d.%m.%Y')
post_status(start_collect_movies_text(movies_accept_time, next_movie_watching),
st_id, attachments=[upload_attachment('src/FMN.webp')])
post_status(start_collect_movies_text(movies_accept_time, next_movie_watching), st_id, attachments=[upload_attachment('src/FMN.png')])
time.sleep(0.2)
mark_as_read_notification(i['id'])
states_stor.states['max_mute_time'] = int(max_mute_time)
states_stor.states['stop_thread_scan'] = int(stop_thread_scan)
states_stor.states['last_thread_id'] = st_id
write_states(states_stor.states)
break
if now_hour == 0:
logger.warning('Автоматический триггер в полночи сработал')
thread_created_at = time_now
delta = relativedelta(
hour=hour_poll_posting, minute=0, second=0, weekday=TU(1))
stop_thread_scan = thread_created_at + delta
if check_stop_thread_scan(stop_thread_scan) == stop_thread_scan:
logger.success("проверка на корректность даты пройдена")
else:
logger.warning("проверка на корректность даты не пройдена")
# stop_thread_scan = check_stop_thread_scan(stop_thread_scan)
movies_accept_time = stop_thread_scan.strftime(
'%H:%M %d.%m.%Y по Москве')
stop_thread_scan = time.mktime(
time.struct_time(stop_thread_scan.timetuple()))
if now_week == 6: # Фикс стыков двух недель. Если вс, то расчитываем на следующую неделю
next_week = 2
else:
next_week = 1
next_movie_watching_delta = relativedelta(
hour=fmn_next_watching_hour, minute=0, second=0, weekday=SU(next_week))
next_movie_watching = time_now + next_movie_watching_delta
# Глушение до следующего сеанса FMN.
max_mute_time = time.mktime(
time.struct_time(next_movie_watching.timetuple()))
next_movie_watching = next_movie_watching.strftime('%d.%m.%Y')
watched_movie_name = get_peertube_stream_name()
st_id = post_status("Спасибо что посмотрели " + watched_movie_name + "\n\n" + start_collect_movies_text(movies_accept_time, next_movie_watching) +
'\n\n@rf@mastodon.ml', reply_to_status_id=None, attachments=[upload_attachment('src/FMN.webp')], visibility="public")['id']
time.sleep(0.2)
states_stor.states['max_mute_time'] = int(max_mute_time)
states_stor.states['stop_thread_scan'] = int(stop_thread_scan)
states_stor.states['last_thread_id'] = st_id
write_states(states_stor.states)
states['max_mute_time'] = int(max_mute_time)
states['stop_thread_scan'] = int(stop_thread_scan)
states['last_thread_id'] = st_id
write_states(states)
break
time.sleep(30)
@ -133,20 +69,19 @@ def start_collect_movies_text(movies_accept_time=str, next_movie_watching=str):
Напоминаем правила:
- Мы принимаем на просмотр полнометражные художественные фильмы;
- Прием варианта осуществляется путем публикации ссылки на этот фильм на IMDB https://libremdb.leemoon.network/ или Кинопоиске в этом треде;
- Прием варианта осуществляется путем публикации ссылки на этот фильм на IMDB (libremdb) или Кинопоиске в этом треде;
- Нам не подходят: сериалы, короткометражные и документальные фильмы;
- Максимальное количество вариантов, предложенных одним человеком не должно превышать {limit_movies_per_user};
- Всего может быть собрано до {limit_all_movies_poll} фильмов;
- Заявки принимаются до крайнего срока, после чего будет объявлено голосование по собранным вариантам.
Крайний срок подачи заявки - {movies_accept_time}.
Рекомендуем посетить список, чтобы случайно не предложить уже просмотренный фильм: https://pub.phreedom.club/~localhost/fmn_watched.gmi
Желаем удачи.
'''.replace('\t', '')
return text
def run_scan_notif():
scan_notif = threading.Thread(target=get_control_mention, daemon=True)
scan_notif.start()

View File

@ -1,23 +0,0 @@
import calendar
import datetime
from config import hour_poll_posting
def check_stop_thread_scan(suggested_date):
suggested_month = suggested_date.month
suggested_year = suggested_date.year
tuesdays = []
for i in calendar.Calendar().itermonthdays4(suggested_year, suggested_month):
if i[3] == 1 and datetime.datetime(year=i[0], month=i[1], day=i[2]) > suggested_date:
tuesdays.append(datetime.datetime(
year=i[0], month=i[1], day=i[2], hour=hour_poll_posting))
if tuesdays == []:
shift_for_next_week = suggested_date + datetime.timedelta(days=1)
suggested_month = shift_for_next_week.month
suggested_year = shift_for_next_week.year
for i in calendar.Calendar().itermonthdays4(suggested_year, suggested_month):
if i[3] == 1 and datetime.datetime(year=i[0], month=i[1], day=i[2]) > suggested_date:
tuesdays.append(datetime.datetime(
year=i[0], month=i[1], day=i[2], hour=hour_poll_posting))
return tuesdays[0] # near tuesday

30
test.py
View File

@ -1,30 +0,0 @@
import unittest
import datetime
import time_machine
from src import sheduler
class TestDate(unittest.TestCase):
def test_date_tue(self):
"""
test near tue
тестирует корректность расчета даты на след. вторник
условия: расчет начат от воскресенья или понедельника (случайно)
"""
import calendar
import random
for month in range(1, 13):
sundays = [i for i in calendar.Calendar().itermonthdays4(year=2024, month=month) if i[3] == 6]
for sunday in sundays:
fake_date = datetime.datetime(year=sunday[0], month=sunday[1], day=sunday[2], hour=random.randint(21, 23)) + datetime.timedelta(days=random.randint(0, 1))
with time_machine.travel(fake_date):
try:
result = sheduler.check_stop_thread_scan(fake_date)
print("current fake date: " + fake_date.strftime("%Y.%m.%d %H:%M") + f" near tue: {result}")
except Exception as E:
print("fail fake date: " + fake_date.strftime("%Y.%m.%d %H:%M") + f", err: {E}")
if __name__ == '__main__':
unittest.main()