2016-03-22

Amazon Kinesis Client (KCL) V2 IRecordProcessor

If you are using the KCL starting with version 1.5.0 there are some changes as stated in the official documentation. [http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app-java.html].

There is a new version of record processor interface.
In this article I want to give some details about the initialize method.

IRecordProcessor V2
 void initialize(InitializationInput initializationInput)  
 void processRecords(ProcessRecordsInput processRecordsInput)  
 void shutdown(ShutdownInput shutdownInput)  

initialize

InitializationInput

In addition to the current processed shardId you get an instance of ExtendedSequenceNumber. The documentation states that this is either the record processor starting sequence number or the last uncommitted two-phase checkpoint.

I use this sequence number to initialize an internal last processed marker in my record processor implementation.

 

Edge case

After some problems with the DynamoDB metadata table the content was dropped by a colleague. At that point my record processor behaved strange. After checking the log file I wondered about the given sequence number "LATEST".

 

Solution

After some investigation I found some static definitions within the ExtendedSequenceNumber. These are symbolic pointer within the shard stream. Using these, you are able to tell if the current sequence number is really a sequence number or a stream pointer. I found some helper methods but all of them are declared private.

ExtendedSequenceNumber

   /**  
    * Special value for LATEST.  
    */  
   public static final ExtendedSequenceNumber LATEST =   
       new ExtendedSequenceNumber(SentinelCheckpoint.LATEST.toString());  
   /**  
    * Special value for SHARD_END.  
    */  
   public static final ExtendedSequenceNumber SHARD_END =   
     new ExtendedSequenceNumber(SentinelCheckpoint.SHARD_END.toString());  
   /**  
    *   
    * Special value for TRIM_HORIZON.  
    */  
   public static final ExtendedSequenceNumber TRIM_HORIZON =   
       new ExtendedSequenceNumber(SentinelCheckpoint.TRIM_HORIZON.toString());  

Some reverse engineering

But I still wondered why these "pointer" sequence numbers are provided. After digging into the code and examining the client initialization I found the reason for this behavior.
A LeaseManager instance looking for the DynamoDB metadata table. If it doesn't exists a new one is generated. After that a new KinesisClientLease instance is generated for the InitialPositionInStream enumeration value given using a instance of KinisisClientLibConfiguration when building the Worker instance. That is converted into a ExtendedSequenceNumber enumeration instance. Now the KinesisClientLease instance is serialized into the metadata table. Finally the initialize method is called passing the sequence number from the KinesisClientLease.

2016-03-09

Native Docker Feeling For Windows Using Git Bash

Docker on Windows is easy because of the native client port. The CLI tools are pretty well integrated for usage with PowerShell. But all available examples are stated in Linux shell syntax and translating all of them is into PS syntax is most times annoying and alienating.

But the docker toolbox comes for rescue as it requires Git for Windows. This git distribution installs a version of MinGW and the MSYS tools providing the Git Bash shell similar to what you will find on Linux machines.

Using bash you can control any docker host on reachable machines. If you have problems with the default terminal emulation you must use winpty. There is a little culture clash between Linux and Windows console output.

For me this is really a productivity booster. Because combined with ssh you can use one shell instance for almost all adventures to remote Linux machines.