|
@@ -0,0 +1,35 @@
|
|
|
|
+#!/usr/bin/env python
|
|
|
|
+
|
|
|
|
+from redistimeseries.client import Client as RedisTimeSeries
|
|
|
|
+import redis
|
|
|
|
+import time
|
|
|
|
+
|
|
|
|
+redis_host = "redis"
|
|
|
|
+redis_port = 6379
|
|
|
|
+
|
|
|
|
+rts = RedisTimeSeries(host=redis_host, port=redis_port)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+pool = redis.ConnectionPool(host=redis_host, port=redis_port)
|
|
|
|
+r = redis.Redis(connection_pool=pool)
|
|
|
|
+
|
|
|
|
+try:
|
|
|
|
+ r.xadd("mystream", {'event_type': 'startup', 'user': 'root'})
|
|
|
|
+ r.xgroup_create("mystream", "consumerGroup", '$')
|
|
|
|
+except:
|
|
|
|
+ print("group already exists")
|
|
|
|
+
|
|
|
|
+while True:
|
|
|
|
+ msgs = r.xreadgroup("consumerGroup", "consumerName", streams={"mystream": '>'}, count=10, block=1000, noack=False)
|
|
|
|
+ for msg in msgs:
|
|
|
|
+ for m in msg[1]:
|
|
|
|
+ evnt = m[1]['event_type']
|
|
|
|
+ try:
|
|
|
|
+ rts.info(evnt)
|
|
|
|
+ except:
|
|
|
|
+ rts.create(evnt, retentionSecs=60, labels={'event_type': evnt})
|
|
|
|
+ rts.create(evnt+"_minute", retentionSecs=0, labels={'event_type': evnt})
|
|
|
|
+ rts.createrule(evnt, evnt+"_minute", 'count', 60)
|
|
|
|
+
|
|
|
|
+ rts.incrby(evnt,1)
|