Real-time abnormality detection by means of Random Cut Forest in Amazon Kinesis Data Analytics

Real-time anomaly detection explains an usage case to discover and flag unanticipated habits in streaming information as it happens. Online artificial intelligence (ML) algorithms are popular for this usage case since they do not need any specific guidelines and have the ability to adjust to an altering standard, which is especially beneficial for constant streams of information where inbound information modifications continually gradually.

Random Cut Forest (RCF) is one such algorithm commonly utilized for anomaly detection usage cases. In normal setups, we wish to have the ability to run the RCF algorithm on input information with big throughput, and streaming information processing structures can aid with that. We are thrilled to share that RCF is possible with Amazon Kinesis Data Analytics for Apache Flink Apache Flink is a popular open-source structure for real-time, stateful calculations over information streams, and can be utilized to run RCF on input streams with big throughput.

This post shows how we can utilize Kinesis Data Analytics for Apache Flink to run an online RCF algorithm for anomaly detection.

Service introduction

The following diagram highlights our architecture, which includes 3 elements: an input information stream utilizing Amazon Kinesis Data Streams, a Flink task, and an output Kinesis information stream. In regards to information circulation, we utilize a Python script to produce anomalous sine wave information into the input information stream, the information is then processed by RCF in a Flink task, and the resultant anomaly rating is provided to the output information stream.

The following chart reveals an example of our anticipated outcome, which shows that the anomaly rating peaked when the sine wave information source anomalously dropped to continuous -17.

We can execute this option in 3 basic actions:

  1. Establish AWS resources by means of AWS CloudFormation
  2. Establish an information generator to produce information into the source information stream.
  3. Run the RCF Flink Java code on Kinesis Data Analytics.

Establish AWS resources by means of AWS CloudFormation

The following CloudFormation stack will produce all the AWS resources we require for this tutorial, consisting of 2 Kinesis information streams, a Kinesis Data Analytics app, and an Amazon Simple Storage Service (Amazon S3) pail.

Check In to your AWS account, then select Introduce Stack:


Follow the actions on the AWS CloudFormation console to produce the stack.

Establish an information generator

Run the following Python script to occupy the input information stream with the anomalous sine wave information:

 import json
import boto3
import mathematics


def get_data( time):.
rad = (time/100)% 360.
val = math.sin( rad) * 10 + 10.

if rad > > 2.4 and rad < < 2.6:.
val = -17.

return {'time': time, 'worth': val}

def produce( stream_name, kinesis_client):.
time = 0.

while Real:.
information = get_data( time).
kinesis_client. put_record(.
StreamName= stream_name,.
Information= json.dumps( information),.
PartitionKey=" partitionkey")

time += 1.

if __ name __ == '__ primary __':.
produce( STREAM_NAME, boto3.client(' kinesis', region_name=" us-west-2"))

Run the RCF Flink Java code on Kinesis Data Analytics

The CloudFormation stack instantly downloaded and packaged the RCF Flink task container declare you. For that reason, you can merely go to the Kinesis Data Analytics console to run your application.

That's it! We now have a running Flink task that continually checks out in information from an input Kinesis information stream and computes the anomaly rating for each brand-new information point offered the previous information points it has actually seen.

The following areas discuss the RCF application and Flink task code in more information.

RCF application

Various RCF executions are openly offered. For this tutorial, we utilize the AWS application by covering it around a custom-made wrapper ( RandomCutForestOperator) to be utilized in our Flink task.

RandomCutForestOperator is executed as an Apache Flink ProcessFunction, which is a function that enables us to compose custom-made reasoning to process every component in the stream. Our custom-made reasoning begins with an information change by means of inputDataMapper.apply, followed by getting the anomaly rating by calling the AWS RCF library by means of rcf.getAnomalyScore The code application of RandomCutForestOperator can be discovered on GitHub

RandomCutForestOperatorBuilder needs 2 primary kinds of specifications:

  • RandomCutForestOperator hyperparameters-- We utilize the following:.
    • Measurements-- We set this to 1 since our input information is a 1-dimensional sine wave including the float information type.
    • ShingleSize-- We set this to 1, which implies our RCF algorithm will consider the previous and existing information points in anomaly rating reduction. Keep in mind that this can be increased to represent seasonality in information.
    • SampleSize-- We set this to 628, which implies an optimum of 628 information points is kept in the information sample for each tree.
  • DataMapper specifications for input and output processing-- We utilize the following:.
    • InputDataMapper-- We utilize RandomCutForestOperator.SIMPLE _ FLOAT_INPUT_DATA_MAPPER to map input information from float to drift[]
    • ResultMapper-- We utilize RandomCutForestOperator.SIMPLE _ TUPLE_RESULT_DATA_MAPPER, which is a BiFunction that signs up with the anomaly rating with the matching sine wave information point into a tuple.

Flink task code

The following code bit highlights the core streaming structure of our Apache Flink streaming Java code. It initially checks out in information from the source Kinesis information stream, then processes it utilizing the RCF algorithm. The computed abnormality rating is then composed to an output Kinesis information stream.

 DataStream<< Float> > sineWaveSource = createSourceFromStaticConfig( env);.

. procedure(.
RandomCutForestOperator.<< Float, Tuple2<< Float, Double>>> > home builder()
. setDimensions( 1 )
. setShingleSize( 1 )
. setSampleSize( 628 )
. setInputDataMapper( RandomCutForestOperator.SIMPLE _ FLOAT_INPUT_DATA_MAPPER)
. setResultMapper( RandomCutForestOperator.SIMPLE _ TUPLE_RESULT_DATA_MAPPER)
. construct(),.
TupleTypeInfo.getBasicTupleTypeInfo( Float.class, Double.class))
. addSink( createSinkFromStaticConfig());.

In this example, our standard input information is a sine wave. As displayed in the following screenshot, a low abnormality rating is returned when the information is routine. Nevertheless, when there is an abnormality in the information (when the sine wave input information drops to a consistent), a high abnormality rating is returned. The anomaly rating is provided into an output Kinesis information stream. You can imagine this outcome by producing a Kinesis Data Analytics Studio app; for guidelines, describe Interactive analysis of streaming information

Since this is a not being watched algorithm, you do not require to offer any specific guidelines or identified datasets for this operator. Simply put, just the input information stream, information conversions, and some hyperparameters were supplied. The RCF algorithm itself identified the anticipated standard based upon the input information and recognized any unanticipated habits.

Additionally, this implies the design will continually adjust even if the standard modifications gradually. As such, very little re-training cadence is needed. This is effective for anomaly detection on streaming information since the information will typically wander gradually gradually due seasonal patterns, inflation, devices calibration drift, and so on.

Tidy Up

To prevent sustaining future charges, finish the following actions:

  1. On the Amazon S3 console, empty the S3 pail produced by the CloudFormation stack.
  2. On the AWS CloudFormation console, erase the CloudFormation stack.


This post showed how to carry out anomaly detection on input streaming information with RCF, an online without supervision ML algorithm utilizing Kinesis Data Analytics. We likewise demonstrated how this algorithm discovers the information standard by itself, and can adjust to modifications in the standard gradually. We hope you consider this option for your real-time abnormality detection usage cases.

About the Authors

Daren Wong is a Software Application Advancement Engineer in AWS. He deals with Amazon Kinesis Data Analytics, the handled offering for running Apache Flink applications on AWS.

Aleksandr Pilipenko is a Software Application Advancement Engineer in AWS. He deals with Amazon Kinesis Data Analytics, the handled offering for running Apache Flink applications on AWS.

Hong Liang Teoh is a Software Application Advancement Engineer in AWS. He deals with Amazon Kinesis Data Analytics, the handled offering for running Apache Flink applications on AWS.

Like this post? Please share to your friends:
Leave a Reply

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: