Producer consumer design and implementation using Java 8











up vote
1
down vote

favorite
2












Please review my design and code implementation and suggest if any optimisation is possible in terms of performance (time complexity / space complexity ) or any better way of design or implementation. You are welcome to provide your review comments so that I learn to code and design better.



Git hub link : https://github.com/lalitkumar-kulkarni15/Consumer-Producer-case-study



Problem statement :-



Consumer-Producer-case-study



# Case Study: ## Provided Together with this document we provide src directory containing source code, which is the basis for the task. ## Expectation Implement the task as described below. You can start whenever you like and send us the result once you are done. Add documentation like you would in a real project. Bring the code to a production standard (not more, not less). Do not spend more than 2 hours on this exercise. ## User Story There is a producer (Producer) of price updates for stocks. This producer will generate constant price updates for a fix number of stocks. The producer should not block, every price update should be consumed as quickly as possible. Furthermore there is a load handler (LoadHandler) which will consume the price updates of the producer. In the current implementation the load handler is just passing on the update to a consumer. This should be changed. The consumer (Consumer) will receive the price updates from the load handler. (The current implementation will just print out all price updates for convenience sake.) The consumer is representing a legacy system that cannot consumer more than a certain number of price updates per second. Otherwise it would fall over. ## Task The task of this exercise is to extend the LoadHandler to limit the updates per second to the consumer to a certain given number (MAX_PRICE_UPDATES). In order to achieve this, it is allowed to drop price updates, since otherwise the application will run out of memory, if the application will keep all of them. It is important that, if a price update will be send to the consumer, it has to be the most recent price. ## Result - Fork the project - Implement your solution



Below are the classes :-



1) Producer.java



    package com.exercise.producer;

import com.exercise.model.PriceUpdate;
import com.exercise.producer.Producer;
import com.exercise.regulator.LoadHandler;

public class Producer extends Thread {

private LoadHandler loadHandler;

public Producer(LoadHandler loadHandler) {
this.loadHandler = loadHandler;
}

@Override
public void run() {
System.out.println("Run inside the producer is called.");
try {
generateUpdates();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void generateUpdates() throws InterruptedException{
for (int i = 1; i < 10000000; i++) {
System.out.println("Stock set start");
Thread.sleep(5000);
System.out.println("-----------------------");
loadHandler.receive(new PriceUpdate("Apple", 97.85));
loadHandler.receive(new PriceUpdate("Google", 160.71));
loadHandler.receive(new PriceUpdate("Facebook", 91.66));
loadHandler.receive(new PriceUpdate("Google", 160.73));
loadHandler.receive(new PriceUpdate("Facebook", 91.71));
loadHandler.receive(new PriceUpdate("Google", 160.76));
loadHandler.receive(new PriceUpdate("Apple", 97.85));
loadHandler.receive(new PriceUpdate("Google", 160.71));
loadHandler.receive(new PriceUpdate("Facebook", 91.63));
System.out.println("-----------------------");
System.out.println("Stock set over");
}
}

}


2) Consumer.java



    package com.excercise.consumer;

import java.util.List;
import com.exercise.model.PriceUpdate;

/**
* Please do not change the Consumer.
*
*/
public class Consumer {

public void send(List<PriceUpdate> priceUpdates) {
System.out.println("List of price updates received at consumer class is size : "+priceUpdates.size());
priceUpdates.forEach(System.out::println);
}

}


3) PriceUpdate.java



 package com.exercise.model;

import java.time.LocalDateTime;

public class PriceUpdate {

private final String companyName;

private final double price;

private LocalDateTime localDateTime;

public PriceUpdate(String companyName, double price) {
this.companyName = companyName;
this.price = price;
this.localDateTime = LocalDateTime.now();
}

public String getCompanyName() {
return this.companyName;
}

public double getPrice() {
return this.price;
}

public LocalDateTime getLocalDateTime() {
return localDateTime;
}

public void setLocalDateTime(LocalDateTime localDateTime) {
this.localDateTime = localDateTime;
}

@Override
public String toString() {
return companyName + " - " + price +" - "+localDateTime;
}

@Override
public boolean equals(Object obj) {

if(null==obj) {
return false;
}

else if(null != obj && obj instanceof PriceUpdate) {

final PriceUpdate priceUpdate = (PriceUpdate) obj;

if(null!=priceUpdate

&& priceUpdate.getCompanyName().equalsIgnoreCase(this.getCompanyName())
&& priceUpdate.getPrice()==(this.getPrice())
&& (priceUpdate.getLocalDateTime().equals(this.getLocalDateTime()))) {

System.out.println("Equals returning true");

return true;
}

}

System.out.println("Equals returning false");
return false;

}

@Override
public int hashCode() {

int hash = this.companyName.hashCode()
* Double.valueOf(this.price).hashCode()
* this.localDateTime.hashCode();
return hash;
}
}


4) LoadHandler.java



    package com.exercise.regulator;

import java.util.LinkedList;
import java.util.Queue;
import com.excercise.consumer.Consumer;
import com.exercise.model.PriceUpdate;

public class LoadHandler {

private static final int MAX_PRICE_UPDATES = 100;

private final Consumer consumer;

public LoadHandler (Consumer consumer) {
this.consumer = consumer;
Scheduler scheduler = new Scheduler(consumer);
scheduler.startConsumerFeedJobThread();
}

private static Queue<PriceUpdate> priceUpdateQueue = new LinkedList<>();

public static Queue<PriceUpdate> getPriceUpdateQueue() {
return priceUpdateQueue;
}

public static void setPriceUpdateQueue(Queue<PriceUpdate> priceUpdateQueue) {
LoadHandler.priceUpdateQueue = priceUpdateQueue;
}

public void receive(PriceUpdate priceUpdate) {

if(null!=priceUpdate) {

if(priceUpdateQueue.size()<MAX_PRICE_UPDATES) {
priceUpdateQueue.add(priceUpdate);
System.out.println("Stock price added successfully.");
} else {
priceUpdateQueue.poll();
System.out.println("Stock price polled successfully.");
priceUpdateQueue.add(priceUpdate);
System.out.println("Stock price added after poll successfully.");
}

}

}

}


5) RemoveOlderStcksPredicate.java



    package com.exercise.regulator;

import java.time.LocalDateTime;
import java.util.function.Predicate;

import com.exercise.model.PriceUpdate;

public class RemoveOlderStcksPredicate {

public static Predicate<PriceUpdate> isStockEqual(LocalDateTime localDateTime){
System.out.println("Inside is stock equal localdateTime is ::"+localDateTime);
return p->p.getLocalDateTime().isBefore(localDateTime);

}


}


6) StockPredicate.java



    package com.exercise.regulator;

import java.util.HashSet;
import java.util.Queue;
import java.util.function.Predicate;
import com.exercise.model.PriceUpdate;

public class StockPredicate {

public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){

return new HashSet<>(stocksSentToConsumerList)::contains;

}

}


7) Scheduler.java



    package com.exercise.regulator;

import java.time.LocalDateTime;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import com.excercise.consumer.Consumer;
import com.exercise.model.PriceUpdate;

public class Scheduler {

private Consumer consumer;

private static Deque<PriceUpdate> stocksSentToConsumerList = new LinkedList<>();

private static LocalDateTime lastSentDateTime;

public static void setLastSentDateTime(LocalDateTime lastSentDateTime) {
Scheduler.lastSentDateTime = lastSentDateTime;
}

public Scheduler(final Consumer consumer) {
this.consumer = consumer;
}

public void startConsumerFeedJobThread() {

final Runnable stockReguRunnable = getRunnableForstockRegu();
final Thread stockRegulatorThread = new Thread(stockReguRunnable);
stockRegulatorThread.start();

}

private Runnable getRunnableForstockRegu() {

final Runnable runnable = () -> {

try {
sendRegulatedStcksToCnsmr();
} catch (InterruptedException exception) {
exception.printStackTrace();
}

};

return runnable;
}

private void sendRegulatedStcksToCnsmr() throws InterruptedException {

System.out.println("----Starting the scheduler for fetch in scheduler----");
while (true) {

askThreadToSleep();
System.out.println("Got the stock price collection from main queue");
Queue<PriceUpdate> priceUpdateQueue = LoadHandler.getPriceUpdateQueue();
System.out.println("Price update queue size after fetching ::"+priceUpdateQueue.size());
List<PriceUpdate> priceUpdateQueueCopy = new LinkedList<>(priceUpdateQueue);
System.out.println("Copied the stock collection into new queue");

System.out.println("Going to check for already sent stock prices");

System.out.println("-----Printing stocks inside stocksSentToConsumerList------");
stocksSentToConsumerList.forEach(System.out::println);
System.out.println("-----------------------------------------------------------");
System.out.println("-----Printing stocks inside priceUpdateQueueCopy------");
priceUpdateQueueCopy.forEach(System.out::println);
System.out.println("-----------------------------------------------------------");

if(stocksSentToConsumerList.size()>0) {
priceUpdateQueueCopy.removeIf((StockPredicate.isStockEqual(stocksSentToConsumerList).or(RemoveOlderStcksPredicate.isStockEqual(lastSentDateTime))));
} else{
priceUpdateQueueCopy.removeIf((StockPredicate.isStockEqual(stocksSentToConsumerList)));
}
System.out.println("-----Printing stocks inside priceUpdateQueueCopy after filtering------");
priceUpdateQueueCopy.forEach(System.out::println);
System.out.println("-----------------------------------------------------------");

System.out.println("Got filtered stock list with size ::"+priceUpdateQueueCopy.size());
this.consumer.send(priceUpdateQueueCopy);

if(null!=priceUpdateQueueCopy && priceUpdateQueueCopy.size()>0) {
savePrevConsumdStcks(priceUpdateQueueCopy);
}

}
}

private void askThreadToSleep() throws InterruptedException {
System.out.println("----Scheduler sleeping for 1 sec----");
Thread.sleep(1000);
System.out.println("----Scheduler woke up after 1 sec----");
}

private void savePrevConsumdStcks(final List<PriceUpdate> priceUpdateListToSend) {

System.out.println("Clearing the stock sent to consumer list before adding the price update list");
stocksSentToConsumerList.clear();
stocksSentToConsumerList.addAll(priceUpdateListToSend);
setLastSentDateTime(stocksSentToConsumerList.peekLast().getLocalDateTime());
System.out.println("Added the stock price sent list to the collection for next cycle comparison.");
System.out.println("Last sent timestamp is :"+lastSentDateTime);
}

}


