Tuesday, April 17, 2012

StatsD and Graphite

I have been kept finding a monitor solution a couple time. There are several solutions exist, including nagios, ganglia. Ganglia offers a monitoring solution, while Nagios offers a alerting solution too. Both Ganglia and Nagios seem a little complicated, and need a considerable effort to configure. Be honest that I am not familiar with Nagios and Ganglia, and just give a quick look on both of them.

After reading the post Measure Anything, Measure Everything from Esty, I started to have a try on statsd and graphite. Statsd is very simple to setup and easy to integrate with system, but Graphite really took me a very hard time to setup. As at first I tried to set up Graphite on ubuntu8.04, it failed finally due to python is not compitable with py2cario.

After re-installed ubuntu10.4 server, and followed the instruction of how to install graphite on ubuntu, both statsd and Graphite startuped successfully.

The latest version of graphite is 0.9.9, and lacking of document, i am looking forward its 1.0 which planned to release at 2012-01-01 and hugely delayed from its timeline.

Statsd is very simple, it is a daemon of node.js, and its stats.js contains only about 300 lines code. By reveiwing its code, you can fully understand the concept introduced in Measure Anything, Measure Everything, especially counter and timing.

Now it is time to run a test.

1. Launch Graphite(by default graphite will be installed to /opt/graphite).
> sudo /etc/init.d/apache2 restart # startup apache(graphite-web).
> cd /opt/graphite/> sudo ./bin/carbon-cache.py start # startup carbon to receive incoming stats.

2. Launch Statsd
> cd $STATSD_HOME
> sudo cp exampleConfig.js myconfig.js # change the configuration according to your requirement
> sudo node stats.js myconfig.js

3. Run statsd client(actually there is a java client)
> java Main

import java.util.Random;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;

public class Main{
  public static void main(String args[]) throws Exception{
    DatagramSocket socket = new DatagramSocket();
    while(true) {
      // send counter stats
      int c = new Random().nextInt(10);
      String data = "ramoncounter:" + c + "|c";
      byte[] d = data.getBytes();
      socket.send(new DatagramPacket(d, d.length, InetAddress.getByName("192.168.2.221"), 8125));
      System.out.println("[Send] " + data + "...Sleep 10 seconds!");

      // send timer stats
      data = "";
      data += "ramontimer:" + new Random().nextInt(30) + "|ms";
      d = data.getBytes();
      socket.send(new DatagramPacket(d, d.length, InetAddress.getByName("192.168.2.221"), 8125));
      System.out.println("[Send] " + data + "...Sleep 10 seconds!");

      Thread.sleep(10 * 1000);
    }
  }
}

Now open web browser and access http://GRAPHITE_HOST, you will be welcomed be graphite web UI.

How graphite maintain the incoming data points?

This is a big topic, graphite includes 3 components:
  1. carbon - a Twisted daemon that listens for time-series data.
  2. whisper - a simple database library for storing time-series data (similar in design to RRD)
  3. graphite webapp - A Django webapp that renders graphs on-demand using Cairo
Whisper is the data storage engine, and it contain one or more archives, each with a specific data resolution and retention (defined in number of points or max timestamp age). Archives are ordered from the highest-resolution and shortest retention archive to the lowest-resolution and longest retention period archive.

As the official document is so limited, and lacks of examples, it is hard to really understand how whisper works. I modified whisper.py by outputting more log messages to understand its mechanism.

The main functions of whisper.py are:

  • def create(path,archiveList,xFilesFactor=None,aggregationMethod=None)
  • def update(path,value,timestamp=None)
  • def fetch(path,fromTime,untilTime=None)
  • def info(path) - get header information.

By running the test client(Main.class), I have got a ramoncounter.wsp file which contains all data points of the given metrics. Its header information is as below:

maxRetention: 157784400
xFilesFactor: 0.5
aggregationMethod: average
fileSize: 3302620

Archive 0
retention: 21600
secondsPerPoint: 10
points: 2160
size: 25920
offset: 52

Archive 1
retention: 604800
secondsPerPoint: 60
points: 10080
size: 120960
offset: 25972

Archive 2
retention: 157784400
secondsPerPoint: 600
points: 262974
size: 3155688
offset: 146932

The amended whisper.py:
  1 #!/usr/bin/env python
  2 # Copyright 2008 Orbitz WorldWide
  3 #
  4 # Licensed under the Apache License, Version 2.0 (the "License");
  5 # you may not use this file except in compliance with the License.
  6 # You may obtain a copy of the License at
  7 #
  8 #   http://www.apache.org/licenses/LICENSE-2.0
  9 #
 10 # Unless required by applicable law or agreed to in writing, software
 11 # distributed under the License is distributed on an "AS IS" BASIS,
 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13 # See the License for the specific language governing permissions and
 14 # limitations under the License.
 15 #
 16 #
 17 # This module is an implementation of the Whisper database API
 18 # Here is the basic layout of a whisper data file
 19 #
 20 # File = Header,Data
 21 #       Header = Metadata,ArchiveInfo+
 22 #               Metadata = aggregationType,maxRetention,xFilesFactor,archiveCount
 23 #               ArchiveInfo = Offset,SecondsPerPoint,Points
 24 #       Data = Archive+
 25 #               Archive = Point+
 26 #                       Point = timestamp,value
 27 
 28 import os, struct, time, logging
 29 
 30 logging.basicConfig(filename="whisper.log",level=logging.DEBUG)
 31 
 32 try:
 33   import fcntl
 34   CAN_LOCK = True
 35 except ImportError:
 36   CAN_LOCK = False
 37 
 38 LOCK = False
 39 CACHE_HEADERS = False
 40 AUTOFLUSH = False
 41 __headerCache = {}
 42 
 43 longFormat = "!L"
 44 longSize = struct.calcsize(longFormat)
 45 floatFormat = "!f"
 46 floatSize = struct.calcsize(floatFormat)
 47 valueFormat = "!d"
 48 valueSize = struct.calcsize(valueFormat)
 49 pointFormat = "!Ld"
 50 pointSize = struct.calcsize(pointFormat)
 51 metadataFormat = "!2LfL"
 52 metadataSize = struct.calcsize(metadataFormat)
 53 archiveInfoFormat = "!3L"
 54 archiveInfoSize = struct.calcsize(archiveInfoFormat)
 55 
 56 aggregationTypeToMethod = dict({
 57   1: 'average',
 58   2: 'sum',
 59   3: 'last',
 60   4: 'max',
 61   5: 'min'
 62 })
 63 aggregationMethodToType = dict([[v,k] for k,v in aggregationTypeToMethod.items()])
 64 aggregationMethods = aggregationTypeToMethod.values()
 65 
 66 debug = startBlock = endBlock = lambda *a,**k: None
 67 
 68 UnitMultipliers = {
 69   's' : 1,
 70   'm' : 60,
 71   'h' : 60 * 60,
 72   'd' : 60 * 60 * 24,
 73   'y' : 60 * 60 * 24 * 365,
 74 }
 75 
 76 
 77 def parseRetentionDef(retentionDef):
 78   (precision, points) = retentionDef.strip().split(':')
 79 
 80   if precision.isdigit():
 81     precisionUnit = 's'
 82     precision = int(precision)
 83   else:
 84     precisionUnit = precision[-1]
 85     precision = int( precision[:-1] )
 86 
 87   if points.isdigit():
 88     pointsUnit = None
 89     points = int(points)
 90   else:
 91     pointsUnit = points[-1]
 92     points = int( points[:-1] )
 93 
 94   if precisionUnit not in UnitMultipliers:
 95     raise ValueError("Invalid unit: '%s'" % precisionUnit)
 96 
 97   if pointsUnit not in UnitMultipliers and pointsUnit is not None:
 98     raise ValueError("Invalid unit: '%s'" % pointsUnit)
 99 
100   precision = precision * UnitMultipliers[precisionUnit]
101 
102   if pointsUnit:
103     points = points * UnitMultipliers[pointsUnit] / precision
104 
105   return (precision, points)
106 
107 class WhisperException(Exception):
108     """Base class for whisper exceptions."""
109 
110 
111 class InvalidConfiguration(WhisperException):
112     """Invalid configuration."""
113 
114 
115 class InvalidAggregationMethod(WhisperException):
116     """Invalid aggregation method."""
117 
118 
119 class InvalidTimeInterval(WhisperException):
120     """Invalid time interval."""
121 
122 
123 class TimestampNotCovered(WhisperException):
124     """Timestamp not covered by any archives in this database."""
125 
126 class CorruptWhisperFile(WhisperException):
127   def __init__(self, error, path):
128     Exception.__init__(self, error)
129     self.error = error
130     self.path = path
131 
132   def __repr__(self):
133     return "<CorruptWhisperFile[%s] %s>" % (self.path, self.error)
134 
135   def __str__(self):
136     return "%s (%s)" % (self.error, self.path)
137 
138 def enableDebug():
139   global open, debug, startBlock, endBlock
140   class open(file):
141     def __init__(self,*args,**kwargs):
142       file.__init__(self,*args,**kwargs)
143       self.writeCount = 0
144       self.readCount = 0
145 
146     def write(self,data):
147       self.writeCount += 1
148       debug('WRITE %d bytes #%d' % (len(data),self.writeCount))
149       return file.write(self,data)
150 
151     def read(self,bytes):
152       self.readCount += 1
153       debug('READ %d bytes #%d' % (bytes,self.readCount))
154       return file.read(self,bytes)
155 
156   def debug(message):
157     print 'DEBUG :: %s' % message
158 
159   __timingBlocks = {}
160 
161   def startBlock(name):
162     __timingBlocks[name] = time.time()
163 
164   def endBlock(name):
165     debug("%s took %.5f seconds" % (name,time.time() - __timingBlocks.pop(name)))
166 
167 
168 def __readHeader(fh):
169   info = __headerCache.get(fh.name)
170   if info:
171     return info
172 
173   originalOffset = fh.tell()
174   fh.seek(0)
175   packedMetadata = fh.read(metadataSize)
176 
177   try:
178     (aggregationType,maxRetention,xff,archiveCount) = struct.unpack(metadataFormat,packedMetadata)
179   except:
180     raise CorruptWhisperFile("Unable to read header", fh.name)
181 
182   archives = []
183 
184   for i in xrange(archiveCount):
185     packedArchiveInfo = fh.read(archiveInfoSize)
186     try:
187       (offset,secondsPerPoint,points) = struct.unpack(archiveInfoFormat,packedArchiveInfo)
188     except:
189       raise CorruptWhisperFile("Unable to read archive %d metadata" % i, fh.name)
190 
191     archiveInfo = {
192       'offset' : offset,
193       'secondsPerPoint' : secondsPerPoint,
194       'points' : points,
195       'retention' : secondsPerPoint * points,
196       'size' : points * pointSize,
197     }
198     archives.append(archiveInfo)
199 
200   fh.seek(originalOffset)
201   info = {
202     'aggregationMethod' : aggregationTypeToMethod.get(aggregationType, 'average'),
203     'maxRetention' : maxRetention,
204     'xFilesFactor' : xff,
205     'archives' : archives,
206   }
207   if CACHE_HEADERS:
208     __headerCache[fh.name] = info
209 
210   return info
211 
212 
213 def setAggregationMethod(path, aggregationMethod):
214   """setAggregationMethod(path,aggregationMethod)
215 
216 path is a string
217 aggregationMethod specifies the method to use when propogating data (see ``whisper.aggregationMethods``)
218 """
219   fh = open(path,'r+b')
220   if LOCK:
221     fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
222 
223   packedMetadata = fh.read(metadataSize)
224 
225   try:
226     (aggregationType,maxRetention,xff,archiveCount) = struct.unpack(metadataFormat,packedMetadata)
227   except:
228     raise CorruptWhisperFile("Unable to read header", fh.name)
229 
230   try:
231     newAggregationType = struct.pack( longFormat, aggregationMethodToType[aggregationMethod] )
232   except KeyError:
233     raise InvalidAggregationMethod("Unrecognized aggregation method: %s" %
234           aggregationMethod)
235 
236   fh.seek(0)
237   fh.write(newAggregationType)
238 
239   if AUTOFLUSH:
240     fh.flush()
241     os.fsync(fh.fileno())
242 
243   if CACHE_HEADERS and fh.name in __headerCache:
244     del __headerCache[fh.name]
245 
246   fh.close()
247 
248   return aggregationTypeToMethod.get(aggregationType, 'average')
249 
250 
251 def validateArchiveList(archiveList):
252   """ Validates an archiveList. archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
253   An ArchiveList must:
254   1. Have at least one archive config. Example: (60, 86400) 
255   2. No archive may be a duplicate of another. 
256   3. Higher precision archives' precision must evenly divide all lower precision archives' precision.
257   4. Lower precision archives must cover larger time intervals than higher precision archives.
258 
259   Returns True or False
260   """
261 
262   try:
263     if not archiveList:
264       raise InvalidConfiguration("You must specify at least one archive configuration!")
265 
266     archiveList.sort(key=lambda a: a[0]) #sort by precision (secondsPerPoint)
267 
268     for i,archive in enumerate(archiveList):
269       if i == len(archiveList) - 1:
270         break
271 
272       next = archiveList[i+1]
273       if not (archive[0] < next[0]):
274         raise InvalidConfiguration("You cannot configure two archives "
275           "with the same precision %s,%s" % (archive,next))
276 
277       if (next[0] % archive[0]) != 0:
278         raise InvalidConfiguration("Higher precision archives' precision "
279           "must evenly divide all lower precision archives' precision %s,%s" \
280           % (archive[0],next[0]))
281 
282       retention = archive[0] * archive[1]
283       nextRetention = next[0] * next[1]
284 
285       if not (nextRetention > retention):
286         raise InvalidConfiguration("Lower precision archives must cover "
287           "larger time intervals than higher precision archives %s,%s" \
288           % (archive,next))
289 
290   except:
291         # RAMON: no exceptions will be thrown out???
292     return False
293   return True
294 
295 def create(path,archiveList,xFilesFactor=None,aggregationMethod=None):
296   """create(path,archiveList,xFilesFactor=0.5,aggregationMethod='average')
297 archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
298 path is a string
299 archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
300 xFilesFactor specifies the fraction of data points in a propagation interval that must have known values for a propagation to occur
301 aggregationMethod specifies the function to use when propogating data (see ``whisper.aggregationMethods``)
302 """
303   # Set default params
304   if xFilesFactor is None:
305     xFilesFactor = 0.5
306   if aggregationMethod is None:
307     aggregationMethod = 'average'
308 
309   #Validate archive configurations...
310   validArchive = validateArchiveList(archiveList)
311   if not validArchive:
312     raise InvalidConfiguration("There was a problem creating %s due to an invalid schema config." % path)
313 
314   #Looks good, now we create the file and write the header
315   if os.path.exists(path):
316     raise InvalidConfiguration("File %s already exists!" % path)
317 
318   fh = open(path,'wb')
319   if LOCK:
320     fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
321 
322   aggregationType = struct.pack( longFormat, aggregationMethodToType.get(aggregationMethod, 1) )
323   # for example a archieve list, [[10,2160],[60,10080],[600,262974]]
324   oldest = sorted([secondsPerPoint * points for secondsPerPoint,points in archiveList])[-1]
325   # the retention of [600,262974] is 600*262974
326   maxRetention = struct.pack( longFormat, oldest )
327   # xFilesFactor = 0.5
328   xFilesFactor = struct.pack( floatFormat, float(xFilesFactor) )
329   # archiveCount = 3
330   archiveCount = struct.pack(longFormat, len(archiveList))
331   packedMetadata = aggregationType + maxRetention + xFilesFactor + archiveCount
332   fh.write(packedMetadata)
333   # !2LfL means L+L+f+L
334   # Metadata    !2LfL   aggregationType,maxRetention,xFilesFactor,archiveCount
335   #     ArchiveInfo     !3L             Offset,SecondsPerPoint,Points
336   headerSize = metadataSize + (archiveInfoSize * len(archiveList))
337   archiveOffsetPointer = headerSize
338 
339   for secondsPerPoint,points in archiveList:
340         # record the start point(offset) of each archieve.
341     archiveInfo = struct.pack(archiveInfoFormat, archiveOffsetPointer, secondsPerPoint, points)
342     fh.write(archiveInfo)
343     archiveOffsetPointer += (points * pointSize)
344 
345   # perserver the disk space for all archives.
346   zeroes = '\x00' * (archiveOffsetPointer - headerSize)
347   fh.write(zeroes)
348 
349   if AUTOFLUSH:
350     fh.flush()
351     os.fsync(fh.fileno())
352 
353   fh.close()
354 
355 def __aggregate(aggregationMethod, knownValues):
356   if aggregationMethod == 'average':
357     return float(sum(knownValues)) / float(len(knownValues))
358   elif aggregationMethod == 'sum':
359     return float(sum(knownValues))
360   elif aggregationMethod == 'last':
361     return knownValues[len(knownValues)-1]
362   elif aggregationMethod == 'max':
363     return max(knownValues)
364   elif aggregationMethod == 'min':
365     return min(knownValues)
366   else:
367     raise InvalidAggregationMethod("Unrecognized aggregation method %s" %
368             aggregationMethod)
369 
370 def __propagate(fh,header,timestamp,higher,lower):
371   aggregationMethod = header['aggregationMethod']
372   xff = header['xFilesFactor']
373 
374   # guarantee the timestamp can be evenly divided by secondsPerPoint...but 
375   # it will lose data precision, right??
376   lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
377   logging.info("timestamp is %d" % timestamp)
378   logging.info("lowerIntervalStart is %d" % lowerIntervalStart)
379   lowerIntervalEnd = lowerIntervalStart + lower['secondsPerPoint']
380   logging.info("lowerIntervalEnd is %d" % lowerIntervalEnd)
381 
382   fh.seek(higher['offset'])
383   packedPoint = fh.read(pointSize)
384   (higherBaseInterval,higherBaseValue) = struct.unpack(pointFormat,packedPoint)
385   logging.info("higherBaseInterval,higherBaseValue is %d,%d" % (higherBaseInterval,higherBaseValue))
386 
387   if higherBaseInterval == 0:
388     higherFirstOffset = higher['offset']
389   else:
390     timeDistance = lowerIntervalStart - higherBaseInterval
391     pointDistance = timeDistance / higher['secondsPerPoint']
392     logging.info("higher['secondsPerPoint'] is %d" % higher['secondsPerPoint'])
393     byteDistance = pointDistance * pointSize
394         # higher[:higherFirstOffset] the first data point till the data point which timestamp matches the given timestamp
395     higherFirstOffset = higher['offset'] + (byteDistance % higher['size'])
396     logging.info("higherFirstOffset is %d" % higherFirstOffset)
397 
398   higherPoints = lower['secondsPerPoint'] / higher['secondsPerPoint']
399   higherSize = higherPoints * pointSize
400   relativeFirstOffset = higherFirstOffset - higher['offset']
401   logging.info("relativeFirstOffset is %d" % relativeFirstOffset)
402   relativeLastOffset = (relativeFirstOffset + higherSize) % higher['size']
403   logging.info("relativeLastOffset is %d" % relativeLastOffset)
404   higherLastOffset = relativeLastOffset + higher['offset']
405   logging.info("higherLastOffset is %d" % relativeLastOffset)
406   fh.seek(higherFirstOffset)
407 
408   if higherFirstOffset < higherLastOffset: #we don't wrap the archive
409     seriesString = fh.read(higherLastOffset - higherFirstOffset)
410   else: #We do wrap the archive..round robin
411     higherEnd = higher['offset'] + higher['size']
412     seriesString = fh.read(higherEnd - higherFirstOffset)
413     fh.seek(higher['offset'])
414     seriesString += fh.read(higherLastOffset - higher['offset'])
415 
416   #Now we unpack the series data we just read
417   byteOrder,pointTypes = pointFormat[0],pointFormat[1:]
418   points = len(seriesString) / pointSize
419   logging.info("points is %d" % points)
420   seriesFormat = byteOrder + (pointTypes * points)
421   unpackedSeries = struct.unpack(seriesFormat, seriesString)
422   logging.info("unpackedSeries is %s" % str(unpackedSeries))
423 
424   #And finally we construct a list of values
425   neighborValues = [None] * points
426   currentInterval = lowerIntervalStart
427   logging.info("currentInterval is %d" % currentInterval)
428   step = higher['secondsPerPoint']
429   logging.info("step is %d" % step)
430 
431   # the value of unpackedSeries will like below:
432   # (1334806980, 0.29999999999999999, 1334806990, 0.20000000000000001, 1334828600, 12.0, 1334807010, 0.29999999999999999, 1334807020, 0.5)
433   # and the xrange(0,len(unpackedSeries),2) will return [0,2,4,6...]
434   for i in xrange(0,len(unpackedSeries),2):
435     pointTime = unpackedSeries[i]
436     if pointTime == currentInterval:
437           # what does this mean???...check above comments.
438       neighborValues[i/2] = unpackedSeries[i+1]
439     currentInterval += step
440   logging.info("neighborValues is %s" % str(neighborValues))
441 
442   #Propagate aggregateValue to propagate from neighborValues if we have enough known points
443   knownValues = [v for v in neighborValues if v is not None]
444   if not knownValues:
445     return False
446   logging.info("knownValues is %s" % str(knownValues))
447 
448   knownPercent = float(len(knownValues)) / float(len(neighborValues))
449   logging.info("knownPercent is %f" % knownPercent)
450   logging.info("xff is %f" % xff)
451   if knownPercent >= xff: #we have enough data to propagate a value!
452     logging.info("aggregationMethod is %s" % str(aggregationMethod))
453     aggregateValue = __aggregate(aggregationMethod, knownValues)
454     logging.info("aggregateValue is %f" % aggregateValue)
455     myPackedPoint = struct.pack(pointFormat,lowerIntervalStart,aggregateValue)
456     fh.seek(lower['offset'])
457     packedPoint = fh.read(pointSize)
458     (lowerBaseInterval,lowerBaseValue) = struct.unpack(pointFormat,packedPoint)
459 
460         # create or update
461     if lowerBaseInterval == 0: #First propagated update to this lower archive
462       fh.seek(lower['offset'])
463       fh.write(myPackedPoint)
464     else: #Not our first propagated update to this lower archive
465       timeDistance = lowerIntervalStart - lowerBaseInterval
466       pointDistance = timeDistance / lower['secondsPerPoint']
467       byteDistance = pointDistance * pointSize
468       lowerOffset = lower['offset'] + (byteDistance % lower['size'])
469       fh.seek(lowerOffset)
470       fh.write(myPackedPoint)
471 
472     return True
473 
474   else:
475     return False
476 
477 
478 def update(path,value,timestamp=None):
479   """update(path,value,timestamp=None)
480 
481 path is a string
482 value is a float
483 timestamp is either an int or float
484 """
485   value = float(value)
486   fh = open(path,'r+b')
487   return file_update(fh, value, timestamp)
488 
489 
490 def file_update(fh, value, timestamp):
491   if LOCK:
492     fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
493 
494   header = __readHeader(fh)
495   now = int( time.time() )
496   if timestamp is None:
497     timestamp = now
498 
499   timestamp = int(timestamp)
500   diff = now - timestamp
501   logging.info("diff(now - timestamp) is %d" % diff)
502   if not ((diff < header['maxRetention']) and diff >= 0):
503     raise TimestampNotCovered("Timestamp not covered by any archives in "
504       "this database.")
505 
506   # for [[10,2160],[60,10080],[600,262974]], [10,2160] is the highest-precision archive, while 
507   # [600,262974] is the lowest-precision archive...the archive list is sorted from highest-precision
508   # to lowest-precision.
509   for i,archive in enumerate(header['archives']): #Find the highest-precision archive that covers timestamp
510     if archive['retention'] < diff: continue
511     lowerArchives = header['archives'][i+1:] #We'll pass on the update to these lower precision archives later
512     break
513 
514   # The scope of variable 'archive' in for loop is beyond the for loop...a little strange feature!
515   # First we update the highest-precision archive
516   myInterval = timestamp - (timestamp % archive['secondsPerPoint'])
517   myPackedPoint = struct.pack(pointFormat,myInterval,value)
518   fh.seek(archive['offset'])
519   packedPoint = fh.read(pointSize)
520   (baseInterval,baseValue) = struct.unpack(pointFormat,packedPoint)
521 
522   if baseInterval == 0: #This file's first update
523     # seek(offset) will reach the absolute position specified by offset.
524     fh.seek(archive['offset'])
525     fh.write(myPackedPoint)
526     baseInterval,baseValue = myInterval,value
527   else: #Not our first update
528     timeDistance = myInterval - baseInterval
529     pointDistance = timeDistance / archive['secondsPerPoint']
530     byteDistance = pointDistance * pointSize
531         # byteDistance % archive['size'] round-robin
532     myOffset = archive['offset'] + (byteDistance % archive['size'])
533     fh.seek(myOffset)
534     fh.write(myPackedPoint)
535 
536   #Now we propagate the update to lower-precision archives
537   higher = archive
538   logging.info("higher archive:" + str(higher))
539   for lower in lowerArchives:
540     if not __propagate(fh, header, myInterval, higher, lower):
541       break
542     higher = lower
543 
544   if AUTOFLUSH:
545     fh.flush()
546     os.fsync(fh.fileno())
547 
548   fh.close()
549 
550 
551 def update_many(path,points):
552   """update_many(path,points)
553 
554 path is a string
555 points is a list of (timestamp,value) points
556 """
557   if not points: return
558   points = [ (int(t),float(v)) for (t,v) in points]
559   points.sort(key=lambda p: p[0],reverse=True) #order points by timestamp, newest first
560   fh = open(path,'r+b')
561   return file_update_many(fh, points)
562 
563 
564 def file_update_many(fh, points):
565   if LOCK:
566     fcntl.flock( fh.fileno(), fcntl.LOCK_EX )
567 
568   header = __readHeader(fh)
569   now = int( time.time() )
570   archives = iter( header['archives'] )
571   currentArchive = archives.next()
572   currentPoints = []
573 
574   for point in points:
575     age = now - point[0]
576 
577     while currentArchive['retention'] < age: #we can't fit any more points in this archive
578       if currentPoints: #commit all the points we've found that it can fit
579         currentPoints.reverse() #put points in chronological order
580         __archive_update_many(fh,header,currentArchive,currentPoints)
581         currentPoints = []
582       try:
583         currentArchive = archives.next()
584       except StopIteration:
585         currentArchive = None
586         break
587 
588     if not currentArchive:
589       break #drop remaining points that don't fit in the database
590 
591     currentPoints.append(point)
592 
593   if currentArchive and currentPoints: #don't forget to commit after we've checked all the archives
594     currentPoints.reverse()
595     __archive_update_many(fh,header,currentArchive,currentPoints)
596 
597   if AUTOFLUSH:
598     fh.flush()
599     os.fsync(fh.fileno())
600 
601   fh.close()
602 
603 
604 def __archive_update_many(fh,header,archive,points):
605   step = archive['secondsPerPoint']
606   alignedPoints = [ (timestamp - (timestamp % step), value)
607                     for (timestamp,value) in points ]
608   #Create a packed string for each contiguous sequence of points
609   packedStrings = []
610   previousInterval = None
611   currentString = ""
612   for (interval,value) in alignedPoints:
613     if (not previousInterval) or (interval == previousInterval + step):
614       currentString += struct.pack(pointFormat,interval,value)
615       previousInterval = interval
616     else:
617       numberOfPoints = len(currentString) / pointSize
618       startInterval = previousInterval - (step * (numberOfPoints-1))
619       packedStrings.append( (startInterval,currentString) )
620       currentString = struct.pack(pointFormat,interval,value)
621       previousInterval = interval
622   if currentString:
623     numberOfPoints = len(currentString) / pointSize
624     startInterval = previousInterval - (step * (numberOfPoints-1))
625     packedStrings.append( (startInterval,currentString) )
626 
627   #Read base point and determine where our writes will start
628   fh.seek(archive['offset'])
629   packedBasePoint = fh.read(pointSize)
630   (baseInterval,baseValue) = struct.unpack(pointFormat,packedBasePoint)
631   if baseInterval == 0: #This file's first update
632     baseInterval = packedStrings[0][0] #use our first string as the base, so we start at the start
633 
634   #Write all of our packed strings in locations determined by the baseInterval
635   for (interval,packedString) in packedStrings:
636     timeDistance = interval - baseInterval
637     pointDistance = timeDistance / step
638     byteDistance = pointDistance * pointSize
639     myOffset = archive['offset'] + (byteDistance % archive['size'])
640     fh.seek(myOffset)
641     archiveEnd = archive['offset'] + archive['size']
642     bytesBeyond = (myOffset + len(packedString)) - archiveEnd
643 
644     if bytesBeyond > 0:
645       fh.write( packedString[:-bytesBeyond] )
646       assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (archiveEnd,fh.tell(),bytesBeyond,len(packedString))
647       fh.seek( archive['offset'] )
648       fh.write( packedString[-bytesBeyond:] ) #safe because it can't exceed the archive (retention checking logic above)
649     else:
650       fh.write(packedString)
651 
652   #Now we propagate the updates to lower-precision archives
653   higher = archive
654   lowerArchives = [arc for arc in header['archives'] if arc['secondsPerPoint'] > archive['secondsPerPoint']]
655 
656   for lower in lowerArchives:
657     fit = lambda i: i - (i % lower['secondsPerPoint'])
658     lowerIntervals = [fit(p[0]) for p in alignedPoints]
659     uniqueLowerIntervals = set(lowerIntervals)
660     propagateFurther = False
661     for interval in uniqueLowerIntervals:
662       if __propagate(fh, header, interval, higher, lower):
663         propagateFurther = True
664 
665     if not propagateFurther:
666       break
667     higher = lower
668 
669 
670 def info(path):
671   """info(path)
672 
673 path is a string
674 """
675   fh = open(path,'rb')
676   info = __readHeader(fh)
677   fh.close()
678   return info
679 
680 
681 def fetch(path,fromTime,untilTime=None):
682   """fetch(path,fromTime,untilTime=None)
683 
684 path is a string
685 fromTime is an epoch time
686 untilTime is also an epoch time, but defaults to now
687 """
688   fh = open(path,'rb')
689   return file_fetch(fh, fromTime, untilTime)
690 
691 
692 def file_fetch(fh, fromTime, untilTime):
693   header = __readHeader(fh)
694   now = int( time.time() )
695   if untilTime is None:
696     untilTime = now
697   fromTime = int(fromTime)
698   untilTime = int(untilTime)
699 
700   oldestTime = now - header['maxRetention']
701   if fromTime < oldestTime:
702     fromTime = oldestTime
703 
704   if not (fromTime < untilTime):
705     raise InvalidTimeInterval("Invalid time interval")
706   if untilTime > now:
707     untilTime = now
708   if untilTime < fromTime:
709     untilTime = now
710 
711   diff = now - fromTime
712   for archive in header['archives']:
713     if archive['retention'] >= diff:
714       break
715 
716   fromInterval = int( fromTime - (fromTime % archive['secondsPerPoint']) ) + archive['secondsPerPoint']
717   untilInterval = int( untilTime - (untilTime % archive['secondsPerPoint']) ) + archive['secondsPerPoint']
718   fh.seek(archive['offset'])
719   packedPoint = fh.read(pointSize)
720   (baseInterval,baseValue) = struct.unpack(pointFormat,packedPoint)
721 
722   if baseInterval == 0:
723     step = archive['secondsPerPoint']
724     points = (untilInterval - fromInterval) / step
725     timeInfo = (fromInterval,untilInterval,step)
726     valueList = [None] * points
727     return (timeInfo,valueList)
728 
729   #Determine fromOffset
730   timeDistance = fromInterval - baseInterval
731   pointDistance = timeDistance / archive['secondsPerPoint']
732   byteDistance = pointDistance * pointSize
733   fromOffset = archive['offset'] + (byteDistance % archive['size'])
734 
735   #Determine untilOffset
736   timeDistance = untilInterval - baseInterval
737   pointDistance = timeDistance / archive['secondsPerPoint']
738   byteDistance = pointDistance * pointSize
739   untilOffset = archive['offset'] + (byteDistance % archive['size'])
740 
741   #Read all the points in the interval
742   fh.seek(fromOffset)
743   if fromOffset < untilOffset: #If we don't wrap around the archive
744     seriesString = fh.read(untilOffset - fromOffset)
745   else: #We do wrap around the archive, so we need two reads
746     archiveEnd = archive['offset'] + archive['size']
747     seriesString = fh.read(archiveEnd - fromOffset)
748     fh.seek(archive['offset'])
749     seriesString += fh.read(untilOffset - archive['offset'])
750 
751   #Now we unpack the series data we just read (anything faster than unpack?)
752   byteOrder,pointTypes = pointFormat[0],pointFormat[1:]
753   points = len(seriesString) / pointSize
754   seriesFormat = byteOrder + (pointTypes * points)
755   unpackedSeries = struct.unpack(seriesFormat, seriesString)
756 
757   #And finally we construct a list of values (optimize this!)
758   valueList = [None] * points #pre-allocate entire list for speed
759   currentInterval = fromInterval
760   step = archive['secondsPerPoint']
761 
762   for i in xrange(0,len(unpackedSeries),2):
763     pointTime = unpackedSeries[i]
764     if pointTime == currentInterval:
765       pointValue = unpackedSeries[i+1]
766       valueList[i/2] = pointValue #in-place reassignment is faster than append()
767     currentInterval += step
768 
769   fh.close()
770   timeInfo = (fromInterval,untilInterval,step)
771   return (timeInfo,valueList)
772 
773 now = int( time.time() - 24*24*60 )
774 print 'update'
775 update("e:/tmp/ramoncounter-2.wsp", 12, now)
776 
777 

Be noticed that ramoncounter.wsp was created at about 4PM 2012/04/18, and kept receiving data points until 10AM 2012/04/19, and I ran 'python whisper.py' at about 5PM 2012/04/19. The last 3 lines of whisper.py will call the update() method which is the most complicated one, and you find the input timestamp parameter is 'int( time.time() - 24*24*60 )', it is necessary, as all data points  is between 4PM 2012/04/18 and 10AM 2012/04/19.

Below is the whisper.log:

INFO:root:diff(now - timestamp) is 34560
INFO:root:higher archive:{'retention': 604800, 'secondsPerPoint': 60, 'points': 10080, 'size': 120960, 'offset': 25972}
INFO:root:timestamp is 1334795580
INFO:root:lowerIntervalStart is 1334795400
INFO:root:lowerIntervalEnd is 1334796000
INFO:root:higherBaseInterval,higherBaseValue is 1334745540,0
INFO:root:higher['secondsPerPoint'] is 60
INFO:root:higherFirstOffset is 35944
INFO:root:relativeFirstOffset is 9972
INFO:root:relativeLastOffset is 10092
INFO:root:higherLastOffset is 10092
INFO:root:points is 10
INFO:root:unpackedSeries is (1334795400, 0.51666666666666672, 1334795460, 0.25, 1334795520, 0.40000000000000008, 1334795580, 12.0, 1334795640, 0.46666666666666673, 1334795700, 0.5, 1334795760, 0.31666666666666665, 1334795820, 0.71666666666666667, 1334795880, 0.56666666666666665, 1334795940, 0.5)
INFO:root:currentInterval is 1334795400
INFO:root:step is 60
INFO:root:neighborValues is [0.51666666666666672, 0.25, 0.40000000000000008, 12.0, 0.46666666666666673, 0.5, 0.31666666666666665, 0.71666666666666667, 0.56666666666666665, 0.5]
INFO:root:knownValues is [0.51666666666666672, 0.25, 0.40000000000000008, 12.0, 0.46666666666666673, 0.5, 0.31666666666666665, 0.71666666666666667, 0.56666666666666665, 0.5]
INFO:root:knownPercent is 1.000000
INFO:root:xff is 0.500000
INFO:root:aggregationMethod is average
INFO:root:aggregateValue is 1.623333

It is important to understand how whisper update and propagate the updating to lower-precision archives.
Based on the log, you can find that the first archive found is 'archive:{'retention': 604800, 'secondsPerPoint': 60, 'points': 10080, 'size': 120960, 'offset': 25972}', not 'archive:{'retention': 21600, 'secondsPerPoint': 10, 'points': 2160, 'size': 25920, 'offset': 52}', why?

As we ran 'python whisper.py' at 5PM 2012/04/19, and the timestamp of incoming data point is about 5PM 2012/04/18('int( time.time() - 24*24*60 )'), the time distance is 24 hours(retention is 24*60*60=86400), it is greater that the highest-precision archive(secondsPerPoing:10, retention:21600), so the next archive selected. Just check code for detailed information.

How propagate occurs?
It is better to explain it by a example(graphite official documents really lack of this).
If the input data point is "1334795700, 0.5", whisper will first update archive(secondsPerPoint:10), and then propagate to the other 2 lower-precision archives. Lets focus on how it update archive(secondsPerPoint:60).
1) It will match a definite data point by:

lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])

2) Find the position of timestamp(lowerIntervalStart) in archive(secondsPerPoint:10)
3) Find next 6 data points in  archive(secondsPerPoint:10). why 6 points? secondsPerPoint_of_Lower_archive/secondsPerPoint_of_Higher_archive=60/10=6.
4) Apply the aggregate method to the 6 data points.
5) Update archive(secondsPerPoint:60) by timestamp(lowerIntervalStart) and aggregate value.

---------------------------------------------------
Some terminology of Graphite's Whisper(a round-robin-database)
- data point: A float data value paired with a timestamp in seconds since UNIX Epoch(01-01-1970).
- resolution: The number of seconds per data point. For example in our test, statsd will flush the counter('ramoncounter') every 10 seconds, from the perspective of whisper, it receives 1 data point per 10 seconds, so the resolution is 10/1 = 10seconds. And resolution 10 seconds is higher than a resolution of 60 seconds.


Wednesday, April 11, 2012

DBUnit for integration test.

I have published a post Dive into Spring test framework which demonstrate that spring test framework will hugely improve our integration test. But must be aware that in the previous post, testing code and tested code run in same process, that says they are run in same transaction, as all integration test call javax.servlet.HttpServlet.doPost() directly.
Now we face a new situation, imagine that how do maintain data consistency when our testing code call tested code remotely, for example by http, or TCP socket etc. In this case, testing code and tested code run in seperated process, spring can't manage both client and server side transactions. How do we automatic such integration test scenario?

The main challenges in such remote integration test are how to make data consistent for each test case. Only definite input can produce definite output, then how to maintain the underlying database at a definite state for each test case? DBUnit is born for that.

Before running a test case, we can clean and insert a given initial test data set, this will guarantee that we will run our test case against given test data, then we can expect a given output, and finally compare expected result against underlying database.

Code will explain everything. Below is a base test class from which all test class should extend.

  1 package net.mpos.igpe.test;
  2 
  3 import java.io.File;
  4 import java.io.FileOutputStream;
  5 import java.io.IOException;
  6 import java.sql.Connection;
  7 import java.sql.Driver;
  8 import java.sql.DriverManager;
  9 import java.sql.ResultSet;
 10 import java.sql.SQLException;
 11 import java.sql.Statement;
 12 import java.text.SimpleDateFormat;
 13 import java.util.Date;
 14 import java.util.HashMap;
 15 import java.util.LinkedList;
 16 import java.util.List;
 17 import java.util.Map;
 18 
 19 import net.mpos.igpe.common.tlvutilities.TLVElement;
 20 import net.mpos.igpe.common.tlvutilities.TLVParser;
 21 import net.mpos.igpe.core.Constants;
 22 import net.mpos.igpe.util.SecurityMeasurements;
 23 
 24 import org.apache.commons.logging.Log;
 25 import org.apache.commons.logging.LogFactory;
 26 import org.dbunit.Assertion;
 27 import org.dbunit.DatabaseUnitException;
 28 import org.dbunit.database.DatabaseConnection;
 29 import org.dbunit.database.IDatabaseConnection;
 30 import org.dbunit.database.QueryDataSet;
 31 import org.dbunit.dataset.DataSetException;
 32 import org.dbunit.dataset.IDataSet;
 33 import org.dbunit.dataset.ITable;
 34 import org.dbunit.dataset.SortedTable;
 35 import org.dbunit.dataset.filter.DefaultColumnFilter;
 36 import org.dbunit.dataset.xml.FlatXmlDataSet;
 37 import org.dbunit.ext.oracle.Oracle10DataTypeFactory;
 38 import org.dbunit.operation.DatabaseOperation;
 39 import org.dbunit.util.fileloader.FlatXmlDataFileLoader;
 40 import org.junit.After;
 41 import org.junit.Before;
 42 
 43 public class BaseAcceptanceTest {
 44   protected Log logger = LogFactory.getLog(BaseAcceptanceTest.class);
 45   public IDatabaseConnection dbConn;
 46   // NOTE: DATA_KEY and MAC_KEY must be same with table 'operator_session'.
 47   // Before runnint test, you must reload the testing data into db in order to
 48   // set the operator_session.create_time as current time.
 49   public String dataKey = "BS1ZbvLkmOyESBpyZ0XqoiZH8WkYsL2g";
 50   public String macKey = "7yuxr9fYh/2lmtv5YnybIQTm+jdAr58V+ifRZskMfO8=";
 51   public String igpeHost = "192.168.2.107";
 52   public int igpePort = 3000;
 53   // public String igpeHost = "192.168.2.136";
 54   // public int igpePort = 8899;
 55   public String opLoginName = "OPERATOR-LOGIN";
 56   public String batchNo = "200901";
 57   public long deviceId = 111;
 58 
 59   /**
 60    * Load test data for each test case automatically. As all IGPE integration
 61    * tests are TE backed, those master test data(oracle_masterdata.sql) which
 62    * used to support IGPE/TE launching must be imported manually.
 63    */
 64   @Before
 65   public void setUp() throws Exception {
 66     // we can reuse IDatabaseConnection, it represents a specific underlying
 67     // database connection.
 68     // must use this constructor which specify 'scheme', otherwise a
 69     // 'AmbiguousTableNameException' will be thrown out.
 70     dbConn = new DatabaseConnection(setupConnection(), "ramonal", true);
 71     /**
 72      * Refer to DBUnit FAQ.
 73      * <p>
 74      * Why am I getting an "The configured data type factory 'class
 75      * org.dbunit.dataset.datatype.DefaultDataTypeFactory' might cause
 76      * problems with the current database ..." ?
 77      * <p>
 78      * This warning occurs when no data type factory has been configured and
 79      * DbUnit defaults to its
 80      * org.dbunit.dataset.datatype.DefaultDataTypeFactory which supports a
 81      * limited set of RDBMS.
 82      */
 83     dbConn.getConfig().setProperty("http://www.dbunit.org/properties/datatypeFactory",
 84             new Oracle10DataTypeFactory());
 85 
 86     // initialize database
 87     FlatXmlDataFileLoader loader = new FlatXmlDataFileLoader();
 88     /**
 89      * DbUnit uses the first tag for a table to define the columns to be
 90      * populated. If the following records for this table contain extra
 91      * columns, these ones will therefore not be populated.
 92      * <p>
 93      * To solve this, either define all the columns of the table in the
 94      * first row (using NULL values for the empty columns), or use a DTD to
 95      * define the metadata of the tables you use.
 96      */
 97     Map replaceMap = new HashMap();
 98     replaceMap.put("[NULL]", null);
 99     replaceMap.put("[SYS_TIMESTAMP]",
100             new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
101     loader.addReplacementObjects(replaceMap);
102     // lookup data file from classpath
103     IDataSet testData = loader.load("/testdata.xml");
104     DatabaseOperation.CLEAN_INSERT.execute(dbConn, testData);
105     logger.info("Load test data successfully");
106   }
107 
108   @After
109   public void tearDown() throws Exception {
110     // release database connection
111     if (dbConn != null)
112       dbConn.close();
113   }
114 
115   protected void assertTable(List<DbAssertTable> actualAssertTables)
116           throws SQLException, DataSetException, DatabaseUnitException, IOException {
117     this.assertTable(null, actualAssertTables);
118   }
119 
120   /**
121    * Assert data set. Load expected data set file from classpath, this file
122    * must be located at the same package with the test class and has a
123    * convenient name which follows
124    * '{TestClassName}.{TestMethodName}.expected.xml', for example
125    * 'PayoutAcceptanceTest.testPayout_WithoutLGPrize_OldPOS_OK.expected.xml'
126    */
127   protected void assertTable(Map replacementMap, List<DbAssertTable> actualAssertTables)
128           throws SQLException, DataSetException, DatabaseUnitException, IOException {
129     String className = this.getClass().getCanonicalName();
130     className = className.replace(".", "/");
131     this.assertTable(replacementMap, actualAssertTables, PATH_MODE_CLASSPATH + "/" + className
132             + "." + getCurrentMethodName() + ".expected.xml");
133   }
134 
135   protected String getCurrentMethodName() {
136     StackTraceElement e[] = Thread.currentThread().getStackTrace();
137     for (StackTraceElement s : e) {
138       // only test case method name can start with 'testXXX'.
139       if (s.getMethodName().startsWith("test"))
140         return s.getMethodName();
141     }
142     return null;
143   }
144 
145   /**
146    * Assert data set in <code>expectedDataSetFile</code> against underlying
147    * database. Only data set defined in <code>expectedDataSetFile</code> will
148    * be compared.
149    * <p>
150    * You can specify <code>expectedDataSetFile</code> in two styles:
151    * <ul>
152    * <li>lookup data set file from file system, it must start with "file://",
153    * for example, "file://e:/tmp/expected.xml"</li>
154    * <li>lookup data set file from classpath, it must start with
155    * "classpath://", for example,
156    * "classpath:///net/mpos/igpe/transactions/payout/Payout.testPayout_WithLGPrize_NewPOS_OK.xml"
157    * </li>
158    * </ul>
159    * If <code>expectedDataSetFile</code> doesn't start with either "file:" or
160    * "classpath://", default "classpath://" will be assumed.
161    * 
162    * @param actualAssertTables The database table definitions which used to
163    *            limit the returned rows and also may apply ordering.
164    * @param expectedDataSetFile The file of expected data set.
165    * @param replacementMap A replacement map used to replace placholder in
166    *            expected data set file.
167    */
168   protected void assertTable(Map replacementMap, List<DbAssertTable> actualAssertTables,
169           String expectedDataSetFile) throws SQLException, DataSetException,
170           DatabaseUnitException, IOException {
171     // only compare all tables defined in expectedDataSetFile
172     IDataSet expectedDataSet = this.loadDataSet(expectedDataSetFile, replacementMap);
173     String[] expectedTableNames = expectedDataSet.getTableNames();
174     for (String expectedTableName : expectedTableNames) {
175       if (logger.isDebugEnabled())
176         logger.debug("Start to compare expected table - " + expectedTableName);
177       ITable actualTable = null;
178       ITable expectedTable = expectedDataSet.getTable(expectedTableName);
179       DbAssertTable dbTableDef = this.lookupDbAssertTable(actualAssertTables,
180               expectedTableName);
181       if (dbTableDef == null) {
182         // match all rows in underlying database table.
183         actualTable = dbConn.createTable(expectedTableName);
184       } else {
185         if (dbTableDef.getQuery() != null)
186           actualTable = dbConn.createQueryTable(expectedTableName, dbTableDef.getQuery());
187         if (dbTableDef.getSort() != null) {
188           // By default, database table snapshot taken by DbUnit are
189           // sorted by
190           // primary keys. If a table does not have a primary key or
191           // the primary
192           // key is automatically generated by your database, the rows
193           // ordering is
194           // not predictable and assertEquals will fail.
195           actualTable = new SortedTable(actualTable, dbTableDef.getSort());
196           // must be invoked immediately after the constructor
197           ((SortedTable) actualTable).setUseComparable(true);
198           expectedTable = new SortedTable(expectedTable, dbTableDef.getSort());
199           // must be invoked immediately after the constructor
200           ((SortedTable) expectedTable).setUseComparable(true);
201         }
202       }
203       // Ignoring some columns in comparison, only compare columns defined
204       // in expected table.
205       actualTable = DefaultColumnFilter.includedColumnsTable(actualTable, expectedTable
206               .getTableMetaData().getColumns());
207 
208       // assert
209       Assertion.assertEquals(expectedTable, actualTable);
210     }
211   }
212 
213   private DbAssertTable lookupDbAssertTable(List<DbAssertTable> tables, String tableName) {
214     for (DbAssertTable dbTable : tables) {
215       if (tableName.equalsIgnoreCase(dbTable.getTableName()))
216         return dbTable;
217     }
218     logger.info("No DbAssertTable found by tableName=" + tableName);
219     return null;
220   }
221 
222   protected static final String PATH_MODE_FILE = "file://";
223   protected static final String PATH_MODE_CLASSPATH = "classpath://";
224 
225   protected IDataSet loadDataSet(String datasetFile, Map replacementMap) throws IOException,
226           DataSetException {
227     FlatXmlDataFileLoader loader = new FlatXmlDataFileLoader();
228     if (replacementMap != null)
229       loader.addReplacementObjects(replacementMap);
230     IDataSet dataset = null;
231     if (datasetFile.startsWith(PATH_MODE_FILE)) {
232       dataset = loader.getBuilder().build(
233               new File(datasetFile.substring(PATH_MODE_FILE.length())));
234     } else if (datasetFile.startsWith(PATH_MODE_CLASSPATH)) {
235       dataset = loader.load(datasetFile.substring(PATH_MODE_CLASSPATH.length()));
236     }
237     return dataset;
238   }
239 
240   /**
241    * Represents the underlying database table which will used to be compared
242    * with expected tables.
243    * 
244    * @author Ramon Li
245    */
246   protected class DbAssertTable {
247     private String tableName;
248     private String query;
249     private String[] sort;
250 
251     public DbAssertTable(String tableName, String query, String[] sort) {
252       if (tableName == null)
253         throw new IllegalArgumentException("argument 'tableName' can't be null");
254       this.tableName = tableName;
255       this.query = query;
256       this.sort = sort;
257     }
258 
259     /**
260      * The name of asserted table, actually it should be the SQL result name
261      * of <code>query</code>, and the name must match with a table defined
262      * in expected test data file.
263      */
264     public String getTableName() {
265       return tableName;
266     }
267 
268     /**
269      * A SQl query used to retrieve specific number of rows from underlying
270      * database. If you want to get some rows which satify specific
271      * condition of a given table, a query should be specified.
272      * <p>
273      * Also you may want to retrieve a result by join different real
274      * database tables, in this case, the <code>tableName</code> doesn't
275      * need to be a real database table name, but it must match with the
276      * corresponding table defined in expeced data set file.
277      * <p>
278      * If query is null, the <code>tableName</code> must be a real database
279      * table name, and all rows in that table will be returned.
280      */
281     public String getQuery() {
282       return query;
283     }
284 
285     /**
286      * As DBUnit doesn't guarantee the order of returned rows, we would
287      * better specify the columns used to sort rows explicitly.
288      * <p>
289      * If sort columns are null, no sort operation will be performed.
290      */
291     public String[] getSort() {
292       return sort;
293     }
294   }
295 
296   /**
297    * Run a given SQL against underlying database.
298    */
299   protected void sqlExec(IDatabaseConnection conn, String sql) throws Exception {
300     Statement state = conn.getConnection().createStatement();
301     state.execute(sql);
302     state.close();
303     conn.getConnection().commit();
304   }
305 
306   /**
307    * Run a given SQL against underlying database, and return the int value of
308    * sql result.
309    */
310   protected int sqlQueryInt(IDatabaseConnection conn, String sql) throws Exception {
311     int result = 0;
312     Statement state = conn.getConnection().createStatement();
313     ResultSet rs = state.executeQuery(sql);
314     if (rs.next()) {
315       result = rs.getInt(1);
316     }
317     rs.close();
318     state.close();
319     conn.getConnection().commit();
320     return result;
321   }
322 
323   public IgpeClient setupIgpeClient() throws Exception {
324     Connection conn = null;
325     try {
326       // retrieve latest data/mac key
327       conn = setupConnection();
328       return new IntegrationTestIgpeClient(igpeHost, igpePort);
329     } finally {
330       if (conn != null)
331         conn.close();
332     }
333   }
334 
335   /**
336    * Generate timestamp string of current time.
337    */
338   protected String generateTimestamp() {
339     SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
340     return sdf.format(new Date());
341   }
342 
343   /**
344    * Decrypt the message body into plain text.
345    * 
346    * @param respTlvs The collection of responded TLVs.
347    * @return the plain text of message body which is encrypted.
348    * @throws Exception when encounter any exceptions.
349    */
350   protected String getPlainMessageBody(LinkedList<TLVElement> respTlvs) throws Exception {
351     return this.getPlainMessageBody(respTlvs, this.dataKey);
352   }
353 
354   protected String getPlainMessageBody(LinkedList<TLVElement> respTlvs, String key)
355           throws Exception {
356     TLVElement msgBodyTlv = TLVParser.GetObjectFromList(respTlvs, Constants.TAG_MESSAGE_BODY);
357     return SecurityMeasurements.TripleDESCBCDecryptToString(msgBodyTlv.GetValueAsString(), key);
358   }
359 
360   public static Connection setupConnection(String url, Driver driver, String userName,
361           String passwd) throws SQLException {
362     DriverManager.registerDriver(driver);
363     return DriverManager.getConnection(url, userName, passwd);
364   }
365 
366   public static Connection setupConnection() throws SQLException {
367     return setupConnection("jdbc:oracle:thin:@192.168.2.9:1521/orcl",
368             new oracle.jdbc.driver.OracleDriver(), "ramonal", "ramonal");
369   }
370 
371   private final static int MODE_LOAD = 1;
372   private final static int MODE_EXPORT = 2;
373 
374   /**
375    * How to extract a flat XML dataset from my database?
376    */
377   public static void main(String args[]) throws Exception {
378     if (args.length != 2) {
379       System.out.println("[USAGE]");
380       System.out.println("java " + BaseAcceptanceTest.class.getCanonicalName()
381               + " -l [test data source file to be loaded]");
382       System.out.println("OR");
383       System.out.println("java " + BaseAcceptanceTest.class.getCanonicalName()
384               + " -e [test data destination file to be exported]");
385       System.exit(0);
386     }
387 
388     int mode = -1;
389     if ("-l".equals(args[0]))
390       mode = MODE_LOAD;
391     else if ("-e".equals(args[0]))
392       mode = MODE_EXPORT;
393     else
394       throw new IllegalArgumentException("unsupport mode:" + args[0]);
395     String testDataFile = args[1];
396 
397     // we can reuse IDatabaseConnection, it represents a specific underlying
398     // database connection.
399     // must use this constructor which specify 'scheme', otherwise a
400     // 'AmbiguousTableNameException' will be thrown out.
401     IDatabaseConnection connection = new DatabaseConnection(setupConnection(), "ramonal", true);
402     connection.getConfig().setProperty("http://www.dbunit.org/properties/datatypeFactory",
403             new Oracle10DataTypeFactory());
404 
405     if (MODE_LOAD == mode) {
406       FlatXmlDataFileLoader loader = new FlatXmlDataFileLoader();
407       Map replaceMap = new HashMap();
408       replaceMap.put("[NULL]", null);
409       loader.addReplacementObjects(replaceMap);
410       IDataSet testData = loader.getBuilder().build(new File(testDataFile));
411       DatabaseOperation.CLEAN_INSERT.execute(connection, testData);
412       System.out.println("Load test data(" + testDataFile + ") successfully");
413       return;
414     }
415 
416     if (MODE_EXPORT == mode) {
417       // partial database export
418       QueryDataSet partialDataSet = new QueryDataSet(connection);
419       partialDataSet.addTable("TE_SEQUENCE");
420       partialDataSet.addTable("SYS_CONFIGURATION");
421       partialDataSet.addTable("TELCO");
422       partialDataSet.addTable("GPE");
423       partialDataSet.addTable("MERCHANT");
424       partialDataSet.addTable("TELCO_MERCHANT");
425       partialDataSet.addTable("DEVICE_PHYSICAL_AVAILABILITY");
426       partialDataSet.addTable("DEVICE_TYPE");
427       partialDataSet.addTable("DEVICES");
428       partialDataSet.addTable("HMAC_KEY");
429       partialDataSet.addTable("DEPARTMENT");
430       partialDataSet.addTable("ROLE");
431       partialDataSet.addTable("LANGUAGES");
432       partialDataSet.addTable("OPERATOR");
433       partialDataSet.addTable("OPERATOR_MERCHANT");
434       partialDataSet.addTable("LOTTO_FUN_TYPE");
435       partialDataSet.addTable("LOTTO_OPERATION_PARAMETERS");
436       partialDataSet.addTable("IG_OPERATION_PARAMETERS");
437       partialDataSet.addTable("WINNER_TAX_POLICY");
438       partialDataSet.addTable("TAX_DATE_RANGE");
439       partialDataSet.addTable("WINNER_TAX_THRESHOLDS");
440       partialDataSet.addTable("GAME_TYPE");
441       partialDataSet.addTable("GAME");
442       partialDataSet.addTable("GAME_MERCHANT");
443       partialDataSet.addTable("GAME_INSTANCE");
444       partialDataSet.addTable("GAME_RESULTS");
445       partialDataSet.addTable("TE_TRANSACTION");
446       partialDataSet.addTable("TE_TRANSACTION_MSG");
447       partialDataSet.addTable("TE_TICKET");
448       partialDataSet.addTable("TE_LOTTO_ENTRY");
449       partialDataSet.addTable("WINNING");
450       partialDataSet.addTable("WINNING_STATISTICS");
451       partialDataSet.addTable("PRIZE_PARAMETERS");
452       partialDataSet.addTable("PAYOUT_DETAIL");
453       partialDataSet.addTable("PAYOUT");
454       partialDataSet.addTable("MERCHANT_GAME_PROPERTIES");
455       partialDataSet.addTable("IG_GAME_INSTANCE");
456       partialDataSet.addTable("INSTANT_TICKET");
457       partialDataSet.addTable("INSTANT_TICKET_VIRN");
458       partialDataSet.addTable("OPERATOR_SESSION");
459       partialDataSet.addTable("ACCESS_RIGHT");
460       partialDataSet.addTable("ROLE_ACCESS");
461       partialDataSet.addTable("BD_LOGO");
462       partialDataSet.addTable("BD_MARKETING_MESSAGE");
463       partialDataSet.addTable("WINNING_OBJECT");
464       partialDataSet.addTable("BD_PRIZE_LOGIC");
465       partialDataSet.addTable("BD_PRIZE_OBJECT");
466       partialDataSet.addTable("BD_PRIZE_LEVEL");
467       partialDataSet.addTable("BD_PRIZE_LEVEL_ITEM");
468       partialDataSet.addTable("BD_PRIZE_GROUP");
469       partialDataSet.addTable("BD_PRIZE_GROUP_ITEM");
470       partialDataSet.addTable("OBJECT_PRIZE_PARAMETERS");
471       partialDataSet.addTable("DW_OPERATOR");
472       partialDataSet.addTable("DW_CARD");
473       partialDataSet.addTable("DW_MERCHANT_TOPUP_LOG");
474       partialDataSet.addTable("WINNING_DAILY_CASH");
475       partialDataSet.addTable("PRIZE_LOGIC");
476       FlatXmlDataSet.write(partialDataSet, new FileOutputStream(testDataFile));
477       System.out.println("export test data(" + testDataFile + ") successfully!");
478       return;
479     }
480   }
481 }
In this base test class, setUp() will load all initial test data from "testdata.xml" which locate at root package.

Below is a test class extending from BaseAcceptanceTest.

 1 package net.mpos.igpe.transactions.payout;
 2 
 3 import static org.junit.Assert.assertEquals;
 4 
 5 import java.util.Arrays;
 6 import java.util.HashMap;
 7 import java.util.LinkedList;
 8 import java.util.Map;
 9 
10 import net.mpos.igpe.common.tlvutilities.TLVElement;
11 import net.mpos.igpe.common.tlvutilities.TLVParser;
12 import net.mpos.igpe.core.Constants;
13 import net.mpos.igpe.test.BaseAcceptanceTest;
14 import net.mpos.igpe.test.IgpeClient;
15 
16 import org.junit.Test;
17 
18 public class PayoutAcceptanceTest extends BaseAcceptanceTest {
19 
20   @Test
21   public void testPayout_WithoutLGPrize_OldPOS_OK() throws Exception {
22     IgpeClient igpeClient = this.setupIgpeClient();
23     String traceMsgId = generateTimestamp();
24     LinkedList<TLVElement> resp = igpeClient
25             .igpe("1.4", traceMsgId, generateTimestamp(), opLoginName,
26                     Constants.REQ_PAYOUT + "", deviceId + "", batchNo,
27                     Constants.INVALIDATION_VALUE + "", "#S-123456#PIN-111#2#", dataKey, macKey);
28     String respCode = TLVParser.GetObjectFromList(resp, Constants.TAG_RESPONSE_CODE)
29             .GetValueAsString();
30     assertEquals("#1#200##", respCode);
31     String msgBody = this.getPlainMessageBody(resp);
32     System.out.println(msgBody);
33 
34     // ----- assert database
35     Map replacementMap = new HashMap();
36     // dynamicaly replace palceholder in expected test data file.
37     replacementMap.put("${TICKET_SERIALNO}", "Dwl8yOheqKjhNA2RNW9GFQ==");
38     replacementMap.put("${TRACE_MSG_ID}", traceMsgId);
39     String transId = TLVParser.GetObjectFromList(resp, Constants.TAG_TRANSACTION_ID)
40             .GetValueAsString();
41     this.assertTable(replacementMap, Arrays.asList(new DbAssertTable("TE_TICKET",
42             "select * from TE_TICKET where SERIAL_NO='Dwl8yOheqKjhNA2RNW9GFQ=='",
43             new String[] { "GAME_INSTANCE_ID" }),
44     // only one expected row, no need to set sorting columns.
45             new DbAssertTable("TE_Transaction", "select * from TE_TRANSACTION where ID='"
46                     + transId + "'", null)));
47   }
48 
49 }
In the test case testPayout_WithoutLGPrize_OldPOS_OK(), it will call assertTable() which will compare expected data set against the underlying database. You can check assertTable() for detail implementation, it is easy to understand. Our expected test data file: PayoutAcceptanceTest.testPayout_WithLGPrize_NewPOS_OK.expected.xml
1 <?xml version='1.0' encoding='UTF-8'?>
2 <dataset>
3   <TE_TICKET ID="TICKET-111" GAME_INSTANCE_ID="GII-111" TRANSACTION_ID="TRANS-111" VERSION="1" SERIAL_NO="Dwl8yOheqKjhNA2RNW9GFQ==" TOTAL_AMOUNT="2500.1" IS_WINNING="0" STATUS="5" PIN="f5e09f731f7dffc2a603a7b9b977b2ca" IS_OFFLINE="0" IS_COUNT_IN_POOL="1" IS_BLOCK_PAYOUT="0" SETTLEMENT_FLAG="0" OPERATOR_ID="OPERATOR-111" DEV_ID="111" MERCHANT_ID="111" EXTEND_TEXT="90091b1caee72b14c5269c9214e66dab" TICKET_TYPE="1"/>
4   <TE_TICKET ID="TICKET-112" GAME_INSTANCE_ID="GII-112" TRANSACTION_ID="TRANS-111" VERSION="1" SERIAL_NO="Dwl8yOheqKjhNA2RNW9GFQ==" TOTAL_AMOUNT="2500.1" IS_WINNING="0" STATUS="5" PIN="f5e09f731f7dffc2a603a7b9b977b2ca" IS_OFFLINE="0" IS_COUNT_IN_POOL="1" IS_BLOCK_PAYOUT="0" SETTLEMENT_FLAG="0" OPERATOR_ID="OPERATOR-111" DEV_ID="111" MERCHANT_ID="111" EXTEND_TEXT="90091b1caee72b14c5269c9214e66dab" TICKET_TYPE="1"/>
5   <TE_TICKET ID="TICKET-113" GAME_INSTANCE_ID="GII-113" TRANSACTION_ID="TRANS-111" VERSION="1" SERIAL_NO="Dwl8yOheqKjhNA2RNW9GFQ==" TOTAL_AMOUNT="2500.1" IS_WINNING="0" STATUS="1" PIN="f5e09f731f7dffc2a603a7b9b977b2ca" IS_OFFLINE="0" IS_COUNT_IN_POOL="1" IS_BLOCK_PAYOUT="0" SETTLEMENT_FLAG="0" OPERATOR_ID="OPERATOR-111" DEV_ID="111" MERCHANT_ID="111" EXTEND_TEXT="90091b1caee72b14c5269c9214e66dab" TICKET_TYPE="1"/>
6   <TE_TRANSACTION OPERATOR_ID="OPERATOR-111" GPE_ID="GPE-111" DEV_ID="111" MERCHANT_ID="111" VERSION="0"  TYPE="302" TRACE_MESSAGE_ID="${TRACE_MSG_ID}" RESPONSE_CODE="200" TICKET_SERIAL_NO="${TICKET_SERIALNO}" BATCH_NO="200901" SETTLEMENT_FLAG="0" GAME_ID="GAME-111"/>
7 </dataset>
There is another open source project Unitils which can make using DBunit easier, but it depends on too many third party projects which also depended by my own project, to avoid version conflict of same library, I refuse it. Actually once we have implemented BaseAcceptanceTest, the overhead of writing test code have been significantly decreased.