chore: remove chunked uploads for now
This commit is contained in:
@@ -1,119 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2026 Fluxer Contributors
|
||||
*
|
||||
* This file is part of Fluxer.
|
||||
*
|
||||
* Fluxer is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Fluxer is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with Fluxer. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
import {createChannelID} from '@fluxer/api/src/BrandedTypes';
|
||||
import {DefaultUserOnly, LoginRequired} from '@fluxer/api/src/middleware/AuthMiddleware';
|
||||
import {RateLimitMiddleware} from '@fluxer/api/src/middleware/RateLimitMiddleware';
|
||||
import {OpenAPI} from '@fluxer/api/src/middleware/ResponseTypeMiddleware';
|
||||
import {RateLimitConfigs} from '@fluxer/api/src/RateLimitConfig';
|
||||
import type {HonoApp} from '@fluxer/api/src/types/HonoEnv';
|
||||
import {Validator} from '@fluxer/api/src/Validator';
|
||||
import {
|
||||
ChunkedUploadChunkParam,
|
||||
ChunkedUploadParam,
|
||||
CompleteChunkedUploadRequest,
|
||||
CompleteChunkedUploadResponse,
|
||||
CreateChunkedUploadRequest,
|
||||
CreateChunkedUploadResponse,
|
||||
UploadChunkResponse,
|
||||
} from '@fluxer/schema/src/domains/channel/ChunkedUploadSchemas';
|
||||
import {ChannelIdParam} from '@fluxer/schema/src/domains/common/CommonParamSchemas';
|
||||
|
||||
export function ChunkedUploadController(app: HonoApp) {
|
||||
app.post(
|
||||
'/channels/:channel_id/chunked-uploads',
|
||||
RateLimitMiddleware(RateLimitConfigs.CHANNEL_CHUNKED_UPLOAD_CREATE),
|
||||
LoginRequired,
|
||||
DefaultUserOnly,
|
||||
Validator('param', ChannelIdParam),
|
||||
Validator('json', CreateChunkedUploadRequest),
|
||||
OpenAPI({
|
||||
operationId: 'create_chunked_upload',
|
||||
summary: 'Initiate a chunked upload session',
|
||||
description:
|
||||
'Creates a new chunked upload session for uploading large files. Returns the upload ID, expected chunk size, and total chunk count. The client should then upload each chunk individually and complete the upload when all chunks are uploaded.',
|
||||
responseSchema: CreateChunkedUploadResponse,
|
||||
statusCode: 201,
|
||||
security: ['bearerToken', 'sessionToken'],
|
||||
tags: ['Channels', 'Attachments'],
|
||||
}),
|
||||
async (ctx) => {
|
||||
const user = ctx.get('user');
|
||||
const channelId = createChannelID(ctx.req.valid('param').channel_id);
|
||||
const body = ctx.req.valid('json');
|
||||
const chunkedUploadService = ctx.get('chunkedUploadService');
|
||||
const result = await chunkedUploadService.initiateUpload(user.id, channelId, body);
|
||||
return ctx.json(result, 201);
|
||||
},
|
||||
);
|
||||
|
||||
app.put(
|
||||
'/channels/:channel_id/chunked-uploads/:upload_id/chunks/:chunk_index',
|
||||
RateLimitMiddleware(RateLimitConfigs.CHANNEL_CHUNKED_UPLOAD_CHUNK),
|
||||
LoginRequired,
|
||||
DefaultUserOnly,
|
||||
Validator('param', ChunkedUploadChunkParam),
|
||||
OpenAPI({
|
||||
operationId: 'upload_chunk',
|
||||
summary: 'Upload a file chunk',
|
||||
description:
|
||||
'Uploads a single chunk of a file as part of a chunked upload session. The chunk index is zero-based. Returns an ETag that must be provided when completing the upload.',
|
||||
responseSchema: UploadChunkResponse,
|
||||
statusCode: 200,
|
||||
security: ['bearerToken', 'sessionToken'],
|
||||
tags: ['Channels', 'Attachments'],
|
||||
}),
|
||||
async (ctx) => {
|
||||
const user = ctx.get('user');
|
||||
const {upload_id, chunk_index} = ctx.req.valid('param');
|
||||
const arrayBuffer = await ctx.req.arrayBuffer();
|
||||
const body = new Uint8Array(arrayBuffer);
|
||||
const chunkedUploadService = ctx.get('chunkedUploadService');
|
||||
const result = await chunkedUploadService.uploadChunk(user.id, upload_id, chunk_index, body);
|
||||
return ctx.json(result);
|
||||
},
|
||||
);
|
||||
|
||||
app.post(
|
||||
'/channels/:channel_id/chunked-uploads/:upload_id/complete',
|
||||
RateLimitMiddleware(RateLimitConfigs.CHANNEL_CHUNKED_UPLOAD_COMPLETE),
|
||||
LoginRequired,
|
||||
DefaultUserOnly,
|
||||
Validator('param', ChunkedUploadParam),
|
||||
Validator('json', CompleteChunkedUploadRequest),
|
||||
OpenAPI({
|
||||
operationId: 'complete_chunked_upload',
|
||||
summary: 'Complete a chunked upload',
|
||||
description:
|
||||
'Completes a chunked upload session by assembling all uploaded chunks. Requires ETags for all chunks. Returns the upload filename that can be referenced when sending a message with the uploaded file.',
|
||||
responseSchema: CompleteChunkedUploadResponse,
|
||||
statusCode: 200,
|
||||
security: ['bearerToken', 'sessionToken'],
|
||||
tags: ['Channels', 'Attachments'],
|
||||
}),
|
||||
async (ctx) => {
|
||||
const user = ctx.get('user');
|
||||
const {upload_id} = ctx.req.valid('param');
|
||||
const body = ctx.req.valid('json');
|
||||
const chunkedUploadService = ctx.get('chunkedUploadService');
|
||||
const result = await chunkedUploadService.completeUpload(user.id, upload_id, body);
|
||||
return ctx.json(result);
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -19,7 +19,6 @@
|
||||
|
||||
import {CallController} from '@fluxer/api/src/channel/controllers/CallController';
|
||||
import {ChannelController} from '@fluxer/api/src/channel/controllers/ChannelController';
|
||||
import {ChunkedUploadController} from '@fluxer/api/src/channel/controllers/ChunkedUploadController';
|
||||
import {MessageController} from '@fluxer/api/src/channel/controllers/MessageController';
|
||||
import {MessageInteractionController} from '@fluxer/api/src/channel/controllers/MessageInteractionController';
|
||||
import {ScheduledMessageController} from '@fluxer/api/src/channel/controllers/ScheduledMessageController';
|
||||
@@ -31,7 +30,6 @@ export function registerChannelControllers(app: HonoApp) {
|
||||
MessageInteractionController(app);
|
||||
MessageController(app);
|
||||
ScheduledMessageController(app);
|
||||
ChunkedUploadController(app);
|
||||
CallController(app);
|
||||
StreamController(app);
|
||||
}
|
||||
|
||||
@@ -1,227 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2026 Fluxer Contributors
|
||||
*
|
||||
* This file is part of Fluxer.
|
||||
*
|
||||
* Fluxer is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Fluxer is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with Fluxer. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
import type {ChannelID, UserID} from '@fluxer/api/src/BrandedTypes';
|
||||
import {Config} from '@fluxer/api/src/Config';
|
||||
import type {AuthenticatedChannel} from '@fluxer/api/src/channel/services/AuthenticatedChannel';
|
||||
import {getContentType} from '@fluxer/api/src/channel/services/message/MessageHelpers';
|
||||
import type {IStorageService} from '@fluxer/api/src/infrastructure/IStorageService';
|
||||
import type {LimitConfigService} from '@fluxer/api/src/limits/LimitConfigService';
|
||||
import {resolveLimitSafe} from '@fluxer/api/src/limits/LimitConfigUtils';
|
||||
import {createLimitMatchContext} from '@fluxer/api/src/limits/LimitMatchContextBuilder';
|
||||
import type {Channel} from '@fluxer/api/src/models/Channel';
|
||||
import type {IUserRepository} from '@fluxer/api/src/user/IUserRepository';
|
||||
import {Permissions} from '@fluxer/constants/src/ChannelConstants';
|
||||
import {
|
||||
ATTACHMENT_MAX_SIZE_NON_PREMIUM,
|
||||
CHUNKED_UPLOAD_CHUNK_SIZE,
|
||||
CHUNKED_UPLOAD_MAX_CHUNKS,
|
||||
CHUNKED_UPLOAD_SESSION_TTL_SECONDS,
|
||||
} from '@fluxer/constants/src/LimitConstants';
|
||||
import {ChunkedUploadChunkIndexOutOfRangeError} from '@fluxer/errors/src/domains/channel/ChunkedUploadChunkIndexOutOfRangeError';
|
||||
import {ChunkedUploadIncompleteError} from '@fluxer/errors/src/domains/channel/ChunkedUploadIncompleteError';
|
||||
import {ChunkedUploadNotFoundError} from '@fluxer/errors/src/domains/channel/ChunkedUploadNotFoundError';
|
||||
import {ChunkedUploadNotOwnedError} from '@fluxer/errors/src/domains/channel/ChunkedUploadNotOwnedError';
|
||||
import {FileSizeTooLargeError} from '@fluxer/errors/src/domains/core/FileSizeTooLargeError';
|
||||
import {UnknownUserError} from '@fluxer/errors/src/domains/user/UnknownUserError';
|
||||
import type {IKVProvider} from '@fluxer/kv_client/src/IKVProvider';
|
||||
import type {
|
||||
CompleteChunkedUploadRequest,
|
||||
CompleteChunkedUploadResponse,
|
||||
CreateChunkedUploadRequest,
|
||||
CreateChunkedUploadResponse,
|
||||
UploadChunkResponse,
|
||||
} from '@fluxer/schema/src/domains/channel/ChunkedUploadSchemas';
|
||||
|
||||
interface ChunkedUploadSession {
|
||||
userId: string;
|
||||
channelId: string;
|
||||
s3UploadId: string;
|
||||
uploadFilename: string;
|
||||
filename: string;
|
||||
fileSize: number;
|
||||
chunkSize: number;
|
||||
chunkCount: number;
|
||||
contentType: string;
|
||||
}
|
||||
|
||||
function sessionKey(uploadId: string): string {
|
||||
return `chunked_upload:${uploadId}`;
|
||||
}
|
||||
|
||||
export class ChunkedUploadService {
|
||||
constructor(
|
||||
private storageService: IStorageService,
|
||||
private kvProvider: IKVProvider,
|
||||
private userRepository: IUserRepository,
|
||||
private limitConfigService: LimitConfigService,
|
||||
private getChannelAuthenticated: (params: {userId: UserID; channelId: ChannelID}) => Promise<AuthenticatedChannel>,
|
||||
private ensureTextChannel: (channel: Channel) => void,
|
||||
) {}
|
||||
|
||||
async initiateUpload(
|
||||
userId: UserID,
|
||||
channelId: ChannelID,
|
||||
request: CreateChunkedUploadRequest,
|
||||
): Promise<CreateChunkedUploadResponse> {
|
||||
const {channel, guild, checkPermission} = await this.getChannelAuthenticated({userId, channelId});
|
||||
this.ensureTextChannel(channel);
|
||||
|
||||
if (guild) {
|
||||
await checkPermission(Permissions.SEND_MESSAGES | Permissions.ATTACH_FILES);
|
||||
}
|
||||
|
||||
const user = await this.userRepository.findUnique(userId);
|
||||
if (!user) {
|
||||
throw new UnknownUserError();
|
||||
}
|
||||
|
||||
const fallbackMaxSize = ATTACHMENT_MAX_SIZE_NON_PREMIUM;
|
||||
const ctx = createLimitMatchContext({user, guildFeatures: guild?.features ?? null});
|
||||
const maxFileSize = resolveLimitSafe(
|
||||
this.limitConfigService.getConfigSnapshot(),
|
||||
ctx,
|
||||
'max_attachment_file_size',
|
||||
fallbackMaxSize,
|
||||
);
|
||||
|
||||
if (request.file_size > maxFileSize) {
|
||||
throw new FileSizeTooLargeError(maxFileSize);
|
||||
}
|
||||
|
||||
const chunkCount = Math.ceil(request.file_size / CHUNKED_UPLOAD_CHUNK_SIZE);
|
||||
if (chunkCount > CHUNKED_UPLOAD_MAX_CHUNKS) {
|
||||
throw new FileSizeTooLargeError(maxFileSize);
|
||||
}
|
||||
|
||||
const uploadFilename = crypto.randomUUID();
|
||||
const contentType = getContentType(request.filename);
|
||||
|
||||
const {uploadId: s3UploadId} = await this.storageService.createMultipartUpload({
|
||||
bucket: Config.s3.buckets.uploads,
|
||||
key: uploadFilename,
|
||||
contentType,
|
||||
});
|
||||
|
||||
const uploadId = crypto.randomUUID();
|
||||
|
||||
const session: ChunkedUploadSession = {
|
||||
userId: userId.toString(),
|
||||
channelId: channelId.toString(),
|
||||
s3UploadId,
|
||||
uploadFilename,
|
||||
filename: request.filename,
|
||||
fileSize: request.file_size,
|
||||
chunkSize: CHUNKED_UPLOAD_CHUNK_SIZE,
|
||||
chunkCount,
|
||||
contentType,
|
||||
};
|
||||
|
||||
await this.kvProvider.setex(sessionKey(uploadId), CHUNKED_UPLOAD_SESSION_TTL_SECONDS, JSON.stringify(session));
|
||||
|
||||
return {
|
||||
upload_id: uploadId,
|
||||
upload_filename: uploadFilename,
|
||||
chunk_size: CHUNKED_UPLOAD_CHUNK_SIZE,
|
||||
chunk_count: chunkCount,
|
||||
};
|
||||
}
|
||||
|
||||
async uploadChunk(
|
||||
userId: UserID,
|
||||
uploadId: string,
|
||||
chunkIndex: number,
|
||||
body: Uint8Array,
|
||||
): Promise<UploadChunkResponse> {
|
||||
const session = await this.getSession(uploadId);
|
||||
this.verifyOwnership(session, userId);
|
||||
|
||||
if (chunkIndex < 0 || chunkIndex >= session.chunkCount) {
|
||||
throw new ChunkedUploadChunkIndexOutOfRangeError();
|
||||
}
|
||||
|
||||
const {etag} = await this.storageService.uploadPart({
|
||||
bucket: Config.s3.buckets.uploads,
|
||||
key: session.uploadFilename,
|
||||
uploadId: session.s3UploadId,
|
||||
partNumber: chunkIndex + 1,
|
||||
body,
|
||||
});
|
||||
|
||||
return {etag};
|
||||
}
|
||||
|
||||
async completeUpload(
|
||||
userId: UserID,
|
||||
uploadId: string,
|
||||
request: CompleteChunkedUploadRequest,
|
||||
): Promise<CompleteChunkedUploadResponse> {
|
||||
const session = await this.getSession(uploadId);
|
||||
this.verifyOwnership(session, userId);
|
||||
|
||||
if (request.etags.length !== session.chunkCount) {
|
||||
throw new ChunkedUploadIncompleteError();
|
||||
}
|
||||
|
||||
const seenIndices = new Set<number>();
|
||||
for (const entry of request.etags) {
|
||||
if (entry.chunk_index < 0 || entry.chunk_index >= session.chunkCount) {
|
||||
throw new ChunkedUploadChunkIndexOutOfRangeError();
|
||||
}
|
||||
if (seenIndices.has(entry.chunk_index)) {
|
||||
throw new ChunkedUploadIncompleteError();
|
||||
}
|
||||
seenIndices.add(entry.chunk_index);
|
||||
}
|
||||
|
||||
const parts = request.etags.map((entry) => ({
|
||||
partNumber: entry.chunk_index + 1,
|
||||
etag: entry.etag,
|
||||
}));
|
||||
|
||||
await this.storageService.completeMultipartUpload({
|
||||
bucket: Config.s3.buckets.uploads,
|
||||
key: session.uploadFilename,
|
||||
uploadId: session.s3UploadId,
|
||||
parts,
|
||||
});
|
||||
|
||||
await this.kvProvider.del(sessionKey(uploadId));
|
||||
|
||||
return {
|
||||
upload_filename: session.uploadFilename,
|
||||
file_size: session.fileSize,
|
||||
content_type: session.contentType,
|
||||
};
|
||||
}
|
||||
|
||||
private async getSession(uploadId: string): Promise<ChunkedUploadSession> {
|
||||
const raw = await this.kvProvider.get(sessionKey(uploadId));
|
||||
if (!raw) {
|
||||
throw new ChunkedUploadNotFoundError();
|
||||
}
|
||||
return JSON.parse(raw) as ChunkedUploadSession;
|
||||
}
|
||||
|
||||
private verifyOwnership(session: ChunkedUploadSession, userId: UserID): void {
|
||||
if (session.userId !== userId.toString()) {
|
||||
throw new ChunkedUploadNotOwnedError();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,6 @@
|
||||
*/
|
||||
|
||||
import type {ChannelID} from '@fluxer/api/src/BrandedTypes';
|
||||
import {Config} from '@fluxer/api/src/Config';
|
||||
import {
|
||||
type AttachmentRequestData,
|
||||
mergeUploadWithClientData,
|
||||
@@ -26,7 +25,6 @@ import {
|
||||
} from '@fluxer/api/src/channel/AttachmentDTOs';
|
||||
import type {IChannelRepository} from '@fluxer/api/src/channel/IChannelRepository';
|
||||
import type {MessageRequest, MessageUpdateRequest} from '@fluxer/api/src/channel/MessageTypes';
|
||||
import {getContentType} from '@fluxer/api/src/channel/services/message/MessageHelpers';
|
||||
import type {GuildService} from '@fluxer/api/src/guild/services/GuildService';
|
||||
import type {LimitConfigService} from '@fluxer/api/src/limits/LimitConfigService';
|
||||
import {resolveLimitSafe} from '@fluxer/api/src/limits/LimitConfigUtils';
|
||||
@@ -172,14 +170,11 @@ export async function parseMultipartMessageData(
|
||||
const fileIds = new Set(filesWithIndices.map((f) => f.index));
|
||||
|
||||
const inlineNewAttachments: Array<ClientAttachmentRequest> = [];
|
||||
const preUploadedNewAttachments: Array<ClientAttachmentRequest> = [];
|
||||
|
||||
for (const att of newAttachments) {
|
||||
const id = typeof att.id === 'string' ? parseInt(att.id, 10) : att.id;
|
||||
if (fileIds.has(id)) {
|
||||
inlineNewAttachments.push(att);
|
||||
} else if (att.uploaded_filename) {
|
||||
preUploadedNewAttachments.push(att);
|
||||
} else {
|
||||
throw InputValidationError.fromCode('attachments', ValidationErrorCodes.NO_FILE_FOR_ATTACHMENT_METADATA, {
|
||||
attachmentId: att.id,
|
||||
@@ -232,32 +227,7 @@ export async function parseMultipartMessageData(
|
||||
});
|
||||
}
|
||||
|
||||
let processedPreUploadedAttachments: Array<AttachmentRequestData> = [];
|
||||
if (preUploadedNewAttachments.length > 0) {
|
||||
const storageService = ctx.get('storageService');
|
||||
|
||||
processedPreUploadedAttachments = await Promise.all(
|
||||
preUploadedNewAttachments.map(async (clientData) => {
|
||||
const uploadFilename = clientData.uploaded_filename!;
|
||||
const metadata = await storageService.getObjectMetadata(Config.s3.buckets.uploads, uploadFilename);
|
||||
if (!metadata) {
|
||||
throw InputValidationError.fromCode('attachments', ValidationErrorCodes.NO_FILE_FOR_ATTACHMENT_METADATA, {
|
||||
attachmentId: clientData.id,
|
||||
});
|
||||
}
|
||||
const uploaded: UploadedAttachment = {
|
||||
id: typeof clientData.id === 'string' ? parseInt(clientData.id, 10) : clientData.id,
|
||||
upload_filename: uploadFilename,
|
||||
filename: clientData.filename,
|
||||
file_size: metadata.contentLength,
|
||||
content_type: getContentType(clientData.filename),
|
||||
};
|
||||
return mergeUploadWithClientData(uploaded, clientData);
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
data.attachments = [...existingAttachments, ...processedInlineAttachments, ...processedPreUploadedAttachments];
|
||||
data.attachments = [...existingAttachments, ...processedInlineAttachments];
|
||||
}
|
||||
|
||||
return data as MessageRequest | MessageUpdateRequest;
|
||||
|
||||
@@ -1,396 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2026 Fluxer Contributors
|
||||
*
|
||||
* This file is part of Fluxer.
|
||||
*
|
||||
* Fluxer is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Fluxer is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with Fluxer. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
import type {TestAccount} from '@fluxer/api/src/auth/tests/AuthTestUtils';
|
||||
import {createMultipartFormData, setupTestGuildAndChannel} from '@fluxer/api/src/channel/tests/AttachmentTestUtils';
|
||||
import type {ApiTestHarness} from '@fluxer/api/src/test/ApiTestHarness';
|
||||
import {createApiTestHarness} from '@fluxer/api/src/test/ApiTestHarness';
|
||||
import {createBuilder} from '@fluxer/api/src/test/TestRequestBuilder';
|
||||
import {APIErrorCodes} from '@fluxer/constants/src/ApiErrorCodes';
|
||||
import {CHUNKED_UPLOAD_CHUNK_SIZE} from '@fluxer/constants/src/LimitConstants';
|
||||
import type {ChannelResponse} from '@fluxer/schema/src/domains/channel/ChannelSchemas';
|
||||
import type {
|
||||
CompleteChunkedUploadResponse,
|
||||
CreateChunkedUploadResponse,
|
||||
UploadChunkResponse,
|
||||
} from '@fluxer/schema/src/domains/channel/ChunkedUploadSchemas';
|
||||
import type {MessageResponse} from '@fluxer/schema/src/domains/message/MessageResponseSchemas';
|
||||
import {afterAll, beforeAll, beforeEach, describe, expect, it} from 'vitest';
|
||||
|
||||
let harness: ApiTestHarness;
|
||||
|
||||
beforeAll(async () => {
|
||||
harness = await createApiTestHarness();
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
await harness.reset();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await harness?.shutdown();
|
||||
});
|
||||
|
||||
async function initiateChunkedUpload(
|
||||
token: string,
|
||||
channelId: string,
|
||||
filename: string,
|
||||
fileSize: number,
|
||||
): Promise<CreateChunkedUploadResponse> {
|
||||
return createBuilder<CreateChunkedUploadResponse>(harness, token)
|
||||
.post(`/channels/${channelId}/chunked-uploads`)
|
||||
.body({filename, file_size: fileSize})
|
||||
.expect(201)
|
||||
.execute();
|
||||
}
|
||||
|
||||
async function uploadChunk(
|
||||
token: string,
|
||||
channelId: string,
|
||||
uploadId: string,
|
||||
chunkIndex: number,
|
||||
data: Buffer,
|
||||
): Promise<UploadChunkResponse> {
|
||||
const response = await harness.app.request(
|
||||
`/channels/${channelId}/chunked-uploads/${uploadId}/chunks/${chunkIndex}`,
|
||||
{
|
||||
method: 'PUT',
|
||||
headers: new Headers({
|
||||
Authorization: token,
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'x-forwarded-for': '127.0.0.1',
|
||||
}),
|
||||
body: data,
|
||||
},
|
||||
);
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
return (await response.json()) as UploadChunkResponse;
|
||||
}
|
||||
|
||||
async function completeChunkedUpload(
|
||||
token: string,
|
||||
channelId: string,
|
||||
uploadId: string,
|
||||
etags: Array<{chunk_index: number; etag: string}>,
|
||||
): Promise<CompleteChunkedUploadResponse> {
|
||||
return createBuilder<CompleteChunkedUploadResponse>(harness, token)
|
||||
.post(`/channels/${channelId}/chunked-uploads/${uploadId}/complete`)
|
||||
.body({etags})
|
||||
.execute();
|
||||
}
|
||||
|
||||
describe('Chunked Uploads', () => {
|
||||
let account: TestAccount;
|
||||
let channel: ChannelResponse;
|
||||
|
||||
beforeEach(async () => {
|
||||
const setup = await setupTestGuildAndChannel(harness);
|
||||
account = setup.account;
|
||||
channel = setup.channel;
|
||||
});
|
||||
|
||||
describe('POST /channels/:channel_id/chunked-uploads', () => {
|
||||
it('should initiate a chunked upload session', async () => {
|
||||
const fileSize = CHUNKED_UPLOAD_CHUNK_SIZE * 2 + 100;
|
||||
const result = await initiateChunkedUpload(account.token, channel.id, 'large-file.bin', fileSize);
|
||||
|
||||
expect(result.upload_id).toBeDefined();
|
||||
expect(result.upload_filename).toBeDefined();
|
||||
expect(result.chunk_size).toBe(CHUNKED_UPLOAD_CHUNK_SIZE);
|
||||
expect(result.chunk_count).toBe(3);
|
||||
});
|
||||
|
||||
it('should reject when file size exceeds the limit', async () => {
|
||||
const hugeSize = 1024 * 1024 * 1024 * 10;
|
||||
await createBuilder(harness, account.token)
|
||||
.post(`/channels/${channel.id}/chunked-uploads`)
|
||||
.body({filename: 'huge.bin', file_size: hugeSize})
|
||||
.expect(400, APIErrorCodes.FILE_SIZE_TOO_LARGE)
|
||||
.execute();
|
||||
});
|
||||
|
||||
it('should reject without authentication', async () => {
|
||||
await createBuilder(harness, '')
|
||||
.post(`/channels/${channel.id}/chunked-uploads`)
|
||||
.body({filename: 'file.bin', file_size: 1024})
|
||||
.expect(401)
|
||||
.execute();
|
||||
});
|
||||
});
|
||||
|
||||
describe('PUT /channels/:channel_id/chunked-uploads/:upload_id/chunks/:chunk_index', () => {
|
||||
it('should upload a chunk and return an etag', async () => {
|
||||
const fileSize = CHUNKED_UPLOAD_CHUNK_SIZE + 100;
|
||||
const initResult = await initiateChunkedUpload(account.token, channel.id, 'test.bin', fileSize);
|
||||
|
||||
const chunkData = Buffer.alloc(CHUNKED_UPLOAD_CHUNK_SIZE, 0xab);
|
||||
const result = await uploadChunk(account.token, channel.id, initResult.upload_id, 0, chunkData);
|
||||
|
||||
expect(result.etag).toBeDefined();
|
||||
expect(typeof result.etag).toBe('string');
|
||||
});
|
||||
|
||||
it('should reject chunk index out of range', async () => {
|
||||
const fileSize = CHUNKED_UPLOAD_CHUNK_SIZE + 100;
|
||||
const initResult = await initiateChunkedUpload(account.token, channel.id, 'test.bin', fileSize);
|
||||
|
||||
const chunkData = Buffer.alloc(100, 0xab);
|
||||
const response = await harness.app.request(
|
||||
`/channels/${channel.id}/chunked-uploads/${initResult.upload_id}/chunks/99`,
|
||||
{
|
||||
method: 'PUT',
|
||||
headers: new Headers({
|
||||
Authorization: account.token,
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'x-forwarded-for': '127.0.0.1',
|
||||
}),
|
||||
body: chunkData,
|
||||
},
|
||||
);
|
||||
|
||||
expect(response.status).toBe(400);
|
||||
const body = (await response.json()) as {code: string};
|
||||
expect(body.code).toBe(APIErrorCodes.CHUNKED_UPLOAD_CHUNK_INDEX_OUT_OF_RANGE);
|
||||
});
|
||||
|
||||
it('should reject for non-existent upload session', async () => {
|
||||
const chunkData = Buffer.alloc(100, 0xab);
|
||||
const response = await harness.app.request(`/channels/${channel.id}/chunked-uploads/non-existent-id/chunks/0`, {
|
||||
method: 'PUT',
|
||||
headers: new Headers({
|
||||
Authorization: account.token,
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'x-forwarded-for': '127.0.0.1',
|
||||
}),
|
||||
body: chunkData,
|
||||
});
|
||||
|
||||
expect(response.status).toBe(404);
|
||||
const body = (await response.json()) as {code: string};
|
||||
expect(body.code).toBe(APIErrorCodes.CHUNKED_UPLOAD_NOT_FOUND);
|
||||
});
|
||||
});
|
||||
|
||||
describe('POST /channels/:channel_id/chunked-uploads/:upload_id/complete', () => {
|
||||
it('should complete a chunked upload', async () => {
|
||||
const chunkSize = CHUNKED_UPLOAD_CHUNK_SIZE;
|
||||
const fileSize = chunkSize * 2;
|
||||
const initResult = await initiateChunkedUpload(account.token, channel.id, 'two-chunks.bin', fileSize);
|
||||
|
||||
const chunk0 = Buffer.alloc(chunkSize, 0xaa);
|
||||
const chunk1 = Buffer.alloc(chunkSize, 0xbb);
|
||||
|
||||
const etag0 = await uploadChunk(account.token, channel.id, initResult.upload_id, 0, chunk0);
|
||||
const etag1 = await uploadChunk(account.token, channel.id, initResult.upload_id, 1, chunk1);
|
||||
|
||||
const result = await completeChunkedUpload(account.token, channel.id, initResult.upload_id, [
|
||||
{chunk_index: 0, etag: etag0.etag},
|
||||
{chunk_index: 1, etag: etag1.etag},
|
||||
]);
|
||||
|
||||
expect(result.upload_filename).toBe(initResult.upload_filename);
|
||||
expect(result.file_size).toBe(fileSize);
|
||||
expect(result.content_type).toBeDefined();
|
||||
});
|
||||
|
||||
it('should reject when not all chunks have been provided', async () => {
|
||||
const fileSize = CHUNKED_UPLOAD_CHUNK_SIZE * 2;
|
||||
const initResult = await initiateChunkedUpload(account.token, channel.id, 'test.bin', fileSize);
|
||||
|
||||
const chunk0 = Buffer.alloc(CHUNKED_UPLOAD_CHUNK_SIZE, 0xaa);
|
||||
const etag0 = await uploadChunk(account.token, channel.id, initResult.upload_id, 0, chunk0);
|
||||
|
||||
await createBuilder(harness, account.token)
|
||||
.post(`/channels/${channel.id}/chunked-uploads/${initResult.upload_id}/complete`)
|
||||
.body({etags: [{chunk_index: 0, etag: etag0.etag}]})
|
||||
.expect(400, APIErrorCodes.CHUNKED_UPLOAD_INCOMPLETE)
|
||||
.execute();
|
||||
});
|
||||
|
||||
it('should reject duplicate chunk indices', async () => {
|
||||
const fileSize = CHUNKED_UPLOAD_CHUNK_SIZE * 2;
|
||||
const initResult = await initiateChunkedUpload(account.token, channel.id, 'test.bin', fileSize);
|
||||
|
||||
const chunk0 = Buffer.alloc(CHUNKED_UPLOAD_CHUNK_SIZE, 0xaa);
|
||||
const etag0 = await uploadChunk(account.token, channel.id, initResult.upload_id, 0, chunk0);
|
||||
|
||||
await createBuilder(harness, account.token)
|
||||
.post(`/channels/${channel.id}/chunked-uploads/${initResult.upload_id}/complete`)
|
||||
.body({
|
||||
etags: [
|
||||
{chunk_index: 0, etag: etag0.etag},
|
||||
{chunk_index: 0, etag: etag0.etag},
|
||||
],
|
||||
})
|
||||
.expect(400, APIErrorCodes.CHUNKED_UPLOAD_INCOMPLETE)
|
||||
.execute();
|
||||
});
|
||||
});
|
||||
|
||||
describe('Upload ownership', () => {
|
||||
it('should reject chunk upload from a different user', async () => {
|
||||
const fileSize = CHUNKED_UPLOAD_CHUNK_SIZE + 100;
|
||||
const initResult = await initiateChunkedUpload(account.token, channel.id, 'test.bin', fileSize);
|
||||
|
||||
const otherSetup = await setupTestGuildAndChannel(harness);
|
||||
const otherAccount = otherSetup.account;
|
||||
|
||||
const chunkData = Buffer.alloc(100, 0xab);
|
||||
const response = await harness.app.request(
|
||||
`/channels/${channel.id}/chunked-uploads/${initResult.upload_id}/chunks/0`,
|
||||
{
|
||||
method: 'PUT',
|
||||
headers: new Headers({
|
||||
Authorization: otherAccount.token,
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'x-forwarded-for': '127.0.0.1',
|
||||
}),
|
||||
body: chunkData,
|
||||
},
|
||||
);
|
||||
|
||||
expect(response.status).toBe(403);
|
||||
const body = (await response.json()) as {code: string};
|
||||
expect(body.code).toBe(APIErrorCodes.CHUNKED_UPLOAD_NOT_OWNED);
|
||||
});
|
||||
});
|
||||
|
||||
describe('End-to-end: chunked upload + message send', () => {
|
||||
it('should send a message with a pre-uploaded file', async () => {
|
||||
const chunkSize = CHUNKED_UPLOAD_CHUNK_SIZE;
|
||||
const fileSize = chunkSize + 500;
|
||||
const initResult = await initiateChunkedUpload(account.token, channel.id, 'uploaded-file.txt', fileSize);
|
||||
|
||||
const chunk0 = Buffer.alloc(chunkSize, 0x41);
|
||||
const chunk1 = Buffer.alloc(500, 0x42);
|
||||
|
||||
const etag0 = await uploadChunk(account.token, channel.id, initResult.upload_id, 0, chunk0);
|
||||
const etag1 = await uploadChunk(account.token, channel.id, initResult.upload_id, 1, chunk1);
|
||||
|
||||
await completeChunkedUpload(account.token, channel.id, initResult.upload_id, [
|
||||
{chunk_index: 0, etag: etag0.etag},
|
||||
{chunk_index: 1, etag: etag1.etag},
|
||||
]);
|
||||
|
||||
const payload = {
|
||||
content: 'Message with chunked upload',
|
||||
attachments: [
|
||||
{
|
||||
id: 0,
|
||||
filename: 'uploaded-file.txt',
|
||||
uploaded_filename: initResult.upload_filename,
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
const {body, contentType} = createMultipartFormData(payload, []);
|
||||
|
||||
const mergedHeaders = new Headers();
|
||||
mergedHeaders.set('Content-Type', contentType);
|
||||
mergedHeaders.set('Authorization', account.token);
|
||||
mergedHeaders.set('x-forwarded-for', '127.0.0.1');
|
||||
|
||||
const response = await harness.app.request(`/channels/${channel.id}/messages`, {
|
||||
method: 'POST',
|
||||
headers: mergedHeaders,
|
||||
body,
|
||||
});
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
const message = (await response.json()) as MessageResponse;
|
||||
expect(message.content).toBe('Message with chunked upload');
|
||||
expect(message.attachments).toBeDefined();
|
||||
expect(message.attachments!.length).toBe(1);
|
||||
expect(message.attachments![0].filename).toBe('uploaded-file.txt');
|
||||
});
|
||||
|
||||
it('should send a message with both inline and pre-uploaded files', async () => {
|
||||
const chunkSize = CHUNKED_UPLOAD_CHUNK_SIZE;
|
||||
const fileSize = chunkSize + 100;
|
||||
const initResult = await initiateChunkedUpload(account.token, channel.id, 'large.bin', fileSize);
|
||||
|
||||
const chunk0 = Buffer.alloc(chunkSize, 0xcc);
|
||||
const chunk1 = Buffer.alloc(100, 0xdd);
|
||||
|
||||
const etag0 = await uploadChunk(account.token, channel.id, initResult.upload_id, 0, chunk0);
|
||||
const etag1 = await uploadChunk(account.token, channel.id, initResult.upload_id, 1, chunk1);
|
||||
|
||||
await completeChunkedUpload(account.token, channel.id, initResult.upload_id, [
|
||||
{chunk_index: 0, etag: etag0.etag},
|
||||
{chunk_index: 1, etag: etag1.etag},
|
||||
]);
|
||||
|
||||
const smallFileData = Buffer.from('small inline file content');
|
||||
const payload = {
|
||||
content: 'Mixed upload message',
|
||||
attachments: [
|
||||
{
|
||||
id: 0,
|
||||
filename: 'small.txt',
|
||||
},
|
||||
{
|
||||
id: 1,
|
||||
filename: 'large.bin',
|
||||
uploaded_filename: initResult.upload_filename,
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
const {response, json} = await sendMixedMessage(account.token, channel.id, payload, [
|
||||
{index: 0, filename: 'small.txt', data: smallFileData},
|
||||
]);
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
expect(json.content).toBe('Mixed upload message');
|
||||
expect(json.attachments).toBeDefined();
|
||||
expect(json.attachments!.length).toBe(2);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
async function sendMixedMessage(
|
||||
token: string,
|
||||
channelId: string,
|
||||
payload: Record<string, unknown>,
|
||||
files: Array<{index: number; filename: string; data: Buffer}>,
|
||||
): Promise<{response: Response; json: MessageResponse}> {
|
||||
const {body, contentType} = createMultipartFormData(payload, files);
|
||||
|
||||
const mergedHeaders = new Headers();
|
||||
mergedHeaders.set('Content-Type', contentType);
|
||||
mergedHeaders.set('Authorization', token);
|
||||
mergedHeaders.set('x-forwarded-for', '127.0.0.1');
|
||||
|
||||
const response = await harness.app.request(`/channels/${channelId}/messages`, {
|
||||
method: 'POST',
|
||||
headers: mergedHeaders,
|
||||
body,
|
||||
});
|
||||
|
||||
const text = await response.text();
|
||||
let json: MessageResponse = undefined as unknown as MessageResponse;
|
||||
if (text.length > 0) {
|
||||
try {
|
||||
json = JSON.parse(text) as MessageResponse;
|
||||
} catch {}
|
||||
}
|
||||
|
||||
return {response, json};
|
||||
}
|
||||
@@ -35,7 +35,6 @@ import {Config} from '@fluxer/api/src/Config';
|
||||
import {ChannelRepository} from '@fluxer/api/src/channel/ChannelRepository';
|
||||
import {ChannelRequestService} from '@fluxer/api/src/channel/services/ChannelRequestService';
|
||||
import {ChannelService} from '@fluxer/api/src/channel/services/ChannelService';
|
||||
import {ChunkedUploadService} from '@fluxer/api/src/channel/services/ChunkedUploadService';
|
||||
import {MessageRequestService} from '@fluxer/api/src/channel/services/message/MessageRequestService';
|
||||
import {ScheduledMessageService} from '@fluxer/api/src/channel/services/ScheduledMessageService';
|
||||
import {StreamPreviewService} from '@fluxer/api/src/channel/services/StreamPreviewService';
|
||||
@@ -160,13 +159,11 @@ import {WebhookRequestService} from '@fluxer/api/src/webhook/WebhookRequestServi
|
||||
import {WebhookService} from '@fluxer/api/src/webhook/WebhookService';
|
||||
import type {ICacheService} from '@fluxer/cache/src/ICacheService';
|
||||
import {KVCacheProvider} from '@fluxer/cache/src/providers/KVCacheProvider';
|
||||
import {TEXT_BASED_CHANNEL_TYPES} from '@fluxer/constants/src/ChannelConstants';
|
||||
import {EmailI18nService} from '@fluxer/email/src/EmailI18nService';
|
||||
import type {EmailConfig, UserBouncedEmailChecker} from '@fluxer/email/src/EmailProviderTypes';
|
||||
import {EmailService} from '@fluxer/email/src/EmailService';
|
||||
import type {IEmailService} from '@fluxer/email/src/IEmailService';
|
||||
import {TestEmailService} from '@fluxer/email/src/TestEmailService';
|
||||
import {CannotSendMessageToNonTextChannelError} from '@fluxer/errors/src/domains/channel/CannotSendMessageToNonTextChannelError';
|
||||
import {createMockLogger} from '@fluxer/logger/src/mock';
|
||||
import {RateLimitService} from '@fluxer/rate_limit/src/RateLimitService';
|
||||
import type {ISmsProvider} from '@fluxer/sms/src/providers/ISmsProvider';
|
||||
@@ -474,19 +471,6 @@ export const ServiceMiddleware = createMiddleware<HonoEnv>(async (ctx, next) =>
|
||||
mediaService,
|
||||
);
|
||||
|
||||
const chunkedUploadService = new ChunkedUploadService(
|
||||
storageService,
|
||||
kvClient,
|
||||
userRepository,
|
||||
limitConfigService,
|
||||
channelService.getChannelAuthenticated.bind(channelService),
|
||||
(channel) => {
|
||||
if (!TEXT_BASED_CHANNEL_TYPES.has(channel.type)) {
|
||||
throw new CannotSendMessageToNonTextChannelError();
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
const scheduledMessageRepository = new ScheduledMessageRepository();
|
||||
const scheduledMessageService = new ScheduledMessageService(
|
||||
channelService,
|
||||
@@ -886,7 +870,6 @@ export const ServiceMiddleware = createMiddleware<HonoEnv>(async (ctx, next) =>
|
||||
ctx.set('cacheService', cacheService);
|
||||
ctx.set('channelService', channelService);
|
||||
ctx.set('channelRequestService', channelRequestService);
|
||||
ctx.set('chunkedUploadService', chunkedUploadService);
|
||||
ctx.set('messageRequestService', messageRequestService);
|
||||
ctx.set('channelRepository', channelRepository);
|
||||
ctx.set('connectionService', connectionService);
|
||||
|
||||
@@ -140,19 +140,4 @@ export const ChannelRateLimitConfigs = {
|
||||
bucket: 'channel:stream:preview:post::stream_key',
|
||||
config: {limit: 20, windowMs: ms('10 seconds')},
|
||||
} as RouteRateLimitConfig,
|
||||
|
||||
CHANNEL_CHUNKED_UPLOAD_CREATE: {
|
||||
bucket: 'channel:chunked_upload:create::channel_id',
|
||||
config: {limit: 5, windowMs: ms('10 seconds')},
|
||||
} as RouteRateLimitConfig,
|
||||
|
||||
CHANNEL_CHUNKED_UPLOAD_CHUNK: {
|
||||
bucket: 'channel:chunked_upload:chunk::channel_id',
|
||||
config: {limit: 50, windowMs: ms('10 seconds')},
|
||||
} as RouteRateLimitConfig,
|
||||
|
||||
CHANNEL_CHUNKED_UPLOAD_COMPLETE: {
|
||||
bucket: 'channel:chunked_upload:complete::channel_id',
|
||||
config: {limit: 5, windowMs: ms('10 seconds')},
|
||||
} as RouteRateLimitConfig,
|
||||
} as const;
|
||||
|
||||
@@ -31,7 +31,6 @@ import type {IBlueskyOAuthService} from '@fluxer/api/src/bluesky/IBlueskyOAuthSe
|
||||
import type {IChannelRepository} from '@fluxer/api/src/channel/IChannelRepository';
|
||||
import type {ChannelRequestService} from '@fluxer/api/src/channel/services/ChannelRequestService';
|
||||
import type {ChannelService} from '@fluxer/api/src/channel/services/ChannelService';
|
||||
import type {ChunkedUploadService} from '@fluxer/api/src/channel/services/ChunkedUploadService';
|
||||
import type {MessageRequestService} from '@fluxer/api/src/channel/services/message/MessageRequestService';
|
||||
import type {ScheduledMessageService} from '@fluxer/api/src/channel/services/ScheduledMessageService';
|
||||
import type {StreamPreviewService} from '@fluxer/api/src/channel/services/StreamPreviewService';
|
||||
@@ -134,7 +133,6 @@ export interface HonoEnv {
|
||||
cacheService: ICacheService;
|
||||
channelService: ChannelService;
|
||||
channelRequestService: ChannelRequestService;
|
||||
chunkedUploadService: ChunkedUploadService;
|
||||
messageRequestService: MessageRequestService;
|
||||
channelRepository: IChannelRepository;
|
||||
connectionService: ConnectionService;
|
||||
|
||||
Reference in New Issue
Block a user