Computing average duration per user from a large CSV file that doesn't fit in memory Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern)Script that scrubs data from a .csv fileCreating large CSV fileEfficiently filter a large (100gb+) csv file (v2)Efficiently filter a large (100gb+) csv file (v3)Extracting price statistics from a CSV fileDisplaying member information from a CSV fileFinding the average of each row in a CSV fileAggregate prescriptions per drug in a CSV fileFastest way to write large CSV file in pythonSplit CSV file into a text file per row, with whitespace normalization

How to recreate this effect in Photoshop?

Are my PIs rude or am I just being too sensitive?

Is there a Spanish version of "dot your i's and cross your t's" that includes the letter 'ñ'?

How can I make names more distinctive without making them longer?

How can I fade player when goes inside or outside of the area?

Can a non-EU citizen traveling with me come with me through the EU passport line?

What is this single-engine low-wing propeller plane?

IndentationError when pasting code in Python 3 interpreter mode

Is a manifold-with-boundary with given interior and non-empty boundary essentially unique?

ListPlot join points by nearest neighbor rather than order

How can whole tone melodies sound more interesting?

Why is "Consequences inflicted." not a sentence?

How can players work together to take actions that are otherwise impossible?

How to bypass password on Windows XP account?

Is it possible to boil a liquid by just mixing many immiscible liquids together?

Bonus calculation: Am I making a mountain out of a molehill?

Output the ŋarâþ crîþ alphabet song without using (m)any letters

I am not a queen, who am I?

Are variable time comparisons always a security risk in cryptography code?

Why is black pepper both grey and black?

When is phishing education going too far?

How to assign captions for two tables in LaTeX?

Why was the term "discrete" used in discrete logarithm?

What makes black pepper strong or mild?



Computing average duration per user from a large CSV file that doesn't fit in memory



Announcing the arrival of Valued Associate #679: Cesar Manara
Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern)Script that scrubs data from a .csv fileCreating large CSV fileEfficiently filter a large (100gb+) csv file (v2)Efficiently filter a large (100gb+) csv file (v3)Extracting price statistics from a CSV fileDisplaying member information from a CSV fileFinding the average of each row in a CSV fileAggregate prescriptions per drug in a CSV fileFastest way to write large CSV file in pythonSplit CSV file into a text file per row, with whitespace normalization



.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;








2












$begingroup$


We have a large log file which stores user interactions with an application. The entries in the log file follow the following schema: userId, timestamp, actionType where actionType is one of two possible values: [open, close]



Constraints:



  1. The log file is too big to fit in memory on one machine. Also assume that the aggregated data doesn’t fit into memory.

  2. Code has to be able to run on a single machine.

  3. Should not use an out-of-the box implementation of mapreduce or 3rd party database; don’t assume we have a Hadoop or Spark or other distributed computing framework.

  4. There can be multiple entries of each actionType for each user, and there might be missing entries in the log file. So a user might be missing a close record between two open records or vice versa.

  5. Timestamps will come in strictly ascending order.

For this problem, we need to implement a class/classes that computes the average time spent by each user between open and close. Keep in mind that there are missing entries for some users, so we will have to make a choice about how to handle these entries when making our calculations. Code should follow a consistent policy with regards to how we make that choice.



The desired output for the solution should be [userId, timeSpent,….] for all the users in the log file.



Sample log file (comma-separated, text file)



1,1435456566,open 
2,1435457643,open
3,1435458912,open
1,1435459567,close
4,1435460345,open
1,1435461234,open
2,1435462567,close
1,1435463456,open
3,1435464398,close
4,1435465122,close
1,1435466775,close


Approach



Here is the code I've written in Python and Scala, which seems to be not efficient and up to the expectations of the scenario given. I'd like feedback on how I could optimise this code as per the given scenario.





Scala implementation



import java.io.FileInputStream
import java.util.Scanner, Map, LinkedList
import java.lang.Long
import scala.collection.mutable

object UserMetrics extends App
if (args.length == 0)
println("Please provide input data file name for processing")

val userMetrics = new UserMetrics()
userMetrics.readInputFile(args(0),if (args.length == 1) 600000 else args(1).toInt)


case class UserInfo(userId: Integer, prevTimeStamp: Long, prevStatus: String, timeSpent: Long, occurence: Integer)

class UserMetrics

val usermap = mutable.Map[Integer, LinkedList[UserInfo]]()

def readInputFile(stArr:String, timeOut: Int)
var inputStream: FileInputStream = null
var sc: Scanner = null
try
inputStream = new FileInputStream(stArr);
sc = new Scanner(inputStream, "UTF-8");
while (sc.hasNextLine())
val line: String = sc.nextLine();
processInput(line, timeOut)


for ((key: Integer, userLs: LinkedList[UserInfo]) <- usermap)
val userInfo:UserInfo = userLs.get(0)
val timespent = if (userInfo.occurence>0) userInfo.timeSpent/userInfo.occurence else 0
println("" + key +","+timespent + "")


if (sc.ioException() != null)
throw sc.ioException();

finally
if (inputStream != null)
inputStream.close();

if (sc != null)
sc.close();




def processInput(line: String, timeOut: Int)
val strSp = line.split(",")

val userId: Integer = Integer.parseInt(strSp(0))
val curTimeStamp = Long.parseLong(strSp(1))
val status = strSp(2)
val uInfo: UserInfo = UserInfo(userId, curTimeStamp, status, 0, 0)
val emptyUserInfo: LinkedList[UserInfo] = new LinkedList[UserInfo]()

val lsUserInfo: LinkedList[UserInfo] = usermap.getOrElse(userId, emptyUserInfo)

if (lsUserInfo != null && lsUserInfo.size() > 0)
val lastUserInfo: UserInfo = lsUserInfo.get(lsUserInfo.size() - 1)
val prevTimeStamp: Long = lastUserInfo.prevTimeStamp
val prevStatus: String = lastUserInfo.prevStatus

if (prevStatus.equals("open"))
if (status.equals(lastUserInfo.prevStatus))
val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
val timeDiff = lastUserInfo.timeSpent + timeSelector
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))
else if(!status.equals(lastUserInfo.prevStatus))
val timeDiff = lastUserInfo.timeSpent + curTimeStamp - prevTimeStamp
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))

else if(prevStatus.equals("close"))
if (status.equals(lastUserInfo.prevStatus))
lsUserInfo.remove()
val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent + timeSelector, lastUserInfo.occurence+1))
else if(!status.equals(lastUserInfo.prevStatus))

lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent, lastUserInfo.occurence))


else if(lsUserInfo.size()==0)
lsUserInfo.add(uInfo)

usermap.put(userId, lsUserInfo)





Python Implementation





import sys

def fileBlockStream(fp, number_of_blocks, block):
#A generator that splits a file into blocks and iterates over the lines of one of the blocks.

assert 0 <= block and block < number_of_blocks #Assertions to validate number of blocks given
assert 0 < number_of_blocks

fp.seek(0,2) #seek to end of file to compute block size
file_size = fp.tell()

ini = file_size * block / number_of_blocks #compute start & end point of file block
end = file_size * (1 + block) / number_of_blocks

if ini <= 0:
fp.seek(0)
else:
fp.seek(ini-1)
fp.readline()

while fp.tell() < end:
yield fp.readline() #iterate over lines of the particular chunk or block

def computeResultDS(chunk,avgTimeSpentDict,defaultTimeOut):
countPos,totTmPos,openTmPos,closeTmPos,nextEventPos = 0,1,2,3,4
for rows in chunk.splitlines():
if len(rows.split(",")) != 3:
continue
userKeyID = rows.split(",")[0]
try:
curTimeStamp = int(rows.split(",")[1])
except ValueError:
print("Invalid Timestamp for ID:" + str(userKeyID))
continue
curEvent = rows.split(",")[2]
if userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "close":
#Check if already existing userID with expected Close event 0 - Open; 1 - Close
#Array value within dictionary stores [No. of pair events, total time spent (Close tm-Open tm), Last Open Tm, Last Close Tm, Next expected Event]
curTotalTime = curTimeStamp - avgTimeSpentDict[userKeyID][openTmPos]
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount
avgTimeSpentDict[userKeyID][totTmPos] = totalTime
avgTimeSpentDict[userKeyID][closeTmPos] = curTimeStamp
avgTimeSpentDict[userKeyID][nextEventPos] = 0 #Change next expected event to Open

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "open":
avgTimeSpentDict[userKeyID][openTmPos] = curTimeStamp
avgTimeSpentDict[userKeyID][nextEventPos] = 1 #Change next expected event to Close

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "open":
curTotalTime,closeTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][openTmPos],curTimeStamp)
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
avgTimeSpentDict[userKeyID][totTmPos]=totalTime
avgTimeSpentDict[userKeyID][closeTmPos]=closeTime
avgTimeSpentDict[userKeyID][openTmPos]=curTimeStamp
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "close":
curTotalTime,openTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][closeTmPos],curTimeStamp)
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
avgTimeSpentDict[userKeyID][totTmPos]=totalTime
avgTimeSpentDict[userKeyID][openTmPos]=openTime
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount

elif curEvent == "open":
#Initialize userid with Open event
avgTimeSpentDict[userKeyID] = [0,0,curTimeStamp,0,1]

elif curEvent == "close":
#Initialize userid with missing handler function since there is no Open event for this User
totaltime,OpenTime = missingHandler(defaultTimeOut,0,curTimeStamp)
avgTimeSpentDict[userKeyID] = [1,totaltime,OpenTime,curTimeStamp,0]

def missingHandler(defaultTimeOut,curTimeVal,lastTimeVal):
if lastTimeVal - curTimeVal > defaultTimeOut:
return defaultTimeOut,curTimeVal
else:
return lastTimeVal - curTimeVal,curTimeVal

def computeAvg(avgTimeSpentDict,defaultTimeOut):
resDict =
for k,v in avgTimeSpentDict.iteritems():
if v[0] == 0:
resDict[k] = 0
else:
resDict[k] = v[1]/v[0]
return resDict

if __name__ == "__main__":
avgTimeSpentDict =
if len(sys.argv) < 2:
print("Please provide input data file name for processing")
sys.exit(1)

fileObj = open(sys.argv[1])
number_of_chunks = 4 if len(sys.argv) < 3 else int(sys.argv[2])
defaultTimeOut = 60000 if len(sys.argv) < 4 else int(sys.argv[3])
for chunk_number in range(number_of_chunks):
for chunk in fileBlockStream(fileObj, number_of_chunks, chunk_number):
computeResultDS(chunk, avgTimeSpentDict, defaultTimeOut)
print (computeAvg(avgTimeSpentDict,defaultTimeOut))
avgTimeSpentDict.clear() #Nullify dictionary
fileObj.close #Close the file object


Both programs give the desired output, but efficiency is what matters for this particular scenario. Let me know if you have anything better or any suggestions on the existing implementation.










share|improve this question









New contributor




Wiki_91 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.







$endgroup$











  • $begingroup$
    how is it possible that the aggregated data doesn't fit in memory? It's ~20 bytes per user - you really have a userbase of billions?
    $endgroup$
    – Oh My Goodness
    yesterday











  • $begingroup$
    This is to bring out memory efficient solution and critical thinking among programmers in one of our internal org forum.
    $endgroup$
    – Wiki_91
    yesterday










  • $begingroup$
    is the problem real or imaginary?
    $endgroup$
    – Oh My Goodness
    yesterday










  • $begingroup$
    Imaginery.. We handle such huge volume in distributed Hadoop cluster with spark. But this challenge is to avoid and handle the same solution in single machine.
    $endgroup$
    – Wiki_91
    yesterday






  • 3




    $begingroup$
    you've taken a real problem and applied made-up constraints, like a programming puzzle would have, and got the worst of both worlds. The arbitrary One True Solution character of a puzzle is combined with the vagueness, length and tedium of a real problem. I suggest to remove a bunch of detail to create a short puzzle, or drop the fake restrictions and add real context like "actual size of input" and "actual available memory" to describe an authentic engineering problem.
    $endgroup$
    – Oh My Goodness
    yesterday

















2












$begingroup$


We have a large log file which stores user interactions with an application. The entries in the log file follow the following schema: userId, timestamp, actionType where actionType is one of two possible values: [open, close]



Constraints:



  1. The log file is too big to fit in memory on one machine. Also assume that the aggregated data doesn’t fit into memory.

  2. Code has to be able to run on a single machine.

  3. Should not use an out-of-the box implementation of mapreduce or 3rd party database; don’t assume we have a Hadoop or Spark or other distributed computing framework.

  4. There can be multiple entries of each actionType for each user, and there might be missing entries in the log file. So a user might be missing a close record between two open records or vice versa.

  5. Timestamps will come in strictly ascending order.

For this problem, we need to implement a class/classes that computes the average time spent by each user between open and close. Keep in mind that there are missing entries for some users, so we will have to make a choice about how to handle these entries when making our calculations. Code should follow a consistent policy with regards to how we make that choice.



The desired output for the solution should be [userId, timeSpent,….] for all the users in the log file.



Sample log file (comma-separated, text file)



1,1435456566,open 
2,1435457643,open
3,1435458912,open
1,1435459567,close
4,1435460345,open
1,1435461234,open
2,1435462567,close
1,1435463456,open
3,1435464398,close
4,1435465122,close
1,1435466775,close


Approach



Here is the code I've written in Python and Scala, which seems to be not efficient and up to the expectations of the scenario given. I'd like feedback on how I could optimise this code as per the given scenario.





Scala implementation



import java.io.FileInputStream
import java.util.Scanner, Map, LinkedList
import java.lang.Long
import scala.collection.mutable

object UserMetrics extends App
if (args.length == 0)
println("Please provide input data file name for processing")

val userMetrics = new UserMetrics()
userMetrics.readInputFile(args(0),if (args.length == 1) 600000 else args(1).toInt)


case class UserInfo(userId: Integer, prevTimeStamp: Long, prevStatus: String, timeSpent: Long, occurence: Integer)

class UserMetrics

val usermap = mutable.Map[Integer, LinkedList[UserInfo]]()

def readInputFile(stArr:String, timeOut: Int)
var inputStream: FileInputStream = null
var sc: Scanner = null
try
inputStream = new FileInputStream(stArr);
sc = new Scanner(inputStream, "UTF-8");
while (sc.hasNextLine())
val line: String = sc.nextLine();
processInput(line, timeOut)


for ((key: Integer, userLs: LinkedList[UserInfo]) <- usermap)
val userInfo:UserInfo = userLs.get(0)
val timespent = if (userInfo.occurence>0) userInfo.timeSpent/userInfo.occurence else 0
println("" + key +","+timespent + "")


if (sc.ioException() != null)
throw sc.ioException();

finally
if (inputStream != null)
inputStream.close();

if (sc != null)
sc.close();




def processInput(line: String, timeOut: Int)
val strSp = line.split(",")

val userId: Integer = Integer.parseInt(strSp(0))
val curTimeStamp = Long.parseLong(strSp(1))
val status = strSp(2)
val uInfo: UserInfo = UserInfo(userId, curTimeStamp, status, 0, 0)
val emptyUserInfo: LinkedList[UserInfo] = new LinkedList[UserInfo]()

val lsUserInfo: LinkedList[UserInfo] = usermap.getOrElse(userId, emptyUserInfo)

if (lsUserInfo != null && lsUserInfo.size() > 0)
val lastUserInfo: UserInfo = lsUserInfo.get(lsUserInfo.size() - 1)
val prevTimeStamp: Long = lastUserInfo.prevTimeStamp
val prevStatus: String = lastUserInfo.prevStatus

if (prevStatus.equals("open"))
if (status.equals(lastUserInfo.prevStatus))
val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
val timeDiff = lastUserInfo.timeSpent + timeSelector
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))
else if(!status.equals(lastUserInfo.prevStatus))
val timeDiff = lastUserInfo.timeSpent + curTimeStamp - prevTimeStamp
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))

else if(prevStatus.equals("close"))
if (status.equals(lastUserInfo.prevStatus))
lsUserInfo.remove()
val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent + timeSelector, lastUserInfo.occurence+1))
else if(!status.equals(lastUserInfo.prevStatus))

lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent, lastUserInfo.occurence))


else if(lsUserInfo.size()==0)
lsUserInfo.add(uInfo)

usermap.put(userId, lsUserInfo)





Python Implementation





import sys

def fileBlockStream(fp, number_of_blocks, block):
#A generator that splits a file into blocks and iterates over the lines of one of the blocks.

assert 0 <= block and block < number_of_blocks #Assertions to validate number of blocks given
assert 0 < number_of_blocks

fp.seek(0,2) #seek to end of file to compute block size
file_size = fp.tell()

ini = file_size * block / number_of_blocks #compute start & end point of file block
end = file_size * (1 + block) / number_of_blocks

if ini <= 0:
fp.seek(0)
else:
fp.seek(ini-1)
fp.readline()

while fp.tell() < end:
yield fp.readline() #iterate over lines of the particular chunk or block

def computeResultDS(chunk,avgTimeSpentDict,defaultTimeOut):
countPos,totTmPos,openTmPos,closeTmPos,nextEventPos = 0,1,2,3,4
for rows in chunk.splitlines():
if len(rows.split(",")) != 3:
continue
userKeyID = rows.split(",")[0]
try:
curTimeStamp = int(rows.split(",")[1])
except ValueError:
print("Invalid Timestamp for ID:" + str(userKeyID))
continue
curEvent = rows.split(",")[2]
if userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "close":
#Check if already existing userID with expected Close event 0 - Open; 1 - Close
#Array value within dictionary stores [No. of pair events, total time spent (Close tm-Open tm), Last Open Tm, Last Close Tm, Next expected Event]
curTotalTime = curTimeStamp - avgTimeSpentDict[userKeyID][openTmPos]
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount
avgTimeSpentDict[userKeyID][totTmPos] = totalTime
avgTimeSpentDict[userKeyID][closeTmPos] = curTimeStamp
avgTimeSpentDict[userKeyID][nextEventPos] = 0 #Change next expected event to Open

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "open":
avgTimeSpentDict[userKeyID][openTmPos] = curTimeStamp
avgTimeSpentDict[userKeyID][nextEventPos] = 1 #Change next expected event to Close

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "open":
curTotalTime,closeTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][openTmPos],curTimeStamp)
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
avgTimeSpentDict[userKeyID][totTmPos]=totalTime
avgTimeSpentDict[userKeyID][closeTmPos]=closeTime
avgTimeSpentDict[userKeyID][openTmPos]=curTimeStamp
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "close":
curTotalTime,openTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][closeTmPos],curTimeStamp)
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
avgTimeSpentDict[userKeyID][totTmPos]=totalTime
avgTimeSpentDict[userKeyID][openTmPos]=openTime
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount

elif curEvent == "open":
#Initialize userid with Open event
avgTimeSpentDict[userKeyID] = [0,0,curTimeStamp,0,1]

elif curEvent == "close":
#Initialize userid with missing handler function since there is no Open event for this User
totaltime,OpenTime = missingHandler(defaultTimeOut,0,curTimeStamp)
avgTimeSpentDict[userKeyID] = [1,totaltime,OpenTime,curTimeStamp,0]

def missingHandler(defaultTimeOut,curTimeVal,lastTimeVal):
if lastTimeVal - curTimeVal > defaultTimeOut:
return defaultTimeOut,curTimeVal
else:
return lastTimeVal - curTimeVal,curTimeVal

def computeAvg(avgTimeSpentDict,defaultTimeOut):
resDict =
for k,v in avgTimeSpentDict.iteritems():
if v[0] == 0:
resDict[k] = 0
else:
resDict[k] = v[1]/v[0]
return resDict

if __name__ == "__main__":
avgTimeSpentDict =
if len(sys.argv) < 2:
print("Please provide input data file name for processing")
sys.exit(1)

fileObj = open(sys.argv[1])
number_of_chunks = 4 if len(sys.argv) < 3 else int(sys.argv[2])
defaultTimeOut = 60000 if len(sys.argv) < 4 else int(sys.argv[3])
for chunk_number in range(number_of_chunks):
for chunk in fileBlockStream(fileObj, number_of_chunks, chunk_number):
computeResultDS(chunk, avgTimeSpentDict, defaultTimeOut)
print (computeAvg(avgTimeSpentDict,defaultTimeOut))
avgTimeSpentDict.clear() #Nullify dictionary
fileObj.close #Close the file object


Both programs give the desired output, but efficiency is what matters for this particular scenario. Let me know if you have anything better or any suggestions on the existing implementation.










share|improve this question









New contributor




Wiki_91 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.







$endgroup$











  • $begingroup$
    how is it possible that the aggregated data doesn't fit in memory? It's ~20 bytes per user - you really have a userbase of billions?
    $endgroup$
    – Oh My Goodness
    yesterday











  • $begingroup$
    This is to bring out memory efficient solution and critical thinking among programmers in one of our internal org forum.
    $endgroup$
    – Wiki_91
    yesterday










  • $begingroup$
    is the problem real or imaginary?
    $endgroup$
    – Oh My Goodness
    yesterday










  • $begingroup$
    Imaginery.. We handle such huge volume in distributed Hadoop cluster with spark. But this challenge is to avoid and handle the same solution in single machine.
    $endgroup$
    – Wiki_91
    yesterday






  • 3




    $begingroup$
    you've taken a real problem and applied made-up constraints, like a programming puzzle would have, and got the worst of both worlds. The arbitrary One True Solution character of a puzzle is combined with the vagueness, length and tedium of a real problem. I suggest to remove a bunch of detail to create a short puzzle, or drop the fake restrictions and add real context like "actual size of input" and "actual available memory" to describe an authentic engineering problem.
    $endgroup$
    – Oh My Goodness
    yesterday













2












2








2





$begingroup$


We have a large log file which stores user interactions with an application. The entries in the log file follow the following schema: userId, timestamp, actionType where actionType is one of two possible values: [open, close]



Constraints:



  1. The log file is too big to fit in memory on one machine. Also assume that the aggregated data doesn’t fit into memory.

  2. Code has to be able to run on a single machine.

  3. Should not use an out-of-the box implementation of mapreduce or 3rd party database; don’t assume we have a Hadoop or Spark or other distributed computing framework.

  4. There can be multiple entries of each actionType for each user, and there might be missing entries in the log file. So a user might be missing a close record between two open records or vice versa.

  5. Timestamps will come in strictly ascending order.

For this problem, we need to implement a class/classes that computes the average time spent by each user between open and close. Keep in mind that there are missing entries for some users, so we will have to make a choice about how to handle these entries when making our calculations. Code should follow a consistent policy with regards to how we make that choice.



The desired output for the solution should be [userId, timeSpent,….] for all the users in the log file.



Sample log file (comma-separated, text file)



1,1435456566,open 
2,1435457643,open
3,1435458912,open
1,1435459567,close
4,1435460345,open
1,1435461234,open
2,1435462567,close
1,1435463456,open
3,1435464398,close
4,1435465122,close
1,1435466775,close


Approach



Here is the code I've written in Python and Scala, which seems to be not efficient and up to the expectations of the scenario given. I'd like feedback on how I could optimise this code as per the given scenario.





Scala implementation



import java.io.FileInputStream
import java.util.Scanner, Map, LinkedList
import java.lang.Long
import scala.collection.mutable

object UserMetrics extends App
if (args.length == 0)
println("Please provide input data file name for processing")

val userMetrics = new UserMetrics()
userMetrics.readInputFile(args(0),if (args.length == 1) 600000 else args(1).toInt)


case class UserInfo(userId: Integer, prevTimeStamp: Long, prevStatus: String, timeSpent: Long, occurence: Integer)

class UserMetrics

val usermap = mutable.Map[Integer, LinkedList[UserInfo]]()

def readInputFile(stArr:String, timeOut: Int)
var inputStream: FileInputStream = null
var sc: Scanner = null
try
inputStream = new FileInputStream(stArr);
sc = new Scanner(inputStream, "UTF-8");
while (sc.hasNextLine())
val line: String = sc.nextLine();
processInput(line, timeOut)


for ((key: Integer, userLs: LinkedList[UserInfo]) <- usermap)
val userInfo:UserInfo = userLs.get(0)
val timespent = if (userInfo.occurence>0) userInfo.timeSpent/userInfo.occurence else 0
println("" + key +","+timespent + "")


if (sc.ioException() != null)
throw sc.ioException();

finally
if (inputStream != null)
inputStream.close();

if (sc != null)
sc.close();




def processInput(line: String, timeOut: Int)
val strSp = line.split(",")

val userId: Integer = Integer.parseInt(strSp(0))
val curTimeStamp = Long.parseLong(strSp(1))
val status = strSp(2)
val uInfo: UserInfo = UserInfo(userId, curTimeStamp, status, 0, 0)
val emptyUserInfo: LinkedList[UserInfo] = new LinkedList[UserInfo]()

val lsUserInfo: LinkedList[UserInfo] = usermap.getOrElse(userId, emptyUserInfo)

if (lsUserInfo != null && lsUserInfo.size() > 0)
val lastUserInfo: UserInfo = lsUserInfo.get(lsUserInfo.size() - 1)
val prevTimeStamp: Long = lastUserInfo.prevTimeStamp
val prevStatus: String = lastUserInfo.prevStatus

if (prevStatus.equals("open"))
if (status.equals(lastUserInfo.prevStatus))
val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
val timeDiff = lastUserInfo.timeSpent + timeSelector
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))
else if(!status.equals(lastUserInfo.prevStatus))
val timeDiff = lastUserInfo.timeSpent + curTimeStamp - prevTimeStamp
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))

else if(prevStatus.equals("close"))
if (status.equals(lastUserInfo.prevStatus))
lsUserInfo.remove()
val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent + timeSelector, lastUserInfo.occurence+1))
else if(!status.equals(lastUserInfo.prevStatus))

lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent, lastUserInfo.occurence))


else if(lsUserInfo.size()==0)
lsUserInfo.add(uInfo)

usermap.put(userId, lsUserInfo)





Python Implementation





import sys

def fileBlockStream(fp, number_of_blocks, block):
#A generator that splits a file into blocks and iterates over the lines of one of the blocks.

assert 0 <= block and block < number_of_blocks #Assertions to validate number of blocks given
assert 0 < number_of_blocks

fp.seek(0,2) #seek to end of file to compute block size
file_size = fp.tell()

ini = file_size * block / number_of_blocks #compute start & end point of file block
end = file_size * (1 + block) / number_of_blocks

if ini <= 0:
fp.seek(0)
else:
fp.seek(ini-1)
fp.readline()

while fp.tell() < end:
yield fp.readline() #iterate over lines of the particular chunk or block

def computeResultDS(chunk,avgTimeSpentDict,defaultTimeOut):
countPos,totTmPos,openTmPos,closeTmPos,nextEventPos = 0,1,2,3,4
for rows in chunk.splitlines():
if len(rows.split(",")) != 3:
continue
userKeyID = rows.split(",")[0]
try:
curTimeStamp = int(rows.split(",")[1])
except ValueError:
print("Invalid Timestamp for ID:" + str(userKeyID))
continue
curEvent = rows.split(",")[2]
if userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "close":
#Check if already existing userID with expected Close event 0 - Open; 1 - Close
#Array value within dictionary stores [No. of pair events, total time spent (Close tm-Open tm), Last Open Tm, Last Close Tm, Next expected Event]
curTotalTime = curTimeStamp - avgTimeSpentDict[userKeyID][openTmPos]
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount
avgTimeSpentDict[userKeyID][totTmPos] = totalTime
avgTimeSpentDict[userKeyID][closeTmPos] = curTimeStamp
avgTimeSpentDict[userKeyID][nextEventPos] = 0 #Change next expected event to Open

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "open":
avgTimeSpentDict[userKeyID][openTmPos] = curTimeStamp
avgTimeSpentDict[userKeyID][nextEventPos] = 1 #Change next expected event to Close

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "open":
curTotalTime,closeTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][openTmPos],curTimeStamp)
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
avgTimeSpentDict[userKeyID][totTmPos]=totalTime
avgTimeSpentDict[userKeyID][closeTmPos]=closeTime
avgTimeSpentDict[userKeyID][openTmPos]=curTimeStamp
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "close":
curTotalTime,openTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][closeTmPos],curTimeStamp)
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
avgTimeSpentDict[userKeyID][totTmPos]=totalTime
avgTimeSpentDict[userKeyID][openTmPos]=openTime
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount

elif curEvent == "open":
#Initialize userid with Open event
avgTimeSpentDict[userKeyID] = [0,0,curTimeStamp,0,1]

elif curEvent == "close":
#Initialize userid with missing handler function since there is no Open event for this User
totaltime,OpenTime = missingHandler(defaultTimeOut,0,curTimeStamp)
avgTimeSpentDict[userKeyID] = [1,totaltime,OpenTime,curTimeStamp,0]

def missingHandler(defaultTimeOut,curTimeVal,lastTimeVal):
if lastTimeVal - curTimeVal > defaultTimeOut:
return defaultTimeOut,curTimeVal
else:
return lastTimeVal - curTimeVal,curTimeVal

def computeAvg(avgTimeSpentDict,defaultTimeOut):
resDict =
for k,v in avgTimeSpentDict.iteritems():
if v[0] == 0:
resDict[k] = 0
else:
resDict[k] = v[1]/v[0]
return resDict

if __name__ == "__main__":
avgTimeSpentDict =
if len(sys.argv) < 2:
print("Please provide input data file name for processing")
sys.exit(1)

fileObj = open(sys.argv[1])
number_of_chunks = 4 if len(sys.argv) < 3 else int(sys.argv[2])
defaultTimeOut = 60000 if len(sys.argv) < 4 else int(sys.argv[3])
for chunk_number in range(number_of_chunks):
for chunk in fileBlockStream(fileObj, number_of_chunks, chunk_number):
computeResultDS(chunk, avgTimeSpentDict, defaultTimeOut)
print (computeAvg(avgTimeSpentDict,defaultTimeOut))
avgTimeSpentDict.clear() #Nullify dictionary
fileObj.close #Close the file object


Both programs give the desired output, but efficiency is what matters for this particular scenario. Let me know if you have anything better or any suggestions on the existing implementation.










share|improve this question









New contributor




Wiki_91 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.







$endgroup$




We have a large log file which stores user interactions with an application. The entries in the log file follow the following schema: userId, timestamp, actionType where actionType is one of two possible values: [open, close]



Constraints:



  1. The log file is too big to fit in memory on one machine. Also assume that the aggregated data doesn’t fit into memory.

  2. Code has to be able to run on a single machine.

  3. Should not use an out-of-the box implementation of mapreduce or 3rd party database; don’t assume we have a Hadoop or Spark or other distributed computing framework.

  4. There can be multiple entries of each actionType for each user, and there might be missing entries in the log file. So a user might be missing a close record between two open records or vice versa.

  5. Timestamps will come in strictly ascending order.

For this problem, we need to implement a class/classes that computes the average time spent by each user between open and close. Keep in mind that there are missing entries for some users, so we will have to make a choice about how to handle these entries when making our calculations. Code should follow a consistent policy with regards to how we make that choice.



The desired output for the solution should be [userId, timeSpent,….] for all the users in the log file.



Sample log file (comma-separated, text file)



1,1435456566,open 
2,1435457643,open
3,1435458912,open
1,1435459567,close
4,1435460345,open
1,1435461234,open
2,1435462567,close
1,1435463456,open
3,1435464398,close
4,1435465122,close
1,1435466775,close


Approach



Here is the code I've written in Python and Scala, which seems to be not efficient and up to the expectations of the scenario given. I'd like feedback on how I could optimise this code as per the given scenario.





Scala implementation



import java.io.FileInputStream
import java.util.Scanner, Map, LinkedList
import java.lang.Long
import scala.collection.mutable

object UserMetrics extends App
if (args.length == 0)
println("Please provide input data file name for processing")

val userMetrics = new UserMetrics()
userMetrics.readInputFile(args(0),if (args.length == 1) 600000 else args(1).toInt)


case class UserInfo(userId: Integer, prevTimeStamp: Long, prevStatus: String, timeSpent: Long, occurence: Integer)

class UserMetrics

val usermap = mutable.Map[Integer, LinkedList[UserInfo]]()

def readInputFile(stArr:String, timeOut: Int)
var inputStream: FileInputStream = null
var sc: Scanner = null
try
inputStream = new FileInputStream(stArr);
sc = new Scanner(inputStream, "UTF-8");
while (sc.hasNextLine())
val line: String = sc.nextLine();
processInput(line, timeOut)


for ((key: Integer, userLs: LinkedList[UserInfo]) <- usermap)
val userInfo:UserInfo = userLs.get(0)
val timespent = if (userInfo.occurence>0) userInfo.timeSpent/userInfo.occurence else 0
println("" + key +","+timespent + "")


if (sc.ioException() != null)
throw sc.ioException();

finally
if (inputStream != null)
inputStream.close();

if (sc != null)
sc.close();




def processInput(line: String, timeOut: Int)
val strSp = line.split(",")

val userId: Integer = Integer.parseInt(strSp(0))
val curTimeStamp = Long.parseLong(strSp(1))
val status = strSp(2)
val uInfo: UserInfo = UserInfo(userId, curTimeStamp, status, 0, 0)
val emptyUserInfo: LinkedList[UserInfo] = new LinkedList[UserInfo]()

val lsUserInfo: LinkedList[UserInfo] = usermap.getOrElse(userId, emptyUserInfo)

if (lsUserInfo != null && lsUserInfo.size() > 0)
val lastUserInfo: UserInfo = lsUserInfo.get(lsUserInfo.size() - 1)
val prevTimeStamp: Long = lastUserInfo.prevTimeStamp
val prevStatus: String = lastUserInfo.prevStatus

if (prevStatus.equals("open"))
if (status.equals(lastUserInfo.prevStatus))
val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
val timeDiff = lastUserInfo.timeSpent + timeSelector
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))
else if(!status.equals(lastUserInfo.prevStatus))
val timeDiff = lastUserInfo.timeSpent + curTimeStamp - prevTimeStamp
lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))

else if(prevStatus.equals("close"))
if (status.equals(lastUserInfo.prevStatus))
lsUserInfo.remove()
val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent + timeSelector, lastUserInfo.occurence+1))
else if(!status.equals(lastUserInfo.prevStatus))

lsUserInfo.remove()
lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent, lastUserInfo.occurence))


else if(lsUserInfo.size()==0)
lsUserInfo.add(uInfo)

usermap.put(userId, lsUserInfo)





Python Implementation





import sys

def fileBlockStream(fp, number_of_blocks, block):
#A generator that splits a file into blocks and iterates over the lines of one of the blocks.

assert 0 <= block and block < number_of_blocks #Assertions to validate number of blocks given
assert 0 < number_of_blocks

fp.seek(0,2) #seek to end of file to compute block size
file_size = fp.tell()

ini = file_size * block / number_of_blocks #compute start & end point of file block
end = file_size * (1 + block) / number_of_blocks

if ini <= 0:
fp.seek(0)
else:
fp.seek(ini-1)
fp.readline()

while fp.tell() < end:
yield fp.readline() #iterate over lines of the particular chunk or block

def computeResultDS(chunk,avgTimeSpentDict,defaultTimeOut):
countPos,totTmPos,openTmPos,closeTmPos,nextEventPos = 0,1,2,3,4
for rows in chunk.splitlines():
if len(rows.split(",")) != 3:
continue
userKeyID = rows.split(",")[0]
try:
curTimeStamp = int(rows.split(",")[1])
except ValueError:
print("Invalid Timestamp for ID:" + str(userKeyID))
continue
curEvent = rows.split(",")[2]
if userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "close":
#Check if already existing userID with expected Close event 0 - Open; 1 - Close
#Array value within dictionary stores [No. of pair events, total time spent (Close tm-Open tm), Last Open Tm, Last Close Tm, Next expected Event]
curTotalTime = curTimeStamp - avgTimeSpentDict[userKeyID][openTmPos]
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount
avgTimeSpentDict[userKeyID][totTmPos] = totalTime
avgTimeSpentDict[userKeyID][closeTmPos] = curTimeStamp
avgTimeSpentDict[userKeyID][nextEventPos] = 0 #Change next expected event to Open

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "open":
avgTimeSpentDict[userKeyID][openTmPos] = curTimeStamp
avgTimeSpentDict[userKeyID][nextEventPos] = 1 #Change next expected event to Close

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "open":
curTotalTime,closeTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][openTmPos],curTimeStamp)
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
avgTimeSpentDict[userKeyID][totTmPos]=totalTime
avgTimeSpentDict[userKeyID][closeTmPos]=closeTime
avgTimeSpentDict[userKeyID][openTmPos]=curTimeStamp
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount

elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "close":
curTotalTime,openTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][closeTmPos],curTimeStamp)
totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
avgTimeSpentDict[userKeyID][totTmPos]=totalTime
avgTimeSpentDict[userKeyID][openTmPos]=openTime
eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
avgTimeSpentDict[userKeyID][countPos] = eventCount

elif curEvent == "open":
#Initialize userid with Open event
avgTimeSpentDict[userKeyID] = [0,0,curTimeStamp,0,1]

elif curEvent == "close":
#Initialize userid with missing handler function since there is no Open event for this User
totaltime,OpenTime = missingHandler(defaultTimeOut,0,curTimeStamp)
avgTimeSpentDict[userKeyID] = [1,totaltime,OpenTime,curTimeStamp,0]

def missingHandler(defaultTimeOut,curTimeVal,lastTimeVal):
if lastTimeVal - curTimeVal > defaultTimeOut:
return defaultTimeOut,curTimeVal
else:
return lastTimeVal - curTimeVal,curTimeVal

def computeAvg(avgTimeSpentDict,defaultTimeOut):
resDict =
for k,v in avgTimeSpentDict.iteritems():
if v[0] == 0:
resDict[k] = 0
else:
resDict[k] = v[1]/v[0]
return resDict

if __name__ == "__main__":
avgTimeSpentDict =
if len(sys.argv) < 2:
print("Please provide input data file name for processing")
sys.exit(1)

fileObj = open(sys.argv[1])
number_of_chunks = 4 if len(sys.argv) < 3 else int(sys.argv[2])
defaultTimeOut = 60000 if len(sys.argv) < 4 else int(sys.argv[3])
for chunk_number in range(number_of_chunks):
for chunk in fileBlockStream(fileObj, number_of_chunks, chunk_number):
computeResultDS(chunk, avgTimeSpentDict, defaultTimeOut)
print (computeAvg(avgTimeSpentDict,defaultTimeOut))
avgTimeSpentDict.clear() #Nullify dictionary
fileObj.close #Close the file object


Both programs give the desired output, but efficiency is what matters for this particular scenario. Let me know if you have anything better or any suggestions on the existing implementation.







python csv scala statistics memory-optimization






share|improve this question









New contributor




Wiki_91 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.











share|improve this question









New contributor




Wiki_91 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.









share|improve this question




share|improve this question








edited 14 mins ago









200_success

131k17157422




131k17157422






New contributor




Wiki_91 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.









asked yesterday









Wiki_91Wiki_91

144




144




New contributor




Wiki_91 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.





New contributor





Wiki_91 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.






Wiki_91 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.











  • $begingroup$
    how is it possible that the aggregated data doesn't fit in memory? It's ~20 bytes per user - you really have a userbase of billions?
    $endgroup$
    – Oh My Goodness
    yesterday











  • $begingroup$
    This is to bring out memory efficient solution and critical thinking among programmers in one of our internal org forum.
    $endgroup$
    – Wiki_91
    yesterday










  • $begingroup$
    is the problem real or imaginary?
    $endgroup$
    – Oh My Goodness
    yesterday










  • $begingroup$
    Imaginery.. We handle such huge volume in distributed Hadoop cluster with spark. But this challenge is to avoid and handle the same solution in single machine.
    $endgroup$
    – Wiki_91
    yesterday






  • 3




    $begingroup$
    you've taken a real problem and applied made-up constraints, like a programming puzzle would have, and got the worst of both worlds. The arbitrary One True Solution character of a puzzle is combined with the vagueness, length and tedium of a real problem. I suggest to remove a bunch of detail to create a short puzzle, or drop the fake restrictions and add real context like "actual size of input" and "actual available memory" to describe an authentic engineering problem.
    $endgroup$
    – Oh My Goodness
    yesterday
















  • $begingroup$
    how is it possible that the aggregated data doesn't fit in memory? It's ~20 bytes per user - you really have a userbase of billions?
    $endgroup$
    – Oh My Goodness
    yesterday











  • $begingroup$
    This is to bring out memory efficient solution and critical thinking among programmers in one of our internal org forum.
    $endgroup$
    – Wiki_91
    yesterday










  • $begingroup$
    is the problem real or imaginary?
    $endgroup$
    – Oh My Goodness
    yesterday










  • $begingroup$
    Imaginery.. We handle such huge volume in distributed Hadoop cluster with spark. But this challenge is to avoid and handle the same solution in single machine.
    $endgroup$
    – Wiki_91
    yesterday






  • 3




    $begingroup$
    you've taken a real problem and applied made-up constraints, like a programming puzzle would have, and got the worst of both worlds. The arbitrary One True Solution character of a puzzle is combined with the vagueness, length and tedium of a real problem. I suggest to remove a bunch of detail to create a short puzzle, or drop the fake restrictions and add real context like "actual size of input" and "actual available memory" to describe an authentic engineering problem.
    $endgroup$
    – Oh My Goodness
    yesterday















$begingroup$
how is it possible that the aggregated data doesn't fit in memory? It's ~20 bytes per user - you really have a userbase of billions?
$endgroup$
– Oh My Goodness
yesterday





$begingroup$
how is it possible that the aggregated data doesn't fit in memory? It's ~20 bytes per user - you really have a userbase of billions?
$endgroup$
– Oh My Goodness
yesterday













$begingroup$
This is to bring out memory efficient solution and critical thinking among programmers in one of our internal org forum.
$endgroup$
– Wiki_91
yesterday




$begingroup$
This is to bring out memory efficient solution and critical thinking among programmers in one of our internal org forum.
$endgroup$
– Wiki_91
yesterday












$begingroup$
is the problem real or imaginary?
$endgroup$
– Oh My Goodness
yesterday




$begingroup$
is the problem real or imaginary?
$endgroup$
– Oh My Goodness
yesterday












$begingroup$
Imaginery.. We handle such huge volume in distributed Hadoop cluster with spark. But this challenge is to avoid and handle the same solution in single machine.
$endgroup$
– Wiki_91
yesterday




$begingroup$
Imaginery.. We handle such huge volume in distributed Hadoop cluster with spark. But this challenge is to avoid and handle the same solution in single machine.
$endgroup$
– Wiki_91
yesterday




3




3




$begingroup$
you've taken a real problem and applied made-up constraints, like a programming puzzle would have, and got the worst of both worlds. The arbitrary One True Solution character of a puzzle is combined with the vagueness, length and tedium of a real problem. I suggest to remove a bunch of detail to create a short puzzle, or drop the fake restrictions and add real context like "actual size of input" and "actual available memory" to describe an authentic engineering problem.
$endgroup$
– Oh My Goodness
yesterday




$begingroup$
you've taken a real problem and applied made-up constraints, like a programming puzzle would have, and got the worst of both worlds. The arbitrary One True Solution character of a puzzle is combined with the vagueness, length and tedium of a real problem. I suggest to remove a bunch of detail to create a short puzzle, or drop the fake restrictions and add real context like "actual size of input" and "actual available memory" to describe an authentic engineering problem.
$endgroup$
– Oh My Goodness
yesterday










1 Answer
1






active

oldest

votes


















3












$begingroup$

This is a comment on your Python solution (I don't know anything about Scala).



You don't need to iterate over chunks of your file unless you want to do parallel processing. However, since there might be a close event in a different block from an opening event, this process is not so easy to parallelize (you would have to keep track of dangling users in both directions, which you don't do as far as I can tell).



Also, the restriction that the aggregate does not fit into memory is...unrealistic IMO. You would have to have more users than there are people in the world. Anyways, your code does not respect this constraint either, since avgTimeSpentDict contains all users and will therefore not fit into memory. So I'm going to ignore this part.



Instead, just iterate over the file normally, with a for loop. This does not read the whole file into memory. Update a running mean with the new value whenever you find a matching event for each user.



At the same time keep a dictionary of users that are open to look out for a matching close event. If you have a close event without an open, it is a broken one and we can ignore it because you said it is guaranteed that the times are sorted (and time travel has not been invented yet, AFAIK). Or do something else with it. Same goes for an open event after a previous open, without any intervening close. Here I just added a print in those cases.



import sys
from collections import defaultdict

def update_mean(count, mean, new_value):
count += 1. # float so it also works in Python 2
mean += (new_value - mean) / count
return count, mean

def average_timeout(file_name):
open_users =
time_spent = defaultdict(lambda: (0., 0.))

with open(file_name) as f:
for line in f:
print(line.strip())
try:
user_id, timestamp, event = line.strip().split(",")
except ValueError:
print(f"misformed line: line!r")
continue

if event == "open":
if user_id in open_users:
print("open with prior open, missed a close")
open_users[user_id] = int(timestamp)
elif event == "close":
if user_id not in open_users:
print("close without open")
else:
diff = int(timestamp) - open_users.pop(user_id)
time_spent[user_id] = update_mean(*time_spent[user_id], diff)
print(f"close with prior open, time difference diff")
else:
print(f"Unknown event: event")

print(f"len(open_users) users left without close event")
return time_spent

if __name__ == "__main__":
time_spent = average_timeout(sys.argv[1])
for user, (_, mean) in time_spent.items():
print(f"user average timeout: mean")


In production you will obviously want to either remove most of the prints or at least make them logging.debug calls.



This can still run out of memory if the average length between an open and a close event contains more open events by different users than there is memory. Or if all events are broken and lack a close.



Python has an official style-guide, PEP8, which programmers are encouraged to follow. It recommends using lower_case for functions and variables and putting a space after each comma in an argument list.



fileObj.close does not actually close the file if you don't call it, fileObj.close(). But even better is to use with which will take care of closing the file for you, even in the event of an exception occurring somewhere.



You should use Python 3. Python 2 will no longer be supported in less than a year.



You can use x in d to check if some value x is in a dictionary d. No need to do x in d.keys(). In Python 2 this distinction is even more important since x in d is $mathcalO(1)$, while x in d.keys() is $mathcalO(n)$ (since it is a list).






share|improve this answer











$endgroup$













    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "196"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: false,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: null,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );






    Wiki_91 is a new contributor. Be nice, and check out our Code of Conduct.









    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f217434%2fcomputing-average-duration-per-user-from-a-large-csv-file-that-doesnt-fit-in-me%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    3












    $begingroup$

    This is a comment on your Python solution (I don't know anything about Scala).



    You don't need to iterate over chunks of your file unless you want to do parallel processing. However, since there might be a close event in a different block from an opening event, this process is not so easy to parallelize (you would have to keep track of dangling users in both directions, which you don't do as far as I can tell).



    Also, the restriction that the aggregate does not fit into memory is...unrealistic IMO. You would have to have more users than there are people in the world. Anyways, your code does not respect this constraint either, since avgTimeSpentDict contains all users and will therefore not fit into memory. So I'm going to ignore this part.



    Instead, just iterate over the file normally, with a for loop. This does not read the whole file into memory. Update a running mean with the new value whenever you find a matching event for each user.



    At the same time keep a dictionary of users that are open to look out for a matching close event. If you have a close event without an open, it is a broken one and we can ignore it because you said it is guaranteed that the times are sorted (and time travel has not been invented yet, AFAIK). Or do something else with it. Same goes for an open event after a previous open, without any intervening close. Here I just added a print in those cases.



    import sys
    from collections import defaultdict

    def update_mean(count, mean, new_value):
    count += 1. # float so it also works in Python 2
    mean += (new_value - mean) / count
    return count, mean

    def average_timeout(file_name):
    open_users =
    time_spent = defaultdict(lambda: (0., 0.))

    with open(file_name) as f:
    for line in f:
    print(line.strip())
    try:
    user_id, timestamp, event = line.strip().split(",")
    except ValueError:
    print(f"misformed line: line!r")
    continue

    if event == "open":
    if user_id in open_users:
    print("open with prior open, missed a close")
    open_users[user_id] = int(timestamp)
    elif event == "close":
    if user_id not in open_users:
    print("close without open")
    else:
    diff = int(timestamp) - open_users.pop(user_id)
    time_spent[user_id] = update_mean(*time_spent[user_id], diff)
    print(f"close with prior open, time difference diff")
    else:
    print(f"Unknown event: event")

    print(f"len(open_users) users left without close event")
    return time_spent

    if __name__ == "__main__":
    time_spent = average_timeout(sys.argv[1])
    for user, (_, mean) in time_spent.items():
    print(f"user average timeout: mean")


    In production you will obviously want to either remove most of the prints or at least make them logging.debug calls.



    This can still run out of memory if the average length between an open and a close event contains more open events by different users than there is memory. Or if all events are broken and lack a close.



    Python has an official style-guide, PEP8, which programmers are encouraged to follow. It recommends using lower_case for functions and variables and putting a space after each comma in an argument list.



    fileObj.close does not actually close the file if you don't call it, fileObj.close(). But even better is to use with which will take care of closing the file for you, even in the event of an exception occurring somewhere.



    You should use Python 3. Python 2 will no longer be supported in less than a year.



    You can use x in d to check if some value x is in a dictionary d. No need to do x in d.keys(). In Python 2 this distinction is even more important since x in d is $mathcalO(1)$, while x in d.keys() is $mathcalO(n)$ (since it is a list).






    share|improve this answer











    $endgroup$

















      3












      $begingroup$

      This is a comment on your Python solution (I don't know anything about Scala).



      You don't need to iterate over chunks of your file unless you want to do parallel processing. However, since there might be a close event in a different block from an opening event, this process is not so easy to parallelize (you would have to keep track of dangling users in both directions, which you don't do as far as I can tell).



      Also, the restriction that the aggregate does not fit into memory is...unrealistic IMO. You would have to have more users than there are people in the world. Anyways, your code does not respect this constraint either, since avgTimeSpentDict contains all users and will therefore not fit into memory. So I'm going to ignore this part.



      Instead, just iterate over the file normally, with a for loop. This does not read the whole file into memory. Update a running mean with the new value whenever you find a matching event for each user.



      At the same time keep a dictionary of users that are open to look out for a matching close event. If you have a close event without an open, it is a broken one and we can ignore it because you said it is guaranteed that the times are sorted (and time travel has not been invented yet, AFAIK). Or do something else with it. Same goes for an open event after a previous open, without any intervening close. Here I just added a print in those cases.



      import sys
      from collections import defaultdict

      def update_mean(count, mean, new_value):
      count += 1. # float so it also works in Python 2
      mean += (new_value - mean) / count
      return count, mean

      def average_timeout(file_name):
      open_users =
      time_spent = defaultdict(lambda: (0., 0.))

      with open(file_name) as f:
      for line in f:
      print(line.strip())
      try:
      user_id, timestamp, event = line.strip().split(",")
      except ValueError:
      print(f"misformed line: line!r")
      continue

      if event == "open":
      if user_id in open_users:
      print("open with prior open, missed a close")
      open_users[user_id] = int(timestamp)
      elif event == "close":
      if user_id not in open_users:
      print("close without open")
      else:
      diff = int(timestamp) - open_users.pop(user_id)
      time_spent[user_id] = update_mean(*time_spent[user_id], diff)
      print(f"close with prior open, time difference diff")
      else:
      print(f"Unknown event: event")

      print(f"len(open_users) users left without close event")
      return time_spent

      if __name__ == "__main__":
      time_spent = average_timeout(sys.argv[1])
      for user, (_, mean) in time_spent.items():
      print(f"user average timeout: mean")


      In production you will obviously want to either remove most of the prints or at least make them logging.debug calls.



      This can still run out of memory if the average length between an open and a close event contains more open events by different users than there is memory. Or if all events are broken and lack a close.



      Python has an official style-guide, PEP8, which programmers are encouraged to follow. It recommends using lower_case for functions and variables and putting a space after each comma in an argument list.



      fileObj.close does not actually close the file if you don't call it, fileObj.close(). But even better is to use with which will take care of closing the file for you, even in the event of an exception occurring somewhere.



      You should use Python 3. Python 2 will no longer be supported in less than a year.



      You can use x in d to check if some value x is in a dictionary d. No need to do x in d.keys(). In Python 2 this distinction is even more important since x in d is $mathcalO(1)$, while x in d.keys() is $mathcalO(n)$ (since it is a list).






      share|improve this answer











      $endgroup$















        3












        3








        3





        $begingroup$

        This is a comment on your Python solution (I don't know anything about Scala).



        You don't need to iterate over chunks of your file unless you want to do parallel processing. However, since there might be a close event in a different block from an opening event, this process is not so easy to parallelize (you would have to keep track of dangling users in both directions, which you don't do as far as I can tell).



        Also, the restriction that the aggregate does not fit into memory is...unrealistic IMO. You would have to have more users than there are people in the world. Anyways, your code does not respect this constraint either, since avgTimeSpentDict contains all users and will therefore not fit into memory. So I'm going to ignore this part.



        Instead, just iterate over the file normally, with a for loop. This does not read the whole file into memory. Update a running mean with the new value whenever you find a matching event for each user.



        At the same time keep a dictionary of users that are open to look out for a matching close event. If you have a close event without an open, it is a broken one and we can ignore it because you said it is guaranteed that the times are sorted (and time travel has not been invented yet, AFAIK). Or do something else with it. Same goes for an open event after a previous open, without any intervening close. Here I just added a print in those cases.



        import sys
        from collections import defaultdict

        def update_mean(count, mean, new_value):
        count += 1. # float so it also works in Python 2
        mean += (new_value - mean) / count
        return count, mean

        def average_timeout(file_name):
        open_users =
        time_spent = defaultdict(lambda: (0., 0.))

        with open(file_name) as f:
        for line in f:
        print(line.strip())
        try:
        user_id, timestamp, event = line.strip().split(",")
        except ValueError:
        print(f"misformed line: line!r")
        continue

        if event == "open":
        if user_id in open_users:
        print("open with prior open, missed a close")
        open_users[user_id] = int(timestamp)
        elif event == "close":
        if user_id not in open_users:
        print("close without open")
        else:
        diff = int(timestamp) - open_users.pop(user_id)
        time_spent[user_id] = update_mean(*time_spent[user_id], diff)
        print(f"close with prior open, time difference diff")
        else:
        print(f"Unknown event: event")

        print(f"len(open_users) users left without close event")
        return time_spent

        if __name__ == "__main__":
        time_spent = average_timeout(sys.argv[1])
        for user, (_, mean) in time_spent.items():
        print(f"user average timeout: mean")


        In production you will obviously want to either remove most of the prints or at least make them logging.debug calls.



        This can still run out of memory if the average length between an open and a close event contains more open events by different users than there is memory. Or if all events are broken and lack a close.



        Python has an official style-guide, PEP8, which programmers are encouraged to follow. It recommends using lower_case for functions and variables and putting a space after each comma in an argument list.



        fileObj.close does not actually close the file if you don't call it, fileObj.close(). But even better is to use with which will take care of closing the file for you, even in the event of an exception occurring somewhere.



        You should use Python 3. Python 2 will no longer be supported in less than a year.



        You can use x in d to check if some value x is in a dictionary d. No need to do x in d.keys(). In Python 2 this distinction is even more important since x in d is $mathcalO(1)$, while x in d.keys() is $mathcalO(n)$ (since it is a list).






        share|improve this answer











        $endgroup$



        This is a comment on your Python solution (I don't know anything about Scala).



        You don't need to iterate over chunks of your file unless you want to do parallel processing. However, since there might be a close event in a different block from an opening event, this process is not so easy to parallelize (you would have to keep track of dangling users in both directions, which you don't do as far as I can tell).



        Also, the restriction that the aggregate does not fit into memory is...unrealistic IMO. You would have to have more users than there are people in the world. Anyways, your code does not respect this constraint either, since avgTimeSpentDict contains all users and will therefore not fit into memory. So I'm going to ignore this part.



        Instead, just iterate over the file normally, with a for loop. This does not read the whole file into memory. Update a running mean with the new value whenever you find a matching event for each user.



        At the same time keep a dictionary of users that are open to look out for a matching close event. If you have a close event without an open, it is a broken one and we can ignore it because you said it is guaranteed that the times are sorted (and time travel has not been invented yet, AFAIK). Or do something else with it. Same goes for an open event after a previous open, without any intervening close. Here I just added a print in those cases.



        import sys
        from collections import defaultdict

        def update_mean(count, mean, new_value):
        count += 1. # float so it also works in Python 2
        mean += (new_value - mean) / count
        return count, mean

        def average_timeout(file_name):
        open_users =
        time_spent = defaultdict(lambda: (0., 0.))

        with open(file_name) as f:
        for line in f:
        print(line.strip())
        try:
        user_id, timestamp, event = line.strip().split(",")
        except ValueError:
        print(f"misformed line: line!r")
        continue

        if event == "open":
        if user_id in open_users:
        print("open with prior open, missed a close")
        open_users[user_id] = int(timestamp)
        elif event == "close":
        if user_id not in open_users:
        print("close without open")
        else:
        diff = int(timestamp) - open_users.pop(user_id)
        time_spent[user_id] = update_mean(*time_spent[user_id], diff)
        print(f"close with prior open, time difference diff")
        else:
        print(f"Unknown event: event")

        print(f"len(open_users) users left without close event")
        return time_spent

        if __name__ == "__main__":
        time_spent = average_timeout(sys.argv[1])
        for user, (_, mean) in time_spent.items():
        print(f"user average timeout: mean")


        In production you will obviously want to either remove most of the prints or at least make them logging.debug calls.



        This can still run out of memory if the average length between an open and a close event contains more open events by different users than there is memory. Or if all events are broken and lack a close.



        Python has an official style-guide, PEP8, which programmers are encouraged to follow. It recommends using lower_case for functions and variables and putting a space after each comma in an argument list.



        fileObj.close does not actually close the file if you don't call it, fileObj.close(). But even better is to use with which will take care of closing the file for you, even in the event of an exception occurring somewhere.



        You should use Python 3. Python 2 will no longer be supported in less than a year.



        You can use x in d to check if some value x is in a dictionary d. No need to do x in d.keys(). In Python 2 this distinction is even more important since x in d is $mathcalO(1)$, while x in d.keys() is $mathcalO(n)$ (since it is a list).







        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited 8 hours ago

























        answered 8 hours ago









        GraipherGraipher

        27.3k54498




        27.3k54498




















            Wiki_91 is a new contributor. Be nice, and check out our Code of Conduct.









            draft saved

            draft discarded


















            Wiki_91 is a new contributor. Be nice, and check out our Code of Conduct.












            Wiki_91 is a new contributor. Be nice, and check out our Code of Conduct.











            Wiki_91 is a new contributor. Be nice, and check out our Code of Conduct.














            Thanks for contributing an answer to Code Review Stack Exchange!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            Use MathJax to format equations. MathJax reference.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f217434%2fcomputing-average-duration-per-user-from-a-large-csv-file-that-doesnt-fit-in-me%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            名間水力發電廠 目录 沿革 設施 鄰近設施 註釋 外部連結 导航菜单23°50′10″N 120°42′41″E / 23.83611°N 120.71139°E / 23.83611; 120.7113923°50′10″N 120°42′41″E / 23.83611°N 120.71139°E / 23.83611; 120.71139計畫概要原始内容臺灣第一座BOT 模式開發的水力發電廠-名間水力電廠名間水力發電廠 水利署首件BOT案原始内容《小檔案》名間電廠 首座BOT水力發電廠原始内容名間電廠BOT - 經濟部水利署中區水資源局

            Prove that NP is closed under karp reduction?Space(n) not closed under Karp reductions - what about NTime(n)?Class P is closed under rotation?Prove or disprove that $NL$ is closed under polynomial many-one reductions$mathbfNC_2$ is closed under log-space reductionOn Karp reductionwhen can I know if a class (complexity) is closed under reduction (cook/karp)Check if class $PSPACE$ is closed under polyonomially space reductionIs NPSPACE also closed under polynomial-time reduction and under log-space reduction?Prove PSPACE is closed under complement?Prove PSPACE is closed under union?

            Is my guitar’s action too high? Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern)Strings too stiff on a recently purchased acoustic guitar | Cort AD880CEIs the action of my guitar really high?Μy little finger is too weak to play guitarWith guitar, how long should I give my fingers to strengthen / callous?When playing a fret the guitar sounds mutedPlaying (Barre) chords up the guitar neckI think my guitar strings are wound too tight and I can't play barre chordsF barre chord on an SG guitarHow to find to the right strings of a barre chord by feel?High action on higher fret on my steel acoustic guitar