Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.13
File renamed without changes.
1 change: 1 addition & 0 deletions command_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

MAX_HISTORY_SIZE = 10


class CommandHistory:
def __init__(self):
self.history = deque(maxlen=MAX_HISTORY_SIZE)
Expand Down
1 change: 0 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# /Users/roman/work/itter/config.py
import os
import sys
from dotenv import load_dotenv
Expand Down
17 changes: 8 additions & 9 deletions database.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# /Users/roman/work/itter/database.py
import asyncio
from typing import Optional, Dict, Any, List
from supabase import Client
Expand Down Expand Up @@ -47,6 +46,7 @@ async def db_get_user_by_id(user_id: str) -> Optional[Dict[str, Any]]:
debug_log(f"[DB ERROR] get_user_by_id: {e}")
return None


async def db_username_exists_case_insensitive(username: str) -> Optional[str]:
if not supabase_client:
raise RuntimeError("Database not initialized")
Expand Down Expand Up @@ -84,7 +84,10 @@ async def db_create_user(username: str, public_key: str) -> None:


async def db_update_profile(
username: str, new_display_name: Optional[str], new_email: Optional[str], reset: Optional[bool] = False
username: str,
new_display_name: Optional[str],
new_email: Optional[str],
reset: Optional[bool] = False,
) -> None:
if not supabase_client:
raise RuntimeError("Database not initialized")
Expand All @@ -94,7 +97,7 @@ async def db_update_profile(
user = await db_get_user_by_username(username)
if not user:
raise ValueError("User not found for profile update.")

update_data = {}
if new_display_name is not None:
update_data["display_name"] = new_display_name
Expand Down Expand Up @@ -594,9 +597,7 @@ async def db_post_eet(

try:
await asyncio.to_thread(
supabase_client.table("posts")
.insert(post_data)
.execute
supabase_client.table("posts").insert(post_data).execute
)
except Exception as e:
debug_log(f"[DB ERROR] db_post_eet: {e}")
Expand Down Expand Up @@ -644,9 +645,7 @@ async def db_get_filtered_timeline_posts(
)
return []
rpc_name = "get_channel_timeline"
rpc_params["p_channel_tag"] = (
filter_value.lower()
)
rpc_params["p_channel_tag"] = filter_value.lower()
elif filter_type == "user":
if not filter_value or not isinstance(filter_value, str):
debug_log(
Expand Down
1 change: 0 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# /Users/roman/work/itter/main.py
import asyncio
import sys
import traceback
Expand Down
14 changes: 14 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[project]
name = "itter"
version = "0.1.0"
description = "Micro-Blogging via Terminal"
readme = "README.md"
requires-python = ">=3.9"
dependencies = [
"asyncssh>=2.21.0",
"python-dotenv>=1.1.0",
"supabase>=2.15.2",
"textwrap ; python_full_version < '3.9'",
"typer>=0.15.4",
"wcwidth>=0.2.13",
]
7 changes: 0 additions & 7 deletions requirements.txt

This file was deleted.

655 changes: 436 additions & 219 deletions ssh_server.py

Large diffs are not rendered by default.

126 changes: 81 additions & 45 deletions utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# /Users/roman/work/itter/utils.py
import re
import hashlib
from datetime import datetime, timezone
Expand All @@ -12,7 +11,7 @@
# Styles
BOLD = "\033[1m"
DIM = "\033[2m"
ITALIC = "\033[3m" # Might not work everywhere
ITALIC = "\033[3m" # Might not work everywhere
UNDERLINE = "\033[4m"
BLINK = "\033[5m"
# Colors (Foreground)
Expand All @@ -24,7 +23,7 @@
FG_MAGENTA = "\033[35m"
FG_CYAN = "\033[36m"
FG_WHITE = "\033[37m"
FG_BRIGHT_BLACK = "\033[90m" # Often used for Grey/Dim
FG_BRIGHT_BLACK = "\033[90m" # Often used for Grey/Dim
FG_BRIGHT_RED = "\033[91m"
FG_BRIGHT_GREEN = "\033[92m"
FG_BRIGHT_YELLOW = "\033[93m"
Expand All @@ -34,7 +33,7 @@
FG_BRIGHT_WHITE = "\033[97m"

# Regex to strip ANSI escape codes
ANSI_ESCAPE_RE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
ANSI_ESCAPE_RE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")


# --- Logging ---
Expand All @@ -43,62 +42,93 @@ def debug_log(msg: str) -> None:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(f"[{timestamp} DEBUG] {msg}")


# --- Time Formatting ---
def time_ago(iso_str: Optional[str]) -> str:
# ... (keep existing time_ago function) ...
if not iso_str: return "some time ago"
if not iso_str:
return "some time ago"
parsed_dt = None
if isinstance(iso_str, datetime): parsed_dt = iso_str
if isinstance(iso_str, datetime):
parsed_dt = iso_str
else:
try:
iso_str_cleaned = iso_str.split(".")[0].replace("Z", "+00:00")
if "+" not in iso_str_cleaned: iso_str_cleaned += "+00:00"
if "+" not in iso_str_cleaned:
iso_str_cleaned += "+00:00"
parsed_dt = datetime.fromisoformat(iso_str_cleaned)
except ValueError: debug_log(f"time_ago parse error for: {iso_str}"); return "a while ago"
if not parsed_dt.tzinfo: parsed_dt = parsed_dt.replace(tzinfo=timezone.utc)
diff = datetime.now(timezone.utc) - parsed_dt; seconds = int(diff.total_seconds())
if seconds < 10: return "just now";
if seconds < 60: return f"{seconds}s ago";
minutes = seconds // 60;
if minutes < 60: return f"{minutes}m ago";
hours = minutes // 60;
if hours < 24: return f"{hours}h ago";
days = hours // 24;
if days < 7: return f"{days}d ago";
weeks = days // 7;
if weeks < 5: return f"{weeks}w ago";
months = days // 30;
if months < 12: return f"{months}mo ago";
years = days // 365; return f"{years}y ago"
except ValueError:
debug_log(f"time_ago parse error for: {iso_str}")
return "a while ago"
if not parsed_dt.tzinfo:
parsed_dt = parsed_dt.replace(tzinfo=timezone.utc)
diff = datetime.now(timezone.utc) - parsed_dt
seconds = int(diff.total_seconds())
if seconds < 10:
return "just now"
if seconds < 60:
return f"{seconds}s ago"
minutes = seconds // 60
if minutes < 60:
return f"{minutes}m ago"
hours = minutes // 60
if hours < 24:
return f"{hours}h ago"
days = hours // 24
if days < 7:
return f"{days}d ago"
weeks = days // 7
if weeks < 5:
return f"{weeks}w ago"
months = days // 30
if months < 12:
return f"{months}mo ago"
years = days // 365
return f"{years}y ago"


# --- Input Parsing ---
CMD_SPLIT_RE = re.compile(r"^\s*(\S+)(?:\s+(.*))?$")
HASHTAG_RE = re.compile(r"(?<!\w)#(\w(?:[\w-]*\w)?)")
USER_RE = re.compile(r"(?<!\w)@(\w{3,20})")


def parse_input_line(line: str) -> Tuple[Optional[str], str, List[str], List[str]]:
# ... (keep existing parse_input_line function) ...
m = CMD_SPLIT_RE.match(line.strip());
if not m: return None, "", [], [];
cmd = m.group(1).lower(); raw_text = m.group(2) or "";
hashtags = list(set(HASHTAG_RE.findall(raw_text.lower()))); user_refs = list(set(USER_RE.findall(raw_text)));
m = CMD_SPLIT_RE.match(line.strip())
if not m:
return None, "", [], []
cmd = m.group(1).lower()
raw_text = m.group(2) or ""
hashtags = list(set(HASHTAG_RE.findall(raw_text.lower())))
user_refs = list(set(USER_RE.findall(raw_text)))
return cmd, raw_text, hashtags, user_refs


def parse_target_filter(raw_text: str) -> Dict[str, Optional[str]]:
# ... (keep existing parse_target_filter function from v13) ...
text = raw_text.strip().lower();
if not text or text == "all": return {"type": "all", "value": None};
if text == "mine": return {"type": "mine", "value": None};
text = raw_text.strip().lower()
if not text or text == "all":
return {"type": "all", "value": None}
if text == "mine":
return {"type": "mine", "value": None}
if text.startswith("#"):
channel_name = text[1:];
if re.match(r"^[a-zA-Z0-9][a-zA-Z0-9-]*$", channel_name): return {"type": "channel", "value": channel_name};
else: debug_log(f"Invalid channel format '{text}', defaulting to 'all'"); return {"type": "all", "value": None};
debug_log(f"Unrecognized filter '{text}', defaulting to 'all'"); return {"type": "all", "value": None}
channel_name = text[1:]
if re.match(r"^[a-zA-Z0-9][a-zA-Z0-9-]*$", channel_name):
return {"type": "channel", "value": channel_name}
else:
debug_log(f"Invalid channel format '{text}', defaulting to 'all'")
return {"type": "all", "value": None}
debug_log(f"Unrecognized filter '{text}', defaulting to 'all'")
return {"type": "all", "value": None}


# --- Eet Content Formatting ---
def format_eet_content(content: str, current_username: Optional[str] = None, current_user_color: str = FG_BRIGHT_YELLOW) -> str:
def format_eet_content(
content: str,
current_username: Optional[str] = None,
current_user_color: str = FG_BRIGHT_YELLOW,
) -> str:
"""Applies ANSI color codes to all hashtags and mentions in eet content.
Highlights the current user's mentions in a specific color."""

Expand All @@ -121,7 +151,7 @@ def replacer(match_obj: re.Match) -> str:
mention_text = match_obj.group(4) # The username
if current_username and mention_text.lower() == current_username.lower():
return f"{current_user_color}{mention_char}{mention_text}{RESET}"
return f"{FG_CYAN}{mention_char}{mention_text}{RESET}" # Other users
return f"{FG_CYAN}{mention_char}{mention_text}{RESET}" # Other users
# This fallback should ideally not be reached if the pattern is correct
# and only matches what we intend for hashtags or usernames.
return match_obj.group(0)
Expand Down Expand Up @@ -153,37 +183,43 @@ def hash_ip(ip_address: str) -> Optional[str]:
# --- String Utils ---
def strip_ansi(text: str) -> str:
"""Removes ANSI escape codes from a string."""
return ANSI_ESCAPE_RE.sub('', text)
return ANSI_ESCAPE_RE.sub("", text)

def truncate_str_with_wcwidth(text: str, max_visual_width: int, placeholder: str = "...") -> str:

def truncate_str_with_wcwidth(
text: str, max_visual_width: int, placeholder: str = "..."
) -> str:
"""Truncates a string to a maximum visual width, accounting for wide characters."""
if not text:
return ""

text_visual_width = wcswidth(text)
if text_visual_width <= max_visual_width:
return text

placeholder_visual_width = wcswidth(placeholder)

if max_visual_width < placeholder_visual_width:
# Not enough space for placeholder, just truncate as much as possible
current_width = 0
for i, char in enumerate(text):
char_width = get_char_width(char)
if char_width == -1: char_width = 1 # Treat error as width 1
if char_width == -1:
char_width = 1 # Treat error as width 1
if current_width + char_width > max_visual_width:
return text[:i]
current_width += char_width
return text # Should not happen if text_visual_width > max_visual_width
return text # Should not happen if text_visual_width > max_visual_width

target_text_width = max_visual_width - placeholder_visual_width
current_width = 0
end_idx = 0
for i, char in enumerate(text):
char_width = get_char_width(char)
if char_width == -1: char_width = 1 # Treat error as width 1
if current_width + char_width > target_text_width: break
if char_width == -1:
char_width = 1 # Treat error as width 1
if current_width + char_width > target_text_width:
break
current_width += char_width
end_idx = i + 1
return text[:end_idx] + placeholder
return text[:end_idx] + placeholder
Loading