8) Exercise.java ( Main class )



    package com.exercise.init;

import com.excercise.consumer.Consumer;
import com.exercise.producer.Producer;
import com.exercise.regulator.LoadHandler;


/**
* Scenario: There is a producer (Producer) of price updates for stocks.
* This producer will generate constant price updates for a fix number of stocks.
* The producer should not block, every price update should be consumed as quickly as possible.
*
* Furthermore there is a load handler (LoadHandler) which will consume the price updates of the producer.
* In the current implementation the load handler will just pass on the update to a consumer. This should be changed.
*
* The consumer (Consumer) will receive the price updates from the load handler.
* (The current implementation will just print out all price updates for convenience sake.)
* The consumer should represent a legacy system that cannot consumer more than a certain number of price updates per second.
* Otherwise it will fall over.
*
* The task of this exercise is to extend the LoadHandler to limit the updates per second to the consumer to a certain given number (MAX_PRICE_UPDATES).
* In order to achieve this, it is a allowed to drop price updates, since otherwise the application will run out of memory, if the application will keep all of them.
* It is important that, if a price update will be send to the consumer, it has to be the most recent price.
*
* Example:
*
* Updates arrive in this order from the consumer:
*
* Apple - 97.85
* Google - 160.71
* Facebook - 91.66
* Google - 160.73
*
* The load balancer has received all updates and is going to send them out to the consumer like this now:
*
* Apple - 97.85
* Google - 160.73
* Facebook - 91.66
*
* So the first Google update (160.73) has been dropped.
*
* In order to limit the number of updates per second to the consumer,
* it will be necessary to write some sort of scheduler/timer.
* It is acceptable to send the updates as bulk once per second.
* Ideally the load should be spread out into smaller chunks during that second.
*
* Please consider that the number of stocks might be bigger than the number of allowed updates per second to the consumer.
* Make sure that the application will not run out of memory, even if the number of stocks or updates per second might be bigger than MAX_PRICE_UPDATES.
*
* Please implement the <b>hashCode</b> and <b>equals</b> in PriceUpdate,
* since those methods might be relevant for the task.
*
* It is fine to create additional classes and tests.
*
* You can use all features of Java 8 as well as any additional library as long as it is open source and will be provided with the solution.
*
*
*/
public class Exercise {

public static void main(String args) {
Consumer consumer = new Consumer ();
LoadHandler loadHandler = new LoadHandler(consumer);
Producer producer = new Producer(loadHandler);
producer.run();
}

}









share|improve this question




















  • 1




    Please post the relevant code section on the website in your post github links are only allowed as a bonus
    – Patrick Hollweck
    Nov 7 at 10:17










  • @PatrickHollweck : Done , please have a look.
    – Hackmaster
    Nov 7 at 10:43










  • Looks good. Just so you know, you need to include the code on the website to prevent potential copy-right issues for people reviewing the code. You can always add a github link, but only in addition to the code on the website. I won't review this, since I barely know Java but I am sure someone else with more Java knowledge will take a look.
    – Patrick Hollweck
    Nov 7 at 11:14










  • Sure @PatrickHollweck, thanks for your suggestions.
    – Hackmaster
    Nov 7 at 11:39










  • always consider implementing Runnable interface rather than extending Thread class. Also, if your producer needs to be a seperate thread, you shoudl be calling producer.start() instead of run method directly.
    – akshaya pandey
    Nov 9 at 9:37















up vote
1
down vote

favorite
2












Please review my design and code implementation and suggest if any optimisation is possible in terms of performance (time complexity / space complexity ) or any better way of design or implementation. You are welcome to provide your review comments so that I learn to code and design better.



Git hub link : https://github.com/lalitkumar-kulkarni15/Consumer-Producer-case-study



Problem statement :-



Consumer-Producer-case-study



# Case Study: ## Provided Together with this document we provide src directory containing source code, which is the basis for the task. ## Expectation Implement the task as described below. You can start whenever you like and send us the result once you are done. Add documentation like you would in a real project. Bring the code to a production standard (not more, not less). Do not spend more than 2 hours on this exercise. ## User Story There is a producer (Producer) of price updates for stocks. This producer will generate constant price updates for a fix number of stocks. The producer should not block, every price update should be consumed as quickly as possible. Furthermore there is a load handler (LoadHandler) which will consume the price updates of the producer. In the current implementation the load handler is just passing on the update to a consumer. This should be changed. The consumer (Consumer) will receive the price updates from the load handler. (The current implementation will just print out all price updates for convenience sake.) The consumer is representing a legacy system that cannot consumer more than a certain number of price updates per second. Otherwise it would fall over. ## Task The task of this exercise is to extend the LoadHandler to limit the updates per second to the consumer to a certain given number (MAX_PRICE_UPDATES). In order to achieve this, it is allowed to drop price updates, since otherwise the application will run out of memory, if the application will keep all of them. It is important that, if a price update will be send to the consumer, it has to be the most recent price. ## Result - Fork the project - Implement your solution



Below are the classes :-



1) Producer.java



    package com.exercise.producer;

import com.exercise.model.PriceUpdate;
import com.exercise.producer.Producer;
import com.exercise.regulator.LoadHandler;

public class Producer extends Thread {

private LoadHandler loadHandler;

public Producer(LoadHandler loadHandler) {
this.loadHandler = loadHandler;
}

@Override
public void run() {
System.out.println("Run inside the producer is called.");
try {
generateUpdates();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void generateUpdates() throws InterruptedException{
for (int i = 1; i < 10000000; i++) {
System.out.println("Stock set start");
Thread.sleep(5000);
System.out.println("-----------------------");
loadHandler.receive(new PriceUpdate("Apple", 97.85));
loadHandler.receive(new PriceUpdate("Google", 160.71));
loadHandler.receive(new PriceUpdate("Facebook", 91.66));
loadHandler.receive(new PriceUpdate("Google", 160.73));
loadHandler.receive(new PriceUpdate("Facebook", 91.71));
loadHandler.receive(new PriceUpdate("Google", 160.76));
loadHandler.receive(new PriceUpdate("Apple", 97.85));
loadHandler.receive(new PriceUpdate("Google", 160.71));
loadHandler.receive(new PriceUpdate("Facebook", 91.63));
System.out.println("-----------------------");
System.out.println("Stock set over");
}
}

}


2) Consumer.java



    package com.excercise.consumer;

import java.util.List;
import com.exercise.model.PriceUpdate;

/**
* Please do not change the Consumer.
*
*/
public class Consumer {

public void send(List<PriceUpdate> priceUpdates) {
System.out.println("List of price updates received at consumer class is size : "+priceUpdates.size());
priceUpdates.forEach(System.out::println);
}

}


3) PriceUpdate.java



 package com.exercise.model;

import java.time.LocalDateTime;

public class PriceUpdate {

private final String companyName;

private final double price;

private LocalDateTime localDateTime;

public PriceUpdate(String companyName, double price) {
this.companyName = companyName;
this.price = price;
this.localDateTime = LocalDateTime.now();
}

public String getCompanyName() {
return this.companyName;
}

public double getPrice() {
return this.price;
}

public LocalDateTime getLocalDateTime() {
return localDateTime;
}

public void setLocalDateTime(LocalDateTime localDateTime) {
this.localDateTime = localDateTime;
}

@Override
public String toString() {
return companyName + " - " + price +" - "+localDateTime;
}

@Override
public boolean equals(Object obj) {

if(null==obj) {
return false;
}

else if(null != obj && obj instanceof PriceUpdate) {

final PriceUpdate priceUpdate = (PriceUpdate) obj;

if(null!=priceUpdate

&& priceUpdate.getCompanyName().equalsIgnoreCase(this.getCompanyName())
&& priceUpdate.getPrice()==(this.getPrice())
&& (priceUpdate.getLocalDateTime().equals(this.getLocalDateTime()))) {

System.out.println("Equals returning true");

return true;
}

}

System.out.println("Equals returning false");
return false;

}

@Override
public int hashCode() {

int hash = this.companyName.hashCode()
* Double.valueOf(this.price).hashCode()
* this.localDateTime.hashCode();
return hash;
}
}


4) LoadHandler.java



    package com.exercise.regulator;

import java.util.LinkedList;
import java.util.Queue;
import com.excercise.consumer.Consumer;
import com.exercise.model.PriceUpdate;

public class LoadHandler {

private static final int MAX_PRICE_UPDATES = 100;

private final Consumer consumer;

public LoadHandler (Consumer consumer) {
this.consumer = consumer;
Scheduler scheduler = new Scheduler(consumer);
scheduler.startConsumerFeedJobThread();
}

private static Queue<PriceUpdate> priceUpdateQueue = new LinkedList<>();

public static Queue<PriceUpdate> getPriceUpdateQueue() {
return priceUpdateQueue;
}

public static void setPriceUpdateQueue(Queue<PriceUpdate> priceUpdateQueue) {
LoadHandler.priceUpdateQueue = priceUpdateQueue;
}

public void receive(PriceUpdate priceUpdate) {

if(null!=priceUpdate) {

if(priceUpdateQueue.size()<MAX_PRICE_UPDATES) {
priceUpdateQueue.add(priceUpdate);
System.out.println("Stock price added successfully.");
} else {
priceUpdateQueue.poll();
System.out.println("Stock price polled successfully.");
priceUpdateQueue.add(priceUpdate);
System.out.println("Stock price added after poll successfully.");
}

}

}

}


5) RemoveOlderStcksPredicate.java



    package com.exercise.regulator;

import java.time.LocalDateTime;
import java.util.function.Predicate;

import com.exercise.model.PriceUpdate;

public class RemoveOlderStcksPredicate {

public static Predicate<PriceUpdate> isStockEqual(LocalDateTime localDateTime){
System.out.println("Inside is stock equal localdateTime is ::"+localDateTime);
return p->p.getLocalDateTime().isBefore(localDateTime);

}


}


6) StockPredicate.java



    package com.exercise.regulator;

import java.util.HashSet;
import java.util.Queue;
import java.util.function.Predicate;
import com.exercise.model.PriceUpdate;

public class StockPredicate {

public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){

return new HashSet<>(stocksSentToConsumerList)::contains;

}

}


7) Scheduler.java



    package com.exercise.regulator;

import java.time.LocalDateTime;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import com.excercise.consumer.Consumer;
import com.exercise.model.PriceUpdate;

public class Scheduler {

private Consumer consumer;

private static Deque<PriceUpdate> stocksSentToConsumerList = new LinkedList<>();

private static LocalDateTime lastSentDateTime;

public static void setLastSentDateTime(LocalDateTime lastSentDateTime) {
Scheduler.lastSentDateTime = lastSentDateTime;
}

public Scheduler(final Consumer consumer) {
this.consumer = consumer;
}

public void startConsumerFeedJobThread() {

final Runnable stockReguRunnable = getRunnableForstockRegu();
final Thread stockRegulatorThread = new Thread(stockReguRunnable);
stockRegulatorThread.start();

}

private Runnable getRunnableForstockRegu() {

final Runnable runnable = () -> {

try {
sendRegulatedStcksToCnsmr();
} catch (InterruptedException exception) {
exception.printStackTrace();
}

};

return runnable;
}

private void sendRegulatedStcksToCnsmr() throws InterruptedException {

System.out.println("----Starting the scheduler for fetch in scheduler----");
while (true) {

askThreadToSleep();
System.out.println("Got the stock price collection from main queue");
Queue<PriceUpdate> priceUpdateQueue = LoadHandler.getPriceUpdateQueue();
System.out.println("Price update queue size after fetching ::"+priceUpdateQueue.size());
List<PriceUpdate> priceUpdateQueueCopy = new LinkedList<>(priceUpdateQueue);
System.out.println("Copied the stock collection into new queue");

System.out.println("Going to check for already sent stock prices");

System.out.println("-----Printing stocks inside stocksSentToConsumerList------");
stocksSentToConsumerList.forEach(System.out::println);
System.out.println("-----------------------------------------------------------");
System.out.println("-----Printing stocks inside priceUpdateQueueCopy------");
priceUpdateQueueCopy.forEach(System.out::println);
System.out.println("-----------------------------------------------------------");

if(stocksSentToConsumerList.size()>0) {
priceUpdateQueueCopy.removeIf((StockPredicate.isStockEqual(stocksSentToConsumerList).or(RemoveOlderStcksPredicate.isStockEqual(lastSentDateTime))));
} else{
priceUpdateQueueCopy.removeIf((StockPredicate.isStockEqual(stocksSentToConsumerList)));
}
System.out.println("-----Printing stocks inside priceUpdateQueueCopy after filtering------");
priceUpdateQueueCopy.forEach(System.out::println);
System.out.println("-----------------------------------------------------------");

System.out.println("Got filtered stock list with size ::"+priceUpdateQueueCopy.size());
this.consumer.send(priceUpdateQueueCopy);

if(null!=priceUpdateQueueCopy && priceUpdateQueueCopy.size()>0) {
savePrevConsumdStcks(priceUpdateQueueCopy);
}

}
}

private void askThreadToSleep() throws InterruptedException {
System.out.println("----Scheduler sleeping for 1 sec----");
Thread.sleep(1000);
System.out.println("----Scheduler woke up after 1 sec----");
}

private void savePrevConsumdStcks(final List<PriceUpdate> priceUpdateListToSend) {

System.out.println("Clearing the stock sent to consumer list before adding the price update list");
stocksSentToConsumerList.clear();
stocksSentToConsumerList.addAll(priceUpdateListToSend);
setLastSentDateTime(stocksSentToConsumerList.peekLast().getLocalDateTime());
System.out.println("Added the stock price sent list to the collection for next cycle comparison.");
System.out.println("Last sent timestamp is :"+lastSentDateTime);
}

}


8) Exercise.java ( Main class )



    package com.exercise.init;

import com.excercise.consumer.Consumer;
import com.exercise.producer.Producer;
import com.exercise.regulator.LoadHandler;


/**
* Scenario: There is a producer (Producer) of price updates for stocks.
* This producer will generate constant price updates for a fix number of stocks.
* The producer should not block, every price update should be consumed as quickly as possible.
*
* Furthermore there is a load handler (LoadHandler) which will consume the price updates of the producer.
* In the current implementation the load handler will just pass on the update to a consumer. This should be changed.
*
* The consumer (Consumer) will receive the price updates from the load handler.
* (The current implementation will just print out all price updates for convenience sake.)
* The consumer should represent a legacy system that cannot consumer more than a certain number of price updates per second.
* Otherwise it will fall over.
*
* The task of this exercise is to extend the LoadHandler to limit the updates per second to the consumer to a certain given number (MAX_PRICE_UPDATES).
* In order to achieve this, it is a allowed to drop price updates, since otherwise the application will run out of memory, if the application will keep all of them.
* It is important that, if a price update will be send to the consumer, it has to be the most recent price.
*
* Example:
*
* Updates arrive in this order from the consumer:
*
* Apple - 97.85
* Google - 160.71
* Facebook - 91.66
* Google - 160.73
*
* The load balancer has received all updates and is going to send them out to the consumer like this now:
*
* Apple - 97.85
* Google - 160.73
* Facebook - 91.66
*
* So the first Google update (160.73) has been dropped.
*
* In order to limit the number of updates per second to the consumer,
* it will be necessary to write some sort of scheduler/timer.
* It is acceptable to send the updates as bulk once per second.
* Ideally the load should be spread out into smaller chunks during that second.
*
* Please consider that the number of stocks might be bigger than the number of allowed updates per second to the consumer.
* Make sure that the application will not run out of memory, even if the number of stocks or updates per second might be bigger than MAX_PRICE_UPDATES.
*
* Please implement the <b>hashCode</b> and <b>equals</b> in PriceUpdate,
* since those methods might be relevant for the task.
*
* It is fine to create additional classes and tests.
*
* You can use all features of Java 8 as well as any additional library as long as it is open source and will be provided with the solution.
*
*
*/
public class Exercise {

public static void main(String args) {
Consumer consumer = new Consumer ();
LoadHandler loadHandler = new LoadHandler(consumer);
Producer producer = new Producer(loadHandler);
producer.run();
}

}









share|improve this question




















  • 1




    Please post the relevant code section on the website in your post github links are only allowed as a bonus
    – Patrick Hollweck
    Nov 7 at 10:17










  • @PatrickHollweck : Done , please have a look.
    – Hackmaster
    Nov 7 at 10:43










  • Looks good. Just so you know, you need to include the code on the website to prevent potential copy-right issues for people reviewing the code. You can always add a github link, but only in addition to the code on the website. I won't review this, since I barely know Java but I am sure someone else with more Java knowledge will take a look.
    – Patrick Hollweck
    Nov 7 at 11:14










  • Sure @PatrickHollweck, thanks for your suggestions.
    – Hackmaster
    Nov 7 at 11:39










  • always consider implementing Runnable interface rather than extending Thread class. Also, if your producer needs to be a seperate thread, you shoudl be calling producer.start() instead of run method directly.
    – akshaya pandey
    Nov 9 at 9:37













up vote
1
down vote

favorite
2









up vote
1
down vote

favorite
2






2





Please review my design and code implementation and suggest if any optimisation is possible in terms of performance (time complexity / space complexity ) or any better way of design or implementation. You are welcome to provide your review comments so that I learn to code and design better.



Git hub link : https://github.com/lalitkumar-kulkarni15/Consumer-Producer-case-study



Problem statement :-



Consumer-Producer-case-study



# Case Study: ## Provided Together with this document we provide src directory containing source code, which is the basis for the task. ## Expectation Implement the task as described below. You can start whenever you like and send us the result once you are done. Add documentation like you would in a real project. Bring the code to a production standard (not more, not less). Do not spend more than 2 hours on this exercise. ## User Story There is a producer (Producer) of price updates for stocks. This producer will generate constant price updates for a fix number of stocks. The producer should not block, every price update should be consumed as quickly as possible. Furthermore there is a load handler (LoadHandler) which will consume the price updates of the producer. In the current implementation the load handler is just passing on the update to a consumer. This should be changed. The consumer (Consumer) will receive the price updates from the load handler. (The current implementation will just print out all price updates for convenience sake.) The consumer is representing a legacy system that cannot consumer more than a certain number of price updates per second. Otherwise it would fall over. ## Task The task of this exercise is to extend the LoadHandler to limit the updates per second to the consumer to a certain given number (MAX_PRICE_UPDATES). In order to achieve this, it is allowed to drop price updates, since otherwise the application will run out of memory, if the application will keep all of them. It is important that, if a price update will be send to the consumer, it has to be the most recent price. ## Result - Fork the project - Implement your solution



Below are the classes :-



1) Producer.java



    package com.exercise.producer;

import com.exercise.model.PriceUpdate;
import com.exercise.producer.Producer;
import com.exercise.regulator.LoadHandler;

public class Producer extends Thread {

private LoadHandler loadHandler;

public Producer(LoadHandler loadHandler) {
this.loadHandler = loadHandler;
}

@Override
public void run() {
System.out.println("Run inside the producer is called.");
try {
generateUpdates();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void generateUpdates() throws InterruptedException{
for (int i = 1; i < 10000000; i++) {
System.out.println("Stock set start");
Thread.sleep(5000);
System.out.println("-----------------------");
loadHandler.receive(new PriceUpdate("Apple", 97.85));
loadHandler.receive(new PriceUpdate("Google", 160.71));
loadHandler.receive(new PriceUpdate("Facebook", 91.66));
loadHandler.receive(new PriceUpdate("Google", 160.73));
loadHandler.receive(new PriceUpdate("Facebook", 91.71));
loadHandler.receive(new PriceUpdate("Google", 160.76));
loadHandler.receive(new PriceUpdate("Apple", 97.85));
loadHandler.receive(new PriceUpdate("Google", 160.71));
loadHandler.receive(new PriceUpdate("Facebook", 91.63));
System.out.println("-----------------------");
System.out.println("Stock set over");
}
}

}


2) Consumer.java



    package com.excercise.consumer;

import java.util.List;
import com.exercise.model.PriceUpdate;

/**
* Please do not change the Consumer.
*
*/
public class Consumer {

public void send(List<PriceUpdate> priceUpdates) {
System.out.println("List of price updates received at consumer class is size : "+priceUpdates.size());
priceUpdates.forEach(System.out::println);
}

}


3) PriceUpdate.java



 package com.exercise.model;

import java.time.LocalDateTime;

public class PriceUpdate {

private final String companyName;

private final double price;

private LocalDateTime localDateTime;

public PriceUpdate(String companyName, double price) {
this.companyName = companyName;
this.price = price;
this.localDateTime = LocalDateTime.now();
}

public String getCompanyName() {
return this.companyName;
}

public double getPrice() {
return this.price;
}

public LocalDateTime getLocalDateTime() {
return localDateTime;
}

public void setLocalDateTime(LocalDateTime localDateTime) {
this.localDateTime = localDateTime;
}

@Override
public String toString() {
return companyName + " - " + price +" - "+localDateTime;
}

@Override
public boolean equals(Object obj) {

if(null==obj) {
return false;
}

else if(null != obj && obj instanceof PriceUpdate) {

final PriceUpdate priceUpdate = (PriceUpdate) obj;

if(null!=priceUpdate

&& priceUpdate.getCompanyName().equalsIgnoreCase(this.getCompanyName())
&& priceUpdate.getPrice()==(this.getPrice())
&& (priceUpdate.getLocalDateTime().equals(this.getLocalDateTime()))) {

System.out.println("Equals returning true");

return true;
}

}

System.out.println("Equals returning false");
return false;

}

@Override
public int hashCode() {

int hash = this.companyName.hashCode()
* Double.valueOf(this.price).hashCode()
* this.localDateTime.hashCode();
return hash;
}
}


4) LoadHandler.java



    package com.exercise.regulator;

import java.util.LinkedList;
import java.util.Queue;
import com.excercise.consumer.Consumer;
import com.exercise.model.PriceUpdate;

public class LoadHandler {

private static final int MAX_PRICE_UPDATES = 100;

private final Consumer consumer;

public LoadHandler (Consumer consumer) {
this.consumer = consumer;
Scheduler scheduler = new Scheduler(consumer);
scheduler.startConsumerFeedJobThread();
}

private static Queue<PriceUpdate> priceUpdateQueue = new LinkedList<>();

public static Queue<PriceUpdate> getPriceUpdateQueue() {
return priceUpdateQueue;
}

public static void setPriceUpdateQueue(Queue<PriceUpdate> priceUpdateQueue) {
LoadHandler.priceUpdateQueue = priceUpdateQueue;
}

public void receive(PriceUpdate priceUpdate) {

if(null!=priceUpdate) {

if(priceUpdateQueue.size()<MAX_PRICE_UPDATES) {
priceUpdateQueue.add(priceUpdate);
System.out.println("Stock price added successfully.");
} else {
priceUpdateQueue.poll();
System.out.println("Stock price polled successfully.");
priceUpdateQueue.add(priceUpdate);
System.out.println("Stock price added after poll successfully.");
}

}

}

}


5) RemoveOlderStcksPredicate.java



    package com.exercise.regulator;

import java.time.LocalDateTime;
import java.util.function.Predicate;

import com.exercise.model.PriceUpdate;

public class RemoveOlderStcksPredicate {

public static Predicate<PriceUpdate> isStockEqual(LocalDateTime localDateTime){
System.out.println("Inside is stock equal localdateTime is ::"+localDateTime);
return p->p.getLocalDateTime().isBefore(localDateTime);

}


}


6) StockPredicate.java



    package com.exercise.regulator;

import java.util.HashSet;
import java.util.Queue;
import java.util.function.Predicate;
import com.exercise.model.PriceUpdate;

public class StockPredicate {

public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){

return new HashSet<>(stocksSentToConsumerList)::contains;

}

}


7) Scheduler.java



    package com.exercise.regulator;

import java.time.LocalDateTime;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import com.excercise.consumer.Consumer;
import com.exercise.model.PriceUpdate;

public class Scheduler {

private Consumer consumer;

private static Deque<PriceUpdate> stocksSentToConsumerList = new LinkedList<>();

private static LocalDateTime lastSentDateTime;

public static void setLastSentDateTime(LocalDateTime lastSentDateTime) {
Scheduler.lastSentDateTime = lastSentDateTime;
}

public Scheduler(final Consumer consumer) {
this.consumer = consumer;
}

public void startConsumerFeedJobThread() {

final Runnable stockReguRunnable = getRunnableForstockRegu();
final Thread stockRegulatorThread = new Thread(stockReguRunnable);
stockRegulatorThread.start();

}

private Runnable getRunnableForstockRegu() {

final Runnable runnable = () -> {

try {
sendRegulatedStcksToCnsmr();
} catch (InterruptedException exception) {
exception.printStackTrace();
}

};

return runnable;
}

private void sendRegulatedStcksToCnsmr() throws InterruptedException {

System.out.println("----Starting the scheduler for fetch in scheduler----");
while (true) {

askThreadToSleep();
System.out.println("Got the stock price collection from main queue");
Queue<PriceUpdate> priceUpdateQueue = LoadHandler.getPriceUpdateQueue();
System.out.println("Price update queue size after fetching ::"+priceUpdateQueue.size());
List<PriceUpdate> priceUpdateQueueCopy = new LinkedList<>(priceUpdateQueue);
System.out.println("Copied the stock collection into new queue");

System.out.println("Going to check for already sent stock prices");

System.out.println("-----Printing stocks inside stocksSentToConsumerList------");
stocksSentToConsumerList.forEach(System.out::println);
System.out.println("-----------------------------------------------------------");
System.out.println("-----Printing stocks inside priceUpdateQueueCopy------");
priceUpdateQueueCopy.forEach(System.out::println);
System.out.println("-----------------------------------------------------------");

if(stocksSentToConsumerList.size()>0) {
priceUpdateQueueCopy.removeIf((StockPredicate.isStockEqual(stocksSentToConsumerList).or(RemoveOlderStcksPredicate.isStockEqual(lastSentDateTime))));
} else{
priceUpdateQueueCopy.removeIf((StockPredicate.isStockEqual(stocksSentToConsumerList)));
}
System.out.println("-----Printing stocks inside priceUpdateQueueCopy after filtering------");
priceUpdateQueueCopy.forEach(System.out::println);
System.out.println("-----------------------------------------------------------");

System.out.println("Got filtered stock list with size ::"+priceUpdateQueueCopy.size());
this.consumer.send(priceUpdateQueueCopy);

if(null!=priceUpdateQueueCopy && priceUpdateQueueCopy.size()>0) {
savePrevConsumdStcks(priceUpdateQueueCopy);
}

}
}

private void askThreadToSleep() throws InterruptedException {
System.out.println("----Scheduler sleeping for 1 sec----");
Thread.sleep(1000);
System.out.println("----Scheduler woke up after 1 sec----");
}

private void savePrevConsumdStcks(final List<PriceUpdate> priceUpdateListToSend) {

System.out.println("Clearing the stock sent to consumer list before adding the price update list");
stocksSentToConsumerList.clear();
stocksSentToConsumerList.addAll(priceUpdateListToSend);
setLastSentDateTime(stocksSentToConsumerList.peekLast().getLocalDateTime());
System.out.println("Added the stock price sent list to the collection for next cycle comparison.");
System.out.println("Last sent timestamp is :"+lastSentDateTime);
}

}


8) Exercise.java ( Main class )



    package com.exercise.init;

import com.excercise.consumer.Consumer;
import com.exercise.producer.Producer;
import com.exercise.regulator.LoadHandler;


