There is a new version of record processor interface.
In this article I want to give some details about the initialize method.
IRecordProcessor V2
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
initialize
InitializationInput
In addition to the current processed shardId you get an instance of ExtendedSequenceNumber. The documentation states that this is either the record processor starting sequence number or the last uncommitted two-phase checkpoint.I use this sequence number to initialize an internal last processed marker in my record processor implementation.
Edge case
After some problems with the DynamoDB metadata table the content was dropped by a colleague. At that point my record processor behaved strange. After checking the log file I wondered about the given sequence number "LATEST".Solution
After some investigation I found some static definitions within the ExtendedSequenceNumber. These are symbolic pointer within the shard stream. Using these, you are able to tell if the current sequence number is really a sequence number or a stream pointer. I found some helper methods but all of them are declared private.ExtendedSequenceNumber
/**
* Special value for LATEST.
*/
public static final ExtendedSequenceNumber LATEST =
new ExtendedSequenceNumber(SentinelCheckpoint.LATEST.toString());
/**
* Special value for SHARD_END.
*/
public static final ExtendedSequenceNumber SHARD_END =
new ExtendedSequenceNumber(SentinelCheckpoint.SHARD_END.toString());
/**
*
* Special value for TRIM_HORIZON.
*/
public static final ExtendedSequenceNumber TRIM_HORIZON =
new ExtendedSequenceNumber(SentinelCheckpoint.TRIM_HORIZON.toString());
Some reverse engineering
But I still wondered why these "pointer" sequence numbers are provided. After digging into the code and examining the client initialization I found the reason for this behavior.A LeaseManager instance looking for the DynamoDB metadata table. If it doesn't exists a new one is generated. After that a new KinesisClientLease instance is generated for the InitialPositionInStream enumeration value given using a instance of KinisisClientLibConfiguration when building the Worker instance. That is converted into a ExtendedSequenceNumber enumeration instance. Now the KinesisClientLease instance is serialized into the metadata table. Finally the initialize method is called passing the sequence number from the KinesisClientLease.