Compare commits

10 Commits

Author SHA1 Message Date
Jiralite
77a6897180 chore: add status page disclaimer to issue templates
Some checks failed
test cassandra-backup / Test latest Cassandra backup (push) Has been cancelled
2026-02-27 20:24:23 +00:00
Sky Walters
9e8a9dafb8 fix(api): Fix nats filter subject 2026-02-27 03:28:30 -05:00
Hampus Kraft
7b1aa6ff2e fix(api): various subtle memory leaks
(and some not so subtle ones, *cough* ReportService *cough*)
2026-02-27 04:51:36 +00:00
M0N7Y5
848269a4d4 Added support for developing in devcontainer (#480) 2026-02-23 00:24:48 +01:00
Hampus Kraft
fd59bc219c [skip ci] fix(gateway): perf optimisations 2026-02-22 18:47:45 +00:00
Hampus Kraft
d843d6f3f8 fix(gateway): harden REQUEST_GUILD_MEMBERS path against DoS floods 2026-02-22 13:52:53 +00:00
Hampus Kraft
4f5704fa1f fix(docs): remove unreferenced image 2026-02-21 16:57:09 +00:00
Hampus Kraft
f54f62ae3c fix(docs): mintlify-produced syntax error & self-produced typos 2026-02-21 16:57:09 +00:00
Hampus Kraft
2db53689a1 fix(app): add masking of phones to settings 2026-02-21 16:57:09 +00:00
Hampus Kraft
a129b162b7 fix(app): remove broken app icon from oauth2 page 2026-02-21 16:57:09 +00:00
33 changed files with 1069 additions and 153 deletions

View File

@@ -0,0 +1,54 @@
# Like dev/Caddyfile.dev, but LiveKit and Mailpit are referenced by their
# Docker Compose hostnames instead of 127.0.0.1.
{
auto_https off
admin off
}
:48763 {
handle /_caddy_health {
respond "OK" 200
}
@gateway path /gateway /gateway/*
handle @gateway {
uri strip_prefix /gateway
reverse_proxy 127.0.0.1:49107
}
@marketing path /marketing /marketing/*
handle @marketing {
uri strip_prefix /marketing
reverse_proxy 127.0.0.1:49531
}
@server path /admin /admin/* /api /api/* /s3 /s3/* /queue /queue/* /media /media/* /_health /_ready /_live /.well-known/fluxer
handle @server {
reverse_proxy 127.0.0.1:49319
}
@livekit path /livekit /livekit/*
handle @livekit {
uri strip_prefix /livekit
reverse_proxy livekit:7880
}
redir /mailpit /mailpit/
handle_path /mailpit/* {
rewrite * /mailpit{path}
reverse_proxy mailpit:8025
}
handle {
reverse_proxy 127.0.0.1:49427 {
header_up Connection {http.request.header.Connection}
header_up Upgrade {http.request.header.Upgrade}
}
}
log {
output stdout
format console
}
}
}

40
.devcontainer/Dockerfile Normal file
View File

@@ -0,0 +1,40 @@
# Language runtimes (Node.js, Go, Rust, Python) are installed via devcontainer
# features. This Dockerfile handles Erlang/OTP (no feature available) and
# tools like Caddy, process-compose, rebar3, uv, ffmpeg, and exiftool.
FROM erlang:28-slim AS erlang
FROM mcr.microsoft.com/devcontainers/base:debian-13
ARG DEBIAN_FRONTEND=noninteractive
ARG REBAR3_VERSION=3.24.0
ARG PROCESS_COMPOSE_VERSION=1.90.0
# Both erlang:28-slim and debian-13 are Trixie-based, so OpenSSL versions match.
COPY --from=erlang /usr/local/lib/erlang /usr/local/lib/erlang
RUN ln -sf /usr/local/lib/erlang/bin/* /usr/local/bin/
RUN apt-get update && apt-get install -y --no-install-recommends \
libncurses6 libsctp1 \
build-essential pkg-config \
ffmpeg libimage-exiftool-perl \
sqlite3 libsqlite3-dev \
libssl-dev openssl \
gettext-base lsof iproute2 \
&& rm -rf /var/lib/apt/lists/*
RUN curl -fsSL "https://github.com/erlang/rebar3/releases/download/${REBAR3_VERSION}/rebar3" \
-o /usr/local/bin/rebar3 \
&& chmod +x /usr/local/bin/rebar3
RUN curl -fsSL "https://caddyserver.com/api/download?os=linux&arch=amd64" \
-o /usr/local/bin/caddy \
&& chmod +x /usr/local/bin/caddy
RUN curl -fsSL "https://github.com/F1bonacc1/process-compose/releases/download/v${PROCESS_COMPOSE_VERSION}/process-compose_linux_amd64.tar.gz" \
| tar xz -C /usr/local/bin process-compose \
&& chmod +x /usr/local/bin/process-compose
RUN curl -fsSL "https://github.com/astral-sh/uv/releases/latest/download/uv-x86_64-unknown-linux-gnu.tar.gz" \
| tar xz --strip-components=1 -C /usr/local/bin \
&& chmod +x /usr/local/bin/uv /usr/local/bin/uvx

View File

@@ -0,0 +1,75 @@
{
"name": "Fluxer",
"dockerComposeFile": "docker-compose.yml",
"service": "app",
"workspaceFolder": "/workspace",
"features": {
"ghcr.io/devcontainers/features/node:1": {
"version": "24",
"pnpmVersion": "10.29.3"
},
"ghcr.io/devcontainers/features/go:1": {
"version": "1.24"
},
"ghcr.io/devcontainers/features/rust:1": {
"version": "1.93.0",
"targets": "wasm32-unknown-unknown"
},
"ghcr.io/devcontainers/features/python:1": {
"version": "os-provided",
"installTools": false
}
},
"onCreateCommand": ".devcontainer/on-create.sh",
"remoteEnv": {
"FLUXER_CONFIG": "${containerWorkspaceFolder}/config/config.json",
"FLUXER_DATABASE": "sqlite"
},
"forwardPorts": [48763, 6379, 7700, 7880],
"portsAttributes": {
"48763": {
"label": "Fluxer (Caddy)",
"onAutoForward": "openBrowser",
"protocol": "http"
},
"6379": {
"label": "Valkey",
"onAutoForward": "silent"
},
"7700": {
"label": "Meilisearch",
"onAutoForward": "silent"
},
"7880": {
"label": "LiveKit",
"onAutoForward": "silent"
},
"9229": {
"label": "Node.js Debugger",
"onAutoForward": "silent"
}
},
"customizations": {
"vscode": {
"extensions": [
"TypeScriptTeam.native-preview",
"biomejs.biome",
"clinyong.vscode-css-modules",
"pgourlain.erlang",
"golang.go",
"rust-lang.rust-analyzer"
],
"settings": {
"typescript.preferences.includePackageJsonAutoImports": "auto",
"typescript.suggest.autoImports": true,
"typescript.experimental.useTsgo": true
}
}
}
}

View File

@@ -0,0 +1,64 @@
services:
app:
build:
context: .
dockerfile: Dockerfile
volumes:
- ..:/workspace:cached
command: sleep infinity
valkey:
image: valkey/valkey:8-alpine
restart: unless-stopped
command: ['valkey-server', '--appendonly', 'yes', '--save', '60', '1', '--loglevel', 'warning']
volumes:
- valkey-data:/data
healthcheck:
test: ['CMD', 'valkey-cli', 'ping']
interval: 10s
timeout: 5s
retries: 5
meilisearch:
image: getmeili/meilisearch:v1.14
restart: unless-stopped
environment:
MEILI_NO_ANALYTICS: 'true'
MEILI_ENV: development
MEILI_MASTER_KEY: fluxer-devcontainer-meili-master-key
volumes:
- meilisearch-data:/meili_data
healthcheck:
test: ['CMD', 'curl', '-f', 'http://localhost:7700/health']
interval: 10s
timeout: 5s
retries: 5
livekit:
image: livekit/livekit-server:v1.9
restart: unless-stopped
command: --config /etc/livekit.yaml
volumes:
- ./livekit.yaml:/etc/livekit.yaml:ro
mailpit:
image: axllent/mailpit:latest
restart: unless-stopped
command: ['--webroot', '/mailpit/']
nats-core:
image: nats:2-alpine
restart: unless-stopped
command: ['--port', '4222']
nats-jetstream:
image: nats:2-alpine
restart: unless-stopped
command: ['--port', '4223', '--jetstream', '--store_dir', '/data']
volumes:
- nats-jetstream-data:/data
volumes:
valkey-data:
meilisearch-data:
nats-jetstream-data:

View File

@@ -0,0 +1,30 @@
# Credentials here must match the values on-create.sh writes to config.json.
port: 7880
keys:
fluxer-devcontainer-key: fluxer-devcontainer-secret-key-00000000
rtc:
tcp_port: 7881
port_range_start: 50000
port_range_end: 50100
use_external_ip: false
node_ip: 127.0.0.1
turn:
enabled: true
domain: localhost
udp_port: 3478
webhook:
api_key: fluxer-devcontainer-key
urls:
- http://app:49319/api/webhooks/livekit
room:
auto_create: true
max_participants: 100
empty_timeout: 300
development: true

70
.devcontainer/on-create.sh Executable file
View File

@@ -0,0 +1,70 @@
#!/usr/bin/env bash
# Runs once when the container is first created.
set -euo pipefail
REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
export FLUXER_CONFIG="${FLUXER_CONFIG:-$REPO_ROOT/config/config.json}"
GREEN='\033[0;32m'
NC='\033[0m'
info() { printf "%b\n" "${GREEN}[devcontainer]${NC} $1"; }
info "Installing pnpm dependencies..."
pnpm install
# Codegen outputs (e.g. MasterZodSchema.generated.tsx) are gitignored.
info "Generating config schema..."
pnpm --filter @fluxer/config generate
if [ ! -f "$FLUXER_CONFIG" ]; then
info "Creating config from development template..."
cp "$REPO_ROOT/config/config.dev.template.json" "$FLUXER_CONFIG"
fi
# Point services at Docker Compose hostnames and adjust settings that differ
# from the default dev template.
info "Patching config for Docker Compose networking..."
jq '
# rspack defaults public_scheme to "https" when unset
.domain.public_scheme = "http" |
# Relative path so the app works on any hostname (localhost, 127.0.0.1, etc.)
.app_public.bootstrap_api_endpoint = "/api" |
.internal.kv = "redis://valkey:6379/0" |
.integrations.search.url = "http://meilisearch:7700" |
.integrations.search.api_key = "fluxer-devcontainer-meili-master-key" |
# Credentials must match .devcontainer/livekit.yaml
.integrations.voice.url = "ws://livekit:7880" |
.integrations.voice.webhook_url = "http://app:49319/api/webhooks/livekit" |
.integrations.voice.api_key = "fluxer-devcontainer-key" |
.integrations.voice.api_secret = "fluxer-devcontainer-secret-key-00000000" |
.integrations.email.smtp.host = "mailpit" |
.integrations.email.smtp.port = 1025 |
.services.nats.core_url = "nats://nats-core:4222" |
.services.nats.jetstream_url = "nats://nats-jetstream:4223" |
# Bluesky OAuth requires HTTPS + loopback IPs (RFC 8252), incompatible with
# the HTTP-only devcontainer setup.
.auth.bluesky.enabled = false
' "$FLUXER_CONFIG" > "$FLUXER_CONFIG.tmp" && mv "$FLUXER_CONFIG.tmp" "$FLUXER_CONFIG"
info "Running bootstrap..."
"$REPO_ROOT/scripts/dev_bootstrap.sh"
info "Pre-compiling Erlang gateway dependencies..."
(cd "$REPO_ROOT/fluxer_gateway" && rebar3 compile) || {
info "Gateway pre-compilation failed (non-fatal, will compile on first start)"
}
info "Devcontainer setup complete."
info ""
info " Start all dev processes: process-compose -f .devcontainer/process-compose.yml up"
info " Open the app: http://127.0.0.1:48763"
info " Dev email inbox: http://127.0.0.1:48763/mailpit/"
info ""

View File

@@ -0,0 +1,57 @@
# Application processes only — backing services (Valkey, Meilisearch, LiveKit,
# Mailpit, NATS) run via Docker Compose.
# process-compose -f .devcontainer/process-compose.yml up
is_tui_disabled: false
log_level: info
log_configuration:
flush_each_line: true
processes:
caddy:
command: caddy run --config .devcontainer/Caddyfile.dev --adapter caddyfile
log_location: dev/logs/caddy.log
readiness_probe:
http_get:
host: 127.0.0.1
port: 48763
path: /_caddy_health
availability:
restart: always
fluxer_server:
command: pnpm --filter fluxer_server dev
log_location: dev/logs/fluxer_server.log
availability:
restart: always
fluxer_app:
command: ./scripts/dev_fluxer_app.sh
environment:
- FORCE_COLOR=1
- FLUXER_APP_DEV_PORT=49427
log_location: dev/logs/fluxer_app.log
availability:
restart: always
fluxer_gateway:
command: ./scripts/dev_gateway.sh
environment:
- FLUXER_GATEWAY_NO_SHELL=1
log_location: dev/logs/fluxer_gateway.log
availability:
restart: always
marketing_dev:
command: pnpm --filter fluxer_marketing dev
environment:
- FORCE_COLOR=1
log_location: dev/logs/marketing_dev.log
availability:
restart: always
css_watch:
command: ./scripts/dev_css_watch.sh
log_location: dev/logs/css_watch.log
availability:
restart: always

View File

@@ -7,7 +7,7 @@ body:
value: |
Thanks for the report.
Please check for existing issues before filing.
Please check our status page at https://fluxerstatus.com and search for existing issues before filing.
Security issues should go to https://fluxer.app/security.
- type: textarea
id: summary

View File

@@ -7,7 +7,7 @@ body:
value: |
Thanks.
Please check for existing issues before filing.
Please check our status page at https://fluxerstatus.com and search for existing issues before filing.
Security issues should go to https://fluxer.app/security.
- type: textarea
id: issue

84
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,84 @@
{
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Debug: fluxer_server",
"program": "${workspaceFolder}/fluxer_server/src/startServer.tsx",
"runtimeArgs": ["--import", "tsx"],
"cwd": "${workspaceFolder}/fluxer_server",
"env": {
"FLUXER_CONFIG": "${workspaceFolder}/config/config.json",
"FLUXER_DATABASE": "sqlite"
},
"console": "integratedTerminal",
"skipFiles": ["<node_internals>/**", "**/node_modules/**"]
},
{
"type": "node",
"request": "launch",
"name": "Debug: fluxer_api (standalone)",
"program": "${workspaceFolder}/fluxer_api/src/AppEntrypoint.tsx",
"runtimeArgs": ["--import", "tsx"],
"cwd": "${workspaceFolder}/fluxer_api",
"env": {
"FLUXER_CONFIG": "${workspaceFolder}/config/config.json",
"FLUXER_DATABASE": "sqlite"
},
"console": "integratedTerminal",
"skipFiles": ["<node_internals>/**", "**/node_modules/**"]
},
{
"type": "node",
"request": "launch",
"name": "Debug: fluxer_marketing",
"program": "${workspaceFolder}/fluxer_marketing/src/index.tsx",
"runtimeArgs": ["--import", "tsx"],
"cwd": "${workspaceFolder}/fluxer_marketing",
"env": {
"FLUXER_CONFIG": "${workspaceFolder}/config/config.json"
},
"console": "integratedTerminal",
"skipFiles": ["<node_internals>/**", "**/node_modules/**"]
},
{
"type": "node",
"request": "launch",
"name": "Debug: fluxer_app (DevServer)",
"program": "${workspaceFolder}/fluxer_app/scripts/DevServer.tsx",
"runtimeArgs": ["--import", "tsx"],
"cwd": "${workspaceFolder}/fluxer_app",
"env": {
"FLUXER_APP_DEV_PORT": "49427",
"FORCE_COLOR": "1"
},
"console": "integratedTerminal",
"skipFiles": ["<node_internals>/**", "**/node_modules/**"]
},
{
"type": "node",
"request": "launch",
"name": "Debug: Test Current File",
"program": "${workspaceFolder}/node_modules/vitest/vitest.mjs",
"args": ["run", "--no-coverage", "${relativeFile}"],
"autoAttachChildProcesses": true,
"console": "integratedTerminal",
"skipFiles": ["<node_internals>/**", "**/node_modules/**"]
},
{
"type": "node",
"request": "attach",
"name": "Attach to Node Process",
"port": 9229,
"restart": true,
"skipFiles": ["<node_internals>/**", "**/node_modules/**"]
}
],
"compounds": [
{
"name": "Debug: Server + App",
"configurations": ["Debug: fluxer_server", "Debug: fluxer_app (DevServer)"]
}
]
}

View File

@@ -73,7 +73,7 @@ TBD
### Devenv development environment
Fluxer supports development through **devenv** only. It provides a reproducible Nix environment and a single, declarative process manager for the dev stack. If you need a different setup, it is currently unsupported.
Fluxer supports development through **devenv** only. It provides a reproducible Nix environment and a single, declarative process manager for the dev stack.
1. Install Nix and devenv using the [devenv getting started guide](https://devenv.sh/getting-started/).
2. Enter the environment:
@@ -108,6 +108,20 @@ If you develop on a remote VM behind Cloudflare Tunnels (or a similar HTTP-only
The bootstrap script configures LiveKit automatically based on `domain.base_domain` in your `config.json`. When set to a non-localhost domain, it enables external IP discovery so clients can connect directly for media while signaling continues through the tunnel.
### Devcontainer (experimental)
There is experimental support for developing in a **VS Code Dev Container** / GitHub Codespace without Nix. The `.devcontainer/` directory provides a Docker Compose setup with all required tooling and backing services.
```bash
# Inside the dev container, start all processes:
process-compose -f .devcontainer/process-compose.yml up
```
Open the app at `http://localhost:48763` and the dev email inbox at `http://localhost:48763/mailpit/`. Predefined VS Code debugging targets are available in `.vscode/launch.json`.
> [!WARNING]
> Bluesky OAuth is disabled in the devcontainer because it requires HTTPS. All other features work normally.
### Documentation
To develop the documentation site with live preview:

View File

@@ -37,6 +37,7 @@ const AccountSecurityTab: React.FC = observer(() => {
const {t} = useLingui();
const user = UserStore.currentUser;
const [showMaskedEmail, setShowMaskedEmail] = useState(false);
const [showMaskedPhone, setShowMaskedPhone] = useState(false);
const [passkeys, setPasskeys] = useState<Array<UserActionCreators.WebAuthnCredential>>([]);
const [loadingPasskeys, setLoadingPasskeys] = useState(false);
const [enablingSmsMfa, setEnablingSmsMfa] = useState(false);
@@ -100,9 +101,11 @@ const AccountSecurityTab: React.FC = observer(() => {
loadingPasskeys={loadingPasskeys}
enablingSmsMfa={enablingSmsMfa}
disablingSmsMfa={disablingSmsMfa}
showMaskedPhone={showMaskedPhone}
loadPasskeys={loadPasskeys}
setEnablingSmsMfa={setEnablingSmsMfa}
setDisablingSmsMfa={setDisablingSmsMfa}
setShowMaskedPhone={setShowMaskedPhone}
/>
</SettingsSection>

View File

@@ -36,6 +36,7 @@ export const AccountSecurityInlineTab = observer(() => {
const {t} = useLingui();
const user = UserStore.currentUser;
const [showMaskedEmail, setShowMaskedEmail] = useState(false);
const [showMaskedPhone, setShowMaskedPhone] = useState(false);
const [passkeys, setPasskeys] = useState<Array<UserActionCreators.WebAuthnCredential>>([]);
const [loadingPasskeys, setLoadingPasskeys] = useState(false);
const [enablingSmsMfa, setEnablingSmsMfa] = useState(false);
@@ -89,9 +90,11 @@ export const AccountSecurityInlineTab = observer(() => {
loadingPasskeys={loadingPasskeys}
enablingSmsMfa={enablingSmsMfa}
disablingSmsMfa={disablingSmsMfa}
showMaskedPhone={showMaskedPhone}
loadPasskeys={loadPasskeys}
setEnablingSmsMfa={setEnablingSmsMfa}
setDisablingSmsMfa={setDisablingSmsMfa}
setShowMaskedPhone={setShowMaskedPhone}
/>
</SettingsSection>
<SettingsSection id="danger_zone" title={t`Danger Zone`}>

View File

@@ -97,6 +97,48 @@
gap: 0.5rem;
}
.phoneRow {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
@media (min-width: 640px) {
.phoneRow {
flex-direction: row;
align-items: center;
gap: 0.5rem;
}
}
.phoneText {
color: var(--text-primary-muted);
font-size: 0.875rem;
}
.phoneTextSelectable {
user-select: text;
-webkit-user-select: text;
}
.toggleButton {
margin-top: 0.1em;
text-align: left;
color: var(--text-link);
font-size: 0.875rem;
cursor: pointer;
}
.toggleButton:hover {
text-decoration: underline;
}
@media (min-width: 640px) {
.toggleButton {
text-align: center;
}
}
.claimButton {
align-self: flex-start;
}

View File

@@ -41,6 +41,15 @@ import type React from 'react';
const logger = new Logger('SecurityTab');
const maskPhone = (phone: string): string => {
if (phone.length <= 4) {
return phone.replace(/./g, '*');
}
const lastTwo = phone.slice(-2);
const masked = phone.slice(0, -2).replace(/\d/g, '*');
return `${masked}${lastTwo}`;
};
interface SecurityTabProps {
user: UserRecord;
isClaimed: boolean;
@@ -51,9 +60,11 @@ interface SecurityTabProps {
loadingPasskeys: boolean;
enablingSmsMfa: boolean;
disablingSmsMfa: boolean;
showMaskedPhone: boolean;
loadPasskeys: () => Promise<void>;
setEnablingSmsMfa: React.Dispatch<React.SetStateAction<boolean>>;
setDisablingSmsMfa: React.Dispatch<React.SetStateAction<boolean>>;
setShowMaskedPhone: (show: boolean) => void;
}
export const SecurityTabContent: React.FC<SecurityTabProps> = observer(
@@ -67,9 +78,11 @@ export const SecurityTabContent: React.FC<SecurityTabProps> = observer(
loadingPasskeys,
enablingSmsMfa,
disablingSmsMfa,
showMaskedPhone,
loadPasskeys,
setEnablingSmsMfa,
setDisablingSmsMfa,
setShowMaskedPhone,
}) => {
const {t, i18n} = useLingui();
@@ -344,13 +357,24 @@ export const SecurityTabContent: React.FC<SecurityTabProps> = observer(
<div className={styles.label}>
<Trans>Phone Number</Trans>
</div>
<div className={styles.description}>
{user.phone ? (
<Trans>Phone number added: {user.phone}</Trans>
) : (
{user.phone ? (
<div className={styles.phoneRow}>
<span className={`${styles.phoneText} ${showMaskedPhone ? styles.phoneTextSelectable : ''}`}>
{showMaskedPhone ? user.phone : maskPhone(user.phone)}
</span>
<button
type="button"
className={styles.toggleButton}
onClick={() => setShowMaskedPhone(!showMaskedPhone)}
>
{showMaskedPhone ? t`Hide` : t`Reveal`}
</button>
</div>
) : (
<div className={styles.description}>
<Trans>Add a phone number to enable SMS two-factor authentication</Trans>
)}
</div>
</div>
)}
</div>
{user.phone ? (
<Button

View File

@@ -63,6 +63,7 @@
gap: 1rem;
align-items: center;
padding: 0.875rem 1rem;
padding-left: 0;
border-radius: 0.75rem;
background: none;
border: none;

View File

@@ -370,16 +370,6 @@ const OAuthAuthorizePage: React.FC = observer(() => {
const appName = publicApp?.name?.trim();
const clientLabel = appName || t`This application`;
const appAvatarUrl = useMemo<string | null>(() => {
if (!publicApp?.id || !publicApp.icon) {
return null;
}
const url = AvatarUtils.getUserAvatarURL({id: publicApp.id, avatar: publicApp.icon}, false);
return url ?? null;
}, [publicApp?.icon, publicApp?.id]);
const appInitial = clientLabel.charAt(0).toUpperCase();
const formattedPermissions = useMemo(() => {
if (!hasBotScope || !authParams?.permissions) return authParams?.permissions ?? undefined;
return formatBotPermissionsQuery(Array.from(selectedPermissions ?? []));
@@ -587,16 +577,6 @@ const OAuthAuthorizePage: React.FC = observer(() => {
</div>
<div className={styles.heroCard}>
<div className={styles.heroAvatarShell}>
<BaseAvatar
size={48}
avatarUrl={appAvatarUrl || ''}
shouldPlayAnimated={false}
className={!appAvatarUrl ? styles.appAvatarFallback : undefined}
userTag={clientLabel}
/>
{!appAvatarUrl && <span className={styles.appAvatarInitial}>{appInitial}</span>}
</div>
<div className={styles.heroCopy}>
<h1 className={styles.heroTitle}>
<Trans>Configure bot permissions</Trans>
@@ -724,17 +704,6 @@ const OAuthAuthorizePage: React.FC = observer(() => {
)}
<div className={styles.heroCard}>
<div className={styles.heroAvatarShell}>
<BaseAvatar
size={48}
avatarUrl={appAvatarUrl || ''}
shouldPlayAnimated={false}
className={!appAvatarUrl ? styles.appAvatarFallback : undefined}
userTag={clientLabel}
/>
{!appAvatarUrl && <span className={styles.appAvatarInitial}>{appInitial}</span>}
</div>
<div className={styles.heroCopy}>
<p className={styles.eyebrow}>
<Trans>Authorization request</Trans>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 16 KiB

View File

@@ -1,6 +1,6 @@
---
title: "Quickstart: ping-pong bot"
description: "Create a bot application, invite it to a guild, and reply "pong" to "ping"."
description: 'Create a bot application, invite it to a guild, and reply "pong" to "ping".'
---
<Warning>
@@ -97,13 +97,13 @@ Second, install the required dependencies:
npm i -E @discordjs/core@2.4.0 @discordjs/rest@2.6.0 @discordjs/ws@2.0.4
```
Second, store your token from Step 4 in a `.env` file in this folder:
Third, store your token from Step 4 in a `.env` file in this folder:
```
FLUXER_BOT_TOKEN=<your token goes here>
```
Third, create a new `bot.mjs` file, looking like so:
Fourth, create a new `bot.mjs` file, looking like so:
```javascript
import {Client, GatewayDispatchEvents} from '@discordjs/core';

View File

@@ -30,6 +30,10 @@
-define(VOICE_RATE_LIMIT_TABLE, voice_update_rate_limit).
-define(VOICE_QUEUE_PROCESS_INTERVAL, 100).
-define(MAX_VOICE_QUEUE_LENGTH, 64).
-define(RATE_LIMIT_WINDOW_MS, 60000).
-define(RATE_LIMIT_MAX_EVENTS, 120).
-define(REQUEST_GUILD_MEMBERS_RATE_LIMIT_WINDOW_MS, 10000).
-define(REQUEST_GUILD_MEMBERS_RATE_LIMIT_MAX_EVENTS, 3).
-type state() :: #{
version := 1 | undefined,
@@ -40,6 +44,7 @@
socket_pid := pid() | undefined,
peer_ip := binary() | undefined,
rate_limit_state := map(),
request_guild_members_pid := pid() | undefined,
otel_span_ctx := term(),
voice_queue_timer := reference() | undefined
}.
@@ -54,7 +59,12 @@ new_state() ->
heartbeat_state => #{},
socket_pid => undefined,
peer_ip => undefined,
rate_limit_state => #{events => [], window_start => undefined},
rate_limit_state => #{
events => [],
request_guild_members_events => [],
window_start => undefined
},
request_guild_members_pid => undefined,
otel_span_ctx => undefined,
voice_queue_timer => undefined
}.
@@ -140,7 +150,7 @@ handle_incoming_data(Data, State = #{encoding := Encoding, compress_ctx := Compr
-spec handle_decode({ok, map()} | {error, term()}, state()) -> ws_result().
handle_decode({ok, #{<<"op">> := Op} = Payload}, State) ->
OpAtom = constants:gateway_opcode(Op),
case check_rate_limit(State) of
case check_rate_limit(State, OpAtom) of
{ok, RateLimitedState} ->
handle_gateway_payload(OpAtom, Payload, RateLimitedState);
rate_limited ->
@@ -160,6 +170,10 @@ websocket_info({session_backpressure_error, Details}, State) ->
handle_session_backpressure_error(Details, State);
websocket_info({'DOWN', _, process, Pid, _}, State = #{session_pid := Pid}) ->
handle_session_down(State);
websocket_info(
{'DOWN', _, process, Pid, _}, State = #{request_guild_members_pid := Pid}
) ->
{ok, State#{request_guild_members_pid => undefined}};
websocket_info({process_voice_queue}, State) ->
NewState = process_queued_voice_updates(State#{voice_queue_timer => undefined}),
{ok, NewState};
@@ -541,9 +555,13 @@ handle_voice_state_update(Pid, Data, State) ->
end.
-spec handle_request_guild_members(map(), pid(), state()) -> ws_result().
handle_request_guild_members(
_Data, _Pid, State = #{request_guild_members_pid := RequestPid}
) when is_pid(RequestPid) ->
{ok, State};
handle_request_guild_members(Data, Pid, State) ->
SocketPid = self(),
spawn(fun() ->
{WorkerPid, _Ref} = spawn_monitor(fun() ->
try
case gen_server:call(Pid, {get_state}, 5000) of
SessionState when is_map(SessionState) ->
@@ -555,7 +573,7 @@ handle_request_guild_members(Data, Pid, State) ->
_:_ -> ok
end
end),
{ok, State}.
{ok, State#{request_guild_members_pid => WorkerPid}}.
-spec handle_lazy_request(map(), pid(), state()) -> ws_result().
handle_lazy_request(Data, Pid, State) ->
@@ -578,23 +596,41 @@ handle_lazy_request(Data, Pid, State) ->
schedule_heartbeat_check() ->
erlang:send_after(constants:heartbeat_interval() div 3, self(), {heartbeat_check}).
-spec check_rate_limit(state()) -> {ok, state()} | rate_limited.
check_rate_limit(State = #{rate_limit_state := RateLimitState}) ->
-spec check_rate_limit(state(), atom()) -> {ok, state()} | rate_limited.
check_rate_limit(State = #{rate_limit_state := RateLimitState}, Op) ->
Now = erlang:system_time(millisecond),
Events = maps:get(events, RateLimitState, []),
WindowStart = maps:get(window_start, RateLimitState, Now),
WindowDuration = 60000,
MaxEvents = 120,
EventsInWindow = [T || T <- Events, (Now - T) < WindowDuration],
case length(EventsInWindow) >= MaxEvents of
EventsInWindow = [T || T <- Events, (Now - T) < ?RATE_LIMIT_WINDOW_MS],
case length(EventsInWindow) >= ?RATE_LIMIT_MAX_EVENTS of
true ->
rate_limited;
false ->
NewEvents = [Now | EventsInWindow],
NewRateLimitState = #{events => NewEvents, window_start => WindowStart},
{ok, State#{rate_limit_state => NewRateLimitState}}
case check_opcode_rate_limit(Op, RateLimitState, Now) of
rate_limited ->
rate_limited;
{ok, OpRateLimitState} ->
NewEvents = [Now | EventsInWindow],
NewRateLimitState =
OpRateLimitState#{events => NewEvents, window_start => WindowStart},
{ok, State#{rate_limit_state => NewRateLimitState}}
end
end.
-spec check_opcode_rate_limit(atom(), map(), integer()) -> {ok, map()} | rate_limited.
check_opcode_rate_limit(request_guild_members, RateLimitState, Now) ->
RequestEvents = maps:get(request_guild_members_events, RateLimitState, []),
RequestEventsInWindow =
[T || T <- RequestEvents, (Now - T) < ?REQUEST_GUILD_MEMBERS_RATE_LIMIT_WINDOW_MS],
case length(RequestEventsInWindow) >= ?REQUEST_GUILD_MEMBERS_RATE_LIMIT_MAX_EVENTS of
true ->
rate_limited;
false ->
{ok, RateLimitState#{request_guild_members_events => [Now | RequestEventsInWindow]}}
end;
check_opcode_rate_limit(_, RateLimitState, _Now) ->
{ok, RateLimitState}.
-spec extract_client_ip(cowboy_req:req()) -> binary().
extract_client_ip(Req) ->
case cowboy_req:header(<<"x-forwarded-for">>, Req) of
@@ -954,4 +990,42 @@ adjust_status_test() ->
?assertEqual(online, adjust_status(online)),
?assertEqual(idle, adjust_status(idle)).
check_rate_limit_blocks_general_flood_test() ->
Now = erlang:system_time(millisecond),
Events = lists:duplicate(?RATE_LIMIT_MAX_EVENTS, Now - 1000),
State = (new_state())#{
rate_limit_state => #{
events => Events,
request_guild_members_events => [],
window_start => Now
}
},
?assertEqual(rate_limited, check_rate_limit(State, heartbeat)).
check_rate_limit_blocks_request_guild_members_burst_test() ->
Now = erlang:system_time(millisecond),
RequestEvents =
lists:duplicate(?REQUEST_GUILD_MEMBERS_RATE_LIMIT_MAX_EVENTS, Now - 1000),
State = (new_state())#{
rate_limit_state => #{
events => [],
request_guild_members_events => RequestEvents,
window_start => Now
}
},
?assertEqual(rate_limited, check_rate_limit(State, request_guild_members)).
check_rate_limit_allows_other_ops_when_request_guild_members_is_hot_test() ->
Now = erlang:system_time(millisecond),
RequestEvents =
lists:duplicate(?REQUEST_GUILD_MEMBERS_RATE_LIMIT_MAX_EVENTS, Now - 1000),
State = (new_state())#{
rate_limit_state => #{
events => [],
request_guild_members_events => RequestEvents,
window_start => Now
}
},
?assertMatch({ok, _}, check_rate_limit(State, heartbeat)).
-endif.

View File

@@ -130,7 +130,19 @@ filter_sessions_for_event(Event, FinalData, SessionIdOpt, Sessions, UpdatedState
)
end;
false ->
guild_sessions:filter_sessions_exclude_session(Sessions, SessionIdOpt)
FilteredSessions = guild_sessions:filter_sessions_exclude_session(
Sessions, SessionIdOpt
),
case Event of
guild_member_add ->
[
{Sid, SessionData}
|| {Sid, SessionData} <- FilteredSessions,
maps:get(bot, SessionData, false) =:= true
];
_ ->
FilteredSessions
end
end
end.
@@ -1149,9 +1161,17 @@ filter_sessions_for_event_guild_wide_goes_to_all_sessions_test() ->
S2 = #{session_id => <<"s2">>, user_id => 11, pid => self()},
Sessions = #{<<"s1">> => S1, <<"s2">> => S2},
State = #{sessions => Sessions, data => #{<<"members">> => #{}}},
Result = filter_sessions_for_event(guild_member_add, #{}, undefined, Sessions, State),
Result = filter_sessions_for_event(guild_update, #{}, undefined, Sessions, State),
?assertEqual(2, length(Result)).
filter_sessions_for_event_guild_member_add_bots_only_test() ->
S1 = #{session_id => <<"s1">>, user_id => 10, pid => self(), bot => false},
S2 = #{session_id => <<"s2">>, user_id => 11, pid => self(), bot => true},
Sessions = #{<<"s1">> => S1, <<"s2">> => S2},
State = #{sessions => Sessions, data => #{<<"members">> => #{}}},
Result = filter_sessions_for_event(guild_member_add, #{}, undefined, Sessions, State),
?assertEqual([{<<"s2">>, S2}], Result).
extract_channel_id_message_create_uses_channel_id_field_test() ->
Data = #{<<"channel_id">> => <<"42">>},
?assertEqual(42, extract_channel_id(message_create, Data)).

View File

@@ -373,12 +373,9 @@ get_sorted_members_for_list(ListId, State) ->
Data = maps:get(data, State, #{}),
Members = guild_data_index:member_values(Data),
FilteredMembers = guild_member_list_common:filter_members_for_list(ListId, Members, State),
lists:sort(
fun(A, B) ->
guild_member_list_common:get_member_sort_key(A) =< guild_member_list_common:get_member_sort_key(B)
end,
FilteredMembers
).
Decorated = [{guild_member_list_common:get_member_sort_key(M), M} || M <- FilteredMembers],
Sorted = lists:sort(fun({KeyA, _}, {KeyB, _}) -> KeyA =< KeyB end, Decorated),
[M || {_, M} <- Sorted].
-spec build_full_items(list_id(), guild_state(), [map()]) -> [list_item()].
build_full_items(ListId, State, SortedMembers) ->

View File

@@ -24,6 +24,15 @@
-define(CHUNK_SIZE, 1000).
-define(MAX_USER_IDS, 100).
-define(MAX_NONCE_LENGTH, 32).
-define(FULL_MEMBER_LIST_LIMIT, 100000).
-define(DEFAULT_QUERY_LIMIT, 25).
-define(MAX_MEMBER_QUERY_LIMIT, 100).
-define(REQUEST_MEMBERS_RATE_LIMIT_TABLE, guild_request_members_rate_limit).
-define(REQUEST_MEMBERS_RATE_LIMIT_WINDOW_MS, 10000).
-define(REQUEST_MEMBERS_RATE_LIMIT_MAX_EVENTS, 5).
-define(REQUEST_MEMBERS_GUILD_RATE_LIMIT_TABLE, guild_request_members_guild_rate_limit).
-define(REQUEST_MEMBERS_GUILD_RATE_LIMIT_WINDOW_MS, 10000).
-define(REQUEST_MEMBERS_GUILD_RATE_LIMIT_MAX_EVENTS, 25).
-type session_state() :: map().
-type request_data() :: map().
@@ -121,7 +130,8 @@ ensure_binary(Value) when is_binary(Value) -> Value;
ensure_binary(_) -> <<>>.
-spec ensure_limit(term()) -> non_neg_integer().
ensure_limit(Limit) when is_integer(Limit), Limit >= 0 -> Limit;
ensure_limit(Limit) when is_integer(Limit), Limit >= 0 ->
min(Limit, ?MAX_MEMBER_QUERY_LIMIT);
ensure_limit(_) -> 0.
-spec normalize_nonce(term()) -> binary() | null.
@@ -135,13 +145,99 @@ process_request(Request, SocketPid, SessionState) ->
#{guild_id := GuildId, query := Query, limit := Limit, user_ids := UserIds} = Request,
UserIdBin = maps:get(user_id, SessionState),
UserId = type_conv:to_integer(UserIdBin),
case check_permission(UserId, GuildId, Query, Limit, UserIds, SessionState) of
case check_request_rate_limit(UserId) of
ok ->
fetch_and_send_members(Request, SocketPid, SessionState);
case check_guild_request_rate_limit(GuildId) of
ok ->
case check_permission(UserId, GuildId, Query, Limit, UserIds, SessionState) of
ok ->
fetch_and_send_members(Request, SocketPid, SessionState);
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
-spec check_request_rate_limit(integer() | undefined) -> ok | {error, atom()}.
check_request_rate_limit(UserId) when is_integer(UserId), UserId > 0 ->
ensure_request_rate_limit_table(),
Now = erlang:system_time(millisecond),
case ets:lookup(?REQUEST_MEMBERS_RATE_LIMIT_TABLE, UserId) of
[] ->
ets:insert(?REQUEST_MEMBERS_RATE_LIMIT_TABLE, {UserId, [Now]}),
ok;
[{UserId, Timestamps}] ->
RecentTimestamps =
[T || T <- Timestamps, (Now - T) < ?REQUEST_MEMBERS_RATE_LIMIT_WINDOW_MS],
case length(RecentTimestamps) >= ?REQUEST_MEMBERS_RATE_LIMIT_MAX_EVENTS of
true ->
{error, rate_limited};
false ->
ets:insert(?REQUEST_MEMBERS_RATE_LIMIT_TABLE, {UserId, [Now | RecentTimestamps]}),
ok
end
end;
check_request_rate_limit(_) ->
{error, invalid_session}.
-spec check_guild_request_rate_limit(integer()) -> ok | {error, atom()}.
check_guild_request_rate_limit(GuildId) when is_integer(GuildId), GuildId > 0 ->
ensure_guild_request_rate_limit_table(),
Now = erlang:system_time(millisecond),
case ets:lookup(?REQUEST_MEMBERS_GUILD_RATE_LIMIT_TABLE, GuildId) of
[] ->
ets:insert(?REQUEST_MEMBERS_GUILD_RATE_LIMIT_TABLE, {GuildId, [Now]}),
ok;
[{GuildId, Timestamps}] ->
RecentTimestamps =
[T || T <- Timestamps, (Now - T) < ?REQUEST_MEMBERS_GUILD_RATE_LIMIT_WINDOW_MS],
case length(RecentTimestamps) >= ?REQUEST_MEMBERS_GUILD_RATE_LIMIT_MAX_EVENTS of
true ->
{error, rate_limited};
false ->
ets:insert(
?REQUEST_MEMBERS_GUILD_RATE_LIMIT_TABLE, {GuildId, [Now | RecentTimestamps]}
),
ok
end
end;
check_guild_request_rate_limit(_) ->
{error, invalid_guild_id}.
-spec ensure_request_rate_limit_table() -> ok.
ensure_request_rate_limit_table() ->
case ets:whereis(?REQUEST_MEMBERS_RATE_LIMIT_TABLE) of
undefined ->
try
ets:new(?REQUEST_MEMBERS_RATE_LIMIT_TABLE, [named_table, public, set]),
ok
catch
error:badarg ->
ok
end;
_ ->
ok
end.
-spec ensure_guild_request_rate_limit_table() -> ok.
ensure_guild_request_rate_limit_table() ->
case ets:whereis(?REQUEST_MEMBERS_GUILD_RATE_LIMIT_TABLE) of
undefined ->
try
ets:new(?REQUEST_MEMBERS_GUILD_RATE_LIMIT_TABLE, [named_table, public, set]),
ok
catch
error:badarg ->
ok
end;
_ ->
ok
end.
-spec check_permission(
integer(), integer(), binary(), non_neg_integer(), [integer()], session_state()
) ->
@@ -215,18 +311,9 @@ fetch_and_send_members(Request, _SocketPid, SessionState) ->
-spec fetch_members(pid(), binary(), non_neg_integer(), [integer()]) -> [member()].
fetch_members(GuildPid, _Query, _Limit, UserIds) when UserIds =/= [] ->
case gen_server:call(GuildPid, {list_guild_members, #{limit => 100000, offset => 0}}, 10000) of
#{members := AllMembers} ->
filter_members_by_ids(AllMembers, UserIds);
_ ->
[]
end;
fetch_members_by_user_ids(GuildPid, UserIds);
fetch_members(GuildPid, Query, Limit, []) ->
ActualLimit =
case Limit of
0 -> 100000;
L -> L
end,
ActualLimit = resolve_member_limit(Query, Limit),
case
gen_server:call(GuildPid, {list_guild_members, #{limit => ActualLimit, offset => 0}}, 10000)
of
@@ -241,17 +328,33 @@ fetch_members(GuildPid, Query, Limit, []) ->
[]
end.
-spec filter_members_by_ids([member()], [integer()]) -> [member()].
filter_members_by_ids(Members, UserIds) ->
UserIdSet = sets:from_list(UserIds),
lists:filter(
fun(Member) ->
UserId = extract_user_id(Member),
UserId =/= undefined andalso sets:is_element(UserId, UserIdSet)
-spec fetch_members_by_user_ids(pid(), [integer()]) -> [member()].
fetch_members_by_user_ids(GuildPid, UserIds) ->
lists:filtermap(
fun(UserId) ->
try
case gen_server:call(GuildPid, {get_guild_member, #{user_id => UserId}}, 5000) of
#{success := true, member_data := Member} when is_map(Member) ->
{true, Member};
_ ->
false
end
catch
exit:_ ->
false
end
end,
Members
lists:usort(UserIds)
).
-spec resolve_member_limit(binary(), non_neg_integer()) -> pos_integer().
resolve_member_limit(<<>>, 0) ->
?FULL_MEMBER_LIST_LIMIT;
resolve_member_limit(_Query, 0) ->
?DEFAULT_QUERY_LIMIT;
resolve_member_limit(_Query, Limit) ->
Limit.
-spec filter_members_by_query([member()], binary(), non_neg_integer()) -> [member()].
filter_members_by_query(Members, Query, Limit) ->
NormalizedQuery = string:lowercase(binary_to_list(Query)),
@@ -518,6 +621,64 @@ ensure_limit_negative_test() ->
ensure_limit_non_integer_test() ->
?assertEqual(0, ensure_limit(<<"10">>)).
ensure_limit_clamped_test() ->
?assertEqual(?MAX_MEMBER_QUERY_LIMIT, ensure_limit(?MAX_MEMBER_QUERY_LIMIT + 1)).
resolve_member_limit_full_scan_test() ->
?assertEqual(?FULL_MEMBER_LIST_LIMIT, resolve_member_limit(<<>>, 0)).
resolve_member_limit_query_default_test() ->
?assertEqual(?DEFAULT_QUERY_LIMIT, resolve_member_limit(<<"ab">>, 0)).
resolve_member_limit_explicit_test() ->
?assertEqual(25, resolve_member_limit(<<"ab">>, 25)).
check_request_rate_limit_allows_initial_request_test() ->
UserId = 987654321,
clear_request_rate_limit(UserId),
?assertEqual(ok, check_request_rate_limit(UserId)),
clear_request_rate_limit(UserId).
check_request_rate_limit_blocks_burst_test() ->
UserId = 987654322,
clear_request_rate_limit(UserId),
ensure_request_rate_limit_table(),
Now = erlang:system_time(millisecond),
Timestamps = lists:duplicate(?REQUEST_MEMBERS_RATE_LIMIT_MAX_EVENTS, Now - 1000),
ets:insert(?REQUEST_MEMBERS_RATE_LIMIT_TABLE, {UserId, Timestamps}),
?assertEqual({error, rate_limited}, check_request_rate_limit(UserId)),
clear_request_rate_limit(UserId).
check_request_rate_limit_invalid_user_test() ->
?assertEqual({error, invalid_session}, check_request_rate_limit(undefined)).
check_guild_request_rate_limit_allows_initial_request_test() ->
GuildId = 87654321,
clear_guild_request_rate_limit(GuildId),
?assertEqual(ok, check_guild_request_rate_limit(GuildId)),
clear_guild_request_rate_limit(GuildId).
check_guild_request_rate_limit_blocks_burst_test() ->
GuildId = 87654322,
clear_guild_request_rate_limit(GuildId),
ensure_guild_request_rate_limit_table(),
Now = erlang:system_time(millisecond),
Timestamps = lists:duplicate(?REQUEST_MEMBERS_GUILD_RATE_LIMIT_MAX_EVENTS, Now - 1000),
ets:insert(?REQUEST_MEMBERS_GUILD_RATE_LIMIT_TABLE, {GuildId, Timestamps}),
?assertEqual({error, rate_limited}, check_guild_request_rate_limit(GuildId)),
clear_guild_request_rate_limit(GuildId).
check_guild_request_rate_limit_invalid_guild_test() ->
?assertEqual({error, invalid_guild_id}, check_guild_request_rate_limit(undefined)).
clear_request_rate_limit(UserId) ->
ensure_request_rate_limit_table(),
ets:delete(?REQUEST_MEMBERS_RATE_LIMIT_TABLE, UserId).
clear_guild_request_rate_limit(GuildId) ->
ensure_guild_request_rate_limit_table(),
ets:delete(?REQUEST_MEMBERS_GUILD_RATE_LIMIT_TABLE, GuildId).
validate_guild_id_integer_test() ->
?assertEqual({ok, 123}, validate_guild_id(123)).
@@ -589,30 +750,6 @@ chunk_presences_no_matching_presences_test() ->
Result = chunk_presences(Presences, [Members]),
?assertEqual([[]], Result).
filter_members_by_ids_basic_test() ->
Members = [
#{<<"user">> => #{<<"id">> => <<"1">>}},
#{<<"user">> => #{<<"id">> => <<"2">>}},
#{<<"user">> => #{<<"id">> => <<"3">>}}
],
Result = filter_members_by_ids(Members, [1, 3]),
?assertEqual(2, length(Result)).
filter_members_by_ids_empty_ids_test() ->
Members = [#{<<"user">> => #{<<"id">> => <<"1">>}}],
Result = filter_members_by_ids(Members, []),
?assertEqual([], Result).
filter_members_by_ids_no_match_test() ->
Members = [#{<<"user">> => #{<<"id">> => <<"1">>}}],
Result = filter_members_by_ids(Members, [999]),
?assertEqual([], Result).
filter_members_by_ids_skips_invalid_members_test() ->
Members = [#{}, #{<<"user">> => #{}}, #{<<"user">> => #{<<"id">> => <<"1">>}}],
Result = filter_members_by_ids(Members, [1]),
?assertEqual(1, length(Result)).
filter_members_by_query_case_insensitive_test() ->
Members = [
#{<<"user">> => #{<<"id">> => <<"1">>, <<"username">> => <<"Alice">>}},

View File

@@ -228,6 +228,11 @@ get_cache_stats() ->
-spec do_handle_message_create(map(), state()) -> state().
do_handle_message_create(Params, State) ->
spawn(fun() -> run_eligibility_and_dispatch(Params, State) end),
State.
-spec run_eligibility_and_dispatch(map(), state()) -> ok.
run_eligibility_and_dispatch(Params, State) ->
MessageData = maps:get(message_data, Params),
UserIds = maps:get(user_ids, Params),
GuildId = maps:get(guild_id, Params),
@@ -274,7 +279,7 @@ do_handle_message_create(Params, State) ->
),
case EligibleUsers of
[] ->
State;
ok;
_ ->
push_dispatcher:enqueue_send_notifications(
EligibleUsers,
@@ -286,5 +291,5 @@ do_handle_message_create(Params, State) ->
ChannelName,
State
),
State
ok
end.

View File

@@ -25,7 +25,7 @@ import {KVAccountDeletionQueueService} from '@fluxer/api/src/infrastructure/KVAc
import {initializeMetricsService} from '@fluxer/api/src/infrastructure/MetricsService';
import {InstanceConfigRepository} from '@fluxer/api/src/instance/InstanceConfigRepository';
import {ipBanCache} from '@fluxer/api/src/middleware/IpBanMiddleware';
import {initializeServiceSingletons} from '@fluxer/api/src/middleware/ServiceMiddleware';
import {initializeServiceSingletons, shutdownReportService} from '@fluxer/api/src/middleware/ServiceMiddleware';
import {
ensureVoiceResourcesInitialized,
getKVClient,
@@ -207,6 +207,13 @@ export function createShutdown(logger: ILogger): () => Promise<void> {
logger.error({error}, 'Error shutting down IP ban cache');
}
try {
shutdownReportService();
logger.info('Report service shut down');
} catch (error) {
logger.error({error}, 'Error shutting down report service');
}
logger.info('API service shutdown complete');
};
}

View File

@@ -42,8 +42,17 @@ export class ClamAV {
const socket = createConnection(this.port, this.host);
let response = '';
let isResolved = false;
const MAX_RESPONSE_SIZE = 10 * 1024 * 1024;
const CONNECT_TIMEOUT_MS = 5000;
const connectTimeout = setTimeout(() => {
if (!isResolved) {
doReject(new Error('ClamAV connection timeout (5s)'));
}
}, CONNECT_TIMEOUT_MS);
const cleanup = () => {
clearTimeout(connectTimeout);
if (!socket.destroyed) {
socket.destroy();
}
@@ -64,6 +73,7 @@ export class ClamAV {
};
socket.on('connect', () => {
clearTimeout(connectTimeout);
try {
socket.write('zINSTREAM\0');
@@ -92,6 +102,9 @@ export class ClamAV {
socket.on('data', (data) => {
response += data.toString();
if (response.length > MAX_RESPONSE_SIZE) {
doReject(new Error(`ClamAV response exceeded ${(MAX_RESPONSE_SIZE / 1024 / 1024).toFixed(0)} MB limit`));
}
});
socket.on('end', () => {

View File

@@ -246,6 +246,7 @@ export class GatewayService {
private circuitBreakerOpenUntilMs = 0;
private readonly CIRCUIT_BREAKER_FAILURE_THRESHOLD = 5;
private readonly CIRCUIT_BREAKER_COOLDOWN_MS = ms('10 seconds');
private readonly PENDING_REQUEST_TIMEOUT_MS = ms('30 seconds');
constructor() {
this.rpcClient = GatewayRpcClient.getInstance();
@@ -260,9 +261,29 @@ export class GatewayService {
this.circuitBreakerOpenUntilMs = 0;
return false;
}
this.rejectAllPendingRequests(new ServiceUnavailableError('Gateway circuit breaker open'));
return true;
}
private rejectAllPendingRequests(error: Error): void {
this.pendingGuildDataRequests.forEach((requests) => {
requests.forEach((req) => req.reject(error));
});
this.pendingGuildDataRequests.clear();
this.pendingGuildMemberRequests.forEach((requests) => {
requests.forEach((req) => req.reject(error));
});
this.pendingGuildMemberRequests.clear();
this.pendingPermissionRequests.forEach((requests) => {
requests.forEach((req) => req.reject(error));
});
this.pendingPermissionRequests.clear();
this.pendingBatchRequestCount = 0;
}
private recordCircuitBreakerSuccess(): void {
this.circuitBreakerConsecutiveFailures = 0;
}
@@ -626,8 +647,25 @@ export class GatewayService {
return;
}
let timeoutId: NodeJS.Timeout | null = setTimeout(() => {
reject(new GatewayTimeoutError());
this.removePendingRequest(this.pendingGuildDataRequests, key, wrappedResolve, wrappedReject);
}, this.PENDING_REQUEST_TIMEOUT_MS);
const wrappedResolve = (value: GuildResponse) => {
if (timeoutId) clearTimeout(timeoutId);
timeoutId = null;
resolve(value);
};
const wrappedReject = (error: Error) => {
if (timeoutId) clearTimeout(timeoutId);
timeoutId = null;
reject(error);
};
const pending = this.pendingGuildDataRequests.get(key) || [];
pending.push({resolve, reject});
pending.push({resolve: wrappedResolve, reject: wrappedReject});
this.pendingGuildDataRequests.set(key, pending);
this.pendingBatchRequestCount += 1;
@@ -651,8 +689,25 @@ export class GatewayService {
return;
}
let timeoutId: NodeJS.Timeout | null = setTimeout(() => {
reject(new GatewayTimeoutError());
this.removePendingRequest(this.pendingGuildMemberRequests, key, wrappedResolve, wrappedReject);
}, this.PENDING_REQUEST_TIMEOUT_MS);
const wrappedResolve = (value: {success: boolean; memberData?: GuildMemberResponse}) => {
if (timeoutId) clearTimeout(timeoutId);
timeoutId = null;
resolve(value);
};
const wrappedReject = (error: Error) => {
if (timeoutId) clearTimeout(timeoutId);
timeoutId = null;
reject(error);
};
const pending = this.pendingGuildMemberRequests.get(key) || [];
pending.push({resolve, reject});
pending.push({resolve: wrappedResolve, reject: wrappedReject});
this.pendingGuildMemberRequests.set(key, pending);
this.pendingBatchRequestCount += 1;
@@ -804,8 +859,25 @@ export class GatewayService {
return;
}
let timeoutId: NodeJS.Timeout | null = setTimeout(() => {
reject(new GatewayTimeoutError());
this.removePendingRequest(this.pendingPermissionRequests, key, wrappedResolve, wrappedReject);
}, this.PENDING_REQUEST_TIMEOUT_MS);
const wrappedResolve = (value: boolean) => {
if (timeoutId) clearTimeout(timeoutId);
timeoutId = null;
resolve(value);
};
const wrappedReject = (error: Error) => {
if (timeoutId) clearTimeout(timeoutId);
timeoutId = null;
reject(error);
};
const pending = this.pendingPermissionRequests.get(key) || [];
pending.push({resolve, reject});
pending.push({resolve: wrappedResolve, reject: wrappedReject});
this.pendingPermissionRequests.set(key, pending);
this.pendingBatchRequestCount += 1;
@@ -817,6 +889,25 @@ export class GatewayService {
});
}
private removePendingRequest<T>(
map: Map<string, Array<PendingRequest<T>>>,
key: string,
resolve: (value: T) => void,
reject: (error: Error) => void,
): void {
const pending = map.get(key);
if (pending) {
const index = pending.findIndex((r) => r.resolve === resolve || r.reject === reject);
if (index >= 0) {
pending.splice(index, 1);
this.pendingBatchRequestCount--;
if (pending.length === 0) {
map.delete(key);
}
}
}
}
async canManageRoles({guildId, userId, targetUserId, roleId}: CanManageRolesParams): Promise<boolean> {
const result = await this.call<{can_manage: boolean}>('guild.can_manage_roles', {
guild_id: guildId.toString(),

View File

@@ -30,6 +30,7 @@ export class SnowflakeReservationService {
private initialized = false;
private reloadPromise: Promise<void> | null = null;
private kvSubscription: IKVSubscription | null = null;
private messageHandler: ((channel: string) => void) | null = null;
constructor(
private repository: SnowflakeReservationRepository,
@@ -50,13 +51,14 @@ export class SnowflakeReservationService {
this.kvSubscription = subscription;
await subscription.connect();
await subscription.subscribe(SNOWFLAKE_RESERVATION_REFRESH_CHANNEL);
subscription.on('message', (channel) => {
this.messageHandler = (channel: string) => {
if (channel === SNOWFLAKE_RESERVATION_REFRESH_CHANNEL) {
this.reload().catch((error) => {
Logger.error({error}, 'Failed to reload snowflake reservations');
});
}
});
};
subscription.on('message', this.messageHandler);
} catch (error) {
Logger.error({error}, 'Failed to subscribe to snowflake reservation refresh channel');
}
@@ -99,9 +101,13 @@ export class SnowflakeReservationService {
}
shutdown(): void {
if (this.kvSubscription && this.messageHandler) {
this.kvSubscription.removeAllListeners('message');
}
if (this.kvSubscription) {
this.kvSubscription.disconnect();
this.kvSubscription = null;
}
this.messageHandler = null;
}
}

View File

@@ -45,6 +45,7 @@ export class LimitConfigService {
private kvSubscription: IKVSubscription | null = null;
private subscriberInitialized = false;
private readonly cacheKey: string;
private messageHandler: ((channel: string) => void) | null = null;
constructor(repository: InstanceConfigRepository, cacheService: ICacheService, kvClient: IKVProvider | null = null) {
this.repository = repository;
@@ -144,17 +145,21 @@ export class LimitConfigService {
const subscription = this.kvClient.duplicate();
this.kvSubscription = subscription;
this.messageHandler = (channel: string) => {
if (channel === LIMIT_CONFIG_REFRESH_CHANNEL) {
this.refreshCache().catch((err) => {
Logger.error({err}, 'Failed to refresh limit config from pubsub');
});
}
};
subscription
.connect()
.then(() => subscription.subscribe(LIMIT_CONFIG_REFRESH_CHANNEL))
.then(() => {
subscription.on('message', (channel) => {
if (channel === LIMIT_CONFIG_REFRESH_CHANNEL) {
this.refreshCache().catch((err) => {
Logger.error({err}, 'Failed to refresh limit config from pubsub');
});
}
});
if (this.messageHandler) {
subscription.on('message', this.messageHandler);
}
})
.catch((error) => {
Logger.error({error}, 'Failed to subscribe to limit config refresh channel');
@@ -164,12 +169,16 @@ export class LimitConfigService {
}
shutdown(): void {
if (this.kvSubscription && this.messageHandler) {
this.kvSubscription.removeAllListeners('message');
}
if (this.kvSubscription) {
this.kvSubscription.quit().catch((err) => {
Logger.error({err}, 'Failed to close KV subscription');
});
this.kvSubscription = null;
}
this.messageHandler = null;
}
}

View File

@@ -52,6 +52,7 @@ class IpBanCache {
private kvClient: IKVProvider | null = null;
private kvSubscription: IKVSubscription | null = null;
private subscriberInitialized = false;
private messageHandler: ((channel: string) => void) | null = null;
constructor() {
this.singleIpBans = this.createFamilyMaps();
@@ -78,23 +79,27 @@ class IpBanCache {
const subscription = this.kvClient.duplicate();
this.kvSubscription = subscription;
this.messageHandler = (channel: string) => {
if (channel === IP_BAN_REFRESH_CHANNEL) {
this.refresh().catch((err) => {
this.consecutiveFailures++;
const message = err instanceof Error ? err.message : String(err);
if (this.consecutiveFailures >= this.maxConsecutiveFailures) {
Logger.error({error: message}, 'Failed to refresh IP ban cache after notification');
} else {
Logger.warn({error: message}, 'Failed to refresh IP ban cache after notification');
}
});
}
};
subscription
.connect()
.then(() => subscription.subscribe(IP_BAN_REFRESH_CHANNEL))
.then(() => {
subscription.on('message', (channel) => {
if (channel === IP_BAN_REFRESH_CHANNEL) {
this.refresh().catch((err) => {
this.consecutiveFailures++;
const message = err instanceof Error ? err.message : String(err);
if (this.consecutiveFailures >= this.maxConsecutiveFailures) {
Logger.error({error: message}, 'Failed to refresh IP ban cache after notification');
} else {
Logger.warn({error: message}, 'Failed to refresh IP ban cache after notification');
}
});
}
});
if (this.messageHandler) {
subscription.on('message', this.messageHandler);
}
})
.catch((error) => {
Logger.error({error}, 'Failed to subscribe to IP ban refresh channel');
@@ -203,10 +208,14 @@ class IpBanCache {
}
shutdown(): void {
if (this.kvSubscription && this.messageHandler) {
this.kvSubscription.removeAllListeners('message');
}
if (this.kvSubscription) {
this.kvSubscription.disconnect();
this.kvSubscription = null;
}
this.messageHandler = null;
}
}

View File

@@ -173,6 +173,15 @@ import {createMiddleware} from 'hono/factory';
const errorI18nService = new ErrorI18nService();
let _reportService: ReportService | null = null;
export function shutdownReportService(): void {
if (_reportService) {
_reportService.shutdown();
_reportService = null;
}
}
let _testEmailService: TestEmailService | null = null;
function getTestEmailService(): TestEmailService {
if (!_testEmailService) {
@@ -617,19 +626,21 @@ export const ServiceMiddleware = createMiddleware<HonoEnv>(async (ctx, next) =>
const desktopHandoffService = new DesktopHandoffService(cacheService);
const authRequestService = new AuthRequestService(authService, ssoService, cacheService, desktopHandoffService);
const reportSearchService = getReportSearchService();
const reportService = new ReportService(
reportRepository,
channelRepository,
guildRepository,
userRepository,
inviteRepository,
emailService,
emailDnsValidationService,
snowflakeService,
storageService,
reportSearchService,
);
if (!_reportService) {
_reportService = new ReportService(
reportRepository,
channelRepository,
guildRepository,
userRepository,
inviteRepository,
emailService,
emailDnsValidationService,
snowflakeService,
storageService,
getReportSearchService(),
);
}
const reportService = _reportService;
const reportRequestService = new ReportRequestService(reportService);
const adminService = new AdminService(

View File

@@ -34,6 +34,7 @@ export class VoiceTopology {
private subscribers: Set<Subscriber> = new Set();
private serverRotationIndex: Map<string, number> = new Map();
private kvSubscription: IKVSubscription | null = null;
private messageHandler: ((channel: string) => void) | null = null;
constructor(
private voiceRepository: IVoiceRepository,
@@ -53,13 +54,14 @@ export class VoiceTopology {
this.kvSubscription = subscription;
await subscription.connect();
await subscription.subscribe(VOICE_CONFIGURATION_CHANNEL);
subscription.on('message', (channel) => {
this.messageHandler = (channel: string) => {
if (channel === VOICE_CONFIGURATION_CHANNEL) {
this.reload().catch((error) => {
Logger.error({error}, 'Failed to reload voice topology from KV notification');
});
}
});
};
subscription.on('message', this.messageHandler);
} catch (error) {
Logger.error({error}, 'Failed to subscribe to voice configuration channel');
}
@@ -239,9 +241,13 @@ export class VoiceTopology {
}
shutdown(): void {
if (this.kvSubscription && this.messageHandler) {
this.kvSubscription.removeAllListeners('message');
}
if (this.kvSubscription) {
this.kvSubscription.disconnect();
this.kvSubscription = null;
}
this.messageHandler = null;
}
}

View File

@@ -70,6 +70,7 @@ export class JetStreamWorkerQueue {
} catch {
await jsm.consumers.add(STREAM_NAME, {
durable_name: CONSUMER_NAME,
filter_subject: `${SUBJECT_PREFIX}>`,
ack_policy: AckPolicy.Explicit,
max_deliver: MAX_DELIVER,
ack_wait: nanos(ACK_WAIT_MS),