/**
* Scenario: There is a producer (Producer) of price updates for stocks.
* This producer will generate constant price updates for a fix number of stocks.
* The producer should not block, every price update should be consumed as quickly as possible.
*
* Furthermore there is a load handler (LoadHandler) which will consume the price updates of the producer.
* In the current implementation the load handler will just pass on the update to a consumer. This should be changed.
*
* The consumer (Consumer) will receive the price updates from the load handler.
* (The current implementation will just print out all price updates for convenience sake.)
* The consumer should represent a legacy system that cannot consumer more than a certain number of price updates per second.
* Otherwise it will fall over.
*
* The task of this exercise is to extend the LoadHandler to limit the updates per second to the consumer to a certain given number (MAX_PRICE_UPDATES).
* In order to achieve this, it is a allowed to drop price updates, since otherwise the application will run out of memory, if the application will keep all of them.
* It is important that, if a price update will be send to the consumer, it has to be the most recent price.
*
* Example:
*
* Updates arrive in this order from the consumer:
*
* Apple - 97.85
* Google - 160.71
* Facebook - 91.66
* Google - 160.73
*
* The load balancer has received all updates and is going to send them out to the consumer like this now:
*
* Apple - 97.85
* Google - 160.73
* Facebook - 91.66
*
* So the first Google update (160.73) has been dropped.
*
* In order to limit the number of updates per second to the consumer,
* it will be necessary to write some sort of scheduler/timer.
* It is acceptable to send the updates as bulk once per second.
* Ideally the load should be spread out into smaller chunks during that second.
*
* Please consider that the number of stocks might be bigger than the number of allowed updates per second to the consumer.
* Make sure that the application will not run out of memory, even if the number of stocks or updates per second might be bigger than MAX_PRICE_UPDATES.
*
* Please implement the <b>hashCode</b> and <b>equals</b> in PriceUpdate,
* since those methods might be relevant for the task.
*
* It is fine to create additional classes and tests.
*
* You can use all features of Java 8 as well as any additional library as long as it is open source and will be provided with the solution.
*
*
*/
public class Exercise {

public static void main(String args) {
Consumer consumer = new Consumer ();
LoadHandler loadHandler = new LoadHandler(consumer);
Producer producer = new Producer(loadHandler);
producer.run();
}

}









share|improve this question















Please review my design and code implementation and suggest if any optimisation is possible in terms of performance (time complexity / space complexity ) or any better way of design or implementation. You are welcome to provide your review comments so that I learn to code and design better.



Git hub link : https://github.com/lalitkumar-kulkarni15/Consumer-Producer-case-study



Problem statement :-



Consumer-Producer-case-study



# Case Study: ## Provided Together with this document we provide src directory containing source code, which is the basis for the task. ## Expectation Implement the task as described below. You can start whenever you like and send us the result once you are done. Add documentation like you would in a real project. Bring the code to a production standard (not more, not less). Do not spend more than 2 hours on this exercise. ## User Story There is a producer (Producer) of price updates for stocks. This producer will generate constant price updates for a fix number of stocks. The producer should not block, every price update should be consumed as quickly as possible. Furthermore there is a load handler (LoadHandler) which will consume the price updates of the producer. In the current implementation the load handler is just passing on the update to a consumer. This should be changed. The consumer (Consumer) will receive the price updates from the load handler. (The current implementation will just print out all price updates for convenience sake.) The consumer is representing a legacy system that cannot consumer more than a certain number of price updates per second. Otherwise it would fall over. ## Task The task of this exercise is to extend the LoadHandler to limit the updates per second to the consumer to a certain given number (MAX_PRICE_UPDATES). In order to achieve this, it is allowed to drop price updates, since otherwise the application will run out of memory, if the application will keep all of them. It is important that, if a price update will be send to the consumer, it has to be the most recent price. ## Result - Fork the project - Implement your solution



Below are the classes :-



1) Producer.java



    package com.exercise.producer;

import com.exercise.model.PriceUpdate;
import com.exercise.producer.Producer;
import com.exercise.regulator.LoadHandler;

public class Producer extends Thread {

private LoadHandler loadHandler;

public Producer(LoadHandler loadHandler) {
this.loadHandler = loadHandler;
}

@Override
public void run() {
System.out.println("Run inside the producer is called.");
try {
generateUpdates();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void generateUpdates() throws InterruptedException{
for (int i = 1; i < 10000000; i++) {
System.out.println("Stock set start");
Thread.sleep(5000);
System.out.println("-----------------------");
loadHandler.receive(new PriceUpdate("Apple", 97.85));
loadHandler.receive(new PriceUpdate("Google", 160.71));
loadHandler.receive(new PriceUpdate("Facebook", 91.66));
loadHandler.receive(new PriceUpdate("Google", 160.73));
loadHandler.receive(new PriceUpdate("Facebook", 91.71));
loadHandler.receive(new PriceUpdate("Google", 160.76));
loadHandler.receive(new PriceUpdate("Apple", 97.85));
loadHandler.receive(new PriceUpdate("Google", 160.71));
loadHandler.receive(new PriceUpdate("Facebook", 91.63));
System.out.println("-----------------------");
System.out.println("Stock set over");
}
}

}


2) Consumer.java



    package com.excercise.consumer;

import java.util.List;
import com.exercise.model.PriceUpdate;

/**
* Please do not change the Consumer.
*
*/
public class Consumer {

public void send(List<PriceUpdate> priceUpdates) {
System.out.println("List of price updates received at consumer class is size : "+priceUpdates.size());
priceUpdates.forEach(System.out::println);
}

}


3) PriceUpdate.java



 package com.exercise.model;

import java.time.LocalDateTime;

public class PriceUpdate {

private final String companyName;

private final double price;

private LocalDateTime localDateTime;

public PriceUpdate(String companyName, double price) {
this.companyName = companyName;
this.price = price;
this.localDateTime = LocalDateTime.now();
}

public String getCompanyName() {
return this.companyName;
}

public double getPrice() {
return this.price;
}

public LocalDateTime getLocalDateTime() {
return localDateTime;
}

public void setLocalDateTime(LocalDateTime localDateTime) {
this.localDateTime = localDateTime;
}

@Override
public String toString() {
return companyName + " - " + price +" - "+localDateTime;
}

@Override
public boolean equals(Object obj) {

if(null==obj) {
return false;
}

else if(null != obj && obj instanceof PriceUpdate) {

final PriceUpdate priceUpdate = (PriceUpdate) obj;

if(null!=priceUpdate

&& priceUpdate.getCompanyName().equalsIgnoreCase(this.getCompanyName())
&& priceUpdate.getPrice()==(this.getPrice())
&& (priceUpdate.getLocalDateTime().equals(this.getLocalDateTime()))) {

System.out.println("Equals returning true");

return true;
}

}

System.out.println("Equals returning false");
return false;

}

@Override
public int hashCode() {

int hash = this.companyName.hashCode()
* Double.valueOf(this.price).hashCode()
* this.localDateTime.hashCode();
return hash;
}
}


4) LoadHandler.java



    package com.exercise.regulator;

import java.util.LinkedList;
import java.util.Queue;
import com.excercise.consumer.Consumer;
import com.exercise.model.PriceUpdate;

public class LoadHandler {

private static final int MAX_PRICE_UPDATES = 100;

private final Consumer consumer;

public LoadHandler (Consumer consumer) {
this.consumer = consumer;
Scheduler scheduler = new Scheduler(consumer);
scheduler.startConsumerFeedJobThread();
}

private static Queue<PriceUpdate> priceUpdateQueue = new LinkedList<>();

public static Queue<PriceUpdate> getPriceUpdateQueue() {
return priceUpdateQueue;
}

public static void setPriceUpdateQueue(Queue<PriceUpdate> priceUpdateQueue) {
LoadHandler.priceUpdateQueue = priceUpdateQueue;
}

public void receive(PriceUpdate priceUpdate) {

if(null!=priceUpdate) {

if(priceUpdateQueue.size()<MAX_PRICE_UPDATES) {
priceUpdateQueue.add(priceUpdate);
System.out.println("Stock price added successfully.");
} else {
priceUpdateQueue.poll();
System.out.println("Stock price polled successfully.");
priceUpdateQueue.add(priceUpdate);
System.out.println("Stock price added after poll successfully.");
}

}

}

}


5) RemoveOlderStcksPredicate.java



    package com.exercise.regulator;

import java.time.LocalDateTime;
import java.util.function.Predicate;

import com.exercise.model.PriceUpdate;

public class RemoveOlderStcksPredicate {

public static Predicate<PriceUpdate> isStockEqual(LocalDateTime localDateTime){
System.out.println("Inside is stock equal localdateTime is ::"+localDateTime);
return p->p.getLocalDateTime().isBefore(localDateTime);

}


}


6) StockPredicate.java



    package com.exercise.regulator;

import java.util.HashSet;
import java.util.Queue;
import java.util.function.Predicate;
import com.exercise.model.PriceUpdate;

public class StockPredicate {

public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){

return new HashSet<>(stocksSentToConsumerList)::contains;

}

}


7) Scheduler.java



    package com.exercise.regulator;

import java.time.LocalDateTime;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import com.excercise.consumer.Consumer;
import com.exercise.model.PriceUpdate;

public class Scheduler {

private Consumer consumer;

private static Deque<PriceUpdate> stocksSentToConsumerList = new LinkedList<>();

private static LocalDateTime lastSentDateTime;

public static void setLastSentDateTime(LocalDateTime lastSentDateTime) {
Scheduler.lastSentDateTime = lastSentDateTime;
}

public Scheduler(final Consumer consumer) {
this.consumer = consumer;
}

public void startConsumerFeedJobThread() {

final Runnable stockReguRunnable = getRunnableForstockRegu();
final Thread stockRegulatorThread = new Thread(stockReguRunnable);
stockRegulatorThread.start();

}

private Runnable getRunnableForstockRegu() {

final Runnable runnable = () -> {

try {
sendRegulatedStcksToCnsmr();
} catch (InterruptedException exception) {
exception.printStackTrace();
}

};

return runnable;
}

private void sendRegulatedStcksToCnsmr() throws InterruptedException {

System.out.println("----Starting the scheduler for fetch in scheduler----");
while (true) {

askThreadToSleep();
System.out.println("Got the stock price collection from main queue");
Queue<PriceUpdate> priceUpdateQueue = LoadHandler.getPriceUpdateQueue();
System.out.println("Price update queue size after fetching ::"+priceUpdateQueue.size());
List<PriceUpdate> priceUpdateQueueCopy = new LinkedList<>(priceUpdateQueue);
System.out.println("Copied the stock collection into new queue");

System.out.println("Going to check for already sent stock prices");

System.out.println("-----Printing stocks inside stocksSentToConsumerList------");
stocksSentToConsumerList.forEach(System.out::println);
System.out.println("-----------------------------------------------------------");
System.out.println("-----Printing stocks inside priceUpdateQueueCopy------");
priceUpdateQueueCopy.forEach(System.out::println);
System.out.println("-----------------------------------------------------------");

if(stocksSentToConsumerList.size()>0) {
priceUpdateQueueCopy.removeIf((StockPredicate.isStockEqual(stocksSentToConsumerList).or(RemoveOlderStcksPredicate.isStockEqual(lastSentDateTime))));
} else{
priceUpdateQueueCopy.removeIf((StockPredicate.isStockEqual(stocksSentToConsumerList)));
}
System.out.println("-----Printing stocks inside priceUpdateQueueCopy after filtering------");
priceUpdateQueueCopy.forEach(System.out::println);
System.out.println("-----------------------------------------------------------");

System.out.println("Got filtered stock list with size ::"+priceUpdateQueueCopy.size());
this.consumer.send(priceUpdateQueueCopy);

if(null!=priceUpdateQueueCopy && priceUpdateQueueCopy.size()>0) {
savePrevConsumdStcks(priceUpdateQueueCopy);
}

}
}

private void askThreadToSleep() throws InterruptedException {
System.out.println("----Scheduler sleeping for 1 sec----");
Thread.sleep(1000);
System.out.println("----Scheduler woke up after 1 sec----");
}

private void savePrevConsumdStcks(final List<PriceUpdate> priceUpdateListToSend) {

System.out.println("Clearing the stock sent to consumer list before adding the price update list");
stocksSentToConsumerList.clear();
stocksSentToConsumerList.addAll(priceUpdateListToSend);
setLastSentDateTime(stocksSentToConsumerList.peekLast().getLocalDateTime());
System.out.println("Added the stock price sent list to the collection for next cycle comparison.");
System.out.println("Last sent timestamp is :"+lastSentDateTime);
}

}


8) Exercise.java ( Main class )



    package com.exercise.init;

import com.excercise.consumer.Consumer;
import com.exercise.producer.Producer;
import com.exercise.regulator.LoadHandler;


/**
* Scenario: There is a producer (Producer) of price updates for stocks.
* This producer will generate constant price updates for a fix number of stocks.
* The producer should not block, every price update should be consumed as quickly as possible.
*
* Furthermore there is a load handler (LoadHandler) which will consume the price updates of the producer.
* In the current implementation the load handler will just pass on the update to a consumer. This should be changed.
*
* The consumer (Consumer) will receive the price updates from the load handler.
* (The current implementation will just print out all price updates for convenience sake.)
* The consumer should represent a legacy system that cannot consumer more than a certain number of price updates per second.
* Otherwise it will fall over.
*
* The task of this exercise is to extend the LoadHandler to limit the updates per second to the consumer to a certain given number (MAX_PRICE_UPDATES).
* In order to achieve this, it is a allowed to drop price updates, since otherwise the application will run out of memory, if the application will keep all of them.
* It is important that, if a price update will be send to the consumer, it has to be the most recent price.
*
* Example:
*
* Updates arrive in this order from the consumer:
*
* Apple - 97.85
* Google - 160.71
* Facebook - 91.66
* Google - 160.73
*
* The load balancer has received all updates and is going to send them out to the consumer like this now:
*
* Apple - 97.85
* Google - 160.73
* Facebook - 91.66
*
* So the first Google update (160.73) has been dropped.
*
* In order to limit the number of updates per second to the consumer,
* it will be necessary to write some sort of scheduler/timer.
* It is acceptable to send the updates as bulk once per second.
* Ideally the load should be spread out into smaller chunks during that second.
*
* Please consider that the number of stocks might be bigger than the number of allowed updates per second to the consumer.
* Make sure that the application will not run out of memory, even if the number of stocks or updates per second might be bigger than MAX_PRICE_UPDATES.
*
* Please implement the <b>hashCode</b> and <b>equals</b> in PriceUpdate,
* since those methods might be relevant for the task.
*
* It is fine to create additional classes and tests.
*
* You can use all features of Java 8 as well as any additional library as long as it is open source and will be provided with the solution.
*
*
*/
public class Exercise {

public static void main(String args) {
Consumer consumer = new Consumer ();
LoadHandler loadHandler = new LoadHandler(consumer);
Producer producer = new Producer(loadHandler);
producer.run();
}

}






java performance algorithm multithreading producer-consumer






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 7 at 10:42

























asked Nov 7 at 10:14









Hackmaster

62




62








  • 1




    Please post the relevant code section on the website in your post github links are only allowed as a bonus
    – Patrick Hollweck
    Nov 7 at 10:17










  • @PatrickHollweck : Done , please have a look.
    – Hackmaster
    Nov 7 at 10:43










  • Looks good. Just so you know, you need to include the code on the website to prevent potential copy-right issues for people reviewing the code. You can always add a github link, but only in addition to the code on the website. I won't review this, since I barely know Java but I am sure someone else with more Java knowledge will take a look.
    – Patrick Hollweck
    Nov 7 at 11:14










  • Sure @PatrickHollweck, thanks for your suggestions.
    – Hackmaster
    Nov 7 at 11:39










  • always consider implementing Runnable interface rather than extending Thread class. Also, if your producer needs to be a seperate thread, you shoudl be calling producer.start() instead of run method directly.
    – akshaya pandey
    Nov 9 at 9:37














  • 1




    Please post the relevant code section on the website in your post github links are only allowed as a bonus
    – Patrick Hollweck
    Nov 7 at 10:17










  • @PatrickHollweck : Done , please have a look.
    – Hackmaster
    Nov 7 at 10:43










  • Looks good. Just so you know, you need to include the code on the website to prevent potential copy-right issues for people reviewing the code. You can always add a github link, but only in addition to the code on the website. I won't review this, since I barely know Java but I am sure someone else with more Java knowledge will take a look.
    – Patrick Hollweck
    Nov 7 at 11:14










  • Sure @PatrickHollweck, thanks for your suggestions.
    – Hackmaster
    Nov 7 at 11:39










  • always consider implementing Runnable interface rather than extending Thread class. Also, if your producer needs to be a seperate thread, you shoudl be calling producer.start() instead of run method directly.
    – akshaya pandey
    Nov 9 at 9:37








1




1




Please post the relevant code section on the website in your post github links are only allowed as a bonus
– Patrick Hollweck
Nov 7 at 10:17




Please post the relevant code section on the website in your post github links are only allowed as a bonus
– Patrick Hollweck
Nov 7 at 10:17












@PatrickHollweck : Done , please have a look.
– Hackmaster
Nov 7 at 10:43




@PatrickHollweck : Done , please have a look.
– Hackmaster
Nov 7 at 10:43












Looks good. Just so you know, you need to include the code on the website to prevent potential copy-right issues for people reviewing the code. You can always add a github link, but only in addition to the code on the website. I won't review this, since I barely know Java but I am sure someone else with more Java knowledge will take a look.
– Patrick Hollweck
Nov 7 at 11:14




Looks good. Just so you know, you need to include the code on the website to prevent potential copy-right issues for people reviewing the code. You can always add a github link, but only in addition to the code on the website. I won't review this, since I barely know Java but I am sure someone else with more Java knowledge will take a look.
– Patrick Hollweck
Nov 7 at 11:14












Sure @PatrickHollweck, thanks for your suggestions.
– Hackmaster
Nov 7 at 11:39




Sure @PatrickHollweck, thanks for your suggestions.
– Hackmaster
Nov 7 at 11:39












always consider implementing Runnable interface rather than extending Thread class. Also, if your producer needs to be a seperate thread, you shoudl be calling producer.start() instead of run method directly.
– akshaya pandey
Nov 9 at 9:37




always consider implementing Runnable interface rather than extending Thread class. Also, if your producer needs to be a seperate thread, you shoudl be calling producer.start() instead of run method directly.
– akshaya pandey
Nov 9 at 9:37










1 Answer
1






active

oldest

votes

















up vote
0
down vote













I've taken a look into your github files and here are my thoughts:



IStockProducer



You might've another taste than me, but I still want to say that: Marker interfaces are useless. They don't help, they don't add any value and they just grow your hierarchy. You should remove it.



PriceUpdate



The constructor: this.localDateTime = LocalDateTime.now()

This may seem convenient for you, but don't make that a habit. It would be much better to give it as an argument
for the constructor. The way you've implemented it, it's impossible to test it because you've got a static dependency inside
the constructor.



setLocalDateTime: I didn't find a place where you're actually using this, so I guess it's made for the future. Just remove
that method and put it into the constructor. This way your PriceUpdate objects could be immutable and that's preferable, especially in a multithreading application.



equals: if (null != obj && obj instanceof PriceUpdate) is equal to: if (obj instanceof PriceUpdate) because instaceof takes care of the null.



hashCode: I am not a hashCode expert, but it seems strange. If you have a good reason for it, ok. If not, you should google how the normal implementation of hashCode looks like (see effective Java book).



Most importantly: Remove the getter and setters of this class. Your PriceUpdate class is currently just a data container. That's
not object-oriented. Instead, your PriceUpdate should do the jobs that you're trying to do from outside, like comparing it with others.



Producer



Thread: Don't inherit from Thread. Instead, implement Runnable and use a thread from outside.



run: This is dangerous:



try {
// ...
} catch (InterruptedException e) {
e.printStackTrace();
}


This way, you get some console output and that's all. In general: If you don't know what to do with an exception, add "throws" to your method signature or wrap it in another exception like this:



try {
// ...
} catch (SomeException e) {
throw new MaybeSomeOtherException(e, "What went wrong from my current prespective.")
}


generateUpdates: You should give your user the possibility to end your producer by using a boolean flag. That would be much better than using a for-loop.



LoadHandler



Static queue: This class is dangerous because it uses a single static Queue with a getter and a setter on it. Just remove the static identifier and let your LoadHandler
completely control this Queue, by removing the accessors to it.



receive: if (null != priceUpdate) is it actually possible to receive nulls in your program or would it be a bug? In the latter case, you should replace the outer
if-statement with a Objects.requireNonNull(...) call. If it's intended, you should think twice about it. Nulls aren't good.



RemoveOlderStcksPredicate



This class should be removed. Let PriceUpdate take care of this stuff.



Schedular



Same as with LoadHandler: Remove "static" from the Deque.



StockPredicate



Seems like a job for one of the other classes (like the other Predicate)



General



final: Sometimes you're using final on your instance variables to indicate that their references won't change. This is very good. The problem is that you don't apply this rule every time and this is confusing. Stick to that rule and be consequent. My advice is to use it even in methods.



Line format: Personally, I dislike your line format. Sometimes you've empty lines after the method signature and before the end of the method and sometimes you have the same rule for if-statements. I would advise you to drop that rule. Examples:



// Instead of:
public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){

return new HashSet<>(stocksSentToConsumerList)::contains;

}

// this:
public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){
return new HashSet<>(stocksSentToConsumerList)::contains;
}


and:



