Reprocess messages which are not Ack-back by consumer

@TomF btw, what will happen if process crashes because of unhandled exception without calling commit() or rollback()? Thanks