refactor: squash branch changes
This commit is contained in:
@@ -18,7 +18,12 @@
|
||||
*/
|
||||
|
||||
import type {IKVPipeline, IKVProvider, IKVSubscription} from '@fluxer/kv_client/src/IKVProvider';
|
||||
import {type IKVLogger, type KVClientConfig, resolveKVClientConfig} from '@fluxer/kv_client/src/KVClientConfig';
|
||||
import {
|
||||
type IKVLogger,
|
||||
type KVClientConfig,
|
||||
type ResolvedKVClientConfig,
|
||||
resolveKVClientConfig,
|
||||
} from '@fluxer/kv_client/src/KVClientConfig';
|
||||
import {KVClientError, KVClientErrorCode} from '@fluxer/kv_client/src/KVClientError';
|
||||
import {
|
||||
createStringEntriesFromPairs,
|
||||
@@ -29,7 +34,7 @@ import {
|
||||
} from '@fluxer/kv_client/src/KVCommandArguments';
|
||||
import {KVPipeline} from '@fluxer/kv_client/src/KVPipeline';
|
||||
import {KVSubscription} from '@fluxer/kv_client/src/KVSubscription';
|
||||
import Redis from 'ioredis';
|
||||
import Redis, {Cluster} from 'ioredis';
|
||||
|
||||
const RELEASE_LOCK_SCRIPT = `
|
||||
if redis.call('GET', KEYS[1]) == ARGV[1] then
|
||||
@@ -106,21 +111,47 @@ return 1
|
||||
`;
|
||||
|
||||
export class KVClient implements IKVProvider {
|
||||
private readonly client: Redis;
|
||||
private readonly client: Redis | Cluster;
|
||||
private readonly config: ResolvedKVClientConfig;
|
||||
private readonly logger: IKVLogger;
|
||||
private readonly url: string;
|
||||
private readonly timeoutMs: number;
|
||||
|
||||
constructor(config: KVClientConfig | string) {
|
||||
const resolvedConfig = resolveKVClientConfig(config);
|
||||
this.config = resolvedConfig;
|
||||
this.url = resolvedConfig.url;
|
||||
this.timeoutMs = resolvedConfig.timeoutMs;
|
||||
this.logger = resolvedConfig.logger;
|
||||
this.client = new Redis(this.url, {
|
||||
connectTimeout: this.timeoutMs,
|
||||
commandTimeout: this.timeoutMs,
|
||||
maxRetriesPerRequest: 1,
|
||||
retryStrategy: createRetryStrategy(),
|
||||
|
||||
if (resolvedConfig.mode === 'cluster') {
|
||||
this.client = this.createClusterClient(resolvedConfig);
|
||||
} else {
|
||||
this.client = new Redis(this.url, {
|
||||
connectTimeout: this.timeoutMs,
|
||||
commandTimeout: this.timeoutMs,
|
||||
maxRetriesPerRequest: 1,
|
||||
retryStrategy: createRetryStrategy(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private createClusterClient(clusterConfig: ResolvedKVClientConfig): Cluster {
|
||||
const nodes =
|
||||
clusterConfig.clusterNodes.length > 0 ? clusterConfig.clusterNodes : parseClusterNodesFromUrl(clusterConfig.url);
|
||||
|
||||
const natMap = clusterConfig.clusterNatMap;
|
||||
const hasNatMap = Object.keys(natMap).length > 0;
|
||||
|
||||
return new Cluster(nodes, {
|
||||
clusterRetryStrategy: createRetryStrategy(),
|
||||
redisOptions: {
|
||||
connectTimeout: clusterConfig.timeoutMs,
|
||||
commandTimeout: clusterConfig.timeoutMs,
|
||||
maxRetriesPerRequest: 1,
|
||||
},
|
||||
scaleReads: 'master',
|
||||
...(hasNatMap ? {natMap} : {}),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -374,6 +405,8 @@ export class KVClient implements IKVProvider {
|
||||
duplicate(): IKVSubscription {
|
||||
return new KVSubscription({
|
||||
url: this.url,
|
||||
mode: this.config.mode,
|
||||
clusterNodes: this.config.clusterNodes,
|
||||
timeoutMs: this.timeoutMs,
|
||||
logger: this.logger,
|
||||
});
|
||||
@@ -480,6 +513,15 @@ export class KVClient implements IKVProvider {
|
||||
}
|
||||
}
|
||||
|
||||
function parseClusterNodesFromUrl(url: string): Array<{host: string; port: number}> {
|
||||
try {
|
||||
const parsed = new URL(url);
|
||||
return [{host: parsed.hostname, port: Number.parseInt(parsed.port || '6379', 10)}];
|
||||
} catch {
|
||||
return [{host: '127.0.0.1', port: 6379}];
|
||||
}
|
||||
}
|
||||
|
||||
function createRetryStrategy(): (times: number) => number {
|
||||
return (times: number) => {
|
||||
const backoffMs = Math.min(times * 100, 2000);
|
||||
|
||||
@@ -24,14 +24,27 @@ export interface IKVLogger {
|
||||
error(obj: object, msg?: string): void;
|
||||
}
|
||||
|
||||
export type KVClientMode = 'standalone' | 'cluster';
|
||||
|
||||
export interface KVClusterNode {
|
||||
host: string;
|
||||
port: number;
|
||||
}
|
||||
|
||||
export interface KVClientConfig {
|
||||
url: string;
|
||||
mode?: KVClientMode;
|
||||
clusterNodes?: Array<KVClusterNode>;
|
||||
clusterNatMap?: Record<string, KVClusterNode>;
|
||||
timeoutMs?: number;
|
||||
logger?: IKVLogger;
|
||||
}
|
||||
|
||||
export interface ResolvedKVClientConfig {
|
||||
url: string;
|
||||
mode: KVClientMode;
|
||||
clusterNodes: Array<KVClusterNode>;
|
||||
clusterNatMap: Record<string, KVClusterNode>;
|
||||
timeoutMs: number;
|
||||
logger: IKVLogger;
|
||||
}
|
||||
@@ -45,6 +58,9 @@ export function resolveKVClientConfig(config: KVClientConfig | string): Resolved
|
||||
if (typeof config === 'string') {
|
||||
return {
|
||||
url: normalizeUrl(config),
|
||||
mode: 'standalone' as const,
|
||||
clusterNodes: [],
|
||||
clusterNatMap: {},
|
||||
timeoutMs: DEFAULT_KV_TIMEOUT_MS,
|
||||
logger: noopLogger,
|
||||
};
|
||||
@@ -52,6 +68,9 @@ export function resolveKVClientConfig(config: KVClientConfig | string): Resolved
|
||||
|
||||
return {
|
||||
url: normalizeUrl(config.url),
|
||||
mode: config.mode ?? 'standalone',
|
||||
clusterNodes: config.clusterNodes ?? [],
|
||||
clusterNatMap: config.clusterNatMap ?? {},
|
||||
timeoutMs: config.timeoutMs ?? DEFAULT_KV_TIMEOUT_MS,
|
||||
logger: config.logger ?? noopLogger,
|
||||
};
|
||||
|
||||
@@ -18,17 +18,21 @@
|
||||
*/
|
||||
|
||||
import type {IKVSubscription} from '@fluxer/kv_client/src/IKVProvider';
|
||||
import type {IKVLogger} from '@fluxer/kv_client/src/KVClientConfig';
|
||||
import type {IKVLogger, KVClientMode, KVClusterNode} from '@fluxer/kv_client/src/KVClientConfig';
|
||||
import Redis from 'ioredis';
|
||||
|
||||
interface KVSubscriptionConfig {
|
||||
url: string;
|
||||
mode?: KVClientMode;
|
||||
clusterNodes?: Array<KVClusterNode>;
|
||||
timeoutMs: number;
|
||||
logger: IKVLogger;
|
||||
}
|
||||
|
||||
export class KVSubscription implements IKVSubscription {
|
||||
private readonly url: string;
|
||||
private readonly mode: KVClientMode;
|
||||
private readonly clusterNodes: Array<KVClusterNode>;
|
||||
private readonly timeoutMs: number;
|
||||
private readonly logger: IKVLogger;
|
||||
private readonly channels: Set<string> = new Set();
|
||||
@@ -38,6 +42,8 @@ export class KVSubscription implements IKVSubscription {
|
||||
|
||||
constructor(config: KVSubscriptionConfig) {
|
||||
this.url = config.url;
|
||||
this.mode = config.mode ?? 'standalone';
|
||||
this.clusterNodes = config.clusterNodes ?? [];
|
||||
this.timeoutMs = config.timeoutMs;
|
||||
this.logger = config.logger;
|
||||
}
|
||||
@@ -47,7 +53,8 @@ export class KVSubscription implements IKVSubscription {
|
||||
return;
|
||||
}
|
||||
|
||||
const client = new Redis(this.url, {
|
||||
const connectionUrl = this.resolveSubscriptionUrl();
|
||||
const client = new Redis(connectionUrl, {
|
||||
autoResubscribe: true,
|
||||
connectTimeout: this.timeoutMs,
|
||||
commandTimeout: this.timeoutMs,
|
||||
@@ -144,6 +151,15 @@ export class KVSubscription implements IKVSubscription {
|
||||
this.errorCallbacks.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private resolveSubscriptionUrl(): string {
|
||||
if (this.mode !== 'cluster' || this.clusterNodes.length === 0) {
|
||||
return this.url;
|
||||
}
|
||||
|
||||
const node = this.clusterNodes[0];
|
||||
return `redis://${node.host}:${node.port}`;
|
||||
}
|
||||
}
|
||||
|
||||
function createRetryStrategy(): (times: number) => number {
|
||||
|
||||
Reference in New Issue
Block a user