// instead of:
if(priceUpdateQueue.size()<MAX_PRICE_UPDATES) {

// this:
if(priceUpdateQueue.size() < MAX_PRICE_UPDATES) {


and:



// instead of:
public class LoadHandler {

private static final int MAX_PRICE_UPDATES = 100;

private final Consumer consumer;

// this:
public class LoadHandler {
private static final int MAX_PRICE_UPDATES = 100;

private final Consumer consumer;


And besides, sometimes you miss some indentations on some lines. It seems unimportant but reading code by itself takes time and you don't want to annoy your readers with something that is so easy to fix.



Where is the multithreading?



I am confused about your program. Normally, a consumer/producer application has multiple threads which need some synchronization. Your application has two threads: The main thread and the Producer thread. Since the main thread doesn't do anything your application is effectively single threaded. I am not sure if the exercise was meant like that.






share|improve this answer





















    Your Answer





    StackExchange.ifUsing("editor", function () {
    return StackExchange.using("mathjaxEditing", function () {
    StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
    StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
    });
    });
    }, "mathjax-editing");

    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "196"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    convertImagesToLinks: false,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: null,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














     

    draft saved


    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f207134%2fproducer-consumer-design-and-implementation-using-java-8%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes








    up vote
    0
    down vote













    I've taken a look into your github files and here are my thoughts:



    IStockProducer



    You might've another taste than me, but I still want to say that: Marker interfaces are useless. They don't help, they don't add any value and they just grow your hierarchy. You should remove it.



    PriceUpdate



    The constructor: this.localDateTime = LocalDateTime.now()

    This may seem convenient for you, but don't make that a habit. It would be much better to give it as an argument
    for the constructor. The way you've implemented it, it's impossible to test it because you've got a static dependency inside
    the constructor.



    setLocalDateTime: I didn't find a place where you're actually using this, so I guess it's made for the future. Just remove
    that method and put it into the constructor. This way your PriceUpdate objects could be immutable and that's preferable, especially in a multithreading application.



    equals: if (null != obj && obj instanceof PriceUpdate) is equal to: if (obj instanceof PriceUpdate) because instaceof takes care of the null.



    hashCode: I am not a hashCode expert, but it seems strange. If you have a good reason for it, ok. If not, you should google how the normal implementation of hashCode looks like (see effective Java book).



    Most importantly: Remove the getter and setters of this class. Your PriceUpdate class is currently just a data container. That's
    not object-oriented. Instead, your PriceUpdate should do the jobs that you're trying to do from outside, like comparing it with others.



    Producer



    Thread: Don't inherit from Thread. Instead, implement Runnable and use a thread from outside.



    run: This is dangerous:



    try {
    // ...
    } catch (InterruptedException e) {
    e.printStackTrace();
    }


    This way, you get some console output and that's all. In general: If you don't know what to do with an exception, add "throws" to your method signature or wrap it in another exception like this:



    try {
    // ...
    } catch (SomeException e) {
    throw new MaybeSomeOtherException(e, "What went wrong from my current prespective.")
    }


    generateUpdates: You should give your user the possibility to end your producer by using a boolean flag. That would be much better than using a for-loop.



    LoadHandler



    Static queue: This class is dangerous because it uses a single static Queue with a getter and a setter on it. Just remove the static identifier and let your LoadHandler
    completely control this Queue, by removing the accessors to it.



    receive: if (null != priceUpdate) is it actually possible to receive nulls in your program or would it be a bug? In the latter case, you should replace the outer
    if-statement with a Objects.requireNonNull(...) call. If it's intended, you should think twice about it. Nulls aren't good.



    RemoveOlderStcksPredicate



    This class should be removed. Let PriceUpdate take care of this stuff.



    Schedular



    Same as with LoadHandler: Remove "static" from the Deque.



    StockPredicate



    Seems like a job for one of the other classes (like the other Predicate)



    General



    final: Sometimes you're using final on your instance variables to indicate that their references won't change. This is very good. The problem is that you don't apply this rule every time and this is confusing. Stick to that rule and be consequent. My advice is to use it even in methods.



    Line format: Personally, I dislike your line format. Sometimes you've empty lines after the method signature and before the end of the method and sometimes you have the same rule for if-statements. I would advise you to drop that rule. Examples:



    // Instead of:
    public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){

    return new HashSet<>(stocksSentToConsumerList)::contains;

    }

    // this:
    public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){
    return new HashSet<>(stocksSentToConsumerList)::contains;
    }


    and:



    // instead of:
    if(priceUpdateQueue.size()<MAX_PRICE_UPDATES) {

    // this:
    if(priceUpdateQueue.size() < MAX_PRICE_UPDATES) {


    and:



    // instead of:
    public class LoadHandler {

    private static final int MAX_PRICE_UPDATES = 100;

    private final Consumer consumer;

    // this:
    public class LoadHandler {
    private static final int MAX_PRICE_UPDATES = 100;

    private final Consumer consumer;


    And besides, sometimes you miss some indentations on some lines. It seems unimportant but reading code by itself takes time and you don't want to annoy your readers with something that is so easy to fix.



    Where is the multithreading?



    I am confused about your program. Normally, a consumer/producer application has multiple threads which need some synchronization. Your application has two threads: The main thread and the Producer thread. Since the main thread doesn't do anything your application is effectively single threaded. I am not sure if the exercise was meant like that.






    share|improve this answer

























      up vote
      0
      down vote













      I've taken a look into your github files and here are my thoughts:



      IStockProducer



      You might've another taste than me, but I still want to say that: Marker interfaces are useless. They don't help, they don't add any value and they just grow your hierarchy. You should remove it.



      PriceUpdate



      The constructor: this.localDateTime = LocalDateTime.now()

      This may seem convenient for you, but don't make that a habit. It would be much better to give it as an argument
      for the constructor. The way you've implemented it, it's impossible to test it because you've got a static dependency inside
      the constructor.



      setLocalDateTime: I didn't find a place where you're actually using this, so I guess it's made for the future. Just remove
      that method and put it into the constructor. This way your PriceUpdate objects could be immutable and that's preferable, especially in a multithreading application.



      equals: if (null != obj && obj instanceof PriceUpdate) is equal to: if (obj instanceof PriceUpdate) because instaceof takes care of the null.



      hashCode: I am not a hashCode expert, but it seems strange. If you have a good reason for it, ok. If not, you should google how the normal implementation of hashCode looks like (see effective Java book).



      Most importantly: Remove the getter and setters of this class. Your PriceUpdate class is currently just a data container. That's
      not object-oriented. Instead, your PriceUpdate should do the jobs that you're trying to do from outside, like comparing it with others.



      Producer



      Thread: Don't inherit from Thread. Instead, implement Runnable and use a thread from outside.



      run: This is dangerous:



      try {
      // ...
      } catch (InterruptedException e) {
      e.printStackTrace();
      }


      This way, you get some console output and that's all. In general: If you don't know what to do with an exception, add "throws" to your method signature or wrap it in another exception like this:



      try {
      // ...
      } catch (SomeException e) {
      throw new MaybeSomeOtherException(e, "What went wrong from my current prespective.")
      }


      generateUpdates: You should give your user the possibility to end your producer by using a boolean flag. That would be much better than using a for-loop.



      LoadHandler



      Static queue: This class is dangerous because it uses a single static Queue with a getter and a setter on it. Just remove the static identifier and let your LoadHandler
      completely control this Queue, by removing the accessors to it.



      receive: if (null != priceUpdate) is it actually possible to receive nulls in your program or would it be a bug? In the latter case, you should replace the outer
      if-statement with a Objects.requireNonNull(...) call. If it's intended, you should think twice about it. Nulls aren't good.



      RemoveOlderStcksPredicate



      This class should be removed. Let PriceUpdate take care of this stuff.



      Schedular



      Same as with LoadHandler: Remove "static" from the Deque.



      StockPredicate



      Seems like a job for one of the other classes (like the other Predicate)



      General



      final: Sometimes you're using final on your instance variables to indicate that their references won't change. This is very good. The problem is that you don't apply this rule every time and this is confusing. Stick to that rule and be consequent. My advice is to use it even in methods.



      Line format: Personally, I dislike your line format. Sometimes you've empty lines after the method signature and before the end of the method and sometimes you have the same rule for if-statements. I would advise you to drop that rule. Examples:



      // Instead of:
      public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){

      return new HashSet<>(stocksSentToConsumerList)::contains;

      }

      // this:
      public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){
      return new HashSet<>(stocksSentToConsumerList)::contains;
      }


      and:



      // instead of:
      if(priceUpdateQueue.size()<MAX_PRICE_UPDATES) {

      // this:
      if(priceUpdateQueue.size() < MAX_PRICE_UPDATES) {


      and:



      // instead of:
      public class LoadHandler {

      private static final int MAX_PRICE_UPDATES = 100;

      private final Consumer consumer;

      // this:
      public class LoadHandler {
      private static final int MAX_PRICE_UPDATES = 100;

      private final Consumer consumer;


      And besides, sometimes you miss some indentations on some lines. It seems unimportant but reading code by itself takes time and you don't want to annoy your readers with something that is so easy to fix.



      Where is the multithreading?



      I am confused about your program. Normally, a consumer/producer application has multiple threads which need some synchronization. Your application has two threads: The main thread and the Producer thread. Since the main thread doesn't do anything your application is effectively single threaded. I am not sure if the exercise was meant like that.






      share|improve this answer























        up vote
        0
        down vote










        up vote
        0
        down vote









        I've taken a look into your github files and here are my thoughts:



        IStockProducer



        You might've another taste than me, but I still want to say that: Marker interfaces are useless. They don't help, they don't add any value and they just grow your hierarchy. You should remove it.



        PriceUpdate



        The constructor: this.localDateTime = LocalDateTime.now()

        This may seem convenient for you, but don't make that a habit. It would be much better to give it as an argument
        for the constructor. The way you've implemented it, it's impossible to test it because you've got a static dependency inside
        the constructor.



        setLocalDateTime: I didn't find a place where you're actually using this, so I guess it's made for the future. Just remove
        that method and put it into the constructor. This way your PriceUpdate objects could be immutable and that's preferable, especially in a multithreading application.



        equals: if (null != obj && obj instanceof PriceUpdate) is equal to: if (obj instanceof PriceUpdate) because instaceof takes care of the null.



        hashCode: I am not a hashCode expert, but it seems strange. If you have a good reason for it, ok. If not, you should google how the normal implementation of hashCode looks like (see effective Java book).



        Most importantly: Remove the getter and setters of this class. Your PriceUpdate class is currently just a data container. That's
        not object-oriented. Instead, your PriceUpdate should do the jobs that you're trying to do from outside, like comparing it with others.



        Producer



        Thread: Don't inherit from Thread. Instead, implement Runnable and use a thread from outside.



        run: This is dangerous:



        try {
        // ...
        } catch (InterruptedException e) {
        e.printStackTrace();
        }


        This way, you get some console output and that's all. In general: If you don't know what to do with an exception, add "throws" to your method signature or wrap it in another exception like this:



        try {
        // ...
        } catch (SomeException e) {
        throw new MaybeSomeOtherException(e, "What went wrong from my current prespective.")
        }


        generateUpdates: You should give your user the possibility to end your producer by using a boolean flag. That would be much better than using a for-loop.



        LoadHandler



        Static queue: This class is dangerous because it uses a single static Queue with a getter and a setter on it. Just remove the static identifier and let your LoadHandler
        completely control this Queue, by removing the accessors to it.



        receive: if (null != priceUpdate) is it actually possible to receive nulls in your program or would it be a bug? In the latter case, you should replace the outer
        if-statement with a Objects.requireNonNull(...) call. If it's intended, you should think twice about it. Nulls aren't good.



        RemoveOlderStcksPredicate



        This class should be removed. Let PriceUpdate take care of this stuff.



        Schedular



        Same as with LoadHandler: Remove "static" from the Deque.



        StockPredicate



        Seems like a job for one of the other classes (like the other Predicate)



        General



        final: Sometimes you're using final on your instance variables to indicate that their references won't change. This is very good. The problem is that you don't apply this rule every time and this is confusing. Stick to that rule and be consequent. My advice is to use it even in methods.



        Line format: Personally, I dislike your line format. Sometimes you've empty lines after the method signature and before the end of the method and sometimes you have the same rule for if-statements. I would advise you to drop that rule. Examples:



        // Instead of:
        public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){

        return new HashSet<>(stocksSentToConsumerList)::contains;

        }

        // this:
        public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){
        return new HashSet<>(stocksSentToConsumerList)::contains;
        }


        and:



        // instead of:
        if(priceUpdateQueue.size()<MAX_PRICE_UPDATES) {

        // this:
        if(priceUpdateQueue.size() < MAX_PRICE_UPDATES) {


        and:



        // instead of:
        public class LoadHandler {

        private static final int MAX_PRICE_UPDATES = 100;

        private final Consumer consumer;

        // this:
        public class LoadHandler {
        private static final int MAX_PRICE_UPDATES = 100;

        private final Consumer consumer;


        And besides, sometimes you miss some indentations on some lines. It seems unimportant but reading code by itself takes time and you don't want to annoy your readers with something that is so easy to fix.



        Where is the multithreading?



        I am confused about your program. Normally, a consumer/producer application has multiple threads which need some synchronization. Your application has two threads: The main thread and the Producer thread. Since the main thread doesn't do anything your application is effectively single threaded. I am not sure if the exercise was meant like that.






        share|improve this answer












        I've taken a look into your github files and here are my thoughts:



        IStockProducer



        You might've another taste than me, but I still want to say that: Marker interfaces are useless. They don't help, they don't add any value and they just grow your hierarchy. You should remove it.



        PriceUpdate



        The constructor: this.localDateTime = LocalDateTime.now()

        This may seem convenient for you, but don't make that a habit. It would be much better to give it as an argument
        for the constructor. The way you've implemented it, it's impossible to test it because you've got a static dependency inside
        the constructor.



        setLocalDateTime: I didn't find a place where you're actually using this, so I guess it's made for the future. Just remove
        that method and put it into the constructor. This way your PriceUpdate objects could be immutable and that's preferable, especially in a multithreading application.



        equals: if (null != obj && obj instanceof PriceUpdate) is equal to: if (obj instanceof PriceUpdate) because instaceof takes care of the null.



        hashCode: I am not a hashCode expert, but it seems strange. If you have a good reason for it, ok. If not, you should google how the normal implementation of hashCode looks like (see effective Java book).



        Most importantly: Remove the getter and setters of this class. Your PriceUpdate class is currently just a data container. That's
        not object-oriented. Instead, your PriceUpdate should do the jobs that you're trying to do from outside, like comparing it with others.



        Producer



        Thread: Don't inherit from Thread. Instead, implement Runnable and use a thread from outside.



        run: This is dangerous:



        try {
        // ...
        } catch (InterruptedException e) {
        e.printStackTrace();
        }


        This way, you get some console output and that's all. In general: If you don't know what to do with an exception, add "throws" to your method signature or wrap it in another exception like this:



        try {
        // ...
        } catch (SomeException e) {
        throw new MaybeSomeOtherException(e, "What went wrong from my current prespective.")
        }


        generateUpdates: You should give your user the possibility to end your producer by using a boolean flag. That would be much better than using a for-loop.



        LoadHandler



        Static queue: This class is dangerous because it uses a single static Queue with a getter and a setter on it. Just remove the static identifier and let your LoadHandler
        completely control this Queue, by removing the accessors to it.



        receive: if (null != priceUpdate) is it actually possible to receive nulls in your program or would it be a bug? In the latter case, you should replace the outer
        if-statement with a Objects.requireNonNull(...) call. If it's intended, you should think twice about it. Nulls aren't good.



        RemoveOlderStcksPredicate



        This class should be removed. Let PriceUpdate take care of this stuff.



        Schedular



        Same as with LoadHandler: Remove "static" from the Deque.



        StockPredicate



        Seems like a job for one of the other classes (like the other Predicate)



        General



        final: Sometimes you're using final on your instance variables to indicate that their references won't change. This is very good. The problem is that you don't apply this rule every time and this is confusing. Stick to that rule and be consequent. My advice is to use it even in methods.



        Line format: Personally, I dislike your line format. Sometimes you've empty lines after the method signature and before the end of the method and sometimes you have the same rule for if-statements. I would advise you to drop that rule. Examples:



        // Instead of:
        public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){

        return new HashSet<>(stocksSentToConsumerList)::contains;

        }

        // this:
        public static Predicate<PriceUpdate> isStockEqual(Queue<PriceUpdate> stocksSentToConsumerList){
        return new HashSet<>(stocksSentToConsumerList)::contains;
        }


        and:



        // instead of:
        if(priceUpdateQueue.size()<MAX_PRICE_UPDATES) {

        // this:
        if(priceUpdateQueue.size() < MAX_PRICE_UPDATES) {


        and:



        // instead of:
        public class LoadHandler {

        private static final int MAX_PRICE_UPDATES = 100;

        private final Consumer consumer;

        // this:
        public class LoadHandler {
        private static final int MAX_PRICE_UPDATES = 100;

        private final Consumer consumer;


        And besides, sometimes you miss some indentations on some lines. It seems unimportant but reading code by itself takes time and you don't want to annoy your readers with something that is so easy to fix.



        Where is the multithreading?



        I am confused about your program. Normally, a consumer/producer application has multiple threads which need some synchronization. Your application has two threads: The main thread and the Producer thread. Since the main thread doesn't do anything your application is effectively single threaded. I am not sure if the exercise was meant like that.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered 2 days ago









        Synth

        1165




        1165






























             

            draft saved


            draft discarded



















































             


            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f207134%2fproducer-consumer-design-and-implementation-using-java-8%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Quarter-circle Tiles

            build a pushdown automaton that recognizes the reverse language of a given pushdown automaton?

            Mont Emei