Tracktor Swiss-knife Utility library.
uv add tracktolibWith specific extras:
uv add tracktolib[pg,api]Utility functions to initialize logging formatting and streams.
import logging
from tracktolib.logs import init_logging
logger = logging.getLogger()
formatter, stream_handler = init_logging(logger, 'json', version='0.0.1')Async PostgreSQL helpers using asyncpg.
uv add tracktolib[pg]Sync PostgreSQL helpers using psycopg (v3).
uv add tracktolib[pg-sync]from psycopg import connect
from tracktolib.pg_sync import insert_many, fetch_one, fetch_count, fetch_all
conn = connect('postgresql://user:pass@localhost/db')
data = [
{'foo': 'bar', 'value': 1},
{'foo': 'baz', 'value': 2}
]
insert_many(conn, 'public.test', data)
query = 'SELECT foo from public.test order by value asc'
value = fetch_one(conn, query, required=True) # {'foo': 'bar'}, raises if not found
assert fetch_count(conn, 'public.test') == 2
query = 'SELECT * from public.test order by value asc'
assert fetch_all(conn, query) == dataAsync S3 helpers using aiobotocore.
uv add tracktolib[s3]S3 helpers using minio.
uv add tracktolib[s3-minio]Async S3 helpers using niquests and botocore presigned URLs.
uv add tracktolib[s3-niquests]from tracktolib.s3.niquests import S3Session
async with S3Session(
endpoint_url='http://localhost:9000',
access_key='...',
secret_key='...',
region='us-east-1',
) as s3:
# Object operations
await s3.put_object('bucket', 'path/file.txt', b'content')
content = await s3.get_object('bucket', 'path/file.txt')
await s3.delete_object('bucket', 'path/file.txt')
# Streaming upload (multipart for large files)
async def data_stream():
yield b'chunk1'
yield b'chunk2'
await s3.file_upload('bucket', 'large-file.bin', data_stream())
# Bucket policy management
policy = {'Version': '2012-10-17', 'Statement': [...]}
await s3.put_bucket_policy('bucket', policy)
await s3.get_bucket_policy('bucket')
await s3.delete_bucket_policy('bucket')
# Empty a bucket (delete all objects)
deleted_count = await s3.empty_bucket('bucket')
# Sync a local directory to S3 (like aws s3 sync)
from pathlib import Path
result = await s3.sync_directory('bucket', Path('./local'), 'remote/prefix')
print(f"Uploaded: {result['uploaded']}, Deleted: {result['deleted']}, Skipped: {result['skipped']}")
# With delete flag to remove remote files not present locally
result = await s3.sync_directory('bucket', Path('./local'), 'remote/prefix', delete=True)HTTP client helpers using httpx.
uv add tracktolib[http]FastAPI utilities using fastapi and pydantic.
uv add tracktolib[api]Notion API helpers using niquests.
uv add tracktolib[notion]import niquests
from tracktolib.notion.fetch import fetch_database, get_notion_headers
from tracktolib.notion.cache import NotionCache
async with niquests.AsyncSession() as session:
session.headers.update(get_notion_headers())
# Without cache
db = await fetch_database(session, "database-id")
# With persistent cache (stored in ~/.cache/tracktolib/notion/cache.json)
cache = NotionCache()
db = await fetch_database(session, "database-id", cache=cache)
# Check cached databases
cache.get_databases() # All cached databases
cache.get_database("db-id") # Specific database (id, title, properties, cached_at)GitHub API helpers using niquests.
uv add tracktolib[gh]from tracktolib.gh import GitHubClient
async with GitHubClient() as gh: # Uses GITHUB_TOKEN env var
# Issue comments
comments = await gh.get_issue_comments("owner/repo", 123)
await gh.create_issue_comment("owner/repo", 123, "Hello!")
await gh.delete_comments_with_marker("owner/repo", 123, "<!-- bot -->")
# Labels
labels = await gh.get_issue_labels("owner/repo", 123)
await gh.add_labels("owner/repo", 123, ["bug", "priority"])
await gh.remove_label("owner/repo", 123, "wontfix")
# Deployments
deploys = await gh.get_deployments("owner/repo", environment="production")
await gh.mark_deployment_inactive("owner/repo", "preview-123")Cloudflare DNS API helpers using niquests.
uv add tracktolib[cf]from tracktolib.cf import CloudflareDNSClient
async with CloudflareDNSClient() as cf: # Uses CLOUDFLARE_API_TOKEN and CLOUDFLARE_ZONE_ID env vars
# Get a DNS record
record = await cf.get_dns_record("app.example.com", "CNAME")
# Create a DNS record
record = await cf.create_dns_record(
"app.example.com",
"target.example.com",
record_type="CNAME",
ttl=60,
proxied=True,
)
# Delete by ID or name
await cf.delete_dns_record(record["id"])
await cf.delete_dns_record_by_name("app.example.com", "CNAME")
# Check existence
exists = await cf.dns_record_exists("app.example.com")Testing utilities using deepdiff.
uv add tracktolib[tests]