English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
Redis Stream es Redis 5.0 nueva estructura de datos añadida.
Redis Stream se utiliza principalmente para colas de mensajes (MQ, Message Queue), Redis tiene un Redis publicar y suscribirse (pub/sub) se utiliza para implementar la función de cola de mensajes, pero tiene la desventaja de que los mensajes no se pueden persistir, si se produce una interrupción de la red, elRedis falla, etc., los mensajes se perderán.
En términos simples, publicar y suscribirse (pub/sub) puede distribuir mensajes, pero no puede registrar mensajes históricos.
Redis Stream ofrece la función de persistencia de mensajes y replicación maestro-esclavo, lo que permite que cualquier cliente acceda a los datos en cualquier momento y recuerde la posición de acceso de cada cliente, y asegura que los mensajes no se pierdan.
La estructura de Redis Stream es como se muestra a continuación, tiene una lista de mensajes que une todos los mensajes añadidos, cada mensaje tiene un ID único y el contenido correspondiente:
Cada Stream tiene un nombre único, que es la clave de Redis, se crea automáticamente al usar la instrucción xadd por primera vez para agregar mensajes.
Análisis del gráfico superior:
Grupo de Consumidor :Grupo de consumo, creado con el comando XGROUP CREATE, un grupo de consumo tiene varios consumidores (Consumer).
last_delivered_id :Cursor, cada grupo de consumo tendrá un cursor last_delivered_id, y cualquier consumidor que lea mensajes moverá el cursor last_delivered_id hacia adelante.
pending_ids :Variable de estado del consumidor (Consumer), que tiene la función de mantener los ids no confirmados del consumidor. pending_ids registra los mensajes que han sido leídos por el cliente pero aún no han sido ack (carácter de reconocimiento: carácter de confirmación).
Comandos relacionados con la cola de mensajes:
XADD - Añadir mensaje al final
XTRIM - cortar el flujo, limitar la longitud
XDEL - eliminar mensajes
XLEN - obtener la cantidad de elementos en el flujo, es decir, la longitud de los mensajes
XRANGE - obtener lista de mensajes, se filtrarán automáticamente los mensajes eliminados
XREVRANGE - obtener lista de mensajes en orden inverso, ID de mayor a menor
XREAD - obtener lista de mensajes de manera bloqueada o no bloqueada
comandos relacionados con el grupo de consumidores:
XGROUP CREATE - crear grupo de consumidores
XREADGROUP GROUP - leer mensajes del grupo de consumidores
XACK - marcar mensajes como "procesados"
XGROUP SETID - establecer el nuevo ID de mensaje de entrega final para el grupo de consumidores
XGROUP DELCONSUMER - eliminar consumidor
XGROUP DESTROY - eliminar grupo de consumidores
XPENDING - mostrar información relacionada con los mensajes pendientes
XCLAIM - transferir la propiedad de propiedad de los mensajes
XINFO - ver información relacionada con el flujo y el grupo de consumidores
XINFO GROUPS - imprimir información del grupo de consumidores
XINFO STREAM - imprimir información de flujo
Utilice XADD para agregar mensajes a la cola, si la cola especificada no existe, se crea una cola, la sintaxis de XADD es:
XADD clave ID campo valor [campo valor ...]
key : Nombre de la cola, si no existe se crea
ID :ID de mensaje, utilizamos * representa el ID de mensaje generado por redis, puede ser personalizado, pero debe asegurarse de que sea creciente.
valor de campo : registro.
redis> XADD mystream * nombre Sara apellido OConnor "1601372323627-0" redis> XADD mystream * campo1 valor1 campo2 valor2 campo3 valor3 "1601372323627-1" redis> XLEN mystream (integer) 2 redis> XRANGE mystream - + 1) 1) ""1601372323627-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "OConnor" 2) 1) ""1601372323627-1" 2) 1) "campo"1" 2) "valor"1" 3) "campo"2" 4) "valor"2" 5) "campo"3" 6) "valor"3" redis>
Utilice XTRIM para cortar el flujo, limitar la longitud, la sintaxis es:
XTRIM clave MAXLEN [~] número
key :名称 de la cola
MAXLEN :longitud
count :数量
127.0.0.1:6379> XADD mystream * campo1 A campo2 B campo3 C campo4 D "1601372434568-0" 127.0.0.1:6379> XTRIM mystream MAXLEN 2 (entero) 0 127.0.0.1:6379> XRANGE mystream - + 1) 1) ""1601372434568-0" 2) 1) "campo"1" 2) "A" 3) "campo"2" 4) "B" 5) "campo"3" 6) "C" 7) "campo"4" 8) "D" 127.0.0.1:6379> redis>
Utilice XDEL para eliminar mensajes, la sintaxis es:
XDEL clave ID [ID ...]
key:名称 de la cola
ID :消息 ID
> XADD mystream * a 1 1538561698944-0 > XADD mystream * b 2 1538561700640-0 > XADD mystream * c 3 1538561701744-0 > XDEL mystream 1538561700640-0 (integer) 1 127.0.0.1:6379> XRANGE mystream - + 1) 1) 1538561698944-0 2) 1) "a" 2) ""1" 2) 1) 1538561701744-0 2) 1) "c" 2) ""3"
Utilice XLEN para obtener el número de elementos en el flujo, es decir, la longitud de los mensajes, la sintaxis es:
XLEN clave
key:名称 de la cola
redis> XADD mystream * item 1 "1601372563177-0" redis> XADD mystream * item 2 "1601372563178-0" redis> XADD mystream * item 3 "1601372563178-1" redis> XLEN mystream (integer) 3 redis>
使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:
XRANGE key start end [COUNT count]
key :队列名
start :开始值, - 表示最小值
end :结束值, + 表示最大值
count :数量
redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" redis> XADD writers * name Ngozi surname Adichie "1601372577812-1" redis> XLEN writers (integer) 5 redis> XRANGE writers - + COUNT 2 1) 1) ""1601372577811-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) ""1601372577811-1" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" redis>
使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:
XREVRANGE key end start [COUNT count]
key :队列名
end :结束值, + 表示最大值
start :开始值, - 表示最小值
count :数量
redis> XADD writers * name Virginia surname Woolf "1601372731458-0" redis> XADD writers * name Jane surname Austen "1601372731459-0" redis> XADD writers * name Toni surname Morrison "1601372731459-1" redis> XADD writers * name Agatha surname Christie "1601372731459-2" redis> XADD writers * name Ngozi surname Adichie "1601372731459-3" redis> XLEN writers (integer) 5 redis> XREVRANGE writers + - COUNT 1 1) 1) ""1601372731459-3" 2) 1) "name" 2) "Ngozi" 3) "surname" 4) "Adichie" redis>
使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
count :数量
milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
key :队列名
id :消息 ID
# 从 Stream 头部读取两条消息 > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream" 2) 1) 1) 1526984818136-0 2) 1) "duration" 2) ""1532" 3) "event"-id" 4) ""5" 5) "user"-id" 6) ""7782813" 2) 1) 1526999352406-0 2) 1) "duration" 2) ""812" 3) "event"-id" 4) ""9" 5) "user"-id" 6) ""388234" 2) 1) "writers" 2) 1) 1) 1526985676425-0 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) 1526985685298-0 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen"
使用 XGROUP CREATE 创建消费者组,语法格式:
XGROUP [CREATE key groupname id-o-$] [SETID key groupname id]-o-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
key : Nombre de la cola, si no existe se crea
groupname : Nombre del grupo.
$ : Indica comenzar desde el final, solo acepta nuevos mensajes, todas las mensajes del Stream actuales se ignorarán.
Consumir desde el principio:
XGROUP CREATE mystream consumer-group-name 0-0
Consumir desde el final:
XGROUP CREATE mystream consumer-group-name $
Para leer mensajes del grupo de consumo usando XREADGROUP GROUP, el formato de sintaxis es:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group : Nombre del grupo de consumo
consumer : Nombre del consumidor.
count : Cantidad de lectura.
milliseconds : Milisegundos de bloqueo.
key : Nombre de la cola.
ID : Identificador de mensaje.
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >