Thursday 11 June 2015

Simplified Downloads

Since its re-launch in 2013 gbif.org has supported the downloading of occurrence data using an arbitrary query with the download file provided as a Darwin Core Archive file whose internal content is described here. This format contains comprehensive and self-explanatory information, which makes it suitable to be referenced in external resources. However, in cases where people only need the occurrence data in its simplest form the DwC-A format presents an additional complexity that can make it hard to use the data. Because of that we now support a new download format: a zip file that only contains a single file with the most common fields/terms used, where each column is separated by the TAB character. This makes things much easier when it comes to importing the data into tools such as Microsoft Excel, geographic information systems and relational databases. The current download functionality was extended to allow the selection of the desired format:

From this point the functionality remains the same: eventually you will receive an email containing a hyperlink where the file can be downloaded.

Technical Architecture

The simplified download format was implemented following the technical requirement that new formats should be supported in the near future with minimal impact to the formats supported at a specific moment. In general, occurrence downloads are implemented using two different sets of technologies depending on the estimated size of the download in number of records; a threshold of 200,000 records is set to define when a download is small (< 200K) and big (>200K), where history shows a vast majority of “small” downloads. The following chart summarizes the key technologies that enables occurrence downloads:

Download workflow

Occurrence downloads are automated using a workflow engine called Oozie, it coordinates the required steps to produce a single download file. In summary the workflow proceeds as follows:
  1. Initially, Apache Solr is contacted to determine the number of records that the download file will contain.
  2. Big or small?
    1.  If the amount of records is less than 200,000 (it is small download), Apache Solr is queried to iterate over the results; the detail of each occurrence record is fetched from HBase since it’s the official storage of occurrence records. Individual downloads are produced by a multi-threaded application implemented using the Akka framework; the Apache Zookeeper and Curator frameworks are used to limit the amount of threads that can be running at the same time (it avoids a thread explosion in the machines that run the download workflow).
    2. If the amount of records is greater than 200,000 (it is a big download), Apache Hive is used to retrieve the occurrence data from an HDFS table. To avoid overloading of HBase we create that HDFS table as a daily snapshot of the occurrence data stored in HBase.
  3. Finally the occurrence records are collected and organized in the requested output format (DwC-A or Simple).
Note: the details of how this is implemented can be consulted in the Github project: https://github.com/gbif/occurrence/tree/master/occurrence-download.

Conclusion

Reducing both the number of columns and the size (number of bytes) in our downloads has been one of our most requested features, and we hope this makes using the GBIF data easier for everyone.


Friday 29 May 2015

Don't fill your HDFS disks (upgrading to CDH 5.4.2)

Just a short post on the dangers of filling your HDFS disks. It's a warning you'll hear at conferences and in best practices blog posts like this one, but usually with only a vague consequence of "bad things will happen". We upgraded from CDH 5.2.0 to CDH 5.4.2 this past weekend and learned the hard way: bad things will happen.

The Machine Configuration

The upgrade went fine in our dev cluster (which has almost no data in HDFS) so we weren't expecting problems in production. Our production cluster is of course slightly different than our (much smaller) dev cluster. In production we have 3 masters, where one holds the NameNode and another holds the SecondaryNameNode (we're not yet using a High Availability setup, but it's in the plan). We have 12 DataNodes where each one has 13 disks dedicated to HDFS storage: 12 are 1TB and one is 512GB. They are formatted with 0% reserved blocks for root. The machines are evenly split into two racks.

Pre Upgrade Status

We were at about 75% total HDFS usage with only a few percent difference between machines. We were configured to use Round Robin block placement (dfs.datanode.fsdataset.volume.choosing.policy) with 10GB reserved for non-hdfs use (dfs.datanode.du.reserved), which are the defaults in CDH manager. Each of the 1TB disks was around 700GB used (of 932GB usable), and the 512 GB disks were all at their limit: 456GB used (of 466GB usable). That left only the configured 10GB free for non-hdfs use on the small disks. Our disks are mounted in the pattern /mnt/disk_a, /mnt/disk_b and so on, with /mnt/disk_m as the small disk. We're using the free version of CDHM so we can't do rolling upgrades, meaning this upgrade would bringing everything down. And because our cluster is getting full (> 80% usage is another rumoured "bad things" threshold) we have reduced one class of data (user's occurrence downloads) to a replication factor of 2 (from the default of 3). This is considered somewhere between naughty and criminal, and you'll see why below.

Upgrade Time

We followed the recommended procedure and did the oozie, hive, and CDH manager backups, downloaded the latest parcels, and pressed the big Update button. Everything appeared to be going fine until HDFS tried to start up again, where the symptom was that it was taking a really long time (several minutes, after which the CDHM upgrade process finally gave up saying the DataNodes weren't making contact). Looking at the NameNode logs we see that it was performing a "Block Pool Upgrade", which took btw 90 and 120 seconds for each of our ~700GB disks. Here's an excerpt of where it worked without problems:


2015-05-23 20:18:53,715 INFO org.apache.hadoop.hdfs.server.common.Storage: Lock on /mnt/disk_a/dfs/dn/in_use.lock acquired by nodename 27117@c4n1.gbif.org
2015-05-23 20:18:53,811 INFO org.apache.hadoop.hdfs.server.common.Storage: Analyzing storage directories for bpid BP-2033573672-130.226.238.178-1367832131535
2015-05-23 20:18:53,811 INFO org.apache.hadoop.hdfs.server.common.Storage: Locking is disabled for /mnt/disk_a/dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535
2015-05-23 20:18:53,823 INFO org.apache.hadoop.hdfs.server.common.Storage: Upgrading block pool storage directory /mnt/disk_a/dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535.
   old LV = -56; old CTime = 1416737045694.
   new LV = -56; new CTime = 1432405112136
2015-05-23 20:20:33,565 INFO org.apache.hadoop.hdfs.server.common.Storage: HardLinkStats: 59768 Directories, including 53157 Empty Directories, 0 single Link operations, 6611 multi-Link operations, linking 22536 files, total 22536 linkable files.  Also physically copied 0 other files.
2015-05-23 20:20:33,609 INFO org.apache.hadoop.hdfs.server.common.Storage: Upgrade of block pool BP-2033573672-130.226.238.178-1367832131535 at /mnt/disk_a/dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535 is complete

That upgrade time happens sequentially for each disk, so even the though the machines were upgrading in parallel, we were still looking at ~30 minutes of downtime for the whole cluster. As if that wasn't sufficiently worrying, then we finally get to disk_m, our nearly full 512G disk:


2015-05-23 20:53:05,814 INFO org.apache.hadoop.hdfs.server.common.Storage: Lock on /mnt/disk_m/dfs/dn/in_use.lock acquired by nodename 12424@c4n1.gbif.org
2015-05-23 20:53:05,869 INFO org.apache.hadoop.hdfs.server.common.Storage: Analyzing storage directories for bpid BP-2033573672-130.226.238.178-1367832131535
2015-05-23 20:53:05,870 INFO org.apache.hadoop.hdfs.server.common.Storage: Locking is disabled for /mnt/disk_m/
dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535
2015-05-23 20:53:05,886 INFO org.apache.hadoop.hdfs.server.common.Storage: Upgrading block pool storage directory /mnt/disk_m/
dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535.
   old LV = -56; old CTime = 1416737045694.
   new LV = -56; new CTime = 1432405112136
2015-05-23 20:54:12,469 WARN org.apache.hadoop.hdfs.server.common.Storage: Failed to analyze storage directories for block pool BP-2033573672-130.226.238.178-1367832131535
java.io.IOException: Cannot create directory /mnt/disk_m/
dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535/current/finalized/subdir91/subdir168
        at org.apache.hadoop.hdfs.server.datanode.DataStorage.linkBlocksHelper(DataStorage.java:1259)
        at org.apache.hadoop.hdfs.server.datanode.DataStorage.linkBlocksHelper(DataStorage.java:1296)
        at org.apache.hadoop.hdfs.server.datanode.DataStorage.linkBlocksHelper(DataStorage.java:1296)
        at org.apache.hadoop.hdfs.server.datanode.DataStorage.linkBlocks(DataStorage.java:1023)
        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage.linkAllBlocks(BlockPoolSliceStorage.java:647)
        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage.doUpgrade(BlockPoolSliceStorage.java:456)
        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage.doTransition(BlockPoolSliceStorage.java:390)
        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage.loadStorageDirectory(BlockPoolSliceStorage.java:171)
        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage.loadBpStorageDirectories(BlockPoolSliceStorage.java:214)
        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage.recoverTransitionRead(BlockPoolSliceStorage.java:242)
        at org.apache.hadoop.hdfs.server.datanode.DataStorage.addStorageLocations(DataStorage.java:396)
        at org.apache.hadoop.hdfs.server.datanode.DataStorage.recoverTransitionRead(DataStorage.java:478)
        at org.apache.hadoop.hdfs.server.datanode.DataNode.initStorage(DataNode.java:1397)
        at org.apache.hadoop.hdfs.server.datanode.DataNode.initBlockPool(DataNode.java:1362)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.verifyAndSetNamespaceInfo(BPOfferService.java:317)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.connectToNNAndHandshake(BPServiceActor.java:227)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:839)
        at java.lang.Thread.run(Thread.java:745)
2015-05-23 20:54:12,476 WARN org.apache.hadoop.hdfs.server.common.Storage: Failed to add storage for block pool: BP-2033573672-130.226.238.178-1367832131535 : Cannot create directory /mnt/disk_m/dfs/dn/current/BP-2033573672-130.226.238.178-1367832131535/current/finalized/subdir91/subdir168

The somewhat misleading "Cannot create directory" is not a file permission problem but rather a disk full problem. During this block pool upgrade some temporary space is needed for rewriting metadata, and that space is apparently more than the 10G that was available to "non-HDFS" (which we've concluded means "not HDFS storage files, but everything else is fair game"). Because some space is available to start the upgrade, it begins, but then when it exhausts the disk it fails, and This Kills The DataNode. It does clean up after itself, but prevents the DataNode from starting, meaning our cluster was on its knees and in no danger of standing up.

So the problem was lack of free space, which on 10 of our 12 machines we were able to solve by wiping temporary files from the colocated yarn directory. Those 10 machines were then able to upgrade their disk_m and started up. We still had two nodes down and unfortunately they were in different racks, so that meant we had a big pile of our replication factor 2 files missing blocks (the default HDFS block replication policy places the second and subsequent copies on a different rack from the first copy).

While digging around in the different properties that we thought could affect our disks and HDFS behaviour we were also restarting the failing DataNodes regularly. At some point the log message changed to:

WARN org.apache.hadoop.hdfs.server.common.Storage: java.io.FileNotFoundException: /mnt/disk_m/dfs/dn/in_use.lock (No space left on device)

After that message the DataNode started, but with disk_m marked as a failed volume. We're not sure why this happened but presume that after one of our failures it didn't clean up it's temp files on disk_m and then on subsequent restarts found the disk completely full and (rightly) considered it unusable and tried to carry on. With the final two DataNodes up we had almost all of our cluster, minus the two failed volumes. There were only 35 corrupted files (missing blocks) left after they came up. These were files set to replication factor 2, and by bad luck had both copies of some of their blocks on the failed disk_m (one from rack1, one from rack2).

It would not have been the end of the world to just delete the corrupted user downloads (they were all over a year old) but on principle, it would not be The Right Thing To Do.

On inodes and hardlinks

The normal directory structure of the dfs dir in a DataNode is /dfs/dn/current/<blockpool name>/current/finalized and within finalized are a whole series of directories to fan out the various blocks that the volume contains. During the block pool upgrade a copy of 'finalized' is made called previous.tmp. It's not a normal copy however - it uses hardlinks in order to avoid duplicating all of the data (which obviously wouldn't work). The copy is needed during the upgrade and is removed afterwards. Since our upgrade failed halfway through we had both directories and had no choice but to move the entire /dfs directory off of /disk_m to a temporary disk and complete the upgrade there. We first tried a copy (use cp -a to preserve hardlinks) to a mounted NFS share. The copy looked fine but on startup the DataNode didn't understand the mounted drive ("drive not formatted"). Then we tried copying to a USB drive plugged into the machine and that ultimately worked (despite feeling decidedly un-Yahoo). Once the USB drive was upgraded and online in the cluster, replication took over and copied all of its blocks to new homes on /rack2. We then unmounted the USB drive, wiped both /disk_m's and then let replication balance out again. Final result: no lost blocks.

Mitigation

With the cluster happy again we made a few changes to hopefully ensure this doesn't happen again:
  • dfs.datanode.du.reserved:25GB this guarantees 25GB free on each volume (up from 10GB) and should be enough to allow a future upgrade to happen
  • dfs.datanode.fsdataset.volume.choosing.policy:AvailableSpace 
  • dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction:1.0 together these two direct new blocks to disks that have more free space, thereby leaving our now full /disk_m alone

Conclusion

This was one small taste of what can go wrong with filling heterogenous disks in an HDFS cluster. We're sure there are worse dangers lurking on the full-disk horizon, so hopefully you've learned from our pain and will give yourself some breathing room when things start to fill up. Also, don't use a replication factor of less than 3 if there's anyway you can help it.





Monday 30 March 2015

Improving the GBIF Backbone matching

In GBIF occurrence records are matched to a taxon in a backbone taxonomy using the species match API. This is important to reduce spelling variations and create consistent metrics and searches according to a single classification and synonymy.

Over the past years we have been alerted to various bad matches. Most of the reported issues refer to a false fuzzy match for a name missing in our backbone.

In order to improve the taxonomic classification of occurrence records, we are undertaking 2 activities.  The first is to improve the algorithms we use to fuzzily match names, and the second will be to improve the algorithms used to assembled the backbone taxonomy itself.  Here I explain some of the work currently underway to tackle the former, which is visible on the test environment.

1.Name parsing of undetermined species

In occurrences we see many names with a partly undetermined name such as Lucanus spec. Erroneously these rank markers have been treated as real species epithets and together with fuzzy matching resulted in poor results.

Examples
  • Xysticus sp. used to wrongly match Xysticus spiethi while it now just matches the genus Xysticus.
  • Triodia sp. used to match the family Poaceae while it now matches the genus

2. Damerau–Levenshtein distance algorithm

For scoring fuzzy matches we have so far applied the Jaro Winkler distance which is often used for matching person names. It tends to allow for rather fuzzy matches at the end of long strings. This is desirable for scientific names, but the allowed fuzziness was too big and we decided to revert to the classical and more predictable Damerau–Levenshtein distance. This reduces false positive fuzzy matches considerably even though we lost a few good matches at the same time.

Examples

Matching results

The distinct, verbatim classifications of 528 million records were passed through the original and the new fuzzy matching algorithms - this included 10.5 million distinct classifications in total.  The results show that 428 thousand classifications (4%), representing 5,323,758 occurrence records produce a different match. So far we have taken a random subsample of the records which change, and manually inspected the results - we can hardly spot any degression or wrong matches.

We have published the complete matching comparison as well as the subset of changed records at Zenodo as tab delimited files:


The schema of the files have 3 column families each with the scientificName, GBIF taxonKey and the higher DwC classification terms for every match record (verbatim prefixed with v_ , old matching with an _old suffix and the new matching results with plain terms, e.g. v_scientificName, scientificName_old, scientificName).


We are glad to receive any feedback on further improvements or bad matching results we need to fix in the next iteration of work. Please get in touch with Markus Döring, mdoering@gbif.org.

Appendix

Create distinct occurrence names table

CREATE TABLE markus.names AS 
SELECT count(*) as numocc, count(distinct datasetKey) as numdatasets, v_scientificName, v_kingdom, v_phylum, v_class, v_order_ as v_order, v_family, v_genus, v_subgenus, v_specificEpithet, v_infraspecificEpithet, v_scientificNameAuthorship, v_taxonrank, v_higherClassification 
FROM prod_b.occurrence_hdfs 
GROUP BY v_scientificName, v_kingdom, v_phylum, v_class, v_order_, v_family, v_genus, v_subgenus, v_specificEpithet, v_infraspecificEpithet, v_scientificNameAuthorship, v_taxonrank, v_higherClassification 
ORDER BY v_scientificName, numocc DESC

Lookup taxonkey with both old & new lookup

CREATE TABLE markus.name_matches AS
SELECT 
  n.numocc, 
  n.numdatasets, 
  n.v_scientificName, 
  n.v_kingdom, 
  n.v_phylum, 
  n.v_class, 
  n.v_order, 
  n.v_family, 
  n.v_genus, 
  n.v_subgenus, 
  n.v_specificEpithet, 
  n.v_infraspecificEpithet, 
  n.v_scientificNameAuthorship, 
  n.v_taxonrank, 
  n.v_higherClassification, 

  prod.taxonKey as taxonKey_old,
  prod.scientificName as scientificName_old,
  prod.rank as rank_old,
  prod.status as status_old,
  prod.matchType as matchType_old,
  prod.confidence as confidence_old,
  prod.kingdomKey as kingdomKey_old,
  prod.phylumKey as phylumKey_old,
  prod.classKey as classKey_old,
  prod.orderKey as orderKey_old,
  prod.familyKey as familyKey_old,
  prod.genusKey as genusKey_old,
  prod.speciesKey as speciesKey_old,
  prod.kingdom as kingdom_old,
  prod.phylum as phylum_old,
  prod.class_ as class_old,
  prod.order_ as order_old,
  prod.family as family_old,
  prod.genus as genus_old,
  prod.species as species_old,

  uat.taxonKey as taxonKey,
  uat.scientificName as scientificName,
  uat.rank as rank,
  uat.status as status,
  uat.matchType as matchType,
  uat.confidence as confidence,
  uat.kingdomKey as kingdomKey,
  uat.phylumKey as phylumKey,
  uat.classKey as classKey,
  uat.orderKey as orderKey,
  uat.familyKey as familyKey,
  uat.genusKey as genusKey,
  uat.speciesKey as speciesKey,
  uat.kingdom as kingdom,
  uat.phylum as phylum,
  uat.class_ as class_,
  uat.order_ as order_,
  uat.family as family,
  uat.genus as genus,
  uat.species as species

FROM (
  SELECT 
    numocc, 
    numdatasets, 
    v_scientificName, 
    v_kingdom, v_phylum, v_class, v_order, v_family, v_genus, v_subgenus, 
    v_specificEpithet, 
    v_infraspecificEpithet, 
    v_scientificNameAuthorship, 
    v_taxonrank, 
    v_higherClassification, 
    match('PROD', v_kingdom, v_phylum, v_class, v_order, v_family, v_genus, v_scientificName, v_specificEpithet, v_infraspecificEpithet) prod, 
    match('UAT', v_kingdom, v_phylum, v_class, v_order, v_family, v_genus, v_scientificName, v_specificEpithet, v_infraspecificEpithet) uat
  FROM markus.names
) n;

Hive exports

CREATE TABLE markus.matches_changed 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' NULL DEFINED AS '' AS 
SELECT * from markus.name_matches 
WHERE taxonKey!=taxonKey_old;

CREATE TABLE markus.matches_all 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' NULL DEFINED AS '' AS 
SELECT * from markus.name_matches;

Friday 27 March 2015

IPT v2.2 – Making data citable through DataCite

GBIF is pleased to release IPT 2.2, now capable of automatically connecting with either DataCite or EZID to assign DOIs to datasets. This new feature makes biodiversity data easier to access on the Web and facilitates tracking its re-use.

DataCite integration explained

DataCite specialises in assigning DOIs to datasets. It was established in 2009 with three fundamental goals(1):
                 
  1. Establish easier access to research data on the Internet
  2. Increase acceptance of research data as citable contributions to the scholarly record
  3. Support research data archiving to permit results to be verified and re-purposed